This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 90a3be3b1 [lake/iceberg] support RowType conversion in
FlussArrayAsIcebergList (#2649)
90a3be3b1 is described below
commit 90a3be3b1fe206aeeb90347601e03c2f6fa5d0c7
Author: Junbo Wang <[email protected]>
AuthorDate: Wed Feb 11 20:58:26 2026 +0800
[lake/iceberg] support RowType conversion in FlussArrayAsIcebergList (#2649)
---
.../iceberg/source/FlussArrayAsIcebergList.java | 13 ++++++
.../source/FlussRowAsIcebergRecordTest.java | 46 +++++++++++++++++++---
2 files changed, 54 insertions(+), 5 deletions(-)
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussArrayAsIcebergList.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussArrayAsIcebergList.java
index 285c8b1c5..c05cbd751 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussArrayAsIcebergList.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussArrayAsIcebergList.java
@@ -17,8 +17,10 @@
package org.apache.fluss.lake.iceberg.source;
+import org.apache.fluss.lake.iceberg.FlussDataTypeToIcebergDataType;
import org.apache.fluss.row.InternalArray;
import org.apache.fluss.row.InternalMap;
+import org.apache.fluss.row.InternalRow;
import org.apache.fluss.types.ArrayType;
import org.apache.fluss.types.BigIntType;
import org.apache.fluss.types.BinaryType;
@@ -33,6 +35,7 @@ import org.apache.fluss.types.FloatType;
import org.apache.fluss.types.IntType;
import org.apache.fluss.types.LocalZonedTimestampType;
import org.apache.fluss.types.MapType;
+import org.apache.fluss.types.RowType;
import org.apache.fluss.types.SmallIntType;
import org.apache.fluss.types.StringType;
import org.apache.fluss.types.TimeType;
@@ -40,6 +43,8 @@ import org.apache.fluss.types.TimestampType;
import org.apache.fluss.types.TinyIntType;
import org.apache.fluss.utils.DateTimeUtils;
+import org.apache.iceberg.types.Types;
+
import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.OffsetDateTime;
@@ -113,6 +118,14 @@ public class FlussArrayAsIcebergList extends
AbstractList<Object> {
? null
: new FlussMapAsIcebergMap(
internalMap, mapType.getKeyType(),
mapType.getValueType());
+ } else if (elementType instanceof RowType) {
+ RowType rowType = (RowType) elementType;
+ InternalRow internalRow = flussArray.getRow(index,
rowType.getFieldCount());
+ Types.StructType nestedStructType =
+ (Types.StructType)
rowType.accept(FlussDataTypeToIcebergDataType.INSTANCE);
+ return internalRow == null
+ ? null
+ : new FlussRowAsIcebergRecord(nestedStructType, rowType,
internalRow);
} else {
throw new UnsupportedOperationException(
"Unsupported array element type conversion for Fluss type:
"
diff --git
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecordTest.java
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecordTest.java
index 720bc2991..8e35c38af 100644
---
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecordTest.java
+++
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecordTest.java
@@ -336,13 +336,24 @@ class FlussRowAsIcebergRecordTest {
"ids",
Types.ListType.ofRequired(
10,
Types.IntegerType.get())))),
+ // array of row (ARRAY<ROW<type STRING, value STRING>>)
+ Types.NestedField.required(
+ 11,
+ "contacts",
+ Types.ListType.ofRequired(
+ 12,
+ Types.StructType.of(
+ Types.NestedField.required(
+ 13, "type",
Types.StringType.get()),
+ Types.NestedField.required(
+ 14, "value",
Types.StringType.get())))),
// nullable row
Types.NestedField.optional(
- 11,
+ 15,
"nullable_row",
Types.StructType.of(
Types.NestedField.required(
- 12, "id",
Types.IntegerType.get()))));
+ 16, "id",
Types.IntegerType.get()))));
RowType flussRowType =
RowType.of(
@@ -360,10 +371,15 @@ class FlussRowAsIcebergRecordTest {
DataTypes.FIELD("flag",
DataTypes.BOOLEAN())))),
// row_with array
DataTypes.ROW(DataTypes.FIELD("ids",
DataTypes.ARRAY(DataTypes.INT()))),
+ // array of row
+ DataTypes.ARRAY(
+ DataTypes.ROW(
+ DataTypes.FIELD("type",
DataTypes.STRING()),
+ DataTypes.FIELD("value",
DataTypes.STRING()))),
// nullable row
DataTypes.ROW(DataTypes.FIELD("id", DataTypes.INT())));
- GenericRow genericRow = new GenericRow(4);
+ GenericRow genericRow = new GenericRow(5);
// Simple Row
GenericRow simpleRow = new GenericRow(2);
@@ -385,8 +401,17 @@ class FlussRowAsIcebergRecordTest {
rowWithArray.setField(0, new GenericArray(new int[] {1, 2, 3}));
genericRow.setField(2, rowWithArray);
+ // Array of Row
+ GenericRow contact1 = new GenericRow(2);
+ contact1.setField(0, BinaryString.fromString("email"));
+ contact1.setField(1, BinaryString.fromString("[email protected]"));
+ GenericRow contact2 = new GenericRow(2);
+ contact2.setField(0, BinaryString.fromString("phone"));
+ contact2.setField(1, BinaryString.fromString("123-456-7890"));
+ genericRow.setField(3, new GenericArray(new Object[] {contact1,
contact2}));
+
// Nullable Row
- genericRow.setField(3, null);
+ genericRow.setField(4, null);
FlussRowAsIcebergRecord record = new
FlussRowAsIcebergRecord(structType, flussRowType);
record.internalRow = genericRow;
@@ -411,8 +436,19 @@ class FlussRowAsIcebergRecordTest {
assertThat(ids.get(1)).isEqualTo(2);
assertThat(ids.get(2)).isEqualTo(3);
+ // Verify Array of Row (ARRAY<ROW<type STRING, value STRING>>)
+ List<?> contacts = (List<?>) record.get(3);
+ assertThat(contacts).isNotNull();
+ assertThat(contacts).hasSize(2);
+ Record icebergContact1 = (Record) contacts.get(0);
+ assertThat(icebergContact1.get(0)).isEqualTo("email");
+ assertThat(icebergContact1.get(1)).isEqualTo("[email protected]");
+ Record icebergContact2 = (Record) contacts.get(1);
+ assertThat(icebergContact2.get(0)).isEqualTo("phone");
+ assertThat(icebergContact2.get(1)).isEqualTo("123-456-7890");
+
// Verify Nullable Row
- assertThat(record.get(3)).isNull();
+ assertThat(record.get(4)).isNull();
}
@SuppressWarnings("unchecked")