This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-0.9
in repository https://gitbox.apache.org/repos/asf/fluss.git

commit 9a6e4c846de39e778c88ca0fca6b45975039b326
Author: Junbo Wang <[email protected]>
AuthorDate: Wed Feb 11 20:58:26 2026 +0800

    [lake/iceberg] support RowType conversion in FlussArrayAsIcebergList (#2649)
---
 .../iceberg/source/FlussArrayAsIcebergList.java    | 13 ++++++
 .../source/FlussRowAsIcebergRecordTest.java        | 46 +++++++++++++++++++---
 2 files changed, 54 insertions(+), 5 deletions(-)

diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussArrayAsIcebergList.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussArrayAsIcebergList.java
index 285c8b1c5..c05cbd751 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussArrayAsIcebergList.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussArrayAsIcebergList.java
@@ -17,8 +17,10 @@
 
 package org.apache.fluss.lake.iceberg.source;
 
+import org.apache.fluss.lake.iceberg.FlussDataTypeToIcebergDataType;
 import org.apache.fluss.row.InternalArray;
 import org.apache.fluss.row.InternalMap;
+import org.apache.fluss.row.InternalRow;
 import org.apache.fluss.types.ArrayType;
 import org.apache.fluss.types.BigIntType;
 import org.apache.fluss.types.BinaryType;
@@ -33,6 +35,7 @@ import org.apache.fluss.types.FloatType;
 import org.apache.fluss.types.IntType;
 import org.apache.fluss.types.LocalZonedTimestampType;
 import org.apache.fluss.types.MapType;
+import org.apache.fluss.types.RowType;
 import org.apache.fluss.types.SmallIntType;
 import org.apache.fluss.types.StringType;
 import org.apache.fluss.types.TimeType;
@@ -40,6 +43,8 @@ import org.apache.fluss.types.TimestampType;
 import org.apache.fluss.types.TinyIntType;
 import org.apache.fluss.utils.DateTimeUtils;
 
+import org.apache.iceberg.types.Types;
+
 import java.nio.ByteBuffer;
 import java.time.Instant;
 import java.time.OffsetDateTime;
@@ -113,6 +118,14 @@ public class FlussArrayAsIcebergList extends 
AbstractList<Object> {
                     ? null
                     : new FlussMapAsIcebergMap(
                             internalMap, mapType.getKeyType(), 
mapType.getValueType());
+        } else if (elementType instanceof RowType) {
+            RowType rowType = (RowType) elementType;
+            InternalRow internalRow = flussArray.getRow(index, 
rowType.getFieldCount());
+            Types.StructType nestedStructType =
+                    (Types.StructType) 
rowType.accept(FlussDataTypeToIcebergDataType.INSTANCE);
+            return internalRow == null
+                    ? null
+                    : new FlussRowAsIcebergRecord(nestedStructType, rowType, 
internalRow);
         } else {
             throw new UnsupportedOperationException(
                     "Unsupported array element type conversion for Fluss type: 
"
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecordTest.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecordTest.java
index 720bc2991..8e35c38af 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecordTest.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecordTest.java
@@ -336,13 +336,24 @@ class FlussRowAsIcebergRecordTest {
                                                 "ids",
                                                 Types.ListType.ofRequired(
                                                         10, 
Types.IntegerType.get())))),
+                        // array of row (ARRAY<ROW<type STRING, value STRING>>)
+                        Types.NestedField.required(
+                                11,
+                                "contacts",
+                                Types.ListType.ofRequired(
+                                        12,
+                                        Types.StructType.of(
+                                                Types.NestedField.required(
+                                                        13, "type", 
Types.StringType.get()),
+                                                Types.NestedField.required(
+                                                        14, "value", 
Types.StringType.get())))),
                         // nullable row
                         Types.NestedField.optional(
-                                11,
+                                15,
                                 "nullable_row",
                                 Types.StructType.of(
                                         Types.NestedField.required(
-                                                12, "id", 
Types.IntegerType.get()))));
+                                                16, "id", 
Types.IntegerType.get()))));
 
         RowType flussRowType =
                 RowType.of(
@@ -360,10 +371,15 @@ class FlussRowAsIcebergRecordTest {
                                                 DataTypes.FIELD("flag", 
DataTypes.BOOLEAN())))),
                         // row_with array
                         DataTypes.ROW(DataTypes.FIELD("ids", 
DataTypes.ARRAY(DataTypes.INT()))),
+                        // array of row
+                        DataTypes.ARRAY(
+                                DataTypes.ROW(
+                                        DataTypes.FIELD("type", 
DataTypes.STRING()),
+                                        DataTypes.FIELD("value", 
DataTypes.STRING()))),
                         // nullable row
                         DataTypes.ROW(DataTypes.FIELD("id", DataTypes.INT())));
 
-        GenericRow genericRow = new GenericRow(4);
+        GenericRow genericRow = new GenericRow(5);
 
         // Simple Row
         GenericRow simpleRow = new GenericRow(2);
@@ -385,8 +401,17 @@ class FlussRowAsIcebergRecordTest {
         rowWithArray.setField(0, new GenericArray(new int[] {1, 2, 3}));
         genericRow.setField(2, rowWithArray);
 
+        // Array of Row
+        GenericRow contact1 = new GenericRow(2);
+        contact1.setField(0, BinaryString.fromString("email"));
+        contact1.setField(1, BinaryString.fromString("[email protected]"));
+        GenericRow contact2 = new GenericRow(2);
+        contact2.setField(0, BinaryString.fromString("phone"));
+        contact2.setField(1, BinaryString.fromString("123-456-7890"));
+        genericRow.setField(3, new GenericArray(new Object[] {contact1, 
contact2}));
+
         // Nullable Row
-        genericRow.setField(3, null);
+        genericRow.setField(4, null);
 
         FlussRowAsIcebergRecord record = new 
FlussRowAsIcebergRecord(structType, flussRowType);
         record.internalRow = genericRow;
@@ -411,8 +436,19 @@ class FlussRowAsIcebergRecordTest {
         assertThat(ids.get(1)).isEqualTo(2);
         assertThat(ids.get(2)).isEqualTo(3);
 
+        // Verify Array of Row (ARRAY<ROW<type STRING, value STRING>>)
+        List<?> contacts = (List<?>) record.get(3);
+        assertThat(contacts).isNotNull();
+        assertThat(contacts).hasSize(2);
+        Record icebergContact1 = (Record) contacts.get(0);
+        assertThat(icebergContact1.get(0)).isEqualTo("email");
+        assertThat(icebergContact1.get(1)).isEqualTo("[email protected]");
+        Record icebergContact2 = (Record) contacts.get(1);
+        assertThat(icebergContact2.get(0)).isEqualTo("phone");
+        assertThat(icebergContact2.get(1)).isEqualTo("123-456-7890");
+
         // Verify Nullable Row
-        assertThat(record.get(3)).isNull();
+        assertThat(record.get(4)).isNull();
     }
 
     @SuppressWarnings("unchecked")

Reply via email to