wuchong commented on code in PR #2079:
URL: https://github.com/apache/fluss/pull/2079#discussion_r2640519748
##########
fluss-common/pom.xml:
##########
@@ -62,7 +62,14 @@
<artifactId>fluss-shaded-arrow</artifactId>
</dependency>
- <!-- TODO: these two dependencies need to be shaded. -->
+ <!-- TODO: these three dependencies need to be shaded. -->
+ <!-- Use the Arrow compatible version -->
+ <dependency>
+ <groupId>org.eclipse.collections</groupId>
+ <artifactId>eclipse-collections</artifactId>
+ <version>11.1.0</version>
Review Comment:
We don't need this, this should be removed.
##########
fluss-common/src/main/java/org/apache/fluss/row/indexed/IndexedRowWriter.java:
##########
@@ -319,6 +326,103 @@ private void writeVarLengthToVarLengthList(int length) {
//
------------------------------------------------------------------------------------------
+ /**
+ * Creates an accessor for writing the elements of an indexed row writer
during runtime.
+ *
+ * @param fieldType the field type of the indexed row
+ */
+ public static FieldWriter createFieldWriter(DataType fieldType) {
Review Comment:
This is a duplicated code block, this can be replaced by
`BinaryWriter.createValueWriter(fieldType)`.
##########
fluss-client/pom.xml:
##########
@@ -113,6 +113,14 @@
<include>*:*</include>
</includes>
</artifactSet>
+ <filters>
+ <filter>
+ <artifact>*</artifact>
+ <excludes>
+ <exclude>LICENSE*</exclude>
Review Comment:
Why exclude all the LICENSE files?
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkArrayConverter.java:
##########
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.utils;
+
+import org.apache.fluss.row.InternalArray;
+import org.apache.fluss.types.ArrayType;
+import org.apache.fluss.types.DataType;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+
+import static
org.apache.fluss.flink.utils.FlussRowToFlinkRowConverter.createInternalConverter;
+import static org.apache.fluss.types.DataTypeChecks.getLength;
+import static org.apache.fluss.types.DataTypeChecks.getPrecision;
+import static org.apache.fluss.types.DataTypeChecks.getScale;
+
+/** Flink Array Converter. */
+public class FlinkArrayConverter implements ArrayData {
+ private final ArrayData arrayData;
+
+ FlinkArrayConverter(DataType flussDataType, Object flussField) {
+ DataType eleType = ((ArrayType) flussDataType).getElementType();
+ this.arrayData = copyArray((org.apache.fluss.row.InternalArray)
flussField, eleType);
+ }
+
+ private ArrayData copyArray(org.apache.fluss.row.InternalArray from,
DataType eleType) {
+ FlussRowToFlinkRowConverter.FlussDeserializationConverter converter =
+ createInternalConverter(eleType);
+ if (!eleType.isNullable()) {
+ switch (eleType.getTypeRoot()) {
+ case BOOLEAN:
+ return new GenericArrayData(from.toBooleanArray());
+ case TINYINT:
+ return new GenericArrayData(from.toByteArray());
+ case SMALLINT:
+ return new GenericArrayData(from.toShortArray());
+ case INTEGER:
+ case DATE:
+ case TIME_WITHOUT_TIME_ZONE:
+ return new GenericArrayData(from.toIntArray());
+ case BIGINT:
+ return new GenericArrayData(from.toLongArray());
+ case FLOAT:
+ return new GenericArrayData(from.toFloatArray());
+ case DOUBLE:
+ return new GenericArrayData(from.toDoubleArray());
+ }
+ }
+
+ Object[] newArray = new Object[from.size()];
+
+ for (int i = 0; i < newArray.length; ++i) {
+ if (!from.isNullAt(i)) {
+ newArray[i] = converter.deserialize(getFieldValue(from, i,
eleType));
+ } else {
+ newArray[i] = null;
+ }
+ }
+
+ return new GenericArrayData(newArray);
+ }
+
+ @Override
+ public int size() {
+ return arrayData.size();
+ }
+
+ @Override
+ public boolean isNullAt(int i) {
+ return arrayData.isNullAt(i);
+ }
+
+ @Override
+ public boolean getBoolean(int i) {
+ return arrayData.getBoolean(i);
+ }
+
+ @Override
+ public byte getByte(int i) {
+ return arrayData.getByte(i);
+ }
+
+ @Override
+ public short getShort(int i) {
+ return arrayData.getShort(i);
+ }
+
+ @Override
+ public int getInt(int i) {
+ return arrayData.getInt(i);
+ }
+
+ @Override
+ public long getLong(int i) {
+ return arrayData.getLong(i);
+ }
+
+ @Override
+ public float getFloat(int i) {
+ return arrayData.getFloat(i);
+ }
+
+ @Override
+ public double getDouble(int i) {
+ return arrayData.getDouble(i);
+ }
+
+ @Override
+ public StringData getString(int i) {
+ return arrayData.getString(i);
+ }
+
+ @Override
+ public DecimalData getDecimal(int i, int i1, int i2) {
+ return arrayData.getDecimal(i, i1, i2);
+ }
+
+ @Override
+ public TimestampData getTimestamp(int i, int i1) {
+ return arrayData.getTimestamp(i, i1);
+ }
+
+ @Override
+ public <T> RawValueData<T> getRawValue(int i) {
+ return arrayData.getRawValue(i);
+ }
+
+ @Override
+ public byte[] getBinary(int i) {
+ return arrayData.getBinary(i);
+ }
+
+ @Override
+ public ArrayData getArray(int i) {
+ return arrayData.getArray(i);
+ }
+
+ @Override
+ public MapData getMap(int i) {
+ return arrayData.getMap(i);
+ }
+
+ @Override
+ public RowData getRow(int i, int i1) {
+ return arrayData.getRow(i, i1);
+ }
+
+ @Override
+ public boolean[] toBooleanArray() {
+ return arrayData.toBooleanArray();
+ }
+
+ @Override
+ public byte[] toByteArray() {
+ return arrayData.toByteArray();
+ }
+
+ @Override
+ public short[] toShortArray() {
+ return arrayData.toShortArray();
+ }
+
+ @Override
+ public int[] toIntArray() {
+ return arrayData.toIntArray();
+ }
+
+ @Override
+ public long[] toLongArray() {
+ return arrayData.toLongArray();
+ }
+
+ @Override
+ public float[] toFloatArray() {
+ return arrayData.toFloatArray();
+ }
+
+ @Override
+ public double[] toDoubleArray() {
+ return arrayData.toDoubleArray();
+ }
+
+ public ArrayData getArrayData() {
+ return arrayData;
+ }
+
+ public static ArrayData deserialize(DataType flussDataType, Object
flussField) {
+ return new FlinkArrayConverter(flussDataType,
flussField).getArrayData();
+ }
+
+ private static Object getFieldValue(InternalArray array, int pos, DataType
dataType) {
Review Comment:
We don't need to do the element getter again, just use the
`InternalArray.createElementGetter`.
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkArrayConverter.java:
##########
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.utils;
+
+import org.apache.fluss.row.InternalArray;
+import org.apache.fluss.types.ArrayType;
+import org.apache.fluss.types.DataType;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+
+import static
org.apache.fluss.flink.utils.FlussRowToFlinkRowConverter.createInternalConverter;
+import static org.apache.fluss.types.DataTypeChecks.getLength;
+import static org.apache.fluss.types.DataTypeChecks.getPrecision;
+import static org.apache.fluss.types.DataTypeChecks.getScale;
+
+/** Flink Array Converter. */
+public class FlinkArrayConverter implements ArrayData {
+ private final ArrayData arrayData;
+
+ FlinkArrayConverter(DataType flussDataType, Object flussField) {
+ DataType eleType = ((ArrayType) flussDataType).getElementType();
+ this.arrayData = copyArray((org.apache.fluss.row.InternalArray)
flussField, eleType);
+ }
+
+ private ArrayData copyArray(org.apache.fluss.row.InternalArray from,
DataType eleType) {
+ FlussRowToFlinkRowConverter.FlussDeserializationConverter converter =
+ createInternalConverter(eleType);
+ if (!eleType.isNullable()) {
+ switch (eleType.getTypeRoot()) {
+ case BOOLEAN:
+ return new GenericArrayData(from.toBooleanArray());
+ case TINYINT:
+ return new GenericArrayData(from.toByteArray());
+ case SMALLINT:
+ return new GenericArrayData(from.toShortArray());
+ case INTEGER:
+ case DATE:
+ case TIME_WITHOUT_TIME_ZONE:
+ return new GenericArrayData(from.toIntArray());
+ case BIGINT:
+ return new GenericArrayData(from.toLongArray());
+ case FLOAT:
+ return new GenericArrayData(from.toFloatArray());
+ case DOUBLE:
+ return new GenericArrayData(from.toDoubleArray());
+ }
+ }
Review Comment:
Actually, the nullability information in Flinkās `DataType` is not always
reliable. To ensure robustness, we should wrap this code block in a try-catch
and fall back to using the element `converter` if an exception occurs. This
guarantees safe execution even when nullability metadata is inaccurate.
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlussRowToFlinkRowConverter.java:
##########
@@ -146,21 +143,12 @@ private FlussDeserializationConverter
createInternalConverter(DataType flussData
timestampLtz.getNanoOfMillisecond());
};
case ARRAY:
- ArrayType arrayType = (ArrayType) flussDataType;
- InternalArray.ElementGetter elementGetter =
-
InternalArray.createElementGetter(arrayType.getElementType());
- FlussDeserializationConverter elementConverter =
-
createNullableInternalConverter(arrayType.getElementType());
- return (flussField) -> {
- InternalArray flussArray = (InternalArray) flussField;
- int size = flussArray.size();
- Object[] flinkArray = new Object[size];
- for (int i = 0; i < size; i++) {
- Object flussElement =
elementGetter.getElementOrNull(flussArray, i);
- flinkArray[i] =
elementConverter.deserialize(flussElement);
- }
- return new GenericArrayData(flinkArray);
- };
+ return (flussField) ->
FlinkArrayConverter.deserialize(flussDataType, flussField);
+ case MAP:
+ // TODO: Add Map type support in future
+ throw new UnsupportedOperationException("Map type not
supported yet");
+ case ROW:
+ return (flussField) ->
FlinkRowConverter.deserialize(flussDataType, flussField);
Review Comment:
Actually, I still prefer the original implementation of the conversion which
is very clean by using `elementGetter` and `elementConverter`. Introducing
`FlinkArrayConverter` and `FlinkRowConverter` seems unnecessary, and they don't
need to implement `ArrayData` and `RowData` interfaces.
##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkRowToFlussRowConverterTest.java:
##########
@@ -91,22 +90,15 @@ private static RowData genRowDataForAllType() {
genericRowData.setField(
18,
TimestampData.fromLocalDateTime(LocalDateTime.parse("2023-10-25T12:01:13.182")));
+
+ // 19: array
genericRowData.setField(
19, new GenericArrayData(new Integer[] {1, 2, 3, 4, 5, -11,
null, 444, 102234}));
+
+ // 20: row (nested row with fields: u1: INT, u2: ROW(v1: INT), u3:
STRING)
genericRowData.setField(
- 20,
- new GenericArrayData(
- new float[] {0.1f, 1.1f, -0.5f, 6.6f, Float.MAX_VALUE,
Float.MIN_VALUE}));
- genericRowData.setField(
- 21,
- new GenericArrayData(
- new GenericArrayData[] {
- new GenericArrayData(
- new StringData[] {fromString("a"), null,
fromString("c")}),
- null,
- new GenericArrayData(
- new StringData[] {fromString("hello"),
fromString("world")})
- }));
Review Comment:
Why removes the original array test data? I think they are useful to our
test.
##########
fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java:
##########
@@ -155,6 +155,9 @@ public InternalArray getArray(int pos) {
throw new UnsupportedOperationException();
}
- // TODO: Support Map type conversion from Iceberg to Fluss
- // TODO: Support Row type conversion from Iceberg to Fluss
+ @Override
+ public InternalRow getRow(int pos, int numFields) {
+ // TODO: Support Row type conversion from Iceberg to Fluss
+ return null;
Review Comment:
throw UnsupportedOperationException
##########
fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java:
##########
@@ -155,6 +155,9 @@ public InternalArray getArray(int pos) {
throw new UnsupportedOperationException();
}
- // TODO: Support Map type conversion from Iceberg to Fluss
- // TODO: Support Row type conversion from Iceberg to Fluss
+ @Override
+ public InternalRow getRow(int pos, int numFields) {
+ // TODO: Support Row type conversion from Iceberg to Fluss
+ return null;
Review Comment:
throw `UnsupportedOperationException`
##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlussRowToFlinkRowConverterTest.java:
##########
@@ -102,28 +100,12 @@ void testConverter() throws Exception {
new
FlinkAsFlussArray(flinkRow.getArray(19)).toObjectArray(DataTypes.INT());
assertThat(array1).isEqualTo(new Integer[] {1, 2, 3, 4, 5, -11,
null, 444, 102234});
- // array of float
- Float[] array2 =
- new
FlinkAsFlussArray(flinkRow.getArray(20)).toObjectArray(DataTypes.FLOAT());
- assertThat(array2)
- .isEqualTo(
- new Float[] {
- 0.1f, 1.1f, -0.5f, 6.6f, Float.MAX_VALUE,
Float.MIN_VALUE
- });
-
- // array of string
- assertThat(flinkRow.getArray(21).size()).isEqualTo(3);
- BinaryString[] stringArray1 =
- new FlinkAsFlussArray(flinkRow.getArray(21).getArray(0))
- .toObjectArray(DataTypes.STRING());
- assertThat(stringArray1)
- .isEqualTo(new BinaryString[] {fromString("a"), null,
fromString("c")});
- assertThat(flinkRow.getArray(21).isNullAt(1)).isTrue();
- BinaryString[] stringArray2 =
- new FlinkAsFlussArray(flinkRow.getArray(21).getArray(2))
- .toObjectArray(DataTypes.STRING());
- assertThat(stringArray2)
- .isEqualTo(new BinaryString[] {fromString("hello"),
fromString("world")});
Review Comment:
ditto. shouldn't remove
##########
fluss-common/src/test/java/org/apache/fluss/row/TestInternalRowGenerator.java:
##########
@@ -62,20 +62,14 @@ public static RowType createAllRowType() {
new DataField("f17", DataTypes.TIMESTAMP_LTZ(1)),
new DataField("f18", DataTypes.TIMESTAMP_LTZ(5)),
new DataField("f19", DataTypes.ARRAY(DataTypes.INT())),
+ // TODO: Add Map and Row fields in Issue #1973
new DataField(
"f20",
- DataTypes.ARRAY(DataTypes.FLOAT().copy(false))), //
vector embedding type
- new DataField(
- "f21",
DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.STRING()))) // nested array
Review Comment:
Why removes the nested array test data? I think they are very useful.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]