This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2f4ff4e775b13cac48ee3a369d4eff2368a49de7
Author: TsReaper <[email protected]>
AuthorDate: Thu Jul 18 19:55:48 2019 +0800

    [FLINK-13323][table-runtime-blink] Add tests for complex data formats
---
 .../flink/table/dataformat/BinaryRowTest.java      | 539 ++++++++++++++++++++-
 .../flink/table/dataformat/DataFormatTestUtil.java |  68 ++-
 .../apache/flink/table/dataformat/DecimalTest.java |   8 +
 .../apache/flink/table/util/SegmentsUtilTest.java  | 149 +++++-
 4 files changed, 751 insertions(+), 13 deletions(-)

diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryRowTest.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryRowTest.java
index b4a6387..6351e78 100644
--- 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryRowTest.java
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryRowTest.java
@@ -18,31 +18,60 @@
 
 package org.apache.flink.table.dataformat;
 
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LocalDateSerializer;
+import org.apache.flink.api.common.typeutils.base.LocalDateTimeSerializer;
+import org.apache.flink.api.common.typeutils.base.LocalTimeSerializer;
+import org.apache.flink.api.common.typeutils.base.SqlDateSerializer;
+import org.apache.flink.api.common.typeutils.base.SqlTimeSerializer;
+import org.apache.flink.api.common.typeutils.base.SqlTimestampSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.disk.RandomAccessInputView;
 import org.apache.flink.runtime.io.disk.RandomAccessOutputView;
+import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.typeutils.BaseArraySerializer;
+import org.apache.flink.table.typeutils.BaseMapSerializer;
 import org.apache.flink.table.typeutils.BaseRowSerializer;
 import org.apache.flink.table.typeutils.BinaryRowSerializer;
 
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.EOFException;
 import java.io.IOException;
 import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 
 import static org.apache.flink.table.dataformat.BinaryString.fromBytes;
 import static org.apache.flink.table.dataformat.BinaryString.fromString;
+import static org.apache.flink.table.dataformat.DataFormatTestUtil.MyObj;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -61,7 +90,7 @@ public class BinaryRowTest {
                MemorySegment segment = MemorySegmentFactory.wrap(new 
byte[100]);
                BinaryRow row = new BinaryRow(2);
                row.pointTo(segment, 10, 48);
-               assertTrue(row.getSegments()[0] == segment);
+               assertSame(row.getSegments()[0], segment);
                row.setInt(0, 5);
                row.setDouble(1, 5.8D);
        }
@@ -85,7 +114,7 @@ public class BinaryRowTest {
                assertTrue(row.isNullAt(0));
                assertEquals(55, row.getShort(5));
                assertEquals(22, row.getLong(2));
-               assertEquals(true, row.getBoolean(4));
+               assertTrue(row.getBoolean(4));
                assertEquals((byte) 66, row.getByte(6));
                assertEquals(77f, row.getFloat(7), 0);
        }
@@ -131,7 +160,7 @@ public class BinaryRowTest {
        }
 
        @Test
