This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 3b74d6b30 [lake/iceberg] Support nested row types for Iceberg tiering
(#2278)
3b74d6b30 is described below
commit 3b74d6b302217b208394330bd9fcb447c4e40543
Author: SeungMin <[email protected]>
AuthorDate: Fri Jan 9 15:32:09 2026 +0900
[lake/iceberg] Support nested row types for Iceberg tiering (#2278)
---
.../iceberg/FlussDataTypeToIcebergDataType.java | 28 ++++-
.../iceberg/source/FlussRowAsIcebergRecord.java | 10 ++
.../iceberg/source/IcebergRecordAsFlussRow.java | 19 +++-
.../lake/iceberg/utils/IcebergConversions.java | 11 ++
.../flink/FlinkUnionReadLogTableITCase.java | 19 +++-
.../flink/FlinkUnionReadPrimaryKeyTableITCase.java | 54 +++++++---
.../source/FlussRowAsIcebergRecordTest.java | 120 +++++++++++++++++++++
.../source/IcebergRecordAsFlussRowTest.java | 58 +++++++++-
8 files changed, 294 insertions(+), 25 deletions(-)
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java
index 1c093482a..bd01d08af 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java
@@ -23,6 +23,7 @@ import org.apache.fluss.types.BinaryType;
import org.apache.fluss.types.BooleanType;
import org.apache.fluss.types.BytesType;
import org.apache.fluss.types.CharType;
+import org.apache.fluss.types.DataField;
import org.apache.fluss.types.DataTypeVisitor;
import org.apache.fluss.types.DateType;
import org.apache.fluss.types.DecimalType;
@@ -41,6 +42,9 @@ import org.apache.fluss.types.TinyIntType;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
+import java.util.ArrayList;
+import java.util.List;
+
/** Convert from Fluss's data type to Iceberg's data type. */
public class FlussDataTypeToIcebergDataType implements DataTypeVisitor<Type> {
@@ -168,6 +172,28 @@ public class FlussDataTypeToIcebergDataType implements
DataTypeVisitor<Type> {
@Override
public Type visit(RowType rowType) {
- throw new UnsupportedOperationException("Unsupported row type");
+ List<Types.NestedField> fields = new ArrayList<>();
+
+ for (DataField field : rowType.getFields()) {
+ Type fieldType = field.getType().accept(this);
+
+ if (field.getType().isNullable()) {
+ fields.add(
+ Types.NestedField.optional(
+ getNextId(),
+ field.getName(),
+ fieldType,
+ field.getDescription().orElse(null)));
+ } else {
+ fields.add(
+ Types.NestedField.required(
+ getNextId(),
+ field.getName(),
+ fieldType,
+ field.getDescription().orElse(null)));
+ }
+ }
+
+ return Types.StructType.of(fields);
}
}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecord.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecord.java
index 70dc8ea6f..6d7070001 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecord.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecord.java
@@ -17,6 +17,7 @@
package org.apache.fluss.lake.iceberg.source;
+import org.apache.fluss.lake.iceberg.FlussDataTypeToIcebergDataType;
import org.apache.fluss.row.InternalArray;
import org.apache.fluss.row.InternalRow;
import org.apache.fluss.types.ArrayType;
@@ -179,6 +180,15 @@ public class FlussRowAsIcebergRecord implements Record {
? null
: new FlussArrayAsIcebergList(array,
arrayType.getElementType());
};
+ } else if (flussType instanceof RowType) {
+ RowType rowType = (RowType) flussType;
+ Types.StructType nestedStructType =
+ (Types.StructType)
rowType.accept(FlussDataTypeToIcebergDataType.INSTANCE);
+
+ return row -> {
+ InternalRow nestedRow = row.getRow(pos,
rowType.getFieldCount());
+ return new FlussRowAsIcebergRecord(nestedStructType, rowType,
nestedRow);
+ };
} else {
throw new UnsupportedOperationException(
"Unsupported data type conversion for Fluss type: "
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java
index 8143433b0..2a560e8d1 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java
@@ -44,6 +44,10 @@ public class IcebergRecordAsFlussRow implements InternalRow {
public IcebergRecordAsFlussRow() {}
+ public IcebergRecordAsFlussRow(Record icebergRecord) {
+ this.icebergRecord = icebergRecord;
+ }
+
public IcebergRecordAsFlussRow replaceIcebergRecord(Record icebergRecord) {
this.icebergRecord = icebergRecord;
return this;
@@ -169,7 +173,18 @@ public class IcebergRecordAsFlussRow implements
InternalRow {
@Override
public InternalRow getRow(int pos, int numFields) {
- // TODO: Support Row type conversion from Iceberg to Fluss
- throw new UnsupportedOperationException();
+ Object value = icebergRecord.get(pos);
+ if (value == null) {
+ return null;
+ }
+ if (value instanceof Record) {
+ return new IcebergRecordAsFlussRow((Record) value);
+ } else {
+ throw new IllegalArgumentException(
+ "Expected Iceberg Record for nested row at position "
+ + pos
+ + " but found: "
+ + value.getClass().getName());
+ }
}
}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/IcebergConversions.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/IcebergConversions.java
index 038ebd8a7..b75ce8ad4 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/IcebergConversions.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/IcebergConversions.java
@@ -21,6 +21,7 @@ import
org.apache.fluss.lake.iceberg.source.FlussRowAsIcebergRecord;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.row.GenericRow;
import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.types.DataField;
import org.apache.fluss.types.DataType;
import org.apache.fluss.types.DataTypes;
import org.apache.fluss.types.RowType;
@@ -38,6 +39,7 @@ import org.apache.iceberg.types.Types;
import javax.annotation.Nullable;
+import java.util.ArrayList;
import java.util.List;
import static
org.apache.fluss.metadata.ResolvedPartitionSpec.PARTITION_SPEC_SEPARATOR;
@@ -128,7 +130,16 @@ public class IcebergConversions {
} else if (icebergType instanceof Types.ListType) {
Types.ListType listType = (Types.ListType) icebergType;
return
DataTypes.ARRAY(convertIcebergTypeToFlussType(listType.elementType()));
+ } else if (icebergType.isStructType()) {
+ Types.StructType structType = icebergType.asStructType();
+ List<DataField> fields = new ArrayList<>();
+ for (Types.NestedField nestedField : structType.fields()) {
+ DataType fieldType =
convertIcebergTypeToFlussType(nestedField.type());
+ fields.add(new DataField(nestedField.name(), fieldType));
+ }
+ return DataTypes.ROW(fields.toArray(new DataField[0]));
}
+
throw new UnsupportedOperationException(
"Unsupported data type conversion for Iceberg type: "
+ icebergType.getClass().getName());
diff --git
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadLogTableITCase.java
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadLogTableITCase.java
index 168f51cd2..ef353d989 100644
---
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadLogTableITCase.java
+++
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadLogTableITCase.java
@@ -126,12 +126,12 @@ public class FlinkUnionReadLogTableITCase extends
FlinkUnionReadTestBase {
// check filter push down
assertThat(plan)
.contains("TableSourceScan(")
- .contains("LogicalFilter(condition=[=($15, _UTF-16LE'" +
partition + "'")
+ .contains("LogicalFilter(condition=[=($16, _UTF-16LE'" +
partition + "'")
.contains("filter=[=(p, _UTF-16LE'" + partition + "'");
List<Row> expectedFiltered =
writtenRows.stream()
- .filter(r -> partition.equals(r.getField(15)))
+ .filter(r -> partition.equals(r.getField(16)))
.collect(Collectors.toList());
List<Row> actualFiltered =
@@ -295,7 +295,12 @@ public class FlinkUnionReadLogTableITCase extends
FlinkUnionReadTestBase {
.column("f_timestamp_ltz2", DataTypes.TIMESTAMP_LTZ(6))
.column("f_timestamp_ntz1", DataTypes.TIMESTAMP(3))
.column("f_timestamp_ntz2", DataTypes.TIMESTAMP(6))
- .column("f_binary", DataTypes.BINARY(4));
+ .column("f_binary", DataTypes.BINARY(4))
+ .column(
+ "f_row",
+ DataTypes.ROW(
+ DataTypes.FIELD("f_nested_int",
DataTypes.INT()),
+ DataTypes.FIELD("f_nested_string",
DataTypes.STRING())));
TableDescriptor.Builder tableBuilder =
TableDescriptor.builder()
@@ -342,7 +347,8 @@ public class FlinkUnionReadLogTableITCase extends
FlinkUnionReadTestBase {
TimestampLtz.fromEpochMillis(1698235273400L,
7000),
TimestampNtz.fromMillis(1698235273501L),
TimestampNtz.fromMillis(1698235273501L, 8000),
- new byte[] {5, 6, 7, 8}));
+ new byte[] {5, 6, 7, 8},
+ row(10, "nested_string")));
flinkRows.add(
Row.of(
@@ -364,7 +370,8 @@ public class FlinkUnionReadLogTableITCase extends
FlinkUnionReadTestBase {
Instant.ofEpochMilli(1698235273501L),
ZoneId.of("UTC"))
.plusNanos(8000),
- new byte[] {5, 6, 7, 8}));
+ new byte[] {5, 6, 7, 8},
+ Row.of(10, "nested_string")));
} else {
rows.add(
row(
@@ -383,6 +390,7 @@ public class FlinkUnionReadLogTableITCase extends
FlinkUnionReadTestBase {
TimestampNtz.fromMillis(1698235273501L),
TimestampNtz.fromMillis(1698235273501L, 8000),
new byte[] {5, 6, 7, 8},
+ row(10, "nested_string"),
partition));
flinkRows.add(
@@ -406,6 +414,7 @@ public class FlinkUnionReadLogTableITCase extends
FlinkUnionReadTestBase {
ZoneId.of("UTC"))
.plusNanos(8000),
new byte[] {5, 6, 7, 8},
+ Row.of(10, "nested_string"),
partition));
}
}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadPrimaryKeyTableITCase.java
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadPrimaryKeyTableITCase.java
index a8b62e773..6cc48ede0 100644
---
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadPrimaryKeyTableITCase.java
+++
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadPrimaryKeyTableITCase.java
@@ -24,8 +24,10 @@ import org.apache.fluss.metadata.Schema;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.BinaryString;
import org.apache.fluss.row.Decimal;
import org.apache.fluss.row.GenericArray;
+import org.apache.fluss.row.GenericRow;
import org.apache.fluss.row.InternalRow;
import org.apache.fluss.row.TimestampLtz;
import org.apache.fluss.row.TimestampNtz;
@@ -100,7 +102,8 @@ public class FlinkUnionReadPrimaryKeyTableITCase extends
FlinkUnionReadTestBase
TimestampNtz.fromMillis(1698235273183L, 6000),
new byte[] {1, 2, 3, 4},
new float[] {1.1f, 1.2f, 1.3f},
- partition));
+ partition,
+ Row.of(1, "nested_row1")));
expectedRows.add(
Row.of(
true,
@@ -119,7 +122,8 @@ public class FlinkUnionReadPrimaryKeyTableITCase extends
FlinkUnionReadTestBase
TimestampNtz.fromMillis(1698235273201L, 6000),
new byte[] {1, 2, 3, 4},
new float[] {1.1f, 1.2f, 1.3f},
- partition));
+ partition,
+ Row.of(2, "nested_row2")));
}
} else {
expectedRows =
@@ -141,7 +145,8 @@ public class FlinkUnionReadPrimaryKeyTableITCase extends
FlinkUnionReadTestBase
TimestampNtz.fromMillis(1698235273183L,
6000),
new byte[] {1, 2, 3, 4},
new float[] {1.1f, 1.2f, 1.3f},
- null),
+ null,
+ Row.of(1, "nested_row1")),
Row.of(
true,
(byte) 10,
@@ -159,7 +164,8 @@ public class FlinkUnionReadPrimaryKeyTableITCase extends
FlinkUnionReadTestBase
TimestampNtz.fromMillis(1698235273201L,
6000),
new byte[] {1, 2, 3, 4},
new float[] {1.1f, 1.2f, 1.3f},
- null));
+ null,
+ Row.of(2, "nested_row2")));
}
String query = "select * from " + tableName;
@@ -202,7 +208,8 @@ public class FlinkUnionReadPrimaryKeyTableITCase extends
FlinkUnionReadTestBase
TimestampNtz.fromMillis(1698235273201L, 6000),
new byte[] {1, 2, 3, 4},
new float[] {1.1f, 1.2f, 1.3f},
- partition));
+ partition,
+ Row.of(2, "nested_row2")));
expectedRows2.add(
Row.ofKind(
RowKind.UPDATE_AFTER,
@@ -222,7 +229,8 @@ public class FlinkUnionReadPrimaryKeyTableITCase extends
FlinkUnionReadTestBase
TimestampNtz.fromMillis(1698235273501L, 8000),
new byte[] {5, 6, 7, 8},
new float[] {2.1f, 2.2f, 2.3f},
- partition));
+ partition,
+ Row.of(3, "nested_update")));
}
} else {
expectedRows2.add(
@@ -244,7 +252,8 @@ public class FlinkUnionReadPrimaryKeyTableITCase extends
FlinkUnionReadTestBase
TimestampNtz.fromMillis(1698235273201L, 6000),
new byte[] {1, 2, 3, 4},
new float[] {1.1f, 1.2f, 1.3f},
- null));
+ null,
+ Row.of(2, "nested_row2")));
expectedRows2.add(
Row.ofKind(
RowKind.UPDATE_AFTER,
@@ -264,7 +273,8 @@ public class FlinkUnionReadPrimaryKeyTableITCase extends
FlinkUnionReadTestBase
TimestampNtz.fromMillis(1698235273501L, 8000),
new byte[] {5, 6, 7, 8},
new float[] {2.1f, 2.2f, 2.3f},
- null));
+ null,
+ Row.of(3, "nested_update")));
}
if (isPartitioned) {
@@ -349,6 +359,10 @@ public class FlinkUnionReadPrimaryKeyTableITCase extends
FlinkUnionReadTestBase
}
private void writeFullTypeRow(TablePath tablePath, String partition)
throws Exception {
+ GenericRow nestedRow = new GenericRow(2);
+ nestedRow.setField(0, 3);
+ nestedRow.setField(1, BinaryString.fromString("nested_update"));
+
List<InternalRow> rows =
Collections.singletonList(
row(
@@ -368,7 +382,8 @@ public class FlinkUnionReadPrimaryKeyTableITCase extends
FlinkUnionReadTestBase
TimestampNtz.fromMillis(1698235273501L, 8000),
new byte[] {5, 6, 7, 8},
new GenericArray(new float[] {2.1f, 2.2f,
2.3f}),
- partition));
+ partition,
+ nestedRow));
writeRows(tablePath, rows, false);
}
@@ -422,7 +437,12 @@ public class FlinkUnionReadPrimaryKeyTableITCase extends
FlinkUnionReadTestBase
.column("c14", DataTypes.TIMESTAMP(6))
.column("c15", DataTypes.BINARY(4))
.column("c16", DataTypes.ARRAY(DataTypes.FLOAT()))
- .column("c17", DataTypes.STRING());
+ .column("c17", DataTypes.STRING())
+ .column(
+ "c18",
+ DataTypes.ROW(
+ DataTypes.FIELD("a", DataTypes.INT()),
+ DataTypes.FIELD("b",
DataTypes.STRING())));
TableDescriptor.Builder tableBuilder =
TableDescriptor.builder()
@@ -444,6 +464,14 @@ public class FlinkUnionReadPrimaryKeyTableITCase extends
FlinkUnionReadTestBase
}
private List<InternalRow> generateKvRowsFullType(@Nullable String
partition) {
+ GenericRow nestedRow1 = new GenericRow(2);
+ nestedRow1.setField(0, 1);
+ nestedRow1.setField(1, BinaryString.fromString("nested_row1"));
+
+ GenericRow nestedRow2 = new GenericRow(2);
+ nestedRow2.setField(0, 2);
+ nestedRow2.setField(1, BinaryString.fromString("nested_row2"));
+
return Arrays.asList(
row(
false,
@@ -462,7 +490,8 @@ public class FlinkUnionReadPrimaryKeyTableITCase extends
FlinkUnionReadTestBase
TimestampNtz.fromMillis(1698235273183L, 6000),
new byte[] {1, 2, 3, 4},
new GenericArray(new float[] {1.1f, 1.2f, 1.3f}),
- partition),
+ partition,
+ nestedRow1),
row(
true,
(byte) 10,
@@ -480,7 +509,8 @@ public class FlinkUnionReadPrimaryKeyTableITCase extends
FlinkUnionReadTestBase
TimestampNtz.fromMillis(1698235273201L, 6000),
new byte[] {1, 2, 3, 4},
new GenericArray(new float[] {1.1f, 1.2f, 1.3f}),
- partition));
+ partition,
+ nestedRow2));
}
private Map<TableBucket, Long> getBucketLogEndOffset(
diff --git
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecordTest.java
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecordTest.java
index 0c3ec679c..247c33d12 100644
---
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecordTest.java
+++
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecordTest.java
@@ -24,6 +24,7 @@ import org.apache.fluss.row.GenericRow;
import org.apache.fluss.types.DataTypes;
import org.apache.fluss.types.RowType;
+import org.apache.iceberg.data.Record;
import org.apache.iceberg.types.Types;
import org.junit.jupiter.api.Test;
@@ -293,4 +294,123 @@ class FlussRowAsIcebergRecordTest {
// Test null array
assertThat(record.get(14)).isNull();
}
+
+ @Test
+ void testNestedRow() {
+ Types.StructType structType =
+ Types.StructType.of(
+ // simple row
+ Types.NestedField.required(
+ 0,
+ "simple_row",
+ Types.StructType.of(
+ Types.NestedField.required(
+ 1, "id",
Types.IntegerType.get()),
+ Types.NestedField.required(
+ 2, "name",
Types.StringType.get()))),
+ // nested row
+ Types.NestedField.required(
+ 3,
+ "nested_row",
+ Types.StructType.of(
+ Types.NestedField.required(
+ 4, "id",
Types.IntegerType.get()),
+ Types.NestedField.required(
+ 5,
+ "inner",
+ Types.StructType.of(
+
Types.NestedField.required(
+ 6, "val",
Types.DoubleType.get()),
+
Types.NestedField.required(
+ 7,
+ "flag",
+
Types.BooleanType.get()))))),
+ // array row
+ Types.NestedField.required(
+ 8,
+ "array_row",
+ Types.StructType.of(
+ Types.NestedField.required(
+ 9,
+ "ids",
+ Types.ListType.ofRequired(
+ 10,
Types.IntegerType.get())))),
+ // nullable row
+ Types.NestedField.optional(
+ 11,
+ "nullable_row",
+ Types.StructType.of(
+ Types.NestedField.required(
+ 12, "id",
Types.IntegerType.get()))));
+
+ RowType flussRowType =
+ RowType.of(
+ // simple row
+ DataTypes.ROW(
+ DataTypes.FIELD("id", DataTypes.INT()),
+ DataTypes.FIELD("name", DataTypes.STRING())),
+ // nested row
+ DataTypes.ROW(
+ DataTypes.FIELD("id", DataTypes.INT()),
+ DataTypes.FIELD(
+ "inner",
+ DataTypes.ROW(
+ DataTypes.FIELD("val",
DataTypes.DOUBLE()),
+ DataTypes.FIELD("flag",
DataTypes.BOOLEAN())))),
+ // row_with array
+ DataTypes.ROW(DataTypes.FIELD("ids",
DataTypes.ARRAY(DataTypes.INT()))),
+ // nullable row
+ DataTypes.ROW(DataTypes.FIELD("id", DataTypes.INT())));
+
+ GenericRow genericRow = new GenericRow(4);
+
+ // Simple Row
+ GenericRow simpleRow = new GenericRow(2);
+ simpleRow.setField(0, 100);
+ simpleRow.setField(1, BinaryString.fromString("fluss"));
+ genericRow.setField(0, simpleRow);
+
+ // Nested Row
+ GenericRow innerRow = new GenericRow(2);
+ innerRow.setField(0, 3.14);
+ innerRow.setField(1, true);
+ GenericRow nestedRow = new GenericRow(2);
+ nestedRow.setField(0, 200);
+ nestedRow.setField(1, innerRow);
+ genericRow.setField(1, nestedRow);
+
+ // Array Row
+ GenericRow rowWithArray = new GenericRow(1);
+ rowWithArray.setField(0, new GenericArray(new int[] {1, 2, 3}));
+ genericRow.setField(2, rowWithArray);
+
+ // Nullable Row
+ genericRow.setField(3, null);
+
+ FlussRowAsIcebergRecord record = new
FlussRowAsIcebergRecord(structType, flussRowType);
+ record.internalRow = genericRow;
+
+ // Verify Simple Row
+ Record icebergSimpleRow = (Record) record.get(0);
+ assertThat(icebergSimpleRow.get(0)).isEqualTo(100);
+ assertThat(icebergSimpleRow.get(1)).isEqualTo("fluss");
+
+ // Verify Nested Row
+ Record icebergNestedRow = (Record) record.get(1);
+ assertThat(icebergNestedRow.get(0)).isEqualTo(200);
+ Record icebergInnerRow = (Record) icebergNestedRow.get(1);
+ assertThat(icebergInnerRow.get(0)).isEqualTo(3.14);
+ assertThat(icebergInnerRow.get(1)).isEqualTo(true);
+
+ // Verify Row with Array
+ Record icebergRowWithArray = (Record) record.get(2);
+ List<?> ids = (List<?>) icebergRowWithArray.get(0);
+ assertThat(ids.size()).isEqualTo(3);
+ assertThat(ids.get(0)).isEqualTo(1);
+ assertThat(ids.get(1)).isEqualTo(2);
+ assertThat(ids.get(2)).isEqualTo(3);
+
+ // Verify Nullable Row
+ assertThat(record.get(3)).isNull();
+ }
}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRowTest.java
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRowTest.java
index 42ac4d6fb..2aeb223ce 100644
---
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRowTest.java
+++
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRowTest.java
@@ -18,6 +18,8 @@
package org.apache.fluss.lake.iceberg.source;
+import org.apache.fluss.row.InternalRow;
+
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
@@ -61,10 +63,27 @@ class IcebergRecordAsFlussRowTest {
optional(11, "timestamp_ltz",
Types.TimestampType.withZone()),
optional(12, "binary_data", Types.BinaryType.get()),
optional(13, "char_data", Types.StringType.get()),
+ optional(
+ 14,
+ "nested_row",
+ Types.StructType.of(
+ required(15, "city",
Types.StringType.get()),
+ required(
+ 16,
+ "address",
+ Types.StructType.of(
+ required(
+ 17,
+ "subfield1",
+
Types.StringType.get()),
+ required(
+ 18,
+ "subfield2",
+
Types.IntegerType.get()))))),
// System columns
- required(14, "__bucket", Types.IntegerType.get()),
- required(15, "__offset", Types.LongType.get()),
- required(16, "__timestamp",
Types.TimestampType.withZone()));
+ required(19, "__bucket", Types.IntegerType.get()),
+ required(20, "__offset", Types.LongType.get()),
+ required(21, "__timestamp",
Types.TimestampType.withZone()));
record = GenericRecord.create(schema);
}
@@ -82,7 +101,7 @@ class IcebergRecordAsFlussRowTest {
icebergRecordAsFlussRow.replaceIcebergRecord(record);
// Should return count excluding system columns (3 system columns)
- assertThat(icebergRecordAsFlussRow.getFieldCount()).isEqualTo(13);
+ assertThat(icebergRecordAsFlussRow.getFieldCount()).isEqualTo(14);
}
@Test
@@ -145,6 +164,35 @@ class IcebergRecordAsFlussRowTest {
.isEqualTo("Hello"); // char_data
// Test field count (excluding system columns)
- assertThat(icebergRecordAsFlussRow.getFieldCount()).isEqualTo(13);
+ assertThat(icebergRecordAsFlussRow.getFieldCount()).isEqualTo(14);
+ }
+
+ @Test
+ void testNestedRow() {
+ String cityValue = "Seoul";
+ String subfield1Value = "string value";
+ Integer subfield2Value = 12345;
+ Record nestedRecord =
+
GenericRecord.create(record.struct().fields().get(13).type().asStructType());
+ nestedRecord.setField("city", "Seoul");
+
+ Record deepNestedRecord =
+
GenericRecord.create(nestedRecord.struct().fields().get(1).type().asStructType());
+ deepNestedRecord.setField("subfield1", subfield1Value);
+ deepNestedRecord.setField("subfield2", subfield2Value);
+
+ nestedRecord.setField("address", deepNestedRecord);
+
+ record.setField("nested_row", nestedRecord);
+ icebergRecordAsFlussRow.replaceIcebergRecord(record);
+
+ InternalRow nestedRow = icebergRecordAsFlussRow.getRow(13, 2);
+ assertThat(nestedRow).isNotNull();
+ assertThat(nestedRow.getString(0).toString()).isEqualTo(cityValue);
+
+ InternalRow deepNestedRow = nestedRow.getRow(1, 2);
+ assertThat(deepNestedRow).isNotNull();
+
assertThat(deepNestedRow.getString(0).toString()).isEqualTo(subfield1Value);
+ assertThat(deepNestedRow.getInt(1)).isEqualTo(subfield2Value);
}
}