This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 3b74d6b30 [lake/iceberg] Support nested row types for Iceberg tiering 
(#2278)
3b74d6b30 is described below

commit 3b74d6b302217b208394330bd9fcb447c4e40543
Author: SeungMin <[email protected]>
AuthorDate: Fri Jan 9 15:32:09 2026 +0900

    [lake/iceberg] Support nested row types for Iceberg tiering (#2278)
---
 .../iceberg/FlussDataTypeToIcebergDataType.java    |  28 ++++-
 .../iceberg/source/FlussRowAsIcebergRecord.java    |  10 ++
 .../iceberg/source/IcebergRecordAsFlussRow.java    |  19 +++-
 .../lake/iceberg/utils/IcebergConversions.java     |  11 ++
 .../flink/FlinkUnionReadLogTableITCase.java        |  19 +++-
 .../flink/FlinkUnionReadPrimaryKeyTableITCase.java |  54 +++++++---
 .../source/FlussRowAsIcebergRecordTest.java        | 120 +++++++++++++++++++++
 .../source/IcebergRecordAsFlussRowTest.java        |  58 +++++++++-
 8 files changed, 294 insertions(+), 25 deletions(-)

diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java
index 1c093482a..bd01d08af 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java
@@ -23,6 +23,7 @@ import org.apache.fluss.types.BinaryType;
 import org.apache.fluss.types.BooleanType;
 import org.apache.fluss.types.BytesType;
 import org.apache.fluss.types.CharType;
+import org.apache.fluss.types.DataField;
 import org.apache.fluss.types.DataTypeVisitor;
 import org.apache.fluss.types.DateType;
 import org.apache.fluss.types.DecimalType;
@@ -41,6 +42,9 @@ import org.apache.fluss.types.TinyIntType;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
 
+import java.util.ArrayList;
+import java.util.List;
+
 /** Convert from Fluss's data type to Iceberg's data type. */
 public class FlussDataTypeToIcebergDataType implements DataTypeVisitor<Type> {
 
@@ -168,6 +172,28 @@ public class FlussDataTypeToIcebergDataType implements 
DataTypeVisitor<Type> {
 
     @Override
     public Type visit(RowType rowType) {
-        throw new UnsupportedOperationException("Unsupported row type");
+        List<Types.NestedField> fields = new ArrayList<>();
+
+        for (DataField field : rowType.getFields()) {
+            Type fieldType = field.getType().accept(this);
+
+            if (field.getType().isNullable()) {
+                fields.add(
+                        Types.NestedField.optional(
+                                getNextId(),
+                                field.getName(),
+                                fieldType,
+                                field.getDescription().orElse(null)));
+            } else {
+                fields.add(
+                        Types.NestedField.required(
+                                getNextId(),
+                                field.getName(),
+                                fieldType,
+                                field.getDescription().orElse(null)));
+            }
+        }
+
+        return Types.StructType.of(fields);
     }
 }
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecord.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecord.java
index 70dc8ea6f..6d7070001 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecord.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecord.java
@@ -17,6 +17,7 @@
 
 package org.apache.fluss.lake.iceberg.source;
 
+import org.apache.fluss.lake.iceberg.FlussDataTypeToIcebergDataType;
 import org.apache.fluss.row.InternalArray;
 import org.apache.fluss.row.InternalRow;
 import org.apache.fluss.types.ArrayType;
@@ -179,6 +180,15 @@ public class FlussRowAsIcebergRecord implements Record {
                         ? null
                         : new FlussArrayAsIcebergList(array, 
arrayType.getElementType());
             };
+        } else if (flussType instanceof RowType) {
+            RowType rowType = (RowType) flussType;
+            Types.StructType nestedStructType =
+                    (Types.StructType) 
rowType.accept(FlussDataTypeToIcebergDataType.INSTANCE);
+
+            return row -> {
+                InternalRow nestedRow = row.getRow(pos, 
rowType.getFieldCount());
+                return new FlussRowAsIcebergRecord(nestedStructType, rowType, 
nestedRow);
+            };
         } else {
             throw new UnsupportedOperationException(
                     "Unsupported data type conversion for Fluss type: "
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java
index 8143433b0..2a560e8d1 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java
@@ -44,6 +44,10 @@ public class IcebergRecordAsFlussRow implements InternalRow {
 
     public IcebergRecordAsFlussRow() {}
 
+    public IcebergRecordAsFlussRow(Record icebergRecord) {
+        this.icebergRecord = icebergRecord;
+    }
+
     public IcebergRecordAsFlussRow replaceIcebergRecord(Record icebergRecord) {
         this.icebergRecord = icebergRecord;
         return this;
@@ -169,7 +173,18 @@ public class IcebergRecordAsFlussRow implements 
InternalRow {
 
     @Override
     public InternalRow getRow(int pos, int numFields) {
-        // TODO: Support Row type conversion from Iceberg to Fluss
-        throw new UnsupportedOperationException();
+        Object value = icebergRecord.get(pos);
+        if (value == null) {
+            return null;
+        }
+        if (value instanceof Record) {
+            return new IcebergRecordAsFlussRow((Record) value);
+        } else {
+            throw new IllegalArgumentException(
+                    "Expected Iceberg Record for nested row at position "
+                            + pos
+                            + " but found: "
+                            + value.getClass().getName());
+        }
     }
 }
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/IcebergConversions.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/IcebergConversions.java
index 038ebd8a7..b75ce8ad4 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/IcebergConversions.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/IcebergConversions.java
@@ -21,6 +21,7 @@ import 
org.apache.fluss.lake.iceberg.source.FlussRowAsIcebergRecord;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.row.GenericRow;
 import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.types.DataField;
 import org.apache.fluss.types.DataType;
 import org.apache.fluss.types.DataTypes;
 import org.apache.fluss.types.RowType;
@@ -38,6 +39,7 @@ import org.apache.iceberg.types.Types;
 
 import javax.annotation.Nullable;
 
+import java.util.ArrayList;
 import java.util.List;
 
 import static 
org.apache.fluss.metadata.ResolvedPartitionSpec.PARTITION_SPEC_SEPARATOR;
@@ -128,7 +130,16 @@ public class IcebergConversions {
         } else if (icebergType instanceof Types.ListType) {
             Types.ListType listType = (Types.ListType) icebergType;
             return 
DataTypes.ARRAY(convertIcebergTypeToFlussType(listType.elementType()));
+        } else if (icebergType.isStructType()) {
+            Types.StructType structType = icebergType.asStructType();
+            List<DataField> fields = new ArrayList<>();
+            for (Types.NestedField nestedField : structType.fields()) {
+                DataType fieldType = 
convertIcebergTypeToFlussType(nestedField.type());
+                fields.add(new DataField(nestedField.name(), fieldType));
+            }
+            return DataTypes.ROW(fields.toArray(new DataField[0]));
         }
+
         throw new UnsupportedOperationException(
                 "Unsupported data type conversion for Iceberg type: "
                         + icebergType.getClass().getName());
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadLogTableITCase.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadLogTableITCase.java
index 168f51cd2..ef353d989 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadLogTableITCase.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadLogTableITCase.java
@@ -126,12 +126,12 @@ public class FlinkUnionReadLogTableITCase extends 
FlinkUnionReadTestBase {
             // check filter push down
             assertThat(plan)
                     .contains("TableSourceScan(")
-                    .contains("LogicalFilter(condition=[=($15, _UTF-16LE'" + 
partition + "'")
+                    .contains("LogicalFilter(condition=[=($16, _UTF-16LE'" + 
partition + "'")
                     .contains("filter=[=(p, _UTF-16LE'" + partition + "'");
 
             List<Row> expectedFiltered =
                     writtenRows.stream()
-                            .filter(r -> partition.equals(r.getField(15)))
+                            .filter(r -> partition.equals(r.getField(16)))
                             .collect(Collectors.toList());
 
             List<Row> actualFiltered =
@@ -295,7 +295,12 @@ public class FlinkUnionReadLogTableITCase extends 
FlinkUnionReadTestBase {
                         .column("f_timestamp_ltz2", DataTypes.TIMESTAMP_LTZ(6))
                         .column("f_timestamp_ntz1", DataTypes.TIMESTAMP(3))
                         .column("f_timestamp_ntz2", DataTypes.TIMESTAMP(6))
-                        .column("f_binary", DataTypes.BINARY(4));
+                        .column("f_binary", DataTypes.BINARY(4))
+                        .column(
+                                "f_row",
+                                DataTypes.ROW(
+                                        DataTypes.FIELD("f_nested_int", 
DataTypes.INT()),
+                                        DataTypes.FIELD("f_nested_string", 
DataTypes.STRING())));
 
         TableDescriptor.Builder tableBuilder =
                 TableDescriptor.builder()
@@ -342,7 +347,8 @@ public class FlinkUnionReadLogTableITCase extends 
FlinkUnionReadTestBase {
                                 TimestampLtz.fromEpochMillis(1698235273400L, 
7000),
                                 TimestampNtz.fromMillis(1698235273501L),
                                 TimestampNtz.fromMillis(1698235273501L, 8000),
-                                new byte[] {5, 6, 7, 8}));
+                                new byte[] {5, 6, 7, 8},
+                                row(10, "nested_string")));
 
                 flinkRows.add(
                         Row.of(
@@ -364,7 +370,8 @@ public class FlinkUnionReadLogTableITCase extends 
FlinkUnionReadTestBase {
                                                 
Instant.ofEpochMilli(1698235273501L),
                                                 ZoneId.of("UTC"))
                                         .plusNanos(8000),
-                                new byte[] {5, 6, 7, 8}));
+                                new byte[] {5, 6, 7, 8},
+                                Row.of(10, "nested_string")));
             } else {
                 rows.add(
                         row(
@@ -383,6 +390,7 @@ public class FlinkUnionReadLogTableITCase extends 
FlinkUnionReadTestBase {
                                 TimestampNtz.fromMillis(1698235273501L),
                                 TimestampNtz.fromMillis(1698235273501L, 8000),
                                 new byte[] {5, 6, 7, 8},
+                                row(10, "nested_string"),
                                 partition));
 
                 flinkRows.add(
@@ -406,6 +414,7 @@ public class FlinkUnionReadLogTableITCase extends 
FlinkUnionReadTestBase {
                                                 ZoneId.of("UTC"))
                                         .plusNanos(8000),
                                 new byte[] {5, 6, 7, 8},
+                                Row.of(10, "nested_string"),
                                 partition));
             }
         }
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadPrimaryKeyTableITCase.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadPrimaryKeyTableITCase.java
index a8b62e773..6cc48ede0 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadPrimaryKeyTableITCase.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadPrimaryKeyTableITCase.java
@@ -24,8 +24,10 @@ import org.apache.fluss.metadata.Schema;
 import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.metadata.TableDescriptor;
 import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.BinaryString;
 import org.apache.fluss.row.Decimal;
 import org.apache.fluss.row.GenericArray;
+import org.apache.fluss.row.GenericRow;
 import org.apache.fluss.row.InternalRow;
 import org.apache.fluss.row.TimestampLtz;
 import org.apache.fluss.row.TimestampNtz;
@@ -100,7 +102,8 @@ public class FlinkUnionReadPrimaryKeyTableITCase extends 
FlinkUnionReadTestBase
                                 TimestampNtz.fromMillis(1698235273183L, 6000),
                                 new byte[] {1, 2, 3, 4},
                                 new float[] {1.1f, 1.2f, 1.3f},
-                                partition));
+                                partition,
+                                Row.of(1, "nested_row1")));
                 expectedRows.add(
                         Row.of(
                                 true,
@@ -119,7 +122,8 @@ public class FlinkUnionReadPrimaryKeyTableITCase extends 
FlinkUnionReadTestBase
                                 TimestampNtz.fromMillis(1698235273201L, 6000),
                                 new byte[] {1, 2, 3, 4},
                                 new float[] {1.1f, 1.2f, 1.3f},
-                                partition));
+                                partition,
+                                Row.of(2, "nested_row2")));
             }
         } else {
             expectedRows =
@@ -141,7 +145,8 @@ public class FlinkUnionReadPrimaryKeyTableITCase extends 
FlinkUnionReadTestBase
                                     TimestampNtz.fromMillis(1698235273183L, 
6000),
                                     new byte[] {1, 2, 3, 4},
                                     new float[] {1.1f, 1.2f, 1.3f},
-                                    null),
+                                    null,
+                                    Row.of(1, "nested_row1")),
                             Row.of(
                                     true,
                                     (byte) 10,
@@ -159,7 +164,8 @@ public class FlinkUnionReadPrimaryKeyTableITCase extends 
FlinkUnionReadTestBase
                                     TimestampNtz.fromMillis(1698235273201L, 
6000),
                                     new byte[] {1, 2, 3, 4},
                                     new float[] {1.1f, 1.2f, 1.3f},
-                                    null));
+                                    null,
+                                    Row.of(2, "nested_row2")));
         }
 
         String query = "select * from " + tableName;
@@ -202,7 +208,8 @@ public class FlinkUnionReadPrimaryKeyTableITCase extends 
FlinkUnionReadTestBase
                                 TimestampNtz.fromMillis(1698235273201L, 6000),
                                 new byte[] {1, 2, 3, 4},
                                 new float[] {1.1f, 1.2f, 1.3f},
-                                partition));
+                                partition,
+                                Row.of(2, "nested_row2")));
                 expectedRows2.add(
                         Row.ofKind(
                                 RowKind.UPDATE_AFTER,
@@ -222,7 +229,8 @@ public class FlinkUnionReadPrimaryKeyTableITCase extends 
FlinkUnionReadTestBase
                                 TimestampNtz.fromMillis(1698235273501L, 8000),
                                 new byte[] {5, 6, 7, 8},
                                 new float[] {2.1f, 2.2f, 2.3f},
-                                partition));
+                                partition,
+                                Row.of(3, "nested_update")));
             }
         } else {
             expectedRows2.add(
@@ -244,7 +252,8 @@ public class FlinkUnionReadPrimaryKeyTableITCase extends 
FlinkUnionReadTestBase
                             TimestampNtz.fromMillis(1698235273201L, 6000),
                             new byte[] {1, 2, 3, 4},
                             new float[] {1.1f, 1.2f, 1.3f},
-                            null));
+                            null,
+                            Row.of(2, "nested_row2")));
             expectedRows2.add(
                     Row.ofKind(
                             RowKind.UPDATE_AFTER,
@@ -264,7 +273,8 @@ public class FlinkUnionReadPrimaryKeyTableITCase extends 
FlinkUnionReadTestBase
                             TimestampNtz.fromMillis(1698235273501L, 8000),
                             new byte[] {5, 6, 7, 8},
                             new float[] {2.1f, 2.2f, 2.3f},
-                            null));
+                            null,
+                            Row.of(3, "nested_update")));
         }
 
         if (isPartitioned) {
@@ -349,6 +359,10 @@ public class FlinkUnionReadPrimaryKeyTableITCase extends 
FlinkUnionReadTestBase
     }
 
     private void writeFullTypeRow(TablePath tablePath, String partition) 
throws Exception {
+        GenericRow nestedRow = new GenericRow(2);
+        nestedRow.setField(0, 3);
+        nestedRow.setField(1, BinaryString.fromString("nested_update"));
+
         List<InternalRow> rows =
                 Collections.singletonList(
                         row(
@@ -368,7 +382,8 @@ public class FlinkUnionReadPrimaryKeyTableITCase extends 
FlinkUnionReadTestBase
                                 TimestampNtz.fromMillis(1698235273501L, 8000),
                                 new byte[] {5, 6, 7, 8},
                                 new GenericArray(new float[] {2.1f, 2.2f, 
2.3f}),
-                                partition));
+                                partition,
+                                nestedRow));
         writeRows(tablePath, rows, false);
     }
 
@@ -422,7 +437,12 @@ public class FlinkUnionReadPrimaryKeyTableITCase extends 
FlinkUnionReadTestBase
                         .column("c14", DataTypes.TIMESTAMP(6))
                         .column("c15", DataTypes.BINARY(4))
                         .column("c16", DataTypes.ARRAY(DataTypes.FLOAT()))
-                        .column("c17", DataTypes.STRING());
+                        .column("c17", DataTypes.STRING())
+                        .column(
+                                "c18",
+                                DataTypes.ROW(
+                                        DataTypes.FIELD("a", DataTypes.INT()),
+                                        DataTypes.FIELD("b", 
DataTypes.STRING())));
 
         TableDescriptor.Builder tableBuilder =
                 TableDescriptor.builder()