-       public void testWriteString() throws IOException {
+       public void testWriteString() {
                {
                        // litter byte[]
                        BinaryRow row = new BinaryRow(1);
@@ -242,7 +271,7 @@ public class BinaryRowTest {
                assertEquals((byte) 99, row.getByte(2));
                assertEquals(87.1d, row.getDouble(6), 0);
                assertEquals(26.1f, row.getFloat(7), 0);
-               assertEquals(true, row.getBoolean(1));
+               assertTrue(row.getBoolean(1));
                assertEquals("1234567", row.getString(3).toString());
                assertEquals("12345678", row.getString(5).toString());
                assertEquals("啦啦啦啦啦我是快乐的粉刷匠", row.getString(9).toString());
@@ -269,7 +298,7 @@ public class BinaryRowTest {
        }
 
        @Test
-       public void anyNullTest() throws IOException {
+       public void anyNullTest() {
                {
                        BinaryRow row = new BinaryRow(3);
                        BinaryRowWriter writer = new BinaryRowWriter(row);
@@ -302,7 +331,7 @@ public class BinaryRowTest {
        }
 
        @Test
-       public void testSingleSegmentBinaryRowHashCode() throws IOException {
+       public void testSingleSegmentBinaryRowHashCode() {
                final Random rnd = new Random(System.currentTimeMillis());
                // test hash stabilization
                BinaryRow row = new BinaryRow(13);
@@ -347,7 +376,7 @@ public class BinaryRowTest {
        }
 
        @Test
-       public void testHeaderSize() throws IOException {
+       public void testHeaderSize() {
                assertEquals(8, BinaryRow.calculateBitSetWidthInBytes(56));
                assertEquals(16, BinaryRow.calculateBitSetWidthInBytes(57));
                assertEquals(16, BinaryRow.calculateBitSetWidthInBytes(120));
@@ -355,7 +384,7 @@ public class BinaryRowTest {
        }
 
        @Test
-       public void testHeader() throws IOException {
+       public void testHeader() {
                BinaryRow row = new BinaryRow(2);
                BinaryRowWriter writer = new BinaryRowWriter(row);
 
@@ -456,4 +485,498 @@ public class BinaryRowTest {
                Assert.assertArrayEquals(bytes1, row.getBinary(0));
                Assert.assertArrayEquals(bytes2, row.getBinary(1));
        }
+
+       @Test
+       public void testBinaryArray() {
+               // 1. array test
+               BinaryArray array = new BinaryArray();
+               BinaryArrayWriter arrayWriter = new BinaryArrayWriter(
+                       array, 3, 
BinaryArray.calculateFixLengthPartSize(DataTypes.INT().getLogicalType()));
+
+               arrayWriter.writeInt(0, 6);
+               arrayWriter.setNullInt(1);
+               arrayWriter.writeInt(2, 666);
+               arrayWriter.complete();
+
+               assertEquals(array.getInt(0), 6);
+               assertTrue(array.isNullAt(1));
+               assertEquals(array.getInt(2), 666);
+
+               // 2. test write array to binary row
+               BinaryRow row = new BinaryRow(1);
+               BinaryRowWriter rowWriter = new BinaryRowWriter(row);
+               BaseArraySerializer serializer = new BaseArraySerializer(
+                       DataTypes.INT().getLogicalType(), new 
ExecutionConfig());
+               rowWriter.writeArray(0, array, serializer);
+               rowWriter.complete();
+
+               BinaryArray array2 = (BinaryArray) row.getArray(0);
+               assertEquals(array, array2);
+               assertEquals(6, array2.getInt(0));
+               assertTrue(array2.isNullAt(1));
+               assertEquals(666, array2.getInt(2));
+       }
+
+       @Test
+       public void testGenericArray() {
+               // 1. array test
+               Integer[] javaArray = {6, null, 666};
+               GenericArray array = new GenericArray(javaArray, 3, false);
+
+               assertEquals(array.getInt(0), 6);
+               assertTrue(array.isNullAt(1));
+               assertEquals(array.getInt(2), 666);
+
+               // 2. test write array to binary row
+               BinaryRow row2 = new BinaryRow(1);
+               BinaryRowWriter writer2 = new BinaryRowWriter(row2);
+               BaseArraySerializer serializer = new BaseArraySerializer(
+                       DataTypes.INT().getLogicalType(), new 
ExecutionConfig());
+               writer2.writeArray(0, array, serializer);
+               writer2.complete();
+
+               BaseArray array2 = row2.getArray(0);
+               assertEquals(6, array2.getInt(0));
+               assertTrue(array2.isNullAt(1));
+               assertEquals(666, array2.getInt(2));
+       }
+
+       @Test
+       public void testBinaryMap() {
+               BinaryArray array1 = new BinaryArray();
+               BinaryArrayWriter writer1 = new BinaryArrayWriter(
+                       array1, 4, 
BinaryArray.calculateFixLengthPartSize(DataTypes.INT().getLogicalType()));
+               writer1.writeInt(0, 6);
+               writer1.writeInt(1, 5);
+               writer1.writeInt(2, 666);
+               writer1.writeInt(3, 0);
+               writer1.complete();
+
+               BinaryArray array2 = new BinaryArray();
+               BinaryArrayWriter writer2 = new BinaryArrayWriter(
+                       array2, 4, 
BinaryArray.calculateFixLengthPartSize(DataTypes.STRING().getLogicalType()));
+               writer2.writeString(0, BinaryString.fromString("6"));
+               writer2.writeString(1, BinaryString.fromString("5"));
+               writer2.writeString(2, BinaryString.fromString("666"));
+               writer2.setNullAt(3, DataTypes.STRING().getLogicalType());
+               writer2.complete();
+
+               BinaryMap binaryMap = BinaryMap.valueOf(array1, array2);
+
+               BinaryRow row = new BinaryRow(1);
+               BinaryRowWriter rowWriter = new BinaryRowWriter(row);
+               BaseMapSerializer serializer = new BaseMapSerializer(
+                       DataTypes.STRING().getLogicalType(),
+                       DataTypes.INT().getLogicalType(),
+                       new ExecutionConfig());
+               rowWriter.writeMap(0, binaryMap, serializer);
+               rowWriter.complete();
+
+               BinaryMap map = (BinaryMap) row.getMap(0);
+               BinaryArray key = map.keyArray();
+               BinaryArray value = map.valueArray();
+
+               assertEquals(binaryMap, map);
+               assertEquals(array1, key);
+               assertEquals(array2, value);
+
+               assertEquals(5, key.getInt(1));
+               assertEquals(BinaryString.fromString("5"), value.getString(1));
+               assertEquals(0, key.getInt(3));
+               assertTrue(value.isNullAt(3));
+       }
+
+       @Test
+       public void testGenericMap() {
+               Map javaMap = new HashMap();
+               javaMap.put(6, BinaryString.fromString("6"));
+               javaMap.put(5, BinaryString.fromString("5"));
+               javaMap.put(666, BinaryString.fromString("666"));
+               javaMap.put(0, null);
+
+               GenericMap genericMap = new GenericMap(javaMap);
+
+               BinaryRow row = new BinaryRow(1);
+               BinaryRowWriter rowWriter = new BinaryRowWriter(row);
+               BaseMapSerializer serializer = new BaseMapSerializer(
+                       DataTypes.INT().getLogicalType(),
+                       DataTypes.STRING().getLogicalType(),
+                       new ExecutionConfig());
+               rowWriter.writeMap(0, genericMap, serializer);
+               rowWriter.complete();
+
+               Map map = 
row.getMap(0).toJavaMap(DataTypes.INT().getLogicalType(), 
DataTypes.STRING().getLogicalType());
+               assertEquals(BinaryString.fromString("6"), map.get(6));
+               assertEquals(BinaryString.fromString("5"), map.get(5));
+               assertEquals(BinaryString.fromString("666"), map.get(666));
+               assertTrue(map.containsKey(0));
+               assertNull(map.get(0));
+       }
+
+       @Test
+       public void testGenericObject() throws Exception {
+
+               GenericTypeInfo<MyObj> info = new 
GenericTypeInfo<>(MyObj.class);
+               TypeSerializer<MyObj> genericSerializer = 
info.createSerializer(new ExecutionConfig());
+
+               BinaryRow row = new BinaryRow(4);
+               BinaryRowWriter writer = new BinaryRowWriter(row);
+               writer.writeInt(0, 0);
+
+               BinaryGeneric<MyObj> myObj1 = new BinaryGeneric<>(new MyObj(0, 
1), genericSerializer);
+               writer.writeGeneric(1, myObj1);
+               BinaryGeneric<MyObj> myObj2 = new BinaryGeneric<>(new 
MyObj(123, 5.0), genericSerializer);
+               myObj2.ensureMaterialized();
+               writer.writeGeneric(2, myObj2);
+               BinaryGeneric<MyObj> myObj3 = new BinaryGeneric<>(new MyObj(1, 
1), genericSerializer);
+               writer.writeGeneric(3, myObj3);
+               writer.complete();
+
+               assertTestGenericObjectRow(row, genericSerializer);
+
+               // getBytes from var-length memorySegments.
+               BinaryRowSerializer serializer = new BinaryRowSerializer(4);
+               MemorySegment[] memorySegments = new MemorySegment[3];
+               ArrayList<MemorySegment> memorySegmentList = new ArrayList<>();
+               for (int i = 0; i < 3; i++) {
+                       memorySegments[i] = MemorySegmentFactory.wrap(new 
byte[64]);
+                       memorySegmentList.add(memorySegments[i]);
+               }
+               RandomAccessOutputView out = new 
RandomAccessOutputView(memorySegments, 64);
+               serializer.serializeToPages(row, out);
+
+               BinaryRow mapRow = serializer.mapFromPages(new 
RandomAccessInputView(memorySegmentList, 64));
+               assertTestGenericObjectRow(mapRow, genericSerializer);
+       }
+
+       private void assertTestGenericObjectRow(BinaryRow row, 
TypeSerializer<MyObj> serializer) {
+               assertEquals(0, row.getInt(0));
+               BinaryGeneric<MyObj> binaryGeneric1 = row.getGeneric(1);
+               BinaryGeneric<MyObj> binaryGeneric2 = row.getGeneric(2);
+               BinaryGeneric<MyObj> binaryGeneric3 = row.getGeneric(3);
+               assertEquals(new MyObj(0, 1), 
BinaryGeneric.getJavaObjectFromBinaryGeneric(binaryGeneric1, serializer));
+               assertEquals(new MyObj(123, 5.0), 
BinaryGeneric.getJavaObjectFromBinaryGeneric(binaryGeneric2, serializer));
+               assertEquals(new MyObj(1, 1), 
BinaryGeneric.getJavaObjectFromBinaryGeneric(binaryGeneric3, serializer));
+       }
+
+       @Test
+       public void testDateAndTimeAsGenericObject() {
+               BinaryRow row = new BinaryRow(7);
+               BinaryRowWriter writer = new BinaryRowWriter(row);
+
+               LocalDate localDate = LocalDate.of(2019, 7, 16);
+               LocalTime localTime = LocalTime.of(17, 31);
+               LocalDateTime localDateTime = LocalDateTime.of(localDate, 
localTime);
+
+               writer.writeInt(0, 0);
+               writer.writeGeneric(1, new BinaryGeneric<>(new Date(123), 
SqlDateSerializer.INSTANCE));
+               writer.writeGeneric(2, new BinaryGeneric<>(new Time(456), 
SqlTimeSerializer.INSTANCE));
+               writer.writeGeneric(3, new BinaryGeneric<>(new Timestamp(789), 
SqlTimestampSerializer.INSTANCE));
+               writer.writeGeneric(4, new BinaryGeneric<>(localDate, 
LocalDateSerializer.INSTANCE));
+               writer.writeGeneric(5, new BinaryGeneric<>(localTime, 
LocalTimeSerializer.INSTANCE));
+               writer.writeGeneric(6, new BinaryGeneric<>(localDateTime, 
LocalDateTimeSerializer.INSTANCE));
+               writer.complete();
+
+               assertEquals(new Date(123), 
BinaryGeneric.getJavaObjectFromBinaryGeneric(
+                       row.getGeneric(1), SqlDateSerializer.INSTANCE));
+               assertEquals(new Time(456), 
BinaryGeneric.getJavaObjectFromBinaryGeneric(
+                       row.getGeneric(2), SqlTimeSerializer.INSTANCE));
+               assertEquals(new Timestamp(789), 
BinaryGeneric.getJavaObjectFromBinaryGeneric(
+                       row.getGeneric(3), SqlTimestampSerializer.INSTANCE));
+               assertEquals(localDate, 
BinaryGeneric.getJavaObjectFromBinaryGeneric(
+                       row.getGeneric(4), LocalDateSerializer.INSTANCE));
+               assertEquals(localTime, 
BinaryGeneric.getJavaObjectFromBinaryGeneric(
+                       row.getGeneric(5), LocalTimeSerializer.INSTANCE));
+               assertEquals(localDateTime, 
BinaryGeneric.getJavaObjectFromBinaryGeneric(
+                       row.getGeneric(6), LocalDateTimeSerializer.INSTANCE));
+       }
+
+       @Test
+       public void testSerializeVariousSize() throws IOException {
+               // in this test, we are going to start serializing from the 
i-th byte (i in 0...`segSize`)
+               // and the size of the row we're going to serialize is j bytes
+               // (j in `rowFixLength` to the maximum length we can write)
+
+               int segSize = 64;
+               int segTotalNumber = 3;
+
+               BinaryRow row = new BinaryRow(1);
+               BinaryRowWriter writer = new BinaryRowWriter(row);
+               Random random = new Random();
+               byte[] bytes = new byte[1024];
+               random.nextBytes(bytes);
+               writer.writeBinary(0, bytes);
+               writer.complete();
+
+               MemorySegment[] memorySegments = new 
MemorySegment[segTotalNumber];
+               Map<MemorySegment, Integer> msIndex = new HashMap<>();
+               for (int i = 0; i < segTotalNumber; i++) {
+                       memorySegments[i] = MemorySegmentFactory.wrap(new 
byte[segSize]);
+                       msIndex.put(memorySegments[i], i);
+               }
+
+               BinaryRowSerializer serializer = new BinaryRowSerializer(1);
+
+               int rowSizeInt = 4;
+               // note that as there is only one field in the row, the 
fixed-length part is 16 bytes (header + 1 field)
+               int rowFixLength = 16;
+               for (int i = 0; i < segSize; i++) {
+                       // this is the maximum row size we can serialize
+                       // if we are going to serialize from the i-th byte of 
the input view
+                       int maxRowSize = (segSize * segTotalNumber) - i - 
rowSizeInt;
+                       if (segSize - i < rowFixLength + rowSizeInt) {
+                               // oops, we can't write the whole fixed-length 
part in the first segment
+                               // because the remaining space is too small, so 
we have to start serializing from the second segment.
+                               // when serializing, we need to first write the 
length of the row,
+                               // then write the fixed-length part of the row.
+                               maxRowSize -= segSize - i;
+                       }
+                       for (int j = rowFixLength; j < maxRowSize; j++) {
+                               // ok, now we're going to serialize a row of j 
bytes
+                               testSerialize(row, memorySegments, msIndex, 
serializer, i, j);
+                       }
+               }
+       }
+
+       private void testSerialize(
+               BinaryRow row, MemorySegment[] memorySegments,
+               Map<MemorySegment, Integer> msIndex, BinaryRowSerializer 
serializer, int position,
+               int rowSize) throws IOException {
+               RandomAccessOutputView out = new 
RandomAccessOutputView(memorySegments, 64);
+               out.skipBytesToWrite(position);
+               row.setTotalSize(rowSize);
+
+               // this `row` contains random bytes, and now we're going to 
serialize `rowSize` bytes
+               // (not including the row header) of the contents
+               serializer.serializeToPages(row, out);
+
+               // let's see how many segments we have written
+               int segNumber = msIndex.get(out.getCurrentSegment()) + 1;
+               int lastSegSize = out.getCurrentPositionInSegment();
+
+               // now deserialize from the written segments
+               ArrayList<MemorySegment> segments = new 
ArrayList<>(Arrays.asList(memorySegments).subList(0, segNumber));
+               RandomAccessInputView input = new 
RandomAccessInputView(segments, 64, lastSegSize);
+               input.skipBytesToRead(position);
+               BinaryRow mapRow = serializer.mapFromPages(input);
+
+               assertEquals(row, mapRow);
+       }
+
+       @Test
+       public void testZeroOutPaddingGeneric() {
+
+               GenericTypeInfo<MyObj> info = new 
GenericTypeInfo<>(MyObj.class);
+               TypeSerializer<MyObj> genericSerializer = 
info.createSerializer(new ExecutionConfig());
+
+               Random random = new Random();
+               byte[] bytes = new byte[1024];
+
+               BinaryRow row = new BinaryRow(1);
+               BinaryRowWriter writer = new BinaryRowWriter(row);
+
+               // let's random the bytes
+               writer.reset();
+               random.nextBytes(bytes);
+               writer.writeBinary(0, bytes);
+               writer.reset();
+               writer.writeGeneric(0, new BinaryGeneric<>(new MyObj(0, 1), 
genericSerializer));
+               writer.complete();
+               int hash1 = row.hashCode();
+
+               writer.reset();
+               random.nextBytes(bytes);
+               writer.writeBinary(0, bytes);
+               writer.reset();
+               writer.writeGeneric(0, new BinaryGeneric<>(new MyObj(0, 1), 
genericSerializer));
+               writer.complete();
+               int hash2 = row.hashCode();
+
+               assertEquals(hash1, hash2);
+       }
+
+       @Test
+       public void testZeroOutPaddingString() {
+
+               Random random = new Random();
+               byte[] bytes = new byte[1024];
+
+               BinaryRow row = new BinaryRow(1);
+               BinaryRowWriter writer = new BinaryRowWriter(row);
+
+               writer.reset();
+               random.nextBytes(bytes);
+               writer.writeBinary(0, bytes);
+               writer.reset();
+               writer.writeString(0, BinaryString.fromString("wahahah"));
+               writer.complete();
+               int hash1 = row.hashCode();
+
+               writer.reset();
+               random.nextBytes(bytes);
+               writer.writeBinary(0, bytes);
+               writer.reset();
+               writer.writeString(0, BinaryString.fromString("wahahah"));
+               writer.complete();
+               int hash2 = row.hashCode();
+
+               assertEquals(hash1, hash2);
+       }
+
+       @Test
+       public void testHashAndCopy() throws IOException {
+               MemorySegment[] segments = new MemorySegment[3];
+               for (int i = 0; i < 3; i++) {
+                       segments[i] = MemorySegmentFactory.wrap(new byte[64]);
+               }
+               RandomAccessOutputView out = new 
RandomAccessOutputView(segments, 64);
+               BinaryRowSerializer serializer = new BinaryRowSerializer(2);
+
+               BinaryRow row = new BinaryRow(2);
+               BinaryRowWriter writer = new BinaryRowWriter(row);
+               writer.writeString(0, 
BinaryString.fromString("hahahahahahahahahahahahahahahahahahahhahahahahahahahahah"));
+               writer.writeString(1, 
BinaryString.fromString("hahahahahahahahahahahahahahahahahahahhahahahahahahahahaa"));
+               writer.complete();
+               serializer.serializeToPages(row, out);
+
+               ArrayList<MemorySegment> segmentList = new 
ArrayList<>(Arrays.asList(segments));
+               RandomAccessInputView input = new 
RandomAccessInputView(segmentList, 64, 64);
+
+               BinaryRow mapRow = serializer.mapFromPages(input);
+               assertEquals(row, mapRow);
+               assertEquals(row.getString(0), mapRow.getString(0));
+               assertEquals(row.getString(1), mapRow.getString(1));
+               assertNotEquals(row.getString(0), mapRow.getString(1));
+
+               // test if the hash code before and after serialization are the 
same
+               assertEquals(row.hashCode(), mapRow.hashCode());
+               assertEquals(row.getString(0).hashCode(), 
mapRow.getString(0).hashCode());
+               assertEquals(row.getString(1).hashCode(), 
mapRow.getString(1).hashCode());
+
+               // test if the copy method produce a row with the same contents
+               assertEquals(row.copy(), mapRow.copy());
+               assertEquals(row.getString(0).copy(), 
mapRow.getString(0).copy());
+               assertEquals(row.getString(1).copy(), 
mapRow.getString(1).copy());
+       }
+
+       @Test
+       public void testSerStringToKryo() throws IOException {
+               KryoSerializer<BinaryString> serializer = new KryoSerializer<>(
+                       BinaryString.class, new ExecutionConfig());
+
+               BinaryString string = BinaryString.fromString("hahahahaha");
+               RandomAccessOutputView out = new RandomAccessOutputView(
+                       new MemorySegment[]{MemorySegmentFactory.wrap(new 
byte[1024])}, 64);
+               serializer.serialize(string, out);
+
+               RandomAccessInputView input = new RandomAccessInputView(
+                       new 
ArrayList<>(Collections.singletonList(out.getCurrentSegment())), 64, 64);
+               BinaryString newStr = serializer.deserialize(input);
+
+               assertEquals(string, newStr);
+       }
+
+       @Test
+       public void testSerializerPages() throws IOException {
+               // Boundary tests
+               BinaryRow row24 = DataFormatTestUtil.get24BytesBinaryRow();
+               BinaryRow row160 = DataFormatTestUtil.get160BytesBinaryRow();
+               testSerializerPagesInternal(row24, row160);
+               testSerializerPagesInternal(row24, 
DataFormatTestUtil.getMultiSeg160BytesBinaryRow(row160));
+       }
+
+       private void testSerializerPagesInternal(BinaryRow row24, BinaryRow 
row160) throws IOException {
+               BinaryRowSerializer serializer = new BinaryRowSerializer(2);
+
+               // 1. test middle row with just on the edge1
+               {
+                       MemorySegment[] segments = new MemorySegment[4];
+                       for (int i = 0; i < segments.length; i++) {
+                               segments[i] = MemorySegmentFactory.wrap(new 
byte[64]);
+                       }
+                       RandomAccessOutputView out = new 
RandomAccessOutputView(segments, segments[0].size());
+                       serializer.serializeToPages(row24, out);
+                       serializer.serializeToPages(row160, out);
+                       serializer.serializeToPages(row24, out);
+
+                       RandomAccessInputView in = new RandomAccessInputView(
+                               new ArrayList<>(Arrays.asList(segments)),
+                               segments[0].size(),
+                               out.getCurrentPositionInSegment());
+
+                       BinaryRow retRow = new BinaryRow(2);
+                       List<BinaryRow> rets = new ArrayList<>();
+                       while (true) {
+                               try {
+                                       retRow = 
serializer.mapFromPages(retRow, in);
+                               } catch (EOFException e) {
+                                       break;
+                               }
+                               rets.add(retRow.copy());
+                       }
+                       assertEquals(row24, rets.get(0));
+                       assertEquals(row160, rets.get(1));
+                       assertEquals(row24, rets.get(2));
+               }
+
+               // 2. test middle row with just on the edge2
+               {
+                       MemorySegment[] segments = new MemorySegment[7];
+                       for (int i = 0; i < segments.length; i++) {
+                               segments[i] = MemorySegmentFactory.wrap(new 
byte[64]);
+                       }
+                       RandomAccessOutputView out = new 
RandomAccessOutputView(segments, segments[0].size());
+                       serializer.serializeToPages(row24, out);
+                       serializer.serializeToPages(row160, out);
+                       serializer.serializeToPages(row160, out);
+
+                       RandomAccessInputView in = new RandomAccessInputView(
+                               new ArrayList<>(Arrays.asList(segments)),
+                               segments[0].size(),
+                               out.getCurrentPositionInSegment());
+
+                       BinaryRow retRow = new BinaryRow(2);
+                       List<BinaryRow> rets = new ArrayList<>();
+                       while (true) {
+                               try {
+                                       retRow = 
serializer.mapFromPages(retRow, in);
+                               } catch (EOFException e) {
+                                       break;
+                               }
+                               rets.add(retRow.copy());
+                       }
+                       assertEquals(row24, rets.get(0));
+                       assertEquals(row160, rets.get(1));
+                       assertEquals(row160, rets.get(2));
+               }
+
+               // 3. test last row with just on the edge
+               {
+                       MemorySegment[] segments = new MemorySegment[3];
+                       for (int i = 0; i < segments.length; i++) {
+                               segments[i] = MemorySegmentFactory.wrap(new 
byte[64]);
+                       }
+                       RandomAccessOutputView out = new 
RandomAccessOutputView(segments, segments[0].size());
+                       serializer.serializeToPages(row24, out);
+                       serializer.serializeToPages(row160, out);
+
+                       RandomAccessInputView in = new RandomAccessInputView(
+                               new ArrayList<>(Arrays.asList(segments)),
+                               segments[0].size(),
+                               out.getCurrentPositionInSegment());
+
+                       BinaryRow retRow = new BinaryRow(2);
+                       List<BinaryRow> rets = new ArrayList<>();
+                       while (true) {
+                               try {
+                                       retRow = 
serializer.mapFromPages(retRow, in);
+                               } catch (EOFException e) {
+                                       break;
+                               }
+                               rets.add(retRow.copy());
+                       }
+                       assertEquals(row24, rets.get(0));
+                       assertEquals(row160, rets.get(1));
+               }
+       }
 }
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/DataFormatTestUtil.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/DataFormatTestUtil.java
index d89fa5b..11d3f11 100644
--- 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/DataFormatTestUtil.java
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/DataFormatTestUtil.java
@@ -20,10 +20,76 @@ package org.apache.flink.table.dataformat;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 
+import org.apache.commons.lang3.RandomStringUtils;
+
+import static org.junit.Assert.assertEquals;
+
 /**
  * Utils for testing data formats.
  */
-class DataFormatTestUtil {
+public class DataFormatTestUtil {
+
+       /**
+        * Get a binary row of 24 bytes long.
+        */
+       public static BinaryRow get24BytesBinaryRow() {
+               // header (8 bytes) + 2 * string in fixed-length part (8 bytes 
each)
+               BinaryRow row = new BinaryRow(2);
+               BinaryRowWriter writer = new BinaryRowWriter(row);
+               writer.writeString(0, 
BinaryString.fromString(RandomStringUtils.randomNumeric(2)));
+               writer.writeString(1, 
BinaryString.fromString(RandomStringUtils.randomNumeric(2)));
+               writer.complete();
+               return row;
+       }
+
+       /**
+        * Get a binary row of 160 bytes long.
+        */
+       public static BinaryRow get160BytesBinaryRow() {
+               // header (8 bytes) +
+               // 72 byte length string (8 bytes in fixed-length, 72 bytes in 
variable-length) +
+               // 64 byte length string (8 bytes in fixed-length, 64 bytes in 
variable-length)
+               BinaryRow row = new BinaryRow(2);
+               BinaryRowWriter writer = new BinaryRowWriter(row);
+               writer.writeString(0, 
BinaryString.fromString(RandomStringUtils.randomNumeric(72)));
+               writer.writeString(1, 
BinaryString.fromString(RandomStringUtils.randomNumeric(64)));
+               writer.complete();
+               return row;
+       }
+
+       /**
+        * Get a binary row consisting of 6 segments.
+        * The bytes of the returned row is the same with the given input 
binary row.
+        */
+       public static BinaryRow getMultiSeg160BytesBinaryRow(BinaryRow row160) {
+               BinaryRow multiSegRow160 = new BinaryRow(2);
+               MemorySegment[] segments = new MemorySegment[6];
+               int baseOffset = 8;
+               int posInSeg = baseOffset;
+               int remainSize = 160;
+               for (int i = 0; i < segments.length; i++) {
+                       segments[i] = MemorySegmentFactory.wrap(new byte[32]);
+                       int copy = Math.min(32 - posInSeg, remainSize);
+                       row160.getSegments()[0].copyTo(160 - remainSize, 
segments[i], posInSeg, copy);
+                       remainSize -= copy;
+                       posInSeg = 0;
+               }
+               multiSegRow160.pointTo(segments, baseOffset, 160);
+               assertEquals(row160, multiSegRow160);
+               return multiSegRow160;
+       }
+
+       /**
+        * Get a binary row consisting of 2 segments.
+        * Its first segment is the same with the given input binary row, while 
its second segment is empty.
+        */
+       public static BinaryRow getMultiSeg160BytesInOneSegRow(BinaryRow 
row160) {
+               MemorySegment[] segments = new MemorySegment[2];
+               segments[0] = row160.getSegments()[0];
+               segments[1] = MemorySegmentFactory.wrap(new 
byte[row160.getSegments()[0].size()]);
+               row160.pointTo(segments, 0, row160.getSizeInBytes());
+               return row160;
+       }
 
        /**
         * Split the given byte array into two memory segments.
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/DecimalTest.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/DecimalTest.java
index e19299c..e66351a 100644
--- 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/DecimalTest.java
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/DecimalTest.java
@@ -114,4 +114,12 @@ public class DecimalTest {
                Assert.assertEquals(0, Decimal.zero(20, 
2).toBigDecimal().intValue());
                Assert.assertEquals(0, Decimal.zero(20, 
2).toBigDecimal().intValue());
        }
+
+       @Test
+       public void testToString() {
+               String val = "0.0000000000000000001";
+               Assert.assertEquals(val, Decimal.castFrom(val, 39, val.length() 
- 2).toString());
+               val = "123456789012345678901234567890123456789";
+               Assert.assertEquals(val, Decimal.castFrom(val, 39, 
0).toString());
+       }
 }
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/util/SegmentsUtilTest.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/util/SegmentsUtilTest.java
index c1808bc..a8355f1 100644
--- 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/util/SegmentsUtilTest.java
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/util/SegmentsUtilTest.java
@@ -19,11 +19,20 @@ package org.apache.flink.table.util;
 
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.table.dataformat.BinaryRow;
 import org.apache.flink.table.dataformat.BinaryRowTest;
+import org.apache.flink.table.dataformat.DataFormatTestUtil;
+import org.apache.flink.table.dataformat.util.BinaryRowUtil;
 
 import org.junit.Assert;
 import org.junit.Test;
 
+import static 
org.apache.flink.table.dataformat.util.BinaryRowUtil.BYTE_ARRAY_BASE_OFFSET;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
 /**
  * Test for {@link SegmentsUtil}, most is covered by {@link BinaryRowTest},
  * this just test some boundary scenarios testing.
@@ -53,10 +62,142 @@ public class SegmentsUtilTest {
                segments2[0] = MemorySegmentFactory.wrap(new byte[]{6, 0, 2, 
5});
                segments2[1] = MemorySegmentFactory.wrap(new byte[]{6, 12, 15, 
18});
 
-               Assert.assertTrue(SegmentsUtil.equalsMultiSegments(segments1, 
0, segments2, 0, 0));
-               Assert.assertTrue(SegmentsUtil.equals(segments1, 0, segments2, 
1, 3));
-               Assert.assertTrue(SegmentsUtil.equals(segments1, 0, segments2, 
1, 6));
-               Assert.assertFalse(SegmentsUtil.equals(segments1, 0, segments2, 
1, 7));
+               assertTrue(SegmentsUtil.equalsMultiSegments(segments1, 0, 
segments2, 0, 0));
+               assertTrue(SegmentsUtil.equals(segments1, 0, segments2, 1, 3));
+               assertTrue(SegmentsUtil.equals(segments1, 0, segments2, 1, 6));
+               assertFalse(SegmentsUtil.equals(segments1, 0, segments2, 1, 7));
+       }
+
+       @Test
+       public void testBoundaryByteArrayEquals() {
+               byte[] bytes1 = new byte[5];
+               bytes1[3] = 81;
+               byte[] bytes2 = new byte[100];
+               bytes2[3] = 81;
+               bytes2[4] = 81;
+
+               assertTrue(BinaryRowUtil.byteArrayEquals(bytes1, bytes2, 4));
+               assertFalse(BinaryRowUtil.byteArrayEquals(bytes1, bytes2, 5));
+               assertTrue(BinaryRowUtil.byteArrayEquals(bytes1, bytes2, 0));
+       }
+
+       @Test
+       public void testBoundaryEquals() {
+               BinaryRow row24 = DataFormatTestUtil.get24BytesBinaryRow();
+               BinaryRow row160 = DataFormatTestUtil.get160BytesBinaryRow();
+               BinaryRow varRow160 = 
DataFormatTestUtil.getMultiSeg160BytesBinaryRow(row160);
+               BinaryRow varRow160InOne = 
DataFormatTestUtil.getMultiSeg160BytesInOneSegRow(row160);
+
+               assertEquals(row160, varRow160InOne);
+               assertEquals(varRow160, varRow160InOne);
+               assertEquals(row160, varRow160);
+               assertEquals(varRow160InOne, varRow160);
+
+               assertNotEquals(row24, row160);
+               assertNotEquals(row24, varRow160);
+               assertNotEquals(row24, varRow160InOne);
+
+               assertTrue(SegmentsUtil.equals(row24.getSegments(), 0, 
row160.getSegments(), 0, 0));
+               assertTrue(SegmentsUtil.equals(row24.getSegments(), 0, 
varRow160.getSegments(), 0, 0));
+
+               // test var segs
+               MemorySegment[] segments1 = new MemorySegment[2];
+               segments1[0] = MemorySegmentFactory.wrap(new byte[32]);
+               segments1[1] = MemorySegmentFactory.wrap(new byte[32]);
+               MemorySegment[] segments2 = new MemorySegment[3];
+               segments2[0] = MemorySegmentFactory.wrap(new byte[16]);
+               segments2[1] = MemorySegmentFactory.wrap(new byte[16]);
+               segments2[2] = MemorySegmentFactory.wrap(new byte[16]);
+
+               segments1[0].put(9, (byte) 1);
+               assertFalse(SegmentsUtil.equals(segments1, 0, segments2, 14, 
14));
+               segments2[1].put(7, (byte) 1);
+               assertTrue(SegmentsUtil.equals(segments1, 0, segments2, 14, 
14));
+               assertTrue(SegmentsUtil.equals(segments1, 2, segments2, 16, 
14));
+               assertTrue(SegmentsUtil.equals(segments1, 2, segments2, 16, 
16));
+
+               segments2[2].put(7, (byte) 1);
+               assertTrue(SegmentsUtil.equals(segments1, 2, segments2, 32, 
14));
+       }
+
+       @Test
+       public void testBoundaryCopy() {
+               MemorySegment[] segments1 = new MemorySegment[2];
+               segments1[0] = MemorySegmentFactory.wrap(new byte[32]);
+               segments1[1] = MemorySegmentFactory.wrap(new byte[32]);
+               segments1[0].put(15, (byte) 5);
+               segments1[1].put(15, (byte) 6);
+
+               {
+                       byte[] bytes = new byte[64];
+                       MemorySegment[] segments2 = new 
MemorySegment[]{MemorySegmentFactory.wrap(bytes)};
+
+                       SegmentsUtil.copyToBytes(segments1, 0, bytes, 0, 64);
+                       assertTrue(SegmentsUtil.equals(segments1, 0, segments2, 
0, 64));
+               }
+
+               {
+                       byte[] bytes = new byte[64];
+                       MemorySegment[] segments2 = new 
MemorySegment[]{MemorySegmentFactory.wrap(bytes)};
+
+                       SegmentsUtil.copyToBytes(segments1, 32, bytes, 0, 14);
+                       assertTrue(SegmentsUtil.equals(segments1, 32, 
segments2, 0, 14));
+               }
+
+               {
+                       byte[] bytes = new byte[64];
+                       MemorySegment[] segments2 = new 
MemorySegment[]{MemorySegmentFactory.wrap(bytes)};
+
+                       SegmentsUtil.copyToBytes(segments1, 34, bytes, 0, 14);
+                       assertTrue(SegmentsUtil.equals(segments1, 34, 
segments2, 0, 14));
+               }
+       }
+
+       @Test
+       public void testCopyToUnsafe() {
+               MemorySegment[] segments1 = new MemorySegment[2];
+               segments1[0] = MemorySegmentFactory.wrap(new byte[32]);
+               segments1[1] = MemorySegmentFactory.wrap(new byte[32]);
+               segments1[0].put(15, (byte) 5);
+               segments1[1].put(15, (byte) 6);
+
+               {
+                       byte[] bytes = new byte[64];
+                       MemorySegment[] segments2 = new 
MemorySegment[]{MemorySegmentFactory.wrap(bytes)};
+
+                       SegmentsUtil.copyToUnsafe(segments1, 0, bytes, 
BYTE_ARRAY_BASE_OFFSET, 64);
+                       assertTrue(SegmentsUtil.equals(segments1, 0, segments2, 
0, 64));
+               }
+
+               {
+                       byte[] bytes = new byte[64];
+                       MemorySegment[] segments2 = new 
MemorySegment[]{MemorySegmentFactory.wrap(bytes)};
+
+                       SegmentsUtil.copyToUnsafe(segments1, 32, bytes, 
BYTE_ARRAY_BASE_OFFSET, 14);
+                       assertTrue(SegmentsUtil.equals(segments1, 32, 
segments2, 0, 14));
+               }
+
+               {
+                       byte[] bytes = new byte[64];
+                       MemorySegment[] segments2 = new 
MemorySegment[]{MemorySegmentFactory.wrap(bytes)};
+
+                       SegmentsUtil.copyToUnsafe(segments1, 34, bytes, 
BYTE_ARRAY_BASE_OFFSET, 14);
+                       assertTrue(SegmentsUtil.equals(segments1, 34, 
segments2, 0, 14));
+               }
+       }
+
+       @Test
+       public void testFind() {
+               MemorySegment[] segments1 = new MemorySegment[2];
+               segments1[0] = MemorySegmentFactory.wrap(new byte[32]);
+               segments1[1] = MemorySegmentFactory.wrap(new byte[32]);
+               MemorySegment[] segments2 = new MemorySegment[3];
+               segments2[0] = MemorySegmentFactory.wrap(new byte[16]);
+               segments2[1] = MemorySegmentFactory.wrap(new byte[16]);
+               segments2[2] = MemorySegmentFactory.wrap(new byte[16]);
+
+               assertEquals(34, SegmentsUtil.find(segments1, 34, 0, segments2, 
0, 0));
+               assertEquals(-1, SegmentsUtil.find(segments1, 34, 0, segments2, 
0, 15));
        }
 
 }

Reply via email to