This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch ci-2079 in repository https://gitbox.apache.org/repos/asf/fluss.git
commit 04053f3a09ea56bb42ebe67f16cbaa728b9e36bc Author: forwardxu <[email protected]> AuthorDate: Tue Dec 23 14:23:51 2025 +0800 fix failed cases --- fluss-client/pom.xml | 8 - .../scanner/log/DefaultCompletedFetchTest.java | 28 ++- .../java/org/apache/fluss/row/BinaryArrayTest.java | 32 +++ .../apache/fluss/row/TestInternalRowGenerator.java | 23 +- .../apache/fluss/row/aligned/AlignedRowTest.java | 76 ++++++ .../fluss/row/compacted/CompactedRowTest.java | 95 ++++++++ .../fluss/row/encode/AlignedRowEncoderTest.java | 252 ++++++++++++++++++++ .../fluss/row/indexed/IndexedRowReaderTest.java | 15 +- .../apache/fluss/row/indexed/IndexedRowTest.java | 152 +++++++++++- .../org/apache/fluss/testutils/DataTestUtils.java | 51 +++- fluss-filesystems/fluss-fs-obs/pom.xml | 2 - fluss-filesystems/fluss-fs-oss/pom.xml | 2 - fluss-flink/fluss-flink-1.18/pom.xml | 8 - fluss-flink/fluss-flink-1.19/pom.xml | 8 - fluss-flink/fluss-flink-1.20/pom.xml | 8 - fluss-flink/fluss-flink-2.1/pom.xml | 8 - .../fluss/flink/utils/FlinkArrayConverter.java | 262 --------------------- .../fluss/flink/utils/FlinkRowConverter.java | 213 ----------------- .../flink/utils/FlussRowToFlinkRowConverter.java | 42 +++- .../fluss/flink/sink/FlinkComplexTypeITCase.java | 26 ++ .../utils/FlinkRowToFlussRowConverterTest.java | 25 +- .../utils/FlussRowToFlinkRowConverterTest.java | 35 ++- .../iceberg/source/IcebergRecordAsFlussRow.java | 3 +- fluss-lake/fluss-lake-lance/pom.xml | 8 - fluss-server/pom.xml | 8 - pom.xml | 3 + 26 files changed, 827 insertions(+), 566 deletions(-) diff --git a/fluss-client/pom.xml b/fluss-client/pom.xml index 456febeec..481bf6ecd 100644 --- a/fluss-client/pom.xml +++ b/fluss-client/pom.xml @@ -113,14 +113,6 @@ <include>*:*</include> </includes> </artifactSet> - <filters> - <filter> - <artifact>*</artifact> - <excludes> - <exclude>LICENSE*</exclude> - </excludes> - </filter> - </filters> </configuration> </execution> </executions> diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java index 72739ef26..e5b7eeff7 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java @@ -227,21 +227,32 @@ public class DefaultCompletedFetchTest { new Object[] { 1, new String[] {"a", "b"}, - new Object[] {new int[] {1, 2}, new int[] {3, 4}} + new Object[] {new int[] {1, 2}, new int[] {3, 4}}, + new Object[] {10, new Object[] {20, "nested"}, "row1"} }, new Object[] { - 2, new String[] {"c", null}, new Object[] {null, new int[] {3, 4}} + 2, + new String[] {"c", null}, + new Object[] {null, new int[] {3, 4}}, + new Object[] {30, new Object[] {40, "test"}, "row2"} }, new Object[] { 3, new String[] {"e", "f"}, - new Object[] {new int[] {5, 6, 7}, new int[] {8}} + new Object[] {new int[] {5, 6, 7}, new int[] {8}}, + new Object[] {50, new Object[] {60, "value"}, "row3"} }); Schema schema = Schema.newBuilder() .column("a", DataTypes.INT()) .column("b", DataTypes.ARRAY(DataTypes.STRING())) .column("c", DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT()))) + .column( + "d", + DataTypes.ROW( + DataTypes.INT(), + DataTypes.ROW(DataTypes.INT(), DataTypes.STRING()), + DataTypes.STRING())) .build(); TableInfo tableInfo = TableInfo.of( @@ -299,6 +310,17 @@ public class DefaultCompletedFetchTest { .isEqualTo(Arrays.deepToString((Object[]) complexData.get(i)[1])); assertThat(row.getArray(2).toString()) .isEqualTo(Arrays.deepToString((Object[]) complexData.get(i)[2])); + InternalRow nestedRow = row.getRow(3, 3); + assertThat(nestedRow).isNotNull(); + assertThat(nestedRow.getInt(0)).isEqualTo(((Object[]) complexData.get(i)[3])[0]); + InternalRow deeplyNestedRow = nestedRow.getRow(1, 2); + assertThat(deeplyNestedRow).isNotNull(); + assertThat(deeplyNestedRow.getInt(0)) + .isEqualTo(((Object[]) ((Object[]) complexData.get(i)[3])[1])[0]); + assertThat(deeplyNestedRow.getString(1).toString()) + .isEqualTo(((Object[]) ((Object[]) complexData.get(i)[3])[1])[1]); + assertThat(nestedRow.getString(2).toString()) + .isEqualTo(((Object[]) complexData.get(i)[3])[2]); } } diff --git a/fluss-common/src/test/java/org/apache/fluss/row/BinaryArrayTest.java b/fluss-common/src/test/java/org/apache/fluss/row/BinaryArrayTest.java index 14230b62e..d3054ead6 100644 --- a/fluss-common/src/test/java/org/apache/fluss/row/BinaryArrayTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/row/BinaryArrayTest.java @@ -929,4 +929,36 @@ public class BinaryArrayTest { assertThat(BinaryArray.calculateFixLengthPartSize(DataTypes.CHAR(10))).isEqualTo(8); assertThat(BinaryArray.calculateFixLengthPartSize(DataTypes.BINARY(20))).isEqualTo(8); } + + @Test + public void testPrimitiveBinaryArrayGetRowThrowsException() { + PrimitiveBinaryArray array = new PrimitiveBinaryArray(); + BinaryArrayWriter writer = new BinaryArrayWriter(array, 3, 4); + writer.writeInt(0, 10); + writer.writeInt(1, 20); + writer.writeInt(2, 30); + writer.complete(); + + assertThatThrownBy(() -> array.getRow(0, 2)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Can not get nested row from array of primitive type"); + } + + @Test + public void testPrimitiveBinaryArrayGetArray() { + PrimitiveBinaryArray innerArray1 = new PrimitiveBinaryArray(); + BinaryArrayWriter innerWriter1 = new BinaryArrayWriter(innerArray1, 2, 4); + innerWriter1.writeInt(0, 10); + innerWriter1.writeInt(1, 20); + innerWriter1.complete(); + + PrimitiveBinaryArray outerArray = new PrimitiveBinaryArray(); + BinaryArrayWriter outerWriter = new BinaryArrayWriter(outerArray, 1, 8); + outerWriter.setOffsetAndSize(0, 0, innerArray1.getSizeInBytes()); + outerWriter.complete(); + + InternalArray result = outerArray.getArray(0); + assertThat(result).isNotNull(); + assertThat(result).isInstanceOf(PrimitiveBinaryArray.class); + } } diff --git a/fluss-common/src/test/java/org/apache/fluss/row/TestInternalRowGenerator.java b/fluss-common/src/test/java/org/apache/fluss/row/TestInternalRowGenerator.java index d6d1b6a15..a9e08315d 100644 --- a/fluss-common/src/test/java/org/apache/fluss/row/TestInternalRowGenerator.java +++ b/fluss-common/src/test/java/org/apache/fluss/row/TestInternalRowGenerator.java @@ -63,9 +63,15 @@ public class TestInternalRowGenerator { new DataField("f17", DataTypes.TIMESTAMP_LTZ(1)), new DataField("f18", DataTypes.TIMESTAMP_LTZ(5)), new DataField("f19", DataTypes.ARRAY(DataTypes.INT())), - // TODO: Add Map and Row fields in Issue #1973 new DataField( "f20", + DataTypes.ARRAY(DataTypes.FLOAT().copy(false))), // vector embedding type + new DataField( + "f21", + DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.STRING()))), // nested array + // TODO: Add Map and Row fields in Issue #1973 + new DataField( + "f22", DataTypes.ROW( new DataField("u1", DataTypes.INT()), new DataField( @@ -123,6 +129,17 @@ public class TestInternalRowGenerator { GenericArray array1 = GenericArray.of(1, 2, 3, 4, 5, -11, null, 444, 102234); setRandomNull(writers[19], writer, 19, rnd, array1); + GenericArray array2 = + GenericArray.of(0.1f, 1.1f, -0.5f, 6.6f, Float.MAX_VALUE, Float.MIN_VALUE); + setRandomNull(writers[20], writer, 20, rnd, array2); + + GenericArray array3 = + GenericArray.of( + GenericArray.of(fromString("a"), null, fromString("c")), + null, + GenericArray.of(fromString("hello"), fromString("world"))); + setRandomNull(writers[21], writer, 21, rnd, array3); + // TODO: Map type support will be added in Issue #1973 // Map<Object, Object> javaMap = new HashMap<>(); // javaMap.put(0, null); @@ -131,9 +148,9 @@ public class TestInternalRowGenerator { // GenericMap map = new GenericMap(javaMap); // setRandomNull(writers[20], writer, 20, rnd, map); - GenericRow innerRow = GenericRow.of(20); + GenericRow innerRow = GenericRow.of(22); GenericRow genericRow = GenericRow.of(123, innerRow, BinaryString.fromString("Test")); - setRandomNull(writers[20], writer, 20, rnd, genericRow); + setRandomNull(writers[22], writer, 22, rnd, genericRow); IndexedRow row = new IndexedRow(dataTypes); row.pointTo(writer.segment(), 0, writer.position()); diff --git a/fluss-common/src/test/java/org/apache/fluss/row/aligned/AlignedRowTest.java b/fluss-common/src/test/java/org/apache/fluss/row/aligned/AlignedRowTest.java index 519fa2fd8..c9a8be0ce 100644 --- a/fluss-common/src/test/java/org/apache/fluss/row/aligned/AlignedRowTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/row/aligned/AlignedRowTest.java @@ -21,6 +21,8 @@ import org.apache.fluss.memory.MemorySegment; import org.apache.fluss.row.BinaryString; import org.apache.fluss.row.BinaryWriter; import org.apache.fluss.row.Decimal; +import org.apache.fluss.row.InternalArray; +import org.apache.fluss.row.InternalRow; import org.apache.fluss.row.TimestampLtz; import org.apache.fluss.row.TimestampNtz; import org.apache.fluss.types.DataType; @@ -744,4 +746,78 @@ class AlignedRowTest { assertThat(row.getTimestampNtz(10, 9).toString()).contains("2021-01-01T00:00:00.000123456"); assertThat(row.isNullAt(11)).isTrue(); } + + @Test + public void testAlignedArrayGetRow() { + DataType rowType = + DataTypes.ROW( + DataTypes.FIELD("f1", DataTypes.INT()), + DataTypes.FIELD("f2", DataTypes.STRING())); + DataType arrayType = DataTypes.ARRAY(rowType); + + AlignedRow outerRow = new AlignedRow(1); + AlignedRowWriter outerWriter = new AlignedRowWriter(outerRow); + + org.apache.fluss.row.GenericRow row1 = + org.apache.fluss.row.GenericRow.of(100, BinaryString.fromString("nested")); + org.apache.fluss.row.GenericRow row2 = + org.apache.fluss.row.GenericRow.of(200, BinaryString.fromString("nested2")); + org.apache.fluss.row.GenericArray arrayData = + org.apache.fluss.row.GenericArray.of(row1, row2); + + BinaryWriter.ValueWriter arrayWriter = BinaryWriter.createValueWriter(arrayType, ALIGNED); + arrayWriter.writeValue(outerWriter, 0, arrayData); + outerWriter.complete(); + + InternalArray array = outerRow.getArray(0); + assertThat(array).isNotNull(); + assertThat(array.size()).isEqualTo(2); + + InternalRow nestedRow1 = array.getRow(0, 2); + assertThat(nestedRow1.getInt(0)).isEqualTo(100); + assertThat(nestedRow1.getString(1)).isEqualTo(BinaryString.fromString("nested")); + + InternalRow nestedRow2 = array.getRow(1, 2); + assertThat(nestedRow2.getInt(0)).isEqualTo(200); + assertThat(nestedRow2.getString(1)).isEqualTo(BinaryString.fromString("nested2")); + } + + @Test + public void testAlignedArrayGetNestedArray() { + DataType innerArrayType = DataTypes.ARRAY(DataTypes.INT()); + DataType outerArrayType = DataTypes.ARRAY(innerArrayType); + + AlignedRow outerRow = new AlignedRow(1); + AlignedRowWriter outerWriter = new AlignedRowWriter(outerRow); + + org.apache.fluss.row.GenericArray innerArray1 = + org.apache.fluss.row.GenericArray.of(10, 20, 30); + org.apache.fluss.row.GenericArray innerArray2 = + org.apache.fluss.row.GenericArray.of(40, 50, 60); + org.apache.fluss.row.GenericArray outerArrayData = + org.apache.fluss.row.GenericArray.of(innerArray1, innerArray2); + + BinaryWriter.ValueWriter arrayWriter = + BinaryWriter.createValueWriter(outerArrayType, ALIGNED); + arrayWriter.writeValue(outerWriter, 0, outerArrayData); + outerWriter.complete(); + + InternalArray outerArray = outerRow.getArray(0); + assertThat(outerArray).isNotNull(); + assertThat(outerArray.size()).isEqualTo(2); + + InternalArray nestedArray1 = outerArray.getArray(0); + assertThat(nestedArray1).isNotNull(); + assertThat(nestedArray1.size()).isEqualTo(3); + assertThat(nestedArray1.getInt(0)).isEqualTo(10); + assertThat(nestedArray1.getInt(1)).isEqualTo(20); + assertThat(nestedArray1.getInt(2)).isEqualTo(30); + + InternalArray nestedArray2 = outerArray.getArray(1); + assertThat(nestedArray2).isNotNull(); + assertThat(nestedArray2.size()).isEqualTo(3); + assertThat(nestedArray2.getInt(0)).isEqualTo(40); + assertThat(nestedArray2.getInt(1)).isEqualTo(50); + assertThat(nestedArray2.getInt(2)).isEqualTo(60); + } } diff --git a/fluss-common/src/test/java/org/apache/fluss/row/compacted/CompactedRowTest.java b/fluss-common/src/test/java/org/apache/fluss/row/compacted/CompactedRowTest.java index 72382f67a..ea1bdd11f 100644 --- a/fluss-common/src/test/java/org/apache/fluss/row/compacted/CompactedRowTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/row/compacted/CompactedRowTest.java @@ -28,6 +28,7 @@ import org.apache.fluss.types.DataTypes; import org.junit.jupiter.api.Test; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Test for {@link CompactedRow}. */ public class CompactedRowTest { @@ -412,4 +413,98 @@ public class CompactedRowTest { assertThat(row.getInt(i)).isEqualTo(i * 10); } } + + @Test + public void testCompactedArrayGetRowWithInvalidType() { + DataType arrayType = DataTypes.ARRAY(DataTypes.INT()); + DataType[] fieldTypes = {arrayType}; + + CompactedRow outerRow = new CompactedRow(fieldTypes); + CompactedRowWriter outerWriter = new CompactedRowWriter(fieldTypes.length); + + org.apache.fluss.row.GenericArray arrayData = org.apache.fluss.row.GenericArray.of(1, 2, 3); + org.apache.fluss.row.serializer.ArraySerializer serializer = + new org.apache.fluss.row.serializer.ArraySerializer( + DataTypes.INT(), org.apache.fluss.row.BinaryRow.BinaryRowFormat.COMPACTED); + outerWriter.writeArray(arrayData, serializer); + + outerRow.pointTo(outerWriter.segment(), 0, outerWriter.position()); + + org.apache.fluss.row.InternalArray array = outerRow.getArray(0); + assertThatThrownBy(() -> array.getRow(0, 1)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Can not get row from Array of type"); + } + + @Test + public void testCompactedArrayGetRowWithWrongNumFields() { + DataType rowType = + DataTypes.ROW( + DataTypes.FIELD("f1", DataTypes.INT()), + DataTypes.FIELD("f2", DataTypes.STRING())); + DataType arrayType = DataTypes.ARRAY(rowType); + DataType[] fieldTypes = {arrayType}; + + CompactedRow outerRow = new CompactedRow(fieldTypes); + CompactedRowWriter outerWriter = new CompactedRowWriter(fieldTypes.length); + + org.apache.fluss.row.GenericRow innerRow = + org.apache.fluss.row.GenericRow.of(100, BinaryString.fromString("test")); + org.apache.fluss.row.GenericArray arrayData = + org.apache.fluss.row.GenericArray.of(innerRow); + + org.apache.fluss.row.serializer.ArraySerializer serializer = + new org.apache.fluss.row.serializer.ArraySerializer( + rowType, org.apache.fluss.row.BinaryRow.BinaryRowFormat.COMPACTED); + outerWriter.writeArray(arrayData, serializer); + + outerRow.pointTo(outerWriter.segment(), 0, outerWriter.position()); + + org.apache.fluss.row.InternalArray array = outerRow.getArray(0); + assertThatThrownBy(() -> array.getRow(0, 5)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Unexpected number of fields"); + } + + @Test + public void testCompactedArrayGetNestedArray() { + DataType innerArrayType = DataTypes.ARRAY(DataTypes.INT()); + DataType outerArrayType = DataTypes.ARRAY(innerArrayType); + DataType[] fieldTypes = {outerArrayType}; + + CompactedRow outerRow = new CompactedRow(fieldTypes); + CompactedRowWriter outerWriter = new CompactedRowWriter(fieldTypes.length); + + org.apache.fluss.row.GenericArray innerArray1 = + org.apache.fluss.row.GenericArray.of(10, 20, 30); + org.apache.fluss.row.GenericArray innerArray2 = + org.apache.fluss.row.GenericArray.of(40, 50, 60); + org.apache.fluss.row.GenericArray outerArrayData = + org.apache.fluss.row.GenericArray.of(innerArray1, innerArray2); + + org.apache.fluss.row.serializer.ArraySerializer serializer = + new org.apache.fluss.row.serializer.ArraySerializer( + innerArrayType, org.apache.fluss.row.BinaryRow.BinaryRowFormat.COMPACTED); + outerWriter.writeArray(outerArrayData, serializer); + + outerRow.pointTo(outerWriter.segment(), 0, outerWriter.position()); + + org.apache.fluss.row.InternalArray outerArray = outerRow.getArray(0); + assertThat(outerArray).isNotNull(); + assertThat(outerArray.size()).isEqualTo(2); + + org.apache.fluss.row.InternalArray nestedArray1 = outerArray.getArray(0); + assertThat(nestedArray1).isNotNull(); + assertThat(nestedArray1.size()).isEqualTo(3); + assertThat(nestedArray1.getInt(0)).isEqualTo(10); + assertThat(nestedArray1.getInt(1)).isEqualTo(20); + assertThat(nestedArray1.getInt(2)).isEqualTo(30); + + org.apache.fluss.row.InternalArray nestedArray2 = outerArray.getArray(1); + assertThat(nestedArray2).isNotNull(); + assertThat(nestedArray2.size()).isEqualTo(3); + assertThat(nestedArray2.getInt(0)).isEqualTo(40); + assertThat(nestedArray2.getInt(1)).isEqualTo(50); + assertThat(nestedArray2.getInt(2)).isEqualTo(60); + } } diff --git a/fluss-common/src/test/java/org/apache/fluss/row/encode/AlignedRowEncoderTest.java b/fluss-common/src/test/java/org/apache/fluss/row/encode/AlignedRowEncoderTest.java new file mode 100644 index 000000000..be0c31f99 --- /dev/null +++ b/fluss-common/src/test/java/org/apache/fluss/row/encode/AlignedRowEncoderTest.java @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.fluss.row.encode; + +import org.apache.fluss.row.BinaryRow; +import org.apache.fluss.row.BinaryString; +import org.apache.fluss.row.Decimal; +import org.apache.fluss.row.TimestampLtz; +import org.apache.fluss.types.DataType; +import org.apache.fluss.types.DataTypes; + +import org.junit.jupiter.api.Test; + +import java.math.BigDecimal; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link AlignedRowEncoder}. */ +class AlignedRowEncoderTest { + + @Test + void testEncodeSimpleTypes() throws Exception { + DataType[] fieldTypes = + new DataType[] { + DataTypes.INT(), DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.BOOLEAN() + }; + try (AlignedRowEncoder encoder = new AlignedRowEncoder(fieldTypes)) { + encoder.startNewRow(); + encoder.encodeField(0, 100); + encoder.encodeField(1, 200L); + encoder.encodeField(2, BinaryString.fromString("test")); + encoder.encodeField(3, true); + BinaryRow row = encoder.finishRow(); + + assertThat(row.getInt(0)).isEqualTo(100); + assertThat(row.getLong(1)).isEqualTo(200L); + assertThat(row.getString(2)).isEqualTo(BinaryString.fromString("test")); + assertThat(row.getBoolean(3)).isTrue(); + } + } + + @Test + void testEncodeMultipleRows() throws Exception { + DataType[] fieldTypes = new DataType[] {DataTypes.INT(), DataTypes.STRING()}; + try (AlignedRowEncoder encoder = new AlignedRowEncoder(fieldTypes)) { + encoder.startNewRow(); + encoder.encodeField(0, 1); + encoder.encodeField(1, BinaryString.fromString("first")); + BinaryRow row1 = encoder.finishRow(); + BinaryRow row1Copy = row1.copy(); + + encoder.startNewRow(); + encoder.encodeField(0, 2); + encoder.encodeField(1, BinaryString.fromString("second")); + BinaryRow row2 = encoder.finishRow(); + + assertThat(row1Copy.getInt(0)).isEqualTo(1); + assertThat(row1Copy.getString(1)).isEqualTo(BinaryString.fromString("first")); + + assertThat(row2.getInt(0)).isEqualTo(2); + assertThat(row2.getString(1)).isEqualTo(BinaryString.fromString("second")); + } + } + + @Test + void testEncodeAllPrimitiveDataTypes() throws Exception { + DataType[] fieldTypes = + new DataType[] { + DataTypes.TINYINT(), + DataTypes.SMALLINT(), + DataTypes.INT(), + DataTypes.BIGINT(), + DataTypes.FLOAT(), + DataTypes.DOUBLE(), + DataTypes.BOOLEAN(), + DataTypes.STRING(), + DataTypes.BYTES(), + DataTypes.DECIMAL(10, 2) + }; + try (AlignedRowEncoder encoder = new AlignedRowEncoder(fieldTypes)) { + encoder.startNewRow(); + encoder.encodeField(0, (byte) 1); + encoder.encodeField(1, (short) 100); + encoder.encodeField(2, 1000); + encoder.encodeField(3, 10000L); + encoder.encodeField(4, 1.5f); + encoder.encodeField(5, 2.5); + encoder.encodeField(6, false); + encoder.encodeField(7, BinaryString.fromString("hello")); + encoder.encodeField(8, new byte[] {1, 2, 3}); + encoder.encodeField(9, Decimal.fromBigDecimal(new BigDecimal("123.45"), 10, 2)); + BinaryRow row = encoder.finishRow(); + + assertThat(row.getByte(0)).isEqualTo((byte) 1); + assertThat(row.getShort(1)).isEqualTo((short) 100); + assertThat(row.getInt(2)).isEqualTo(1000); + assertThat(row.getLong(3)).isEqualTo(10000L); + assertThat(row.getFloat(4)).isEqualTo(1.5f); + assertThat(row.getDouble(5)).isEqualTo(2.5); + assertThat(row.getBoolean(6)).isFalse(); + assertThat(row.getString(7)).isEqualTo(BinaryString.fromString("hello")); + assertThat(row.getBytes(8)).isEqualTo(new byte[] {1, 2, 3}); + assertThat(row.getDecimal(9, 10, 2)) + .isEqualTo(Decimal.fromBigDecimal(new BigDecimal("123.45"), 10, 2)); + } + } + + @Test + void testEncodeWithNullValues() throws Exception { + DataType[] fieldTypes = + new DataType[] {DataTypes.INT(), DataTypes.STRING(), DataTypes.BIGINT()}; + try (AlignedRowEncoder encoder = new AlignedRowEncoder(fieldTypes)) { + encoder.startNewRow(); + encoder.encodeField(0, 42); + encoder.encodeField(1, null); + encoder.encodeField(2, 100L); + BinaryRow row = encoder.finishRow(); + + assertThat(row.getInt(0)).isEqualTo(42); + assertThat(row.isNullAt(1)).isTrue(); + assertThat(row.getLong(2)).isEqualTo(100L); + } + } + + @Test + void testEncodeTimestampTypes() throws Exception { + DataType[] fieldTypes = new DataType[] {DataTypes.TIMESTAMP_LTZ(3), DataTypes.INT()}; + try (AlignedRowEncoder encoder = new AlignedRowEncoder(fieldTypes)) { + TimestampLtz timestampLtz = TimestampLtz.fromEpochMillis(1000000L); + + encoder.startNewRow(); + encoder.encodeField(0, timestampLtz); + encoder.encodeField(1, 999); + BinaryRow row = encoder.finishRow(); + + assertThat(row.getTimestampLtz(0, 3)).isNotNull(); + assertThat(row.getInt(1)).isEqualTo(999); + } + } + + @Test + void testReuseEncoder() throws Exception { + DataType[] fieldTypes = new DataType[] {DataTypes.INT(), DataTypes.STRING()}; + try (AlignedRowEncoder encoder = new AlignedRowEncoder(fieldTypes)) { + encoder.startNewRow(); + encoder.encodeField(0, 5); + encoder.encodeField(1, BinaryString.fromString("row5")); + BinaryRow row = encoder.finishRow(); + + assertThat(row.getInt(0)).isEqualTo(5); + assertThat(row.getString(1)).isEqualTo(BinaryString.fromString("row5")); + + encoder.startNewRow(); + encoder.encodeField(0, 10); + encoder.encodeField(1, BinaryString.fromString("row10")); + row = encoder.finishRow(); + + assertThat(row.getInt(0)).isEqualTo(10); + assertThat(row.getString(1)).isEqualTo(BinaryString.fromString("row10")); + } + } + + @Test + void testEncodeEmptyRow() throws Exception { + DataType[] fieldTypes = new DataType[] {}; + try (AlignedRowEncoder encoder = new AlignedRowEncoder(fieldTypes)) { + encoder.startNewRow(); + BinaryRow row = encoder.finishRow(); + + assertThat(row.getFieldCount()).isEqualTo(0); + } + } + + @Test + void testEncodeSingleField() throws Exception { + DataType[] fieldTypes = new DataType[] {DataTypes.STRING()}; + try (AlignedRowEncoder encoder = new AlignedRowEncoder(fieldTypes)) { + encoder.startNewRow(); + encoder.encodeField(0, BinaryString.fromString("single")); + BinaryRow row = encoder.finishRow(); + + assertThat(row.getString(0)).isEqualTo(BinaryString.fromString("single")); + } + } + + @Test + void testClose() throws Exception { + DataType[] fieldTypes = new DataType[] {DataTypes.INT()}; + AlignedRowEncoder encoder = new AlignedRowEncoder(fieldTypes); + encoder.close(); + } + + @Test + void testEncodeWithLargeStrings() throws Exception { + DataType[] fieldTypes = new DataType[] {DataTypes.STRING(), DataTypes.INT()}; + try (AlignedRowEncoder encoder = new AlignedRowEncoder(fieldTypes)) { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < 1000; i++) { + sb.append("test"); + } + String largeString = sb.toString(); + + encoder.startNewRow(); + encoder.encodeField(0, BinaryString.fromString(largeString)); + encoder.encodeField(1, 123); + BinaryRow row = encoder.finishRow(); + + assertThat(row.getString(0)).isEqualTo(BinaryString.fromString(largeString)); + assertThat(row.getInt(1)).isEqualTo(123); + } + } + + @Test + void testEncodeWithBinaryData() throws Exception { + DataType[] fieldTypes = new DataType[] {DataTypes.BYTES(), DataTypes.BYTES()}; + try (AlignedRowEncoder encoder = new AlignedRowEncoder(fieldTypes)) { + byte[] data1 = new byte[100]; + byte[] data2 = new byte[200]; + for (int i = 0; i < data1.length; i++) { + data1[i] = (byte) i; + } + for (int i = 0; i < data2.length; i++) { + data2[i] = (byte) (i % 256); + } + + encoder.startNewRow(); + encoder.encodeField(0, data1); + encoder.encodeField(1, data2); + BinaryRow row = encoder.finishRow(); + + assertThat(row.getBytes(0)).isEqualTo(data1); + assertThat(row.getBytes(1)).isEqualTo(data2); + } + } +} diff --git a/fluss-common/src/test/java/org/apache/fluss/row/indexed/IndexedRowReaderTest.java b/fluss-common/src/test/java/org/apache/fluss/row/indexed/IndexedRowReaderTest.java index bb36c1f77..56a307563 100644 --- a/fluss-common/src/test/java/org/apache/fluss/row/indexed/IndexedRowReaderTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/row/indexed/IndexedRowReaderTest.java @@ -109,9 +109,20 @@ public class IndexedRowReaderTest { assertThatArray(reader.readArray(dataTypes[19])) .withElementType(DataTypes.INT()) .isEqualTo(GenericArray.of(1, 2, 3, 4, 5, -11, null, 444, 102234)); + assertThatArray(reader.readArray(dataTypes[20])) + .withElementType(DataTypes.FLOAT()) + .isEqualTo( + GenericArray.of(0.1f, 1.1f, -0.5f, 6.6f, Float.MAX_VALUE, Float.MIN_VALUE)); + assertThatArray(reader.readArray(dataTypes[21])) + .withElementType(DataTypes.ARRAY(DataTypes.STRING())) + .isEqualTo( + GenericArray.of( + GenericArray.of(fromString("a"), null, fromString("c")), + null, + GenericArray.of(fromString("hello"), fromString("world")))); InternalRow nestedRow = - reader.readRow(dataTypes[20].getChildren().toArray(new DataType[0])); - GenericRow expectedInnerRow = GenericRow.of(20); + reader.readRow(dataTypes[22].getChildren().toArray(new DataType[0])); + GenericRow expectedInnerRow = GenericRow.of(22); GenericRow expectedNestedRow = GenericRow.of(123, expectedInnerRow, fromString("Test")); assertThatRow(nestedRow) .withSchema( diff --git a/fluss-common/src/test/java/org/apache/fluss/row/indexed/IndexedRowTest.java b/fluss-common/src/test/java/org/apache/fluss/row/indexed/IndexedRowTest.java index 9aea09e38..3e4066991 100644 --- a/fluss-common/src/test/java/org/apache/fluss/row/indexed/IndexedRowTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/row/indexed/IndexedRowTest.java @@ -22,6 +22,7 @@ import org.apache.fluss.row.BinaryWriter; import org.apache.fluss.row.Decimal; import org.apache.fluss.row.GenericArray; import org.apache.fluss.row.GenericRow; +import org.apache.fluss.row.InternalArray; import org.apache.fluss.row.InternalRow; import org.apache.fluss.row.TimestampLtz; import org.apache.fluss.row.TimestampNtz; @@ -74,7 +75,7 @@ public class IndexedRowTest { assertAllTypeEquals(row); - assertThat(row.getFieldCount()).isEqualTo(21); + assertThat(row.getFieldCount()).isEqualTo(23); assertThat(row.anyNull()).isFalse(); assertThat(row.anyNull(new int[] {0, 1})).isFalse(); } @@ -204,10 +205,21 @@ public class IndexedRowTest { writers[17].writeValue(writer, 17, TimestampLtz.fromEpochMillis(1698235273182L)); writers[18].writeValue(writer, 18, TimestampLtz.fromEpochMillis(1698235273182L)); writers[19].writeValue(writer, 19, GenericArray.of(1, 2, 3, 4, 5, -11, null, 444, 102234)); - - GenericRow innerRow = GenericRow.of(20); + writers[20].writeValue( + writer, + 20, + GenericArray.of(0.1f, 1.1f, -0.5f, 6.6f, Float.MAX_VALUE, Float.MIN_VALUE)); + writers[21].writeValue( + writer, + 21, + GenericArray.of( + GenericArray.of(fromString("a"), null, fromString("c")), + null, + GenericArray.of(fromString("hello"), fromString("world")))); + + GenericRow innerRow = GenericRow.of(22); GenericRow nestedRow = GenericRow.of(123, innerRow, fromString("Test")); - writers[20].writeValue(writer, 20, nestedRow); + writers[22].writeValue(writer, 22, nestedRow); return writer; } @@ -236,9 +248,20 @@ public class IndexedRowTest { assertThatArray(row.getArray(19)) .withElementType(DataTypes.INT()) .isEqualTo(GenericArray.of(1, 2, 3, 4, 5, -11, null, 444, 102234)); - GenericRow expectedInnerRow = GenericRow.of(20); + assertThatArray(row.getArray(20)) + .withElementType(DataTypes.FLOAT().copy(false)) + .isEqualTo( + GenericArray.of(0.1f, 1.1f, -0.5f, 6.6f, Float.MAX_VALUE, Float.MIN_VALUE)); + assertThatArray(row.getArray(21)) + .withElementType(DataTypes.ARRAY(DataTypes.STRING())) + .isEqualTo( + GenericArray.of( + GenericArray.of(fromString("a"), null, fromString("c")), + null, + GenericArray.of(fromString("hello"), fromString("world")))); + GenericRow expectedInnerRow = GenericRow.of(22); GenericRow expectedNestedRow = GenericRow.of(123, expectedInnerRow, fromString("Test")); - assertThatRow(row.getRow(20, 3)) + assertThatRow(row.getRow(22, 3)) .withSchema( DataTypes.ROW( DataTypes.FIELD("u1", DataTypes.INT()), @@ -248,4 +271,121 @@ public class IndexedRowTest { DataTypes.FIELD("u3", DataTypes.STRING()))) .isEqualTo(expectedNestedRow); } + + @Test + void testIndexedArrayGetRow() { + DataType rowType = + DataTypes.ROW( + DataTypes.FIELD("f1", DataTypes.INT()), + DataTypes.FIELD("f2", DataTypes.STRING())); + + DataType[] fieldTypes = new DataType[] {DataTypes.ARRAY(rowType)}; + IndexedRow outerRow = new IndexedRow(fieldTypes); + IndexedRowWriter outerWriter = new IndexedRowWriter(fieldTypes); + + GenericRow row1 = GenericRow.of(100, fromString("first")); + GenericRow row2 = GenericRow.of(200, fromString("second")); + GenericArray arrayData = GenericArray.of(row1, row2); + + BinaryWriter.ValueWriter arrayWriter = + BinaryWriter.createValueWriter(fieldTypes[0], INDEXED); + arrayWriter.writeValue(outerWriter, 0, arrayData); + + outerRow.pointTo(outerWriter.segment(), 0, outerWriter.position()); + + InternalArray array = outerRow.getArray(0); + assertThat(array).isNotNull(); + assertThat(array.size()).isEqualTo(2); + + InternalRow nestedRow1 = array.getRow(0, 2); + assertThat(nestedRow1.getInt(0)).isEqualTo(100); + assertThat(nestedRow1.getString(1)).isEqualTo(fromString("first")); + + InternalRow nestedRow2 = array.getRow(1, 2); + assertThat(nestedRow2.getInt(0)).isEqualTo(200); + assertThat(nestedRow2.getString(1)).isEqualTo(fromString("second")); + } + + @Test + void testIndexedArrayGetRowWithInvalidType() { + DataType[] fieldTypes = new DataType[] {DataTypes.ARRAY(DataTypes.INT())}; + IndexedRow outerRow = new IndexedRow(fieldTypes); + IndexedRowWriter outerWriter = new IndexedRowWriter(fieldTypes); + + GenericArray arrayData = GenericArray.of(1, 2, 3); + BinaryWriter.ValueWriter arrayWriter = + BinaryWriter.createValueWriter(fieldTypes[0], INDEXED); + arrayWriter.writeValue(outerWriter, 0, arrayData); + + outerRow.pointTo(outerWriter.segment(), 0, outerWriter.position()); + + InternalArray array = outerRow.getArray(0); + assertThatThrownBy(() -> array.getRow(0, 1)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Can not get row from Array of type"); + } + + @Test + void testIndexedArrayGetRowWithWrongNumFields() { + DataType rowType = + DataTypes.ROW( + DataTypes.FIELD("f1", DataTypes.INT()), + DataTypes.FIELD("f2", DataTypes.STRING())); + + DataType[] fieldTypes = new DataType[] {DataTypes.ARRAY(rowType)}; + IndexedRow outerRow = new IndexedRow(fieldTypes); + IndexedRowWriter outerWriter = new IndexedRowWriter(fieldTypes); + + GenericRow row1 = GenericRow.of(100, fromString("first")); + GenericArray arrayData = GenericArray.of(row1); + + BinaryWriter.ValueWriter arrayWriter = + BinaryWriter.createValueWriter(fieldTypes[0], INDEXED); + arrayWriter.writeValue(outerWriter, 0, arrayData); + + outerRow.pointTo(outerWriter.segment(), 0, outerWriter.position()); + + InternalArray array = outerRow.getArray(0); + assertThatThrownBy(() -> array.getRow(0, 5)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Unexpected number of fields"); + } + + @Test + void testIndexedArrayGetNestedArray() { + DataType innerArrayType = DataTypes.ARRAY(DataTypes.INT()); + DataType outerArrayType = DataTypes.ARRAY(innerArrayType); + + DataType[] fieldTypes = new DataType[] {outerArrayType}; + IndexedRow row = new IndexedRow(fieldTypes); + IndexedRowWriter rowWriter = new IndexedRowWriter(fieldTypes); + + GenericArray innerArray1 = GenericArray.of(1, 2, 3); + GenericArray innerArray2 = GenericArray.of(4, 5, 6); + GenericArray outerArrayData = GenericArray.of(innerArray1, innerArray2); + + BinaryWriter.ValueWriter arrayWriter = + BinaryWriter.createValueWriter(fieldTypes[0], INDEXED); + arrayWriter.writeValue(rowWriter, 0, outerArrayData); + + row.pointTo(rowWriter.segment(), 0, rowWriter.position()); + + InternalArray outerArray = row.getArray(0); + assertThat(outerArray).isNotNull(); + assertThat(outerArray.size()).isEqualTo(2); + + InternalArray nestedArray1 = outerArray.getArray(0); + assertThat(nestedArray1).isNotNull(); + assertThat(nestedArray1.size()).isEqualTo(3); + assertThat(nestedArray1.getInt(0)).isEqualTo(1); + assertThat(nestedArray1.getInt(1)).isEqualTo(2); + assertThat(nestedArray1.getInt(2)).isEqualTo(3); + + InternalArray nestedArray2 = outerArray.getArray(1); + assertThat(nestedArray2).isNotNull(); + assertThat(nestedArray2.size()).isEqualTo(3); + assertThat(nestedArray2.getInt(0)).isEqualTo(4); + assertThat(nestedArray2.getInt(1)).isEqualTo(5); + assertThat(nestedArray2.getInt(2)).isEqualTo(6); + } } diff --git a/fluss-common/src/test/java/org/apache/fluss/testutils/DataTestUtils.java b/fluss-common/src/test/java/org/apache/fluss/testutils/DataTestUtils.java index 3cb1834f1..662faca60 100644 --- a/fluss-common/src/test/java/org/apache/fluss/testutils/DataTestUtils.java +++ b/fluss-common/src/test/java/org/apache/fluss/testutils/DataTestUtils.java @@ -104,6 +104,16 @@ public class DataTestUtils { return row; } + public static GenericRow row(RowType rowType, Object... objects) { + GenericRow row = new GenericRow(objects.length); + List<DataType> fieldTypes = rowType.getChildren(); + for (int i = 0; i < objects.length; i++) { + Object value = toInternalObject(objects[i], fieldTypes.get(i)); + row.setField(i, value); + } + return row; + } + private static Object toInternalObject(Object obj) { if (obj == null) { return null; @@ -124,6 +134,45 @@ public class DataTestUtils { } } + private static Object toInternalObject(Object obj, DataType dataType) { + if (obj == null) { + return null; + } + + DataTypeRoot typeRoot = dataType.getTypeRoot(); + + if (typeRoot == DataTypeRoot.ROW) { + if (obj instanceof Object[]) { + return row((RowType) dataType, (Object[]) obj); + } + throw new IllegalArgumentException( + "Expected Object[] for ROW type, but got: " + obj.getClass().getSimpleName()); + } + + if (typeRoot == DataTypeRoot.ARRAY) { + if (obj instanceof int[]) { + return new GenericArray((int[]) obj); + } + if (obj instanceof Object[]) { + DataType elementType = dataType.getChildren().get(0); + Object[] array = (Object[]) obj; + Object[] internalArray = new Object[array.length]; + for (int j = 0; j < array.length; j++) { + internalArray[j] = toInternalObject(array[j], elementType); + } + return new GenericArray(internalArray); + } + throw new IllegalArgumentException( + "Expected array for ARRAY type, but got: " + obj.getClass().getSimpleName()); + } + + if (obj instanceof String) { + return BinaryString.fromString((String) obj); + } + + return obj; + } + public static CompactedRow compactedRow(RowType rowType, Object[] objects) { return genCompacted(rowType, objects); } @@ -474,7 +523,7 @@ public class DataTestUtils { throws Exception { if (logFormat == LogFormat.ARROW) { List<InternalRow> rows = - objects.stream().map(DataTestUtils::row).collect(Collectors.toList()); + objects.stream().map(objs -> row(rowType, objs)).collect(Collectors.toList()); return createArrowMemoryLogRecords( rowType, offsetBase, diff --git a/fluss-filesystems/fluss-fs-obs/pom.xml b/fluss-filesystems/fluss-fs-obs/pom.xml index ccdd16c02..976cc3522 100644 --- a/fluss-filesystems/fluss-fs-obs/pom.xml +++ b/fluss-filesystems/fluss-fs-obs/pom.xml @@ -230,8 +230,6 @@ <exclude>.gitkeep</exclude> <exclude>mime.types</exclude> <exclude>mozilla/**</exclude> - <exclude>LICENSE.txt</exclude> - <exclude>license/LICENSE*</exclude> <exclude>okhttp3/internal/publicsuffix/NOTICE</exclude> <exclude>NOTICE</exclude> </excludes> diff --git a/fluss-filesystems/fluss-fs-oss/pom.xml b/fluss-filesystems/fluss-fs-oss/pom.xml index 7517e5ec8..0d2b4cf41 100644 --- a/fluss-filesystems/fluss-fs-oss/pom.xml +++ b/fluss-filesystems/fluss-fs-oss/pom.xml @@ -214,8 +214,6 @@ <exclude>.gitkeep</exclude> <exclude>mime.types</exclude> <exclude>mozilla/**</exclude> - <exclude>LICENSE.txt</exclude> - <exclude>license/LICENSE*</exclude> <exclude>okhttp3/internal/publicsuffix/NOTICE</exclude> <exclude>NOTICE</exclude> </excludes> diff --git a/fluss-flink/fluss-flink-1.18/pom.xml b/fluss-flink/fluss-flink-1.18/pom.xml index 66d359354..758dd91b1 100644 --- a/fluss-flink/fluss-flink-1.18/pom.xml +++ b/fluss-flink/fluss-flink-1.18/pom.xml @@ -219,14 +219,6 @@ <include>org.apache.fluss:fluss-client</include> </includes> </artifactSet> - <filters> - <filter> - <artifact>*</artifact> - <excludes> - <exclude>LICENSE*</exclude> - </excludes> - </filter> - </filters> </configuration> </execution> </executions> diff --git a/fluss-flink/fluss-flink-1.19/pom.xml b/fluss-flink/fluss-flink-1.19/pom.xml index 6cc58223f..4a100b438 100644 --- a/fluss-flink/fluss-flink-1.19/pom.xml +++ b/fluss-flink/fluss-flink-1.19/pom.xml @@ -213,14 +213,6 @@ <include>org.apache.fluss:fluss-client</include> </includes> </artifactSet> - <filters> - <filter> - <artifact>*</artifact> - <excludes> - <exclude>LICENSE*</exclude> - </excludes> - </filter> - </filters> </configuration> </execution> </executions> diff --git a/fluss-flink/fluss-flink-1.20/pom.xml b/fluss-flink/fluss-flink-1.20/pom.xml index b4ff8790f..68c43ed03 100644 --- a/fluss-flink/fluss-flink-1.20/pom.xml +++ b/fluss-flink/fluss-flink-1.20/pom.xml @@ -234,14 +234,6 @@ <include>org.apache.fluss:fluss-client</include> </includes> </artifactSet> - <filters> - <filter> - <artifact>*</artifact> - <excludes> - <exclude>LICENSE*</exclude> - </excludes> - </filter> - </filters> </configuration> </execution> </executions> diff --git a/fluss-flink/fluss-flink-2.1/pom.xml b/fluss-flink/fluss-flink-2.1/pom.xml index b03110c23..5b5475f3d 100644 --- a/fluss-flink/fluss-flink-2.1/pom.xml +++ b/fluss-flink/fluss-flink-2.1/pom.xml @@ -244,14 +244,6 @@ <include>org.apache.fluss:fluss-client</include> </includes> </artifactSet> - <filters> - <filter> - <artifact>*</artifact> - <excludes> - <exclude>LICENSE*</exclude> - </excludes> - </filter> - </filters> </configuration> </execution> </executions> diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkArrayConverter.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkArrayConverter.java deleted file mode 100644 index e9ac4d0bb..000000000 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkArrayConverter.java +++ /dev/null @@ -1,262 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.fluss.flink.utils; - -import org.apache.fluss.row.InternalArray; -import org.apache.fluss.types.ArrayType; -import org.apache.fluss.types.DataType; - -import org.apache.flink.table.data.ArrayData; -import org.apache.flink.table.data.DecimalData; -import org.apache.flink.table.data.GenericArrayData; -import org.apache.flink.table.data.MapData; -import org.apache.flink.table.data.RawValueData; -import org.apache.flink.table.data.RowData; -import org.apache.flink.table.data.StringData; -import org.apache.flink.table.data.TimestampData; - -import static org.apache.fluss.flink.utils.FlussRowToFlinkRowConverter.createInternalConverter; -import static org.apache.fluss.types.DataTypeChecks.getLength; -import static org.apache.fluss.types.DataTypeChecks.getPrecision; -import static org.apache.fluss.types.DataTypeChecks.getScale; - -/** Flink Array Converter. */ -public class FlinkArrayConverter implements ArrayData { - private final ArrayData arrayData; - - FlinkArrayConverter(DataType flussDataType, Object flussField) { - DataType eleType = ((ArrayType) flussDataType).getElementType(); - this.arrayData = copyArray((org.apache.fluss.row.InternalArray) flussField, eleType); - } - - private ArrayData copyArray(org.apache.fluss.row.InternalArray from, DataType eleType) { - FlussRowToFlinkRowConverter.FlussDeserializationConverter converter = - createInternalConverter(eleType); - if (!eleType.isNullable()) { - switch (eleType.getTypeRoot()) { - case BOOLEAN: - return new GenericArrayData(from.toBooleanArray()); - case TINYINT: - return new GenericArrayData(from.toByteArray()); - case SMALLINT: - return new GenericArrayData(from.toShortArray()); - case INTEGER: - case DATE: - case TIME_WITHOUT_TIME_ZONE: - return new GenericArrayData(from.toIntArray()); - case BIGINT: - return new GenericArrayData(from.toLongArray()); - case FLOAT: - return new GenericArrayData(from.toFloatArray()); - case DOUBLE: - return new GenericArrayData(from.toDoubleArray()); - } - } - - Object[] newArray = new Object[from.size()]; - - for (int i = 0; i < newArray.length; ++i) { - if (!from.isNullAt(i)) { - newArray[i] = converter.deserialize(getFieldValue(from, i, eleType)); - } else { - newArray[i] = null; - } - } - - return new GenericArrayData(newArray); - } - - @Override - public int size() { - return arrayData.size(); - } - - @Override - public boolean isNullAt(int i) { - return arrayData.isNullAt(i); - } - - @Override - public boolean getBoolean(int i) { - return arrayData.getBoolean(i); - } - - @Override - public byte getByte(int i) { - return arrayData.getByte(i); - } - - @Override - public short getShort(int i) { - return arrayData.getShort(i); - } - - @Override - public int getInt(int i) { - return arrayData.getInt(i); - } - - @Override - public long getLong(int i) { - return arrayData.getLong(i); - } - - @Override - public float getFloat(int i) { - return arrayData.getFloat(i); - } - - @Override - public double getDouble(int i) { - return arrayData.getDouble(i); - } - - @Override - public StringData getString(int i) { - return arrayData.getString(i); - } - - @Override - public DecimalData getDecimal(int i, int i1, int i2) { - return arrayData.getDecimal(i, i1, i2); - } - - @Override - public TimestampData getTimestamp(int i, int i1) { - return arrayData.getTimestamp(i, i1); - } - - @Override - public <T> RawValueData<T> getRawValue(int i) { - return arrayData.getRawValue(i); - } - - @Override - public byte[] getBinary(int i) { - return arrayData.getBinary(i); - } - - @Override - public ArrayData getArray(int i) { - return arrayData.getArray(i); - } - - @Override - public MapData getMap(int i) { - return arrayData.getMap(i); - } - - @Override - public RowData getRow(int i, int i1) { - return arrayData.getRow(i, i1); - } - - @Override - public boolean[] toBooleanArray() { - return arrayData.toBooleanArray(); - } - - @Override - public byte[] toByteArray() { - return arrayData.toByteArray(); - } - - @Override - public short[] toShortArray() { - return arrayData.toShortArray(); - } - - @Override - public int[] toIntArray() { - return arrayData.toIntArray(); - } - - @Override - public long[] toLongArray() { - return arrayData.toLongArray(); - } - - @Override - public float[] toFloatArray() { - return arrayData.toFloatArray(); - } - - @Override - public double[] toDoubleArray() { - return arrayData.toDoubleArray(); - } - - public ArrayData getArrayData() { - return arrayData; - } - - public static ArrayData deserialize(DataType flussDataType, Object flussField) { - return new FlinkArrayConverter(flussDataType, flussField).getArrayData(); - } - - private static Object getFieldValue(InternalArray array, int pos, DataType dataType) { - if (array.isNullAt(pos)) { - return null; - } - - switch (dataType.getTypeRoot()) { - case CHAR: - return array.getChar(pos, getLength(dataType)); - case STRING: - return array.getString(pos); - case BOOLEAN: - return array.getBoolean(pos); - case BINARY: - return array.getBinary(pos, getLength(dataType)); - case BYTES: - return array.getBytes(pos); - case DECIMAL: - return array.getDecimal(pos, getPrecision(dataType), getScale(dataType)); - case TINYINT: - return array.getByte(pos); - case SMALLINT: - return array.getShort(pos); - case INTEGER: - case DATE: - case TIME_WITHOUT_TIME_ZONE: - return array.getInt(pos); - case BIGINT: - return array.getLong(pos); - case FLOAT: - return array.getFloat(pos); - case DOUBLE: - return array.getDouble(pos); - case TIMESTAMP_WITHOUT_TIME_ZONE: - return array.getTimestampNtz(pos, getPrecision(dataType)); - case TIMESTAMP_WITH_LOCAL_TIME_ZONE: - return array.getTimestampLtz(pos, getPrecision(dataType)); - case ARRAY: - return array.getArray(pos); - // TODO: Add Map type support in future - case MAP: - throw new UnsupportedOperationException("Map type not supported yet"); - // TODO: Requires InternalArray.getRow() method from fluss-common Row type support - case ROW: - // return array.getRow(pos, getFieldCount(dataType)); - throw new UnsupportedOperationException("Row type requires fluss-common update"); - default: - throw new IllegalArgumentException("Unsupported data type: " + dataType); - } - } -} diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkRowConverter.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkRowConverter.java deleted file mode 100644 index 279a96e23..000000000 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkRowConverter.java +++ /dev/null @@ -1,213 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.fluss.flink.utils; - -import org.apache.fluss.row.InternalRow; -import org.apache.fluss.types.DataType; -import org.apache.fluss.types.RowType; - -import org.apache.flink.table.data.ArrayData; -import org.apache.flink.table.data.DecimalData; -import org.apache.flink.table.data.GenericRowData; -import org.apache.flink.table.data.MapData; -import org.apache.flink.table.data.RawValueData; -import org.apache.flink.table.data.RowData; -import org.apache.flink.table.data.StringData; -import org.apache.flink.table.data.TimestampData; -import org.apache.flink.types.RowKind; - -import static org.apache.fluss.flink.utils.FlussRowToFlinkRowConverter.createInternalConverter; -import static org.apache.fluss.types.DataTypeChecks.getFieldCount; -import static org.apache.fluss.types.DataTypeChecks.getLength; -import static org.apache.fluss.types.DataTypeChecks.getPrecision; -import static org.apache.fluss.types.DataTypeChecks.getScale; - -/** FlinkRowConverter. */ -public class FlinkRowConverter implements RowData { - - private final RowData rowData; - - FlinkRowConverter(DataType eleType, Object flussField) { - this.rowData = copyInternalRow((InternalRow) flussField, (RowType) eleType); - } - - public RowData copyInternalRow(InternalRow row, RowType rowType) { - GenericRowData ret = new GenericRowData(row.getFieldCount()); - - for (int i = 0; i < row.getFieldCount(); ++i) { - DataType fieldType = rowType.getTypeAt(i); - FlussRowToFlinkRowConverter.FlussDeserializationConverter converter = - createInternalConverter(fieldType); - ret.setField(i, converter.deserialize(getFieldValue(row, i, fieldType))); - } - - return ret; - } - - @Override - public int getArity() { - return rowData.getArity(); - } - - @Override - public RowKind getRowKind() { - return rowData.getRowKind(); - } - - @Override - public void setRowKind(RowKind rowKind) { - rowData.setRowKind(rowKind); - } - - @Override - public boolean isNullAt(int i) { - return rowData.isNullAt(i); - } - - @Override - public boolean getBoolean(int i) { - return rowData.getBoolean(i); - } - - @Override - public byte getByte(int i) { - return rowData.getByte(i); - } - - @Override - public short getShort(int i) { - return rowData.getShort(i); - } - - @Override - public int getInt(int i) { - return rowData.getInt(i); - } - - @Override - public long getLong(int i) { - return rowData.getLong(i); - } - - @Override - public float getFloat(int i) { - return rowData.getFloat(i); - } - - @Override - public double getDouble(int i) { - return rowData.getDouble(i); - } - - @Override - public StringData getString(int i) { - return rowData.getString(i); - } - - @Override - public DecimalData getDecimal(int i, int i1, int i2) { - return rowData.getDecimal(i, i1, i2); - } - - @Override - public TimestampData getTimestamp(int i, int i1) { - return rowData.getTimestamp(i, i1); - } - - @Override - public <T> RawValueData<T> getRawValue(int i) { - return rowData.getRawValue(i); - } - - @Override - public byte[] getBinary(int i) { - return rowData.getBinary(i); - } - - @Override - public ArrayData getArray(int i) { - return rowData.getArray(i); - } - - @Override - public MapData getMap(int i) { - return rowData.getMap(i); - } - - @Override - public RowData getRow(int i, int i1) { - return rowData.getRow(i, i1); - } - - public RowData getRowData() { - return rowData; - } - - public static RowData deserialize(DataType flussDataType, Object flussField) { - return new FlinkRowConverter(flussDataType, flussField).getRowData(); - } - - private static Object getFieldValue(InternalRow row, int pos, DataType dataType) { - if (row.isNullAt(pos)) { - return null; - } - - switch (dataType.getTypeRoot()) { - case CHAR: - return row.getChar(pos, getLength(dataType)); - case STRING: - return row.getString(pos); - case BOOLEAN: - return row.getBoolean(pos); - case BINARY: - return row.getBinary(pos, getLength(dataType)); - case BYTES: - return row.getBytes(pos); - case DECIMAL: - return row.getDecimal(pos, getPrecision(dataType), getScale(dataType)); - case TINYINT: - return row.getByte(pos); - case SMALLINT: - return row.getShort(pos); - case INTEGER: - case DATE: - case TIME_WITHOUT_TIME_ZONE: - return row.getInt(pos); - case BIGINT: - return row.getLong(pos); - case FLOAT: - return row.getFloat(pos); - case DOUBLE: - return row.getDouble(pos); - case TIMESTAMP_WITHOUT_TIME_ZONE: - return row.getTimestampNtz(pos, getPrecision(dataType)); - case TIMESTAMP_WITH_LOCAL_TIME_ZONE: - return row.getTimestampLtz(pos, getPrecision(dataType)); - case ARRAY: - return row.getArray(pos); - // TODO: Add Map type support in future - case MAP: - throw new UnsupportedOperationException("Map type not supported yet"); - case ROW: - return row.getRow(pos, getFieldCount(dataType)); - default: - throw new IllegalArgumentException("Unsupported data type: " + dataType); - } - } -} diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlussRowToFlinkRowConverter.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlussRowToFlinkRowConverter.java index a0088f8e2..d27e66040 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlussRowToFlinkRowConverter.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlussRowToFlinkRowConverter.java @@ -20,13 +20,16 @@ package org.apache.fluss.flink.utils; import org.apache.fluss.record.LogRecord; import org.apache.fluss.row.BinaryString; import org.apache.fluss.row.Decimal; +import org.apache.fluss.row.InternalArray; import org.apache.fluss.row.InternalRow; import org.apache.fluss.row.TimestampLtz; import org.apache.fluss.row.TimestampNtz; +import org.apache.fluss.types.ArrayType; import org.apache.fluss.types.DataType; import org.apache.fluss.types.RowType; import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericArrayData; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; @@ -72,12 +75,12 @@ public class FlussRowToFlinkRowConverter { /** * Create a nullable runtime {@link FlussDeserializationConverter} from given {@link DataType}. */ - protected FlussDeserializationConverter createNullableInternalConverter( + protected static FlussDeserializationConverter createNullableInternalConverter( DataType flussDataType) { return wrapIntoNullableInternalConverter(createInternalConverter(flussDataType)); } - protected FlussDeserializationConverter wrapIntoNullableInternalConverter( + protected static FlussDeserializationConverter wrapIntoNullableInternalConverter( FlussDeserializationConverter flussDeserializationConverter) { return val -> { if (val == null) { @@ -143,12 +146,43 @@ public class FlussRowToFlinkRowConverter { timestampLtz.getNanoOfMillisecond()); }; case ARRAY: - return (flussField) -> FlinkArrayConverter.deserialize(flussDataType, flussField); + ArrayType arrayType = (ArrayType) flussDataType; + InternalArray.ElementGetter elementGetter = + InternalArray.createElementGetter(arrayType.getElementType()); + FlussDeserializationConverter elementConverter = + createNullableInternalConverter(arrayType.getElementType()); + return (flussField) -> { + InternalArray flussArray = (InternalArray) flussField; + int size = flussArray.size(); + Object[] flinkArray = new Object[size]; + for (int i = 0; i < size; i++) { + Object flussElement = elementGetter.getElementOrNull(flussArray, i); + flinkArray[i] = elementConverter.deserialize(flussElement); + } + return new GenericArrayData(flinkArray); + }; case MAP: // TODO: Add Map type support in future throw new UnsupportedOperationException("Map type not supported yet"); case ROW: - return (flussField) -> FlinkRowConverter.deserialize(flussDataType, flussField); + RowType rowType = (RowType) flussDataType; + int fieldCount = rowType.getFieldCount(); + InternalRow.FieldGetter[] fieldGetters = new InternalRow.FieldGetter[fieldCount]; + FlussDeserializationConverter[] fieldConverters = + new FlussDeserializationConverter[fieldCount]; + for (int i = 0; i < fieldCount; i++) { + fieldGetters[i] = InternalRow.createFieldGetter(rowType.getTypeAt(i), i); + fieldConverters[i] = createNullableInternalConverter(rowType.getTypeAt(i)); + } + return (flussField) -> { + InternalRow flussRow = (InternalRow) flussField; + GenericRowData flinkRowData = new GenericRowData(fieldCount); + for (int i = 0; i < fieldCount; i++) { + Object flussFieldValue = fieldGetters[i].getFieldOrNull(flussRow); + flinkRowData.setField(i, fieldConverters[i].deserialize(flussFieldValue)); + } + return flinkRowData; + }; default: throw new UnsupportedOperationException("Unsupported data type: " + flussDataType); } diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkComplexTypeITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkComplexTypeITCase.java index efa408a7f..7df298387 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkComplexTypeITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkComplexTypeITCase.java @@ -278,6 +278,32 @@ abstract class FlinkComplexTypeITCase extends AbstractTestBase { assertResultsIgnoreOrder(collected, expected, true); } + @Test + void testRowTypesInLogTable() throws Exception { + tEnv.executeSql( + "create table row_log_test (" + + "id int, " + + "simple_row row<a int, b string>, " + + "nested_row row<x int, y row<z int, w string>, v string>, " + + "array_of_rows array<row<a int, b string>>" + + ") with ('bucket.num' = '3')"); + + tEnv.executeSql( + "INSERT INTO row_log_test VALUES " + + "(1, ROW(10, 'hello'), ROW(20, ROW(30, 'nested'), 'row1'), " + + "ARRAY[ROW(1, 'a'), ROW(2, 'b')]), " + + "(2, ROW(40, 'world'), ROW(50, ROW(60, 'test'), 'row2'), " + + "ARRAY[ROW(3, 'c')])") + .await(); + + CloseableIterator<Row> rowIter = tEnv.executeSql("select * from row_log_test").collect(); + List<String> expectedRows = + Arrays.asList( + "+I[1, +I[10, hello], +I[20, +I[30, nested], row1], [+I[1, a], +I[2, b]]]", + "+I[2, +I[40, world], +I[50, +I[60, test], row2], [+I[3, c]]]"); + assertResultsIgnoreOrder(rowIter, expectedRows, true); + } + @Test void testExceptionsForArrayTypeUsage() { assertThatThrownBy( diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkRowToFlussRowConverterTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkRowToFlussRowConverterTest.java index 05261968f..c9379d8c1 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkRowToFlussRowConverterTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkRowToFlussRowConverterTest.java @@ -35,6 +35,7 @@ import java.math.BigDecimal; import java.math.BigInteger; import java.time.LocalDateTime; +import static org.apache.flink.table.data.binary.BinaryStringData.fromString; import static org.apache.fluss.flink.utils.FlinkConversions.toFlinkRowType; import static org.apache.fluss.row.TestInternalRowGenerator.createAllRowType; import static org.apache.fluss.row.indexed.IndexedRowTest.assertAllTypeEquals; @@ -51,7 +52,7 @@ public class FlinkRowToFlussRowConverterTest { try (FlinkRowToFlussRowConverter converter = FlinkRowToFlussRowConverter.create(toFlinkRowType(flussRowType))) { InternalRow internalRow = converter.toInternalRow(genRowDataForAllType()); - assertThat(internalRow.getFieldCount()).isEqualTo(21); + assertThat(internalRow.getFieldCount()).isEqualTo(23); assertAllTypeEquals(internalRow); } @@ -60,13 +61,13 @@ public class FlinkRowToFlussRowConverterTest { FlinkRowToFlussRowConverter.create( toFlinkRowType(flussRowType), KvFormat.COMPACTED)) { InternalRow internalRow = converter.toInternalRow(genRowDataForAllType()); - assertThat(internalRow.getFieldCount()).isEqualTo(21); + assertThat(internalRow.getFieldCount()).isEqualTo(23); assertAllTypeEquals(internalRow); } } private static RowData genRowDataForAllType() { - GenericRowData genericRowData = new GenericRowData(21); + GenericRowData genericRowData = new GenericRowData(23); genericRowData.setField(0, true); genericRowData.setField(1, (byte) 2); genericRowData.setField(2, Short.parseShort("10")); @@ -94,10 +95,24 @@ public class FlinkRowToFlussRowConverterTest { // 19: array genericRowData.setField( 19, new GenericArrayData(new Integer[] {1, 2, 3, 4, 5, -11, null, 444, 102234})); + genericRowData.setField( + 20, + new GenericArrayData( + new float[] {0.1f, 1.1f, -0.5f, 6.6f, Float.MAX_VALUE, Float.MIN_VALUE})); + genericRowData.setField( + 21, + new GenericArrayData( + new GenericArrayData[] { + new GenericArrayData( + new StringData[] {fromString("a"), null, fromString("c")}), + null, + new GenericArrayData( + new StringData[] {fromString("hello"), fromString("world")}) + })); - // 20: row (nested row with fields: u1: INT, u2: ROW(v1: INT), u3: STRING) + // 22: row (nested row with fields: u1: INT, u2: ROW(v1: INT), u3: STRING) genericRowData.setField( - 20, GenericRowData.of(123, GenericRowData.of(20), StringData.fromString("Test"))); + 22, GenericRowData.of(123, GenericRowData.of(22), StringData.fromString("Test"))); return genericRowData; } diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlussRowToFlinkRowConverterTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlussRowToFlinkRowConverterTest.java index a149a7aea..06e48ffa2 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlussRowToFlinkRowConverterTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlussRowToFlinkRowConverterTest.java @@ -20,6 +20,7 @@ package org.apache.fluss.flink.utils; import org.apache.fluss.client.table.scanner.ScanRecord; import org.apache.fluss.flink.row.FlinkAsFlussArray; import org.apache.fluss.record.ChangeType; +import org.apache.fluss.row.BinaryString; import org.apache.fluss.row.indexed.IndexedRow; import org.apache.fluss.row.indexed.IndexedRowWriter; import org.apache.fluss.types.DataType; @@ -36,6 +37,7 @@ import java.math.BigInteger; import java.time.LocalDate; import java.time.LocalTime; +import static org.apache.fluss.row.BinaryString.fromString; import static org.apache.fluss.row.TestInternalRowGenerator.createAllRowType; import static org.apache.fluss.row.TestInternalRowGenerator.createAllTypes; import static org.apache.fluss.row.indexed.IndexedRowTest.genRecordForAllTypes; @@ -100,12 +102,35 @@ class FlussRowToFlinkRowConverterTest { new FlinkAsFlussArray(flinkRow.getArray(19)).toObjectArray(DataTypes.INT()); assertThat(array1).isEqualTo(new Integer[] {1, 2, 3, 4, 5, -11, null, 444, 102234}); + // array of float + Float[] array2 = + new FlinkAsFlussArray(flinkRow.getArray(20)).toObjectArray(DataTypes.FLOAT()); + assertThat(array2) + .isEqualTo( + new Float[] { + 0.1f, 1.1f, -0.5f, 6.6f, Float.MAX_VALUE, Float.MIN_VALUE + }); + + // array of string + assertThat(flinkRow.getArray(21).size()).isEqualTo(3); + BinaryString[] stringArray1 = + new FlinkAsFlussArray(flinkRow.getArray(21).getArray(0)) + .toObjectArray(DataTypes.STRING()); + assertThat(stringArray1) + .isEqualTo(new BinaryString[] {fromString("a"), null, fromString("c")}); + assertThat(flinkRow.getArray(21).isNullAt(1)).isTrue(); + BinaryString[] stringArray2 = + new FlinkAsFlussArray(flinkRow.getArray(21).getArray(2)) + .toObjectArray(DataTypes.STRING()); + assertThat(stringArray2) + .isEqualTo(new BinaryString[] {fromString("hello"), fromString("world")}); + // nested row: ROW<u1: INT, u2: ROW<v1: INT>, u3: STRING> - assertThat(flinkRow.getRow(20, 3)).isNotNull(); - assertThat(flinkRow.getRow(20, 3).getInt(0)).isEqualTo(123); - assertThat(flinkRow.getRow(20, 3).getRow(1, 1)).isNotNull(); - assertThat(flinkRow.getRow(20, 3).getRow(1, 1).getInt(0)).isEqualTo(20); - assertThat(flinkRow.getRow(20, 3).getString(2).toString()).isEqualTo("Test"); + assertThat(flinkRow.getRow(22, 3)).isNotNull(); + assertThat(flinkRow.getRow(22, 3).getInt(0)).isEqualTo(123); + assertThat(flinkRow.getRow(22, 3).getRow(1, 1)).isNotNull(); + assertThat(flinkRow.getRow(22, 3).getRow(1, 1).getInt(0)).isEqualTo(22); + assertThat(flinkRow.getRow(22, 3).getString(2).toString()).isEqualTo("Test"); } } } diff --git a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java index ac1755d44..dd76bd2ec 100644 --- a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java +++ b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java @@ -157,7 +157,6 @@ public class IcebergRecordAsFlussRow implements InternalRow { @Override public InternalRow getRow(int pos, int numFields) { - // TODO: Support Row type conversion from Iceberg to Fluss - return null; + throw new UnsupportedOperationException(); } } diff --git a/fluss-lake/fluss-lake-lance/pom.xml b/fluss-lake/fluss-lake-lance/pom.xml index c4c35093f..058554dd3 100644 --- a/fluss-lake/fluss-lake-lance/pom.xml +++ b/fluss-lake/fluss-lake-lance/pom.xml @@ -208,14 +208,6 @@ <include>*:*</include> </includes> </artifactSet> - <filters> - <filter> - <artifact>*</artifact> - <excludes> - <exclude>LICENSE*</exclude> - </excludes> - </filter> - </filters> </configuration> </execution> </executions> diff --git a/fluss-server/pom.xml b/fluss-server/pom.xml index 3bd653f93..85e18b2c4 100644 --- a/fluss-server/pom.xml +++ b/fluss-server/pom.xml @@ -133,14 +133,6 @@ <include>*:*</include> </includes> </artifactSet> - <filters> - <filter> - <artifact>*</artifact> - <excludes> - <exclude>LICENSE*</exclude> - </excludes> - </filter> - </filters> <relocations> <relocation> <pattern>org.apache.commons</pattern> diff --git a/pom.xml b/pom.xml index 247040141..80d4957c8 100644 --- a/pom.xml +++ b/pom.xml @@ -865,6 +865,9 @@ <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> + <!-- Exclude Eclipse license files from dependencies --> + <exclude>LICENSE-EPL-1.0.txt</exclude> + <exclude>LICENSE-EDL-1.0.txt</exclude> <!-- META-INF/maven can contain 2 things: - For archetypes, it contains an archetype-metadata.xml. - For other jars, it contains the pom for all dependencies under the respective <groupId>/<artifactId>/ directory.