@@ -444,6 +464,14 @@ public class FlinkUnionReadPrimaryKeyTableITCase extends 
FlinkUnionReadTestBase
     }
 
     private List<InternalRow> generateKvRowsFullType(@Nullable String 
partition) {
+        GenericRow nestedRow1 = new GenericRow(2);
+        nestedRow1.setField(0, 1);
+        nestedRow1.setField(1, BinaryString.fromString("nested_row1"));
+
+        GenericRow nestedRow2 = new GenericRow(2);
+        nestedRow2.setField(0, 2);
+        nestedRow2.setField(1, BinaryString.fromString("nested_row2"));
+
         return Arrays.asList(
                 row(
                         false,
@@ -462,7 +490,8 @@ public class FlinkUnionReadPrimaryKeyTableITCase extends 
FlinkUnionReadTestBase
                         TimestampNtz.fromMillis(1698235273183L, 6000),
                         new byte[] {1, 2, 3, 4},
                         new GenericArray(new float[] {1.1f, 1.2f, 1.3f}),
-                        partition),
+                        partition,
+                        nestedRow1),
                 row(
                         true,
                         (byte) 10,
@@ -480,7 +509,8 @@ public class FlinkUnionReadPrimaryKeyTableITCase extends 
FlinkUnionReadTestBase
                         TimestampNtz.fromMillis(1698235273201L, 6000),
                         new byte[] {1, 2, 3, 4},
                         new GenericArray(new float[] {1.1f, 1.2f, 1.3f}),
-                        partition));
+                        partition,
+                        nestedRow2));
     }
 
     private Map<TableBucket, Long> getBucketLogEndOffset(
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecordTest.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecordTest.java
index 0c3ec679c..247c33d12 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecordTest.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecordTest.java
@@ -24,6 +24,7 @@ import org.apache.fluss.row.GenericRow;
 import org.apache.fluss.types.DataTypes;
 import org.apache.fluss.types.RowType;
 
+import org.apache.iceberg.data.Record;
 import org.apache.iceberg.types.Types;
 import org.junit.jupiter.api.Test;
 
@@ -293,4 +294,123 @@ class FlussRowAsIcebergRecordTest {
         // Test null array
         assertThat(record.get(14)).isNull();
     }
+
+    @Test
+    void testNestedRow() {
+        Types.StructType structType =
+                Types.StructType.of(
+                        // simple row
+                        Types.NestedField.required(
+                                0,
+                                "simple_row",
+                                Types.StructType.of(
+                                        Types.NestedField.required(
+                                                1, "id", 
Types.IntegerType.get()),
+                                        Types.NestedField.required(
+                                                2, "name", 
Types.StringType.get()))),
+                        // nested row
+                        Types.NestedField.required(
+                                3,
+                                "nested_row",
+                                Types.StructType.of(
+                                        Types.NestedField.required(
+                                                4, "id", 
Types.IntegerType.get()),
+                                        Types.NestedField.required(
+                                                5,
+                                                "inner",
+                                                Types.StructType.of(
+                                                        
Types.NestedField.required(
+                                                                6, "val", 
Types.DoubleType.get()),
+                                                        
Types.NestedField.required(
+                                                                7,
+                                                                "flag",
+                                                                
Types.BooleanType.get()))))),
+                        // array row
+                        Types.NestedField.required(
+                                8,
+                                "array_row",
+                                Types.StructType.of(
+                                        Types.NestedField.required(
+                                                9,
+                                                "ids",
+                                                Types.ListType.ofRequired(
+                                                        10, 
Types.IntegerType.get())))),
+                        // nullable row
+                        Types.NestedField.optional(
+                                11,
+                                "nullable_row",
+                                Types.StructType.of(
+                                        Types.NestedField.required(
+                                                12, "id", 
Types.IntegerType.get()))));
+
+        RowType flussRowType =
+                RowType.of(
+                        // simple row
+                        DataTypes.ROW(
+                                DataTypes.FIELD("id", DataTypes.INT()),
+                                DataTypes.FIELD("name", DataTypes.STRING())),
+                        // nested row
+                        DataTypes.ROW(
+                                DataTypes.FIELD("id", DataTypes.INT()),
+                                DataTypes.FIELD(
+                                        "inner",
+                                        DataTypes.ROW(
+                                                DataTypes.FIELD("val", 
DataTypes.DOUBLE()),
+                                                DataTypes.FIELD("flag", 
DataTypes.BOOLEAN())))),
+                        // row_with array
+                        DataTypes.ROW(DataTypes.FIELD("ids", 
DataTypes.ARRAY(DataTypes.INT()))),
+                        // nullable row
+                        DataTypes.ROW(DataTypes.FIELD("id", DataTypes.INT())));
+
+        GenericRow genericRow = new GenericRow(4);
+
+        // Simple Row
+        GenericRow simpleRow = new GenericRow(2);
+        simpleRow.setField(0, 100);
+        simpleRow.setField(1, BinaryString.fromString("fluss"));
+        genericRow.setField(0, simpleRow);
+
+        // Nested Row
+        GenericRow innerRow = new GenericRow(2);
+        innerRow.setField(0, 3.14);
+        innerRow.setField(1, true);
+        GenericRow nestedRow = new GenericRow(2);
+        nestedRow.setField(0, 200);
+        nestedRow.setField(1, innerRow);
+        genericRow.setField(1, nestedRow);
+
+        // Array Row
+        GenericRow rowWithArray = new GenericRow(1);
+        rowWithArray.setField(0, new GenericArray(new int[] {1, 2, 3}));
+        genericRow.setField(2, rowWithArray);
+
+        // Nullable Row
+        genericRow.setField(3, null);
+
+        FlussRowAsIcebergRecord record = new 
FlussRowAsIcebergRecord(structType, flussRowType);
+        record.internalRow = genericRow;
+
+        // Verify Simple Row
+        Record icebergSimpleRow = (Record) record.get(0);
+        assertThat(icebergSimpleRow.get(0)).isEqualTo(100);
+        assertThat(icebergSimpleRow.get(1)).isEqualTo("fluss");
+
+        // Verify Nested Row
+        Record icebergNestedRow = (Record) record.get(1);
+        assertThat(icebergNestedRow.get(0)).isEqualTo(200);
+        Record icebergInnerRow = (Record) icebergNestedRow.get(1);
+        assertThat(icebergInnerRow.get(0)).isEqualTo(3.14);
+        assertThat(icebergInnerRow.get(1)).isEqualTo(true);
+
+        // Verify Row with Array
+        Record icebergRowWithArray = (Record) record.get(2);
+        List<?> ids = (List<?>) icebergRowWithArray.get(0);
+        assertThat(ids.size()).isEqualTo(3);
+        assertThat(ids.get(0)).isEqualTo(1);
+        assertThat(ids.get(1)).isEqualTo(2);
+        assertThat(ids.get(2)).isEqualTo(3);
+
+        // Verify Nullable Row
+        assertThat(record.get(3)).isNull();
+    }
 }
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRowTest.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRowTest.java
index 42ac4d6fb..2aeb223ce 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRowTest.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRowTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.fluss.lake.iceberg.source;
 
+import org.apache.fluss.row.InternalRow;
+
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.data.GenericRecord;
 import org.apache.iceberg.data.Record;
@@ -61,10 +63,27 @@ class IcebergRecordAsFlussRowTest {
                         optional(11, "timestamp_ltz", 
Types.TimestampType.withZone()),
                         optional(12, "binary_data", Types.BinaryType.get()),
                         optional(13, "char_data", Types.StringType.get()),
+                        optional(
+                                14,
+                                "nested_row",
+                                Types.StructType.of(
+                                        required(15, "city", 
Types.StringType.get()),
+                                        required(
+                                                16,
+                                                "address",
+                                                Types.StructType.of(
+                                                        required(
+                                                                17,
+                                                                "subfield1",
+                                                                
Types.StringType.get()),
+                                                        required(
+                                                                18,
+                                                                "subfield2",
+                                                                
Types.IntegerType.get()))))),
                         // System columns
-                        required(14, "__bucket", Types.IntegerType.get()),
-                        required(15, "__offset", Types.LongType.get()),
-                        required(16, "__timestamp", 
Types.TimestampType.withZone()));
+                        required(19, "__bucket", Types.IntegerType.get()),
+                        required(20, "__offset", Types.LongType.get()),
+                        required(21, "__timestamp", 
Types.TimestampType.withZone()));
 
         record = GenericRecord.create(schema);
     }
@@ -82,7 +101,7 @@ class IcebergRecordAsFlussRowTest {
         icebergRecordAsFlussRow.replaceIcebergRecord(record);
 
         // Should return count excluding system columns (3 system columns)
-        assertThat(icebergRecordAsFlussRow.getFieldCount()).isEqualTo(13);
+        assertThat(icebergRecordAsFlussRow.getFieldCount()).isEqualTo(14);
     }
 
     @Test
@@ -145,6 +164,35 @@ class IcebergRecordAsFlussRowTest {
                 .isEqualTo("Hello"); // char_data
 
         // Test field count (excluding system columns)
-        assertThat(icebergRecordAsFlussRow.getFieldCount()).isEqualTo(13);
+        assertThat(icebergRecordAsFlussRow.getFieldCount()).isEqualTo(14);
+    }
+
+    @Test
+    void testNestedRow() {
+        String cityValue = "Seoul";
+        String subfield1Value = "string value";
+        Integer subfield2Value = 12345;
+        Record nestedRecord =
+                
GenericRecord.create(record.struct().fields().get(13).type().asStructType());
+        nestedRecord.setField("city", "Seoul");
+
+        Record deepNestedRecord =
+                
GenericRecord.create(nestedRecord.struct().fields().get(1).type().asStructType());
+        deepNestedRecord.setField("subfield1", subfield1Value);
+        deepNestedRecord.setField("subfield2", subfield2Value);
+
+        nestedRecord.setField("address", deepNestedRecord);
+
+        record.setField("nested_row", nestedRecord);
+        icebergRecordAsFlussRow.replaceIcebergRecord(record);
+
+        InternalRow nestedRow = icebergRecordAsFlussRow.getRow(13, 2);
+        assertThat(nestedRow).isNotNull();
+        assertThat(nestedRow.getString(0).toString()).isEqualTo(cityValue);
+
+        InternalRow deepNestedRow = nestedRow.getRow(1, 2);
+        assertThat(deepNestedRow).isNotNull();
+        
assertThat(deepNestedRow.getString(0).toString()).isEqualTo(subfield1Value);
+        assertThat(deepNestedRow.getInt(1)).isEqualTo(subfield2Value);
     }
 }

Reply via email to