This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2f4ff4e775b13cac48ee3a369d4eff2368a49de7 Author: TsReaper <[email protected]> AuthorDate: Thu Jul 18 19:55:48 2019 +0800 [FLINK-13323][table-runtime-blink] Add tests for complex data formats --- .../flink/table/dataformat/BinaryRowTest.java | 539 ++++++++++++++++++++- .../flink/table/dataformat/DataFormatTestUtil.java | 68 ++- .../apache/flink/table/dataformat/DecimalTest.java | 8 + .../apache/flink/table/util/SegmentsUtilTest.java | 149 +++++- 4 files changed, 751 insertions(+), 13 deletions(-) diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryRowTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryRowTest.java index b4a6387..6351e78 100644 --- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryRowTest.java +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryRowTest.java @@ -18,31 +18,60 @@ package org.apache.flink.table.dataformat; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.LocalDateSerializer; +import org.apache.flink.api.common.typeutils.base.LocalDateTimeSerializer; +import org.apache.flink.api.common.typeutils.base.LocalTimeSerializer; +import org.apache.flink.api.common.typeutils.base.SqlDateSerializer; +import org.apache.flink.api.common.typeutils.base.SqlTimeSerializer; +import org.apache.flink.api.common.typeutils.base.SqlTimestampSerializer; import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.runtime.io.disk.RandomAccessInputView; import org.apache.flink.runtime.io.disk.RandomAccessOutputView; +import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.types.logical.IntType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.VarCharType; +import org.apache.flink.table.typeutils.BaseArraySerializer; +import org.apache.flink.table.typeutils.BaseMapSerializer; import org.apache.flink.table.typeutils.BaseRowSerializer; import org.apache.flink.table.typeutils.BinaryRowSerializer; import org.junit.Assert; import org.junit.Test; +import java.io.EOFException; import java.io.IOException; import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; +import java.util.List; +import java.util.Map; import java.util.Random; import java.util.Set; import static org.apache.flink.table.dataformat.BinaryString.fromBytes; import static org.apache.flink.table.dataformat.BinaryString.fromString; +import static org.apache.flink.table.dataformat.DataFormatTestUtil.MyObj; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; /** @@ -61,7 +90,7 @@ public class BinaryRowTest { MemorySegment segment = MemorySegmentFactory.wrap(new byte[100]); BinaryRow row = new BinaryRow(2); row.pointTo(segment, 10, 48); - assertTrue(row.getSegments()[0] == segment); + assertSame(row.getSegments()[0], segment); row.setInt(0, 5); row.setDouble(1, 5.8D); } @@ -85,7 +114,7 @@ public class BinaryRowTest { assertTrue(row.isNullAt(0)); assertEquals(55, row.getShort(5)); assertEquals(22, row.getLong(2)); - assertEquals(true, row.getBoolean(4)); + assertTrue(row.getBoolean(4)); assertEquals((byte) 66, row.getByte(6)); assertEquals(77f, row.getFloat(7), 0); } @@ -131,7 +160,7 @@ public class BinaryRowTest { } @Test - public void testWriteString() throws IOException { + public void testWriteString() { { // litter byte[] BinaryRow row = new BinaryRow(1); @@ -242,7 +271,7 @@ public class BinaryRowTest { assertEquals((byte) 99, row.getByte(2)); assertEquals(87.1d, row.getDouble(6), 0); assertEquals(26.1f, row.getFloat(7), 0); - assertEquals(true, row.getBoolean(1)); + assertTrue(row.getBoolean(1)); assertEquals("1234567", row.getString(3).toString()); assertEquals("12345678", row.getString(5).toString()); assertEquals("啦啦啦啦啦我是快乐的粉刷匠", row.getString(9).toString()); @@ -269,7 +298,7 @@ public class BinaryRowTest { } @Test - public void anyNullTest() throws IOException { + public void anyNullTest() { { BinaryRow row = new BinaryRow(3); BinaryRowWriter writer = new BinaryRowWriter(row); @@ -302,7 +331,7 @@ public class BinaryRowTest { } @Test - public void testSingleSegmentBinaryRowHashCode() throws IOException { + public void testSingleSegmentBinaryRowHashCode() { final Random rnd = new Random(System.currentTimeMillis()); // test hash stabilization BinaryRow row = new BinaryRow(13); @@ -347,7 +376,7 @@ public class BinaryRowTest { } @Test - public void testHeaderSize() throws IOException { + public void testHeaderSize() { assertEquals(8, BinaryRow.calculateBitSetWidthInBytes(56)); assertEquals(16, BinaryRow.calculateBitSetWidthInBytes(57)); assertEquals(16, BinaryRow.calculateBitSetWidthInBytes(120)); @@ -355,7 +384,7 @@ public class BinaryRowTest { } @Test - public void testHeader() throws IOException { + public void testHeader() { BinaryRow row = new BinaryRow(2); BinaryRowWriter writer = new BinaryRowWriter(row); @@ -456,4 +485,498 @@ public class BinaryRowTest { Assert.assertArrayEquals(bytes1, row.getBinary(0)); Assert.assertArrayEquals(bytes2, row.getBinary(1)); } + + @Test + public void testBinaryArray() { + // 1. array test + BinaryArray array = new BinaryArray(); + BinaryArrayWriter arrayWriter = new BinaryArrayWriter( + array, 3, BinaryArray.calculateFixLengthPartSize(DataTypes.INT().getLogicalType())); + + arrayWriter.writeInt(0, 6); + arrayWriter.setNullInt(1); + arrayWriter.writeInt(2, 666); + arrayWriter.complete(); + + assertEquals(array.getInt(0), 6); + assertTrue(array.isNullAt(1)); + assertEquals(array.getInt(2), 666); + + // 2. test write array to binary row + BinaryRow row = new BinaryRow(1); + BinaryRowWriter rowWriter = new BinaryRowWriter(row); + BaseArraySerializer serializer = new BaseArraySerializer( + DataTypes.INT().getLogicalType(), new ExecutionConfig()); + rowWriter.writeArray(0, array, serializer); + rowWriter.complete(); + + BinaryArray array2 = (BinaryArray) row.getArray(0); + assertEquals(array, array2); + assertEquals(6, array2.getInt(0)); + assertTrue(array2.isNullAt(1)); + assertEquals(666, array2.getInt(2)); + } + + @Test + public void testGenericArray() { + // 1. array test + Integer[] javaArray = {6, null, 666}; + GenericArray array = new GenericArray(javaArray, 3, false); + + assertEquals(array.getInt(0), 6); + assertTrue(array.isNullAt(1)); + assertEquals(array.getInt(2), 666); + + // 2. test write array to binary row + BinaryRow row2 = new BinaryRow(1); + BinaryRowWriter writer2 = new BinaryRowWriter(row2); + BaseArraySerializer serializer = new BaseArraySerializer( + DataTypes.INT().getLogicalType(), new ExecutionConfig()); + writer2.writeArray(0, array, serializer); + writer2.complete(); + + BaseArray array2 = row2.getArray(0); + assertEquals(6, array2.getInt(0)); + assertTrue(array2.isNullAt(1)); + assertEquals(666, array2.getInt(2)); + } + + @Test + public void testBinaryMap() { + BinaryArray array1 = new BinaryArray(); + BinaryArrayWriter writer1 = new BinaryArrayWriter( + array1, 4, BinaryArray.calculateFixLengthPartSize(DataTypes.INT().getLogicalType())); + writer1.writeInt(0, 6); + writer1.writeInt(1, 5); + writer1.writeInt(2, 666); + writer1.writeInt(3, 0); + writer1.complete(); + + BinaryArray array2 = new BinaryArray(); + BinaryArrayWriter writer2 = new BinaryArrayWriter( + array2, 4, BinaryArray.calculateFixLengthPartSize(DataTypes.STRING().getLogicalType())); + writer2.writeString(0, BinaryString.fromString("6")); + writer2.writeString(1, BinaryString.fromString("5")); + writer2.writeString(2, BinaryString.fromString("666")); + writer2.setNullAt(3, DataTypes.STRING().getLogicalType()); + writer2.complete(); + + BinaryMap binaryMap = BinaryMap.valueOf(array1, array2); + + BinaryRow row = new BinaryRow(1); + BinaryRowWriter rowWriter = new BinaryRowWriter(row); + BaseMapSerializer serializer = new BaseMapSerializer( + DataTypes.STRING().getLogicalType(), + DataTypes.INT().getLogicalType(), + new ExecutionConfig()); + rowWriter.writeMap(0, binaryMap, serializer); + rowWriter.complete(); + + BinaryMap map = (BinaryMap) row.getMap(0); + BinaryArray key = map.keyArray(); + BinaryArray value = map.valueArray(); + + assertEquals(binaryMap, map); + assertEquals(array1, key); + assertEquals(array2, value); + + assertEquals(5, key.getInt(1)); + assertEquals(BinaryString.fromString("5"), value.getString(1)); + assertEquals(0, key.getInt(3)); + assertTrue(value.isNullAt(3)); + } + + @Test + public void testGenericMap() { + Map javaMap = new HashMap(); + javaMap.put(6, BinaryString.fromString("6")); + javaMap.put(5, BinaryString.fromString("5")); + javaMap.put(666, BinaryString.fromString("666")); + javaMap.put(0, null); + + GenericMap genericMap = new GenericMap(javaMap); + + BinaryRow row = new BinaryRow(1); + BinaryRowWriter rowWriter = new BinaryRowWriter(row); + BaseMapSerializer serializer = new BaseMapSerializer( + DataTypes.INT().getLogicalType(), + DataTypes.STRING().getLogicalType(), + new ExecutionConfig()); + rowWriter.writeMap(0, genericMap, serializer); + rowWriter.complete(); + + Map map = row.getMap(0).toJavaMap(DataTypes.INT().getLogicalType(), DataTypes.STRING().getLogicalType()); + assertEquals(BinaryString.fromString("6"), map.get(6)); + assertEquals(BinaryString.fromString("5"), map.get(5)); + assertEquals(BinaryString.fromString("666"), map.get(666)); + assertTrue(map.containsKey(0)); + assertNull(map.get(0)); + } + + @Test + public void testGenericObject() throws Exception { + + GenericTypeInfo<MyObj> info = new GenericTypeInfo<>(MyObj.class); + TypeSerializer<MyObj> genericSerializer = info.createSerializer(new ExecutionConfig()); + + BinaryRow row = new BinaryRow(4); + BinaryRowWriter writer = new BinaryRowWriter(row); + writer.writeInt(0, 0); + + BinaryGeneric<MyObj> myObj1 = new BinaryGeneric<>(new MyObj(0, 1), genericSerializer); + writer.writeGeneric(1, myObj1); + BinaryGeneric<MyObj> myObj2 = new BinaryGeneric<>(new MyObj(123, 5.0), genericSerializer); + myObj2.ensureMaterialized(); + writer.writeGeneric(2, myObj2); + BinaryGeneric<MyObj> myObj3 = new BinaryGeneric<>(new MyObj(1, 1), genericSerializer); + writer.writeGeneric(3, myObj3); + writer.complete(); + + assertTestGenericObjectRow(row, genericSerializer); + + // getBytes from var-length memorySegments. + BinaryRowSerializer serializer = new BinaryRowSerializer(4); + MemorySegment[] memorySegments = new MemorySegment[3]; + ArrayList<MemorySegment> memorySegmentList = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + memorySegments[i] = MemorySegmentFactory.wrap(new byte[64]); + memorySegmentList.add(memorySegments[i]); + } + RandomAccessOutputView out = new RandomAccessOutputView(memorySegments, 64); + serializer.serializeToPages(row, out); + + BinaryRow mapRow = serializer.mapFromPages(new RandomAccessInputView(memorySegmentList, 64)); + assertTestGenericObjectRow(mapRow, genericSerializer); + } + + private void assertTestGenericObjectRow(BinaryRow row, TypeSerializer<MyObj> serializer) { + assertEquals(0, row.getInt(0)); + BinaryGeneric<MyObj> binaryGeneric1 = row.getGeneric(1); + BinaryGeneric<MyObj> binaryGeneric2 = row.getGeneric(2); + BinaryGeneric<MyObj> binaryGeneric3 = row.getGeneric(3); + assertEquals(new MyObj(0, 1), BinaryGeneric.getJavaObjectFromBinaryGeneric(binaryGeneric1, serializer)); + assertEquals(new MyObj(123, 5.0), BinaryGeneric.getJavaObjectFromBinaryGeneric(binaryGeneric2, serializer)); + assertEquals(new MyObj(1, 1), BinaryGeneric.getJavaObjectFromBinaryGeneric(binaryGeneric3, serializer)); + } + + @Test + public void testDateAndTimeAsGenericObject() { + BinaryRow row = new BinaryRow(7); + BinaryRowWriter writer = new BinaryRowWriter(row); + + LocalDate localDate = LocalDate.of(2019, 7, 16); + LocalTime localTime = LocalTime.of(17, 31); + LocalDateTime localDateTime = LocalDateTime.of(localDate, localTime); + + writer.writeInt(0, 0); + writer.writeGeneric(1, new BinaryGeneric<>(new Date(123), SqlDateSerializer.INSTANCE)); + writer.writeGeneric(2, new BinaryGeneric<>(new Time(456), SqlTimeSerializer.INSTANCE)); + writer.writeGeneric(3, new BinaryGeneric<>(new Timestamp(789), SqlTimestampSerializer.INSTANCE)); + writer.writeGeneric(4, new BinaryGeneric<>(localDate, LocalDateSerializer.INSTANCE)); + writer.writeGeneric(5, new BinaryGeneric<>(localTime, LocalTimeSerializer.INSTANCE)); + writer.writeGeneric(6, new BinaryGeneric<>(localDateTime, LocalDateTimeSerializer.INSTANCE)); + writer.complete(); + + assertEquals(new Date(123), BinaryGeneric.getJavaObjectFromBinaryGeneric( + row.getGeneric(1), SqlDateSerializer.INSTANCE)); + assertEquals(new Time(456), BinaryGeneric.getJavaObjectFromBinaryGeneric( + row.getGeneric(2), SqlTimeSerializer.INSTANCE)); + assertEquals(new Timestamp(789), BinaryGeneric.getJavaObjectFromBinaryGeneric( + row.getGeneric(3), SqlTimestampSerializer.INSTANCE)); + assertEquals(localDate, BinaryGeneric.getJavaObjectFromBinaryGeneric( + row.getGeneric(4), LocalDateSerializer.INSTANCE)); + assertEquals(localTime, BinaryGeneric.getJavaObjectFromBinaryGeneric( + row.getGeneric(5), LocalTimeSerializer.INSTANCE)); + assertEquals(localDateTime, BinaryGeneric.getJavaObjectFromBinaryGeneric( + row.getGeneric(6), LocalDateTimeSerializer.INSTANCE)); + } + + @Test + public void testSerializeVariousSize() throws IOException { + // in this test, we are going to start serializing from the i-th byte (i in 0...`segSize`) + // and the size of the row we're going to serialize is j bytes + // (j in `rowFixLength` to the maximum length we can write) + + int segSize = 64; + int segTotalNumber = 3; + + BinaryRow row = new BinaryRow(1); + BinaryRowWriter writer = new BinaryRowWriter(row); + Random random = new Random(); + byte[] bytes = new byte[1024]; + random.nextBytes(bytes); + writer.writeBinary(0, bytes); + writer.complete(); + + MemorySegment[] memorySegments = new MemorySegment[segTotalNumber]; + Map<MemorySegment, Integer> msIndex = new HashMap<>(); + for (int i = 0; i < segTotalNumber; i++) { + memorySegments[i] = MemorySegmentFactory.wrap(new byte[segSize]); + msIndex.put(memorySegments[i], i); + } + + BinaryRowSerializer serializer = new BinaryRowSerializer(1); + + int rowSizeInt = 4; + // note that as there is only one field in the row, the fixed-length part is 16 bytes (header + 1 field) + int rowFixLength = 16; + for (int i = 0; i < segSize; i++) { + // this is the maximum row size we can serialize + // if we are going to serialize from the i-th byte of the input view + int maxRowSize = (segSize * segTotalNumber) - i - rowSizeInt; + if (segSize - i < rowFixLength + rowSizeInt) { + // oops, we can't write the whole fixed-length part in the first segment + // because the remaining space is too small, so we have to start serializing from the second segment. + // when serializing, we need to first write the length of the row, + // then write the fixed-length part of the row. + maxRowSize -= segSize - i; + } + for (int j = rowFixLength; j < maxRowSize; j++) { + // ok, now we're going to serialize a row of j bytes + testSerialize(row, memorySegments, msIndex, serializer, i, j); + } + } + } + + private void testSerialize( + BinaryRow row, MemorySegment[] memorySegments, + Map<MemorySegment, Integer> msIndex, BinaryRowSerializer serializer, int position, + int rowSize) throws IOException { + RandomAccessOutputView out = new RandomAccessOutputView(memorySegments, 64); + out.skipBytesToWrite(position); + row.setTotalSize(rowSize); + + // this `row` contains random bytes, and now we're going to serialize `rowSize` bytes + // (not including the row header) of the contents + serializer.serializeToPages(row, out); + + // let's see how many segments we have written + int segNumber = msIndex.get(out.getCurrentSegment()) + 1; + int lastSegSize = out.getCurrentPositionInSegment(); + + // now deserialize from the written segments + ArrayList<MemorySegment> segments = new ArrayList<>(Arrays.asList(memorySegments).subList(0, segNumber)); + RandomAccessInputView input = new RandomAccessInputView(segments, 64, lastSegSize); + input.skipBytesToRead(position); + BinaryRow mapRow = serializer.mapFromPages(input); + + assertEquals(row, mapRow); + } + + @Test + public void testZeroOutPaddingGeneric() { + + GenericTypeInfo<MyObj> info = new GenericTypeInfo<>(MyObj.class); + TypeSerializer<MyObj> genericSerializer = info.createSerializer(new ExecutionConfig()); + + Random random = new Random(); + byte[] bytes = new byte[1024]; + + BinaryRow row = new BinaryRow(1); + BinaryRowWriter writer = new BinaryRowWriter(row); + + // let's random the bytes + writer.reset(); + random.nextBytes(bytes); + writer.writeBinary(0, bytes); + writer.reset(); + writer.writeGeneric(0, new BinaryGeneric<>(new MyObj(0, 1), genericSerializer)); + writer.complete(); + int hash1 = row.hashCode(); + + writer.reset(); + random.nextBytes(bytes); + writer.writeBinary(0, bytes); + writer.reset(); + writer.writeGeneric(0, new BinaryGeneric<>(new MyObj(0, 1), genericSerializer)); + writer.complete(); + int hash2 = row.hashCode(); + + assertEquals(hash1, hash2); + } + + @Test + public void testZeroOutPaddingString() { + + Random random = new Random(); + byte[] bytes = new byte[1024]; + + BinaryRow row = new BinaryRow(1); + BinaryRowWriter writer = new BinaryRowWriter(row); + + writer.reset(); + random.nextBytes(bytes); + writer.writeBinary(0, bytes); + writer.reset(); + writer.writeString(0, BinaryString.fromString("wahahah")); + writer.complete(); + int hash1 = row.hashCode(); + + writer.reset(); + random.nextBytes(bytes); + writer.writeBinary(0, bytes); + writer.reset(); + writer.writeString(0, BinaryString.fromString("wahahah")); + writer.complete(); + int hash2 = row.hashCode(); + + assertEquals(hash1, hash2); + } + + @Test + public void testHashAndCopy() throws IOException { + MemorySegment[] segments = new MemorySegment[3]; + for (int i = 0; i < 3; i++) { + segments[i] = MemorySegmentFactory.wrap(new byte[64]); + } + RandomAccessOutputView out = new RandomAccessOutputView(segments, 64); + BinaryRowSerializer serializer = new BinaryRowSerializer(2); + + BinaryRow row = new BinaryRow(2); + BinaryRowWriter writer = new BinaryRowWriter(row); + writer.writeString(0, BinaryString.fromString("hahahahahahahahahahahahahahahahahahahhahahahahahahahahah")); + writer.writeString(1, BinaryString.fromString("hahahahahahahahahahahahahahahahahahahhahahahahahahahahaa")); + writer.complete(); + serializer.serializeToPages(row, out); + + ArrayList<MemorySegment> segmentList = new ArrayList<>(Arrays.asList(segments)); + RandomAccessInputView input = new RandomAccessInputView(segmentList, 64, 64); + + BinaryRow mapRow = serializer.mapFromPages(input); + assertEquals(row, mapRow); + assertEquals(row.getString(0), mapRow.getString(0)); + assertEquals(row.getString(1), mapRow.getString(1)); + assertNotEquals(row.getString(0), mapRow.getString(1)); + + // test if the hash code before and after serialization are the same + assertEquals(row.hashCode(), mapRow.hashCode()); + assertEquals(row.getString(0).hashCode(), mapRow.getString(0).hashCode()); + assertEquals(row.getString(1).hashCode(), mapRow.getString(1).hashCode()); + + // test if the copy method produce a row with the same contents + assertEquals(row.copy(), mapRow.copy()); + assertEquals(row.getString(0).copy(), mapRow.getString(0).copy()); + assertEquals(row.getString(1).copy(), mapRow.getString(1).copy()); + } + + @Test + public void testSerStringToKryo() throws IOException { + KryoSerializer<BinaryString> serializer = new KryoSerializer<>( + BinaryString.class, new ExecutionConfig()); + + BinaryString string = BinaryString.fromString("hahahahaha"); + RandomAccessOutputView out = new RandomAccessOutputView( + new MemorySegment[]{MemorySegmentFactory.wrap(new byte[1024])}, 64); + serializer.serialize(string, out); + + RandomAccessInputView input = new RandomAccessInputView( + new ArrayList<>(Collections.singletonList(out.getCurrentSegment())), 64, 64); + BinaryString newStr = serializer.deserialize(input); + + assertEquals(string, newStr); + } + + @Test + public void testSerializerPages() throws IOException { + // Boundary tests + BinaryRow row24 = DataFormatTestUtil.get24BytesBinaryRow(); + BinaryRow row160 = DataFormatTestUtil.get160BytesBinaryRow(); + testSerializerPagesInternal(row24, row160); + testSerializerPagesInternal(row24, DataFormatTestUtil.getMultiSeg160BytesBinaryRow(row160)); + } + + private void testSerializerPagesInternal(BinaryRow row24, BinaryRow row160) throws IOException { + BinaryRowSerializer serializer = new BinaryRowSerializer(2); + + // 1. test middle row with just on the edge1 + { + MemorySegment[] segments = new MemorySegment[4]; + for (int i = 0; i < segments.length; i++) { + segments[i] = MemorySegmentFactory.wrap(new byte[64]); + } + RandomAccessOutputView out = new RandomAccessOutputView(segments, segments[0].size()); + serializer.serializeToPages(row24, out); + serializer.serializeToPages(row160, out); + serializer.serializeToPages(row24, out); + + RandomAccessInputView in = new RandomAccessInputView( + new ArrayList<>(Arrays.asList(segments)), + segments[0].size(), + out.getCurrentPositionInSegment()); + + BinaryRow retRow = new BinaryRow(2); + List<BinaryRow> rets = new ArrayList<>(); + while (true) { + try { + retRow = serializer.mapFromPages(retRow, in); + } catch (EOFException e) { + break; + } + rets.add(retRow.copy()); + } + assertEquals(row24, rets.get(0)); + assertEquals(row160, rets.get(1)); + assertEquals(row24, rets.get(2)); + } + + // 2. test middle row with just on the edge2 + { + MemorySegment[] segments = new MemorySegment[7]; + for (int i = 0; i < segments.length; i++) { + segments[i] = MemorySegmentFactory.wrap(new byte[64]); + } + RandomAccessOutputView out = new RandomAccessOutputView(segments, segments[0].size()); + serializer.serializeToPages(row24, out); + serializer.serializeToPages(row160, out); + serializer.serializeToPages(row160, out); + + RandomAccessInputView in = new RandomAccessInputView( + new ArrayList<>(Arrays.asList(segments)), + segments[0].size(), + out.getCurrentPositionInSegment()); + + BinaryRow retRow = new BinaryRow(2); + List<BinaryRow> rets = new ArrayList<>(); + while (true) { + try { + retRow = serializer.mapFromPages(retRow, in); + } catch (EOFException e) { + break; + } + rets.add(retRow.copy()); + } + assertEquals(row24, rets.get(0)); + assertEquals(row160, rets.get(1)); + assertEquals(row160, rets.get(2)); + } + + // 3. test last row with just on the edge + { + MemorySegment[] segments = new MemorySegment[3]; + for (int i = 0; i < segments.length; i++) { + segments[i] = MemorySegmentFactory.wrap(new byte[64]); + } + RandomAccessOutputView out = new RandomAccessOutputView(segments, segments[0].size()); + serializer.serializeToPages(row24, out); + serializer.serializeToPages(row160, out); + + RandomAccessInputView in = new RandomAccessInputView( + new ArrayList<>(Arrays.asList(segments)), + segments[0].size(), + out.getCurrentPositionInSegment()); + + BinaryRow retRow = new BinaryRow(2); + List<BinaryRow> rets = new ArrayList<>(); + while (true) { + try { + retRow = serializer.mapFromPages(retRow, in); + } catch (EOFException e) { + break; + } + rets.add(retRow.copy()); + } + assertEquals(row24, rets.get(0)); + assertEquals(row160, rets.get(1)); + } + } } diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/DataFormatTestUtil.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/DataFormatTestUtil.java index d89fa5b..11d3f11 100644 --- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/DataFormatTestUtil.java +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/DataFormatTestUtil.java @@ -20,10 +20,76 @@ package org.apache.flink.table.dataformat; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.commons.lang3.RandomStringUtils; + +import static org.junit.Assert.assertEquals; + /** * Utils for testing data formats. */ -class DataFormatTestUtil { +public class DataFormatTestUtil { + + /** + * Get a binary row of 24 bytes long. + */ + public static BinaryRow get24BytesBinaryRow() { + // header (8 bytes) + 2 * string in fixed-length part (8 bytes each) + BinaryRow row = new BinaryRow(2); + BinaryRowWriter writer = new BinaryRowWriter(row); + writer.writeString(0, BinaryString.fromString(RandomStringUtils.randomNumeric(2))); + writer.writeString(1, BinaryString.fromString(RandomStringUtils.randomNumeric(2))); + writer.complete(); + return row; + } + + /** + * Get a binary row of 160 bytes long. + */ + public static BinaryRow get160BytesBinaryRow() { + // header (8 bytes) + + // 72 byte length string (8 bytes in fixed-length, 72 bytes in variable-length) + + // 64 byte length string (8 bytes in fixed-length, 64 bytes in variable-length) + BinaryRow row = new BinaryRow(2); + BinaryRowWriter writer = new BinaryRowWriter(row); + writer.writeString(0, BinaryString.fromString(RandomStringUtils.randomNumeric(72))); + writer.writeString(1, BinaryString.fromString(RandomStringUtils.randomNumeric(64))); + writer.complete(); + return row; + } + + /** + * Get a binary row consisting of 6 segments. + * The bytes of the returned row is the same with the given input binary row. + */ + public static BinaryRow getMultiSeg160BytesBinaryRow(BinaryRow row160) { + BinaryRow multiSegRow160 = new BinaryRow(2); + MemorySegment[] segments = new MemorySegment[6]; + int baseOffset = 8; + int posInSeg = baseOffset; + int remainSize = 160; + for (int i = 0; i < segments.length; i++) { + segments[i] = MemorySegmentFactory.wrap(new byte[32]); + int copy = Math.min(32 - posInSeg, remainSize); + row160.getSegments()[0].copyTo(160 - remainSize, segments[i], posInSeg, copy); + remainSize -= copy; + posInSeg = 0; + } + multiSegRow160.pointTo(segments, baseOffset, 160); + assertEquals(row160, multiSegRow160); + return multiSegRow160; + } + + /** + * Get a binary row consisting of 2 segments. + * Its first segment is the same with the given input binary row, while its second segment is empty. + */ + public static BinaryRow getMultiSeg160BytesInOneSegRow(BinaryRow row160) { + MemorySegment[] segments = new MemorySegment[2]; + segments[0] = row160.getSegments()[0]; + segments[1] = MemorySegmentFactory.wrap(new byte[row160.getSegments()[0].size()]); + row160.pointTo(segments, 0, row160.getSizeInBytes()); + return row160; + } /** * Split the given byte array into two memory segments. diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/DecimalTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/DecimalTest.java index e19299c..e66351a 100644 --- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/DecimalTest.java +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/DecimalTest.java @@ -114,4 +114,12 @@ public class DecimalTest { Assert.assertEquals(0, Decimal.zero(20, 2).toBigDecimal().intValue()); Assert.assertEquals(0, Decimal.zero(20, 2).toBigDecimal().intValue()); } + + @Test + public void testToString() { + String val = "0.0000000000000000001"; + Assert.assertEquals(val, Decimal.castFrom(val, 39, val.length() - 2).toString()); + val = "123456789012345678901234567890123456789"; + Assert.assertEquals(val, Decimal.castFrom(val, 39, 0).toString()); + } } diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/util/SegmentsUtilTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/util/SegmentsUtilTest.java index c1808bc..a8355f1 100644 --- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/util/SegmentsUtilTest.java +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/util/SegmentsUtilTest.java @@ -19,11 +19,20 @@ package org.apache.flink.table.util; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.table.dataformat.BinaryRow; import org.apache.flink.table.dataformat.BinaryRowTest; +import org.apache.flink.table.dataformat.DataFormatTestUtil; +import org.apache.flink.table.dataformat.util.BinaryRowUtil; import org.junit.Assert; import org.junit.Test; +import static org.apache.flink.table.dataformat.util.BinaryRowUtil.BYTE_ARRAY_BASE_OFFSET; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; + /** * Test for {@link SegmentsUtil}, most is covered by {@link BinaryRowTest}, * this just test some boundary scenarios testing. @@ -53,10 +62,142 @@ public class SegmentsUtilTest { segments2[0] = MemorySegmentFactory.wrap(new byte[]{6, 0, 2, 5}); segments2[1] = MemorySegmentFactory.wrap(new byte[]{6, 12, 15, 18}); - Assert.assertTrue(SegmentsUtil.equalsMultiSegments(segments1, 0, segments2, 0, 0)); - Assert.assertTrue(SegmentsUtil.equals(segments1, 0, segments2, 1, 3)); - Assert.assertTrue(SegmentsUtil.equals(segments1, 0, segments2, 1, 6)); - Assert.assertFalse(SegmentsUtil.equals(segments1, 0, segments2, 1, 7)); + assertTrue(SegmentsUtil.equalsMultiSegments(segments1, 0, segments2, 0, 0)); + assertTrue(SegmentsUtil.equals(segments1, 0, segments2, 1, 3)); + assertTrue(SegmentsUtil.equals(segments1, 0, segments2, 1, 6)); + assertFalse(SegmentsUtil.equals(segments1, 0, segments2, 1, 7)); + } + + @Test + public void testBoundaryByteArrayEquals() { + byte[] bytes1 = new byte[5]; + bytes1[3] = 81; + byte[] bytes2 = new byte[100]; + bytes2[3] = 81; + bytes2[4] = 81; + + assertTrue(BinaryRowUtil.byteArrayEquals(bytes1, bytes2, 4)); + assertFalse(BinaryRowUtil.byteArrayEquals(bytes1, bytes2, 5)); + assertTrue(BinaryRowUtil.byteArrayEquals(bytes1, bytes2, 0)); + } + + @Test + public void testBoundaryEquals() { + BinaryRow row24 = DataFormatTestUtil.get24BytesBinaryRow(); + BinaryRow row160 = DataFormatTestUtil.get160BytesBinaryRow(); + BinaryRow varRow160 = DataFormatTestUtil.getMultiSeg160BytesBinaryRow(row160); + BinaryRow varRow160InOne = DataFormatTestUtil.getMultiSeg160BytesInOneSegRow(row160); + + assertEquals(row160, varRow160InOne); + assertEquals(varRow160, varRow160InOne); + assertEquals(row160, varRow160); + assertEquals(varRow160InOne, varRow160); + + assertNotEquals(row24, row160); + assertNotEquals(row24, varRow160); + assertNotEquals(row24, varRow160InOne); + + assertTrue(SegmentsUtil.equals(row24.getSegments(), 0, row160.getSegments(), 0, 0)); + assertTrue(SegmentsUtil.equals(row24.getSegments(), 0, varRow160.getSegments(), 0, 0)); + + // test var segs + MemorySegment[] segments1 = new MemorySegment[2]; + segments1[0] = MemorySegmentFactory.wrap(new byte[32]); + segments1[1] = MemorySegmentFactory.wrap(new byte[32]); + MemorySegment[] segments2 = new MemorySegment[3]; + segments2[0] = MemorySegmentFactory.wrap(new byte[16]); + segments2[1] = MemorySegmentFactory.wrap(new byte[16]); + segments2[2] = MemorySegmentFactory.wrap(new byte[16]); + + segments1[0].put(9, (byte) 1); + assertFalse(SegmentsUtil.equals(segments1, 0, segments2, 14, 14)); + segments2[1].put(7, (byte) 1); + assertTrue(SegmentsUtil.equals(segments1, 0, segments2, 14, 14)); + assertTrue(SegmentsUtil.equals(segments1, 2, segments2, 16, 14)); + assertTrue(SegmentsUtil.equals(segments1, 2, segments2, 16, 16)); + + segments2[2].put(7, (byte) 1); + assertTrue(SegmentsUtil.equals(segments1, 2, segments2, 32, 14)); + } + + @Test + public void testBoundaryCopy() { + MemorySegment[] segments1 = new MemorySegment[2]; + segments1[0] = MemorySegmentFactory.wrap(new byte[32]); + segments1[1] = MemorySegmentFactory.wrap(new byte[32]); + segments1[0].put(15, (byte) 5); + segments1[1].put(15, (byte) 6); + + { + byte[] bytes = new byte[64]; + MemorySegment[] segments2 = new MemorySegment[]{MemorySegmentFactory.wrap(bytes)}; + + SegmentsUtil.copyToBytes(segments1, 0, bytes, 0, 64); + assertTrue(SegmentsUtil.equals(segments1, 0, segments2, 0, 64)); + } + + { + byte[] bytes = new byte[64]; + MemorySegment[] segments2 = new MemorySegment[]{MemorySegmentFactory.wrap(bytes)}; + + SegmentsUtil.copyToBytes(segments1, 32, bytes, 0, 14); + assertTrue(SegmentsUtil.equals(segments1, 32, segments2, 0, 14)); + } + + { + byte[] bytes = new byte[64]; + MemorySegment[] segments2 = new MemorySegment[]{MemorySegmentFactory.wrap(bytes)}; + + SegmentsUtil.copyToBytes(segments1, 34, bytes, 0, 14); + assertTrue(SegmentsUtil.equals(segments1, 34, segments2, 0, 14)); + } + } + + @Test + public void testCopyToUnsafe() { + MemorySegment[] segments1 = new MemorySegment[2]; + segments1[0] = MemorySegmentFactory.wrap(new byte[32]); + segments1[1] = MemorySegmentFactory.wrap(new byte[32]); + segments1[0].put(15, (byte) 5); + segments1[1].put(15, (byte) 6); + + { + byte[] bytes = new byte[64]; + MemorySegment[] segments2 = new MemorySegment[]{MemorySegmentFactory.wrap(bytes)}; + + SegmentsUtil.copyToUnsafe(segments1, 0, bytes, BYTE_ARRAY_BASE_OFFSET, 64); + assertTrue(SegmentsUtil.equals(segments1, 0, segments2, 0, 64)); + } + + { + byte[] bytes = new byte[64]; + MemorySegment[] segments2 = new MemorySegment[]{MemorySegmentFactory.wrap(bytes)}; + + SegmentsUtil.copyToUnsafe(segments1, 32, bytes, BYTE_ARRAY_BASE_OFFSET, 14); + assertTrue(SegmentsUtil.equals(segments1, 32, segments2, 0, 14)); + } + + { + byte[] bytes = new byte[64]; + MemorySegment[] segments2 = new MemorySegment[]{MemorySegmentFactory.wrap(bytes)}; + + SegmentsUtil.copyToUnsafe(segments1, 34, bytes, BYTE_ARRAY_BASE_OFFSET, 14); + assertTrue(SegmentsUtil.equals(segments1, 34, segments2, 0, 14)); + } + } + + @Test + public void testFind() { + MemorySegment[] segments1 = new MemorySegment[2]; + segments1[0] = MemorySegmentFactory.wrap(new byte[32]); + segments1[1] = MemorySegmentFactory.wrap(new byte[32]); + MemorySegment[] segments2 = new MemorySegment[3]; + segments2[0] = MemorySegmentFactory.wrap(new byte[16]); + segments2[1] = MemorySegmentFactory.wrap(new byte[16]); + segments2[2] = MemorySegmentFactory.wrap(new byte[16]); + + assertEquals(34, SegmentsUtil.find(segments1, 34, 0, segments2, 0, 0)); + assertEquals(-1, SegmentsUtil.find(segments1, 34, 0, segments2, 0, 15)); } }
