This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b62e22c40ea49936981ad3c8ccd0a2564bc88d86
Author: TsReaper <[email protected]>
AuthorDate: Thu Jul 18 15:52:55 2019 +0800

    [FLINK-13304][table-runtime-blink] Fix implementation of getString and 
getBinary method in NestedRow
    
    This closes #9154
---
 .../apache/flink/table/dataformat/NestedRow.java   |  12 +-
 .../flink/table/dataformat/DataFormatTestUtil.java |  73 ++++++++++
 .../flink/table/dataformat/NestedRowTest.java      | 162 +++++++++++++++++++++
 3 files changed, 243 insertions(+), 4 deletions(-)

diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/NestedRow.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/NestedRow.java
index 1ba5592..36fcf44 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/NestedRow.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/NestedRow.java
@@ -25,8 +25,12 @@ import static 
org.apache.flink.table.dataformat.BinaryRow.calculateBitSetWidthIn
 import static org.apache.flink.util.Preconditions.checkArgument;
 
 /**
- * Its memory storage structure and {@link BinaryRow} exactly the same, the 
only different is it supports
- * all bytes in variable MemorySegments.
+ * Its memory storage structure is exactly the same with {@link BinaryRow}.
+ * The only different is that, as {@link NestedRow} is used
+ * to store row value in the variable-length part of {@link BinaryRow},
+ * every field (including both fixed-length part and variable-length part) of 
{@link NestedRow}
+ * has a possibility to cross the boundary of a segment, while the 
fixed-length part of {@link BinaryRow}
+ * must fit into its first memory segment.
  */
 public final class NestedRow extends BinaryFormat implements BaseRow {
 
@@ -219,7 +223,7 @@ public final class NestedRow extends BinaryFormat 
implements BaseRow {
        public BinaryString getString(int pos) {
                assertIndexIsValid(pos);
                int fieldOffset = getFieldOffset(pos);
-               final long offsetAndLen = segments[0].getLong(fieldOffset);
+               final long offsetAndLen = SegmentsUtil.getLong(segments, 
fieldOffset);
                return BinaryString.readBinaryStringFieldFromSegments(segments, 
offset, fieldOffset, offsetAndLen);
        }
 
@@ -247,7 +251,7 @@ public final class NestedRow extends BinaryFormat 
implements BaseRow {
        public byte[] getBinary(int pos) {
                assertIndexIsValid(pos);
                int fieldOffset = getFieldOffset(pos);
-               final long offsetAndLen = segments[0].getLong(fieldOffset);
+               final long offsetAndLen = SegmentsUtil.getLong(segments, 
fieldOffset);
                return readBinaryFieldFromSegments(segments, offset, 
fieldOffset, offsetAndLen);
        }
 
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/DataFormatTestUtil.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/DataFormatTestUtil.java
new file mode 100644
index 0000000..d89fa5b
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/DataFormatTestUtil.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.     See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.        You may obtain a copy of the License at
+ *
+ *             http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+
+/**
+ * Utils for testing data formats.
+ */
+class DataFormatTestUtil {
+
+       /**
+        * Split the given byte array into two memory segments.
+        */
+       static MemorySegment[] splitBytes(byte[] bytes, int baseOffset) {
+               int newSize = (bytes.length + 1) / 2 + baseOffset;
+               MemorySegment[] ret = new MemorySegment[2];
+               ret[0] = MemorySegmentFactory.wrap(new byte[newSize]);
+               ret[1] = MemorySegmentFactory.wrap(new byte[newSize]);
+
+               ret[0].put(baseOffset, bytes, 0, newSize - baseOffset);
+               ret[1].put(0, bytes, newSize - baseOffset, bytes.length - 
(newSize - baseOffset));
+               return ret;
+       }
+
+       /**
+        * A simple class for testing generic type getting / setting on data 
formats.
+        */
+       static class MyObj {
+               public int i;
+               public double j;
+
+               MyObj(int i, double j) {
+                       this.i = i;
+                       this.j = j;
+               }
+
+               @Override
+               public boolean equals(Object o) {
+                       if (this == o) {
+                               return true;
+                       }
+                       if (o == null || getClass() != o.getClass()) {
+                               return false;
+                       }
+
+                       MyObj myObj = (MyObj) o;
+
+                       return i == myObj.i && Double.compare(myObj.j, j) == 0;
+               }
+
+               @Override
+               public String toString() {
+                       return "MyObj{" + "i=" + i + ", j=" + j + '}';
+               }
+       }
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/NestedRowTest.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/NestedRowTest.java
new file mode 100644
index 0000000..78a6ba0
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/NestedRowTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.     See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.        You may obtain a copy of the License at
+ *
+ *             http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.typeutils.BaseRowSerializer;
+
+import org.junit.Test;
+
+import static org.apache.flink.table.dataformat.DataFormatTestUtil.MyObj;
+import static org.apache.flink.table.dataformat.DataFormatTestUtil.splitBytes;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for {@link NestedRow}s.
+ */
+public class NestedRowTest {
+
+       @Test
+       public void testNestedRowWithOneSegment() {
+               BinaryRow row = getBinaryRow();
+               GenericTypeInfo<MyObj> info = new 
GenericTypeInfo<>(MyObj.class);
+               TypeSerializer<MyObj> genericSerializer = 
info.createSerializer(new ExecutionConfig());
+
+               BaseRow nestedRow = row.getRow(0, 5);
+               assertEquals(nestedRow.getInt(0), 1);
+               assertEquals(nestedRow.getLong(1), 5L);
+               assertEquals(nestedRow.getString(2), 
BinaryString.fromString("12345678"));
+               assertTrue(nestedRow.isNullAt(3));
+               assertEquals(new MyObj(15, 5),
+                       
BinaryGeneric.getJavaObjectFromBinaryGeneric(nestedRow.getGeneric(4), 
genericSerializer));
+       }
+
+       @Test
+       public void testNestedRowWithMultipleSegments() {
+               BinaryRow row = getBinaryRow();
+               GenericTypeInfo<MyObj> info = new 
GenericTypeInfo<>(MyObj.class);
+               TypeSerializer<MyObj> genericSerializer = 
info.createSerializer(new ExecutionConfig());
+
+               MemorySegment[] segments = 
splitBytes(row.getSegments()[0].getHeapMemory(), 3);
+               row.pointTo(segments, 3, row.getSizeInBytes());
+               {
+                       BaseRow nestedRow = row.getRow(0, 5);
+                       assertEquals(nestedRow.getInt(0), 1);
+                       assertEquals(nestedRow.getLong(1), 5L);
+                       assertEquals(nestedRow.getString(2), 
BinaryString.fromString("12345678"));
+                       assertTrue(nestedRow.isNullAt(3));
+                       assertEquals(new MyObj(15, 5),
+                               
BinaryGeneric.getJavaObjectFromBinaryGeneric(nestedRow.getGeneric(4), 
genericSerializer));
+               }
+       }
+
+       @Test
+       public void testNestInNestedRow() {
+               // layer1
+               GenericRow gRow = new GenericRow(4);
+               gRow.setField(0, 1);
+               gRow.setField(1, 5L);
+               gRow.setField(2, BinaryString.fromString("12345678"));
+               gRow.setField(3, null);
+
+               // layer2
+               BaseRowSerializer serializer = new BaseRowSerializer(
+                       new LogicalType[]{
+                               DataTypes.INT().getLogicalType(),
+                               DataTypes.BIGINT().getLogicalType(),
+                               DataTypes.STRING().getLogicalType(),
+                               DataTypes.STRING().getLogicalType()
+                       },
+                       new TypeSerializer[]{
+                               IntSerializer.INSTANCE,
+                               LongSerializer.INSTANCE,
+                               StringSerializer.INSTANCE,
+                               StringSerializer.INSTANCE
+                       });
+               BinaryRow row = new BinaryRow(2);
+               BinaryRowWriter writer = new BinaryRowWriter(row);
+               writer.writeString(0, BinaryString.fromString("hahahahafff"));
+               writer.writeRow(1, gRow, serializer);
+               writer.complete();
+
+               // layer3
+               BinaryRow row2 = new BinaryRow(1);
+               BinaryRowWriter writer2 = new BinaryRowWriter(row2);
+               writer2.writeRow(0, row, null);
+               writer2.complete();
+
+               // verify
+               {
+                       NestedRow nestedRow = (NestedRow) row2.getRow(0, 2);
+                       BinaryRow binaryRow = new BinaryRow(2);
+                       binaryRow.pointTo(nestedRow.getSegments(), 
nestedRow.getOffset(),
+                               nestedRow.getSizeInBytes());
+                       assertEquals(binaryRow, row);
+               }
+
+               assertEquals(row2.getRow(0, 2).getString(0), 
BinaryString.fromString("hahahahafff"));
+               BaseRow nestedRow = row2.getRow(0, 2).getRow(1, 4);
+               assertEquals(nestedRow.getInt(0), 1);
+               assertEquals(nestedRow.getLong(1), 5L);
+               assertEquals(nestedRow.getString(2), 
BinaryString.fromString("12345678"));
+               assertTrue(nestedRow.isNullAt(3));
+       }
+
+       private BinaryRow getBinaryRow() {
+               BinaryRow row = new BinaryRow(1);
+               BinaryRowWriter writer = new BinaryRowWriter(row);
+
+               GenericTypeInfo<MyObj> info = new 
GenericTypeInfo<>(MyObj.class);
+               TypeSerializer<MyObj> genericSerializer = 
info.createSerializer(new ExecutionConfig());
+               GenericRow gRow = new GenericRow(5);
+               gRow.setField(0, 1);
+               gRow.setField(1, 5L);
+               gRow.setField(2, BinaryString.fromString("12345678"));
+               gRow.setField(3, null);
+               gRow.setField(4, new BinaryGeneric<>(new MyObj(15, 5), 
genericSerializer));
+
+               BaseRowSerializer serializer = new BaseRowSerializer(
+                       new LogicalType[]{
+                               DataTypes.INT().getLogicalType(),
+                               DataTypes.BIGINT().getLogicalType(),
+                               DataTypes.STRING().getLogicalType(),
+                               DataTypes.STRING().getLogicalType(),
+                               DataTypes.ANY(info).getLogicalType()
+                       },
+                       new TypeSerializer[]{
+                               IntSerializer.INSTANCE,
+                               LongSerializer.INSTANCE,
+                               StringSerializer.INSTANCE,
+                               StringSerializer.INSTANCE,
+                               genericSerializer
+                       });
+               writer.writeRow(0, gRow, serializer);
+               writer.complete();
+
+               return row;
+       }
+}

Reply via email to