This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git
commit b62e22c40ea49936981ad3c8ccd0a2564bc88d86 Author: TsReaper <[email protected]> AuthorDate: Thu Jul 18 15:52:55 2019 +0800 [FLINK-13304][table-runtime-blink] Fix implementation of getString and getBinary method in NestedRow This closes #9154 --- .../apache/flink/table/dataformat/NestedRow.java | 12 +- .../flink/table/dataformat/DataFormatTestUtil.java | 73 ++++++++++ .../flink/table/dataformat/NestedRowTest.java | 162 +++++++++++++++++++++ 3 files changed, 243 insertions(+), 4 deletions(-) diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/NestedRow.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/NestedRow.java index 1ba5592..36fcf44 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/NestedRow.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/NestedRow.java @@ -25,8 +25,12 @@ import static org.apache.flink.table.dataformat.BinaryRow.calculateBitSetWidthIn import static org.apache.flink.util.Preconditions.checkArgument; /** - * Its memory storage structure and {@link BinaryRow} exactly the same, the only different is it supports - * all bytes in variable MemorySegments. + * Its memory storage structure is exactly the same with {@link BinaryRow}. + * The only different is that, as {@link NestedRow} is used + * to store row value in the variable-length part of {@link BinaryRow}, + * every field (including both fixed-length part and variable-length part) of {@link NestedRow} + * has a possibility to cross the boundary of a segment, while the fixed-length part of {@link BinaryRow} + * must fit into its first memory segment. */ public final class NestedRow extends BinaryFormat implements BaseRow { @@ -219,7 +223,7 @@ public final class NestedRow extends BinaryFormat implements BaseRow { public BinaryString getString(int pos) { assertIndexIsValid(pos); int fieldOffset = getFieldOffset(pos); - final long offsetAndLen = segments[0].getLong(fieldOffset); + final long offsetAndLen = SegmentsUtil.getLong(segments, fieldOffset); return BinaryString.readBinaryStringFieldFromSegments(segments, offset, fieldOffset, offsetAndLen); } @@ -247,7 +251,7 @@ public final class NestedRow extends BinaryFormat implements BaseRow { public byte[] getBinary(int pos) { assertIndexIsValid(pos); int fieldOffset = getFieldOffset(pos); - final long offsetAndLen = segments[0].getLong(fieldOffset); + final long offsetAndLen = SegmentsUtil.getLong(segments, fieldOffset); return readBinaryFieldFromSegments(segments, offset, fieldOffset, offsetAndLen); } diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/DataFormatTestUtil.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/DataFormatTestUtil.java new file mode 100644 index 0000000..d89fa5b --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/DataFormatTestUtil.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataformat; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; + +/** + * Utils for testing data formats. + */ +class DataFormatTestUtil { + + /** + * Split the given byte array into two memory segments. + */ + static MemorySegment[] splitBytes(byte[] bytes, int baseOffset) { + int newSize = (bytes.length + 1) / 2 + baseOffset; + MemorySegment[] ret = new MemorySegment[2]; + ret[0] = MemorySegmentFactory.wrap(new byte[newSize]); + ret[1] = MemorySegmentFactory.wrap(new byte[newSize]); + + ret[0].put(baseOffset, bytes, 0, newSize - baseOffset); + ret[1].put(0, bytes, newSize - baseOffset, bytes.length - (newSize - baseOffset)); + return ret; + } + + /** + * A simple class for testing generic type getting / setting on data formats. + */ + static class MyObj { + public int i; + public double j; + + MyObj(int i, double j) { + this.i = i; + this.j = j; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + MyObj myObj = (MyObj) o; + + return i == myObj.i && Double.compare(myObj.j, j) == 0; + } + + @Override + public String toString() { + return "MyObj{" + "i=" + i + ", j=" + j + '}'; + } + } +} diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/NestedRowTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/NestedRowTest.java new file mode 100644 index 0000000..78a6ba0 --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/NestedRowTest.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataformat; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.typeutils.BaseRowSerializer; + +import org.junit.Test; + +import static org.apache.flink.table.dataformat.DataFormatTestUtil.MyObj; +import static org.apache.flink.table.dataformat.DataFormatTestUtil.splitBytes; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Test for {@link NestedRow}s. + */ +public class NestedRowTest { + + @Test + public void testNestedRowWithOneSegment() { + BinaryRow row = getBinaryRow(); + GenericTypeInfo<MyObj> info = new GenericTypeInfo<>(MyObj.class); + TypeSerializer<MyObj> genericSerializer = info.createSerializer(new ExecutionConfig()); + + BaseRow nestedRow = row.getRow(0, 5); + assertEquals(nestedRow.getInt(0), 1); + assertEquals(nestedRow.getLong(1), 5L); + assertEquals(nestedRow.getString(2), BinaryString.fromString("12345678")); + assertTrue(nestedRow.isNullAt(3)); + assertEquals(new MyObj(15, 5), + BinaryGeneric.getJavaObjectFromBinaryGeneric(nestedRow.getGeneric(4), genericSerializer)); + } + + @Test + public void testNestedRowWithMultipleSegments() { + BinaryRow row = getBinaryRow(); + GenericTypeInfo<MyObj> info = new GenericTypeInfo<>(MyObj.class); + TypeSerializer<MyObj> genericSerializer = info.createSerializer(new ExecutionConfig()); + + MemorySegment[] segments = splitBytes(row.getSegments()[0].getHeapMemory(), 3); + row.pointTo(segments, 3, row.getSizeInBytes()); + { + BaseRow nestedRow = row.getRow(0, 5); + assertEquals(nestedRow.getInt(0), 1); + assertEquals(nestedRow.getLong(1), 5L); + assertEquals(nestedRow.getString(2), BinaryString.fromString("12345678")); + assertTrue(nestedRow.isNullAt(3)); + assertEquals(new MyObj(15, 5), + BinaryGeneric.getJavaObjectFromBinaryGeneric(nestedRow.getGeneric(4), genericSerializer)); + } + } + + @Test + public void testNestInNestedRow() { + // layer1 + GenericRow gRow = new GenericRow(4); + gRow.setField(0, 1); + gRow.setField(1, 5L); + gRow.setField(2, BinaryString.fromString("12345678")); + gRow.setField(3, null); + + // layer2 + BaseRowSerializer serializer = new BaseRowSerializer( + new LogicalType[]{ + DataTypes.INT().getLogicalType(), + DataTypes.BIGINT().getLogicalType(), + DataTypes.STRING().getLogicalType(), + DataTypes.STRING().getLogicalType() + }, + new TypeSerializer[]{ + IntSerializer.INSTANCE, + LongSerializer.INSTANCE, + StringSerializer.INSTANCE, + StringSerializer.INSTANCE + }); + BinaryRow row = new BinaryRow(2); + BinaryRowWriter writer = new BinaryRowWriter(row); + writer.writeString(0, BinaryString.fromString("hahahahafff")); + writer.writeRow(1, gRow, serializer); + writer.complete(); + + // layer3 + BinaryRow row2 = new BinaryRow(1); + BinaryRowWriter writer2 = new BinaryRowWriter(row2); + writer2.writeRow(0, row, null); + writer2.complete(); + + // verify + { + NestedRow nestedRow = (NestedRow) row2.getRow(0, 2); + BinaryRow binaryRow = new BinaryRow(2); + binaryRow.pointTo(nestedRow.getSegments(), nestedRow.getOffset(), + nestedRow.getSizeInBytes()); + assertEquals(binaryRow, row); + } + + assertEquals(row2.getRow(0, 2).getString(0), BinaryString.fromString("hahahahafff")); + BaseRow nestedRow = row2.getRow(0, 2).getRow(1, 4); + assertEquals(nestedRow.getInt(0), 1); + assertEquals(nestedRow.getLong(1), 5L); + assertEquals(nestedRow.getString(2), BinaryString.fromString("12345678")); + assertTrue(nestedRow.isNullAt(3)); + } + + private BinaryRow getBinaryRow() { + BinaryRow row = new BinaryRow(1); + BinaryRowWriter writer = new BinaryRowWriter(row); + + GenericTypeInfo<MyObj> info = new GenericTypeInfo<>(MyObj.class); + TypeSerializer<MyObj> genericSerializer = info.createSerializer(new ExecutionConfig()); + GenericRow gRow = new GenericRow(5); + gRow.setField(0, 1); + gRow.setField(1, 5L); + gRow.setField(2, BinaryString.fromString("12345678")); + gRow.setField(3, null); + gRow.setField(4, new BinaryGeneric<>(new MyObj(15, 5), genericSerializer)); + + BaseRowSerializer serializer = new BaseRowSerializer( + new LogicalType[]{ + DataTypes.INT().getLogicalType(), + DataTypes.BIGINT().getLogicalType(), + DataTypes.STRING().getLogicalType(), + DataTypes.STRING().getLogicalType(), + DataTypes.ANY(info).getLogicalType() + }, + new TypeSerializer[]{ + IntSerializer.INSTANCE, + LongSerializer.INSTANCE, + StringSerializer.INSTANCE, + StringSerializer.INSTANCE, + genericSerializer + }); + writer.writeRow(0, gRow, serializer); + writer.complete(); + + return row; + } +}
