This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new b2fa4a9d3 [lake/iceberg] Support tier array type for iceberg (#2266)
b2fa4a9d3 is described below

commit b2fa4a9d3225f0b52a3a3637c24b9a3182a56fae
Author: ForwardXu <[email protected]>
AuthorDate: Mon Jan 5 12:00:50 2026 +0800

    [lake/iceberg] Support tier array type for iceberg (#2266)
---
 .../iceberg/FlussDataTypeToIcebergDataType.java    |  31 ++-
 .../fluss/lake/iceberg/IcebergLakeCatalog.java     |  11 +-
 .../iceberg/source/FlussArrayAsIcebergList.java    | 122 +++++++++
 .../iceberg/source/FlussRowAsIcebergRecord.java    |  10 +
 .../iceberg/source/IcebergArrayAsFlussArray.java   | 206 ++++++++++++++
 .../iceberg/source/IcebergRecordAsFlussRow.java    |   9 +-
 .../lake/iceberg/utils/IcebergConversions.java     |   3 +
 .../flink/FlinkUnionReadPrimaryKeyTableITCase.java |  19 +-
 .../source/FlussRowAsIcebergRecordTest.java        | 296 +++++++++++++++++++++
 .../lake/iceberg/tiering/IcebergTieringTest.java   |   2 +-
 .../integrate-data-lakes/iceberg.md                |   1 +
 11 files changed, 699 insertions(+), 11 deletions(-)

diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java
index aa1a97b44..1c093482a 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java
@@ -47,6 +47,30 @@ public class FlussDataTypeToIcebergDataType implements 
DataTypeVisitor<Type> {
     public static final FlussDataTypeToIcebergDataType INSTANCE =
             new FlussDataTypeToIcebergDataType();
 
+    private final RowType root;
+    private int nextId;
+
+    FlussDataTypeToIcebergDataType() {
+        this.root = null;
+        this.nextId = 0;
+    }
+
+    FlussDataTypeToIcebergDataType(int startId) {
+        this.root = null;
+        this.nextId = startId;
+    }
+
+    FlussDataTypeToIcebergDataType(RowType root) {
+        this.root = root;
+        this.nextId = root.getFieldCount();
+    }
+
+    private int getNextId() {
+        int next = nextId;
+        nextId += 1;
+        return next;
+    }
+
     @Override
     public Type visit(CharType charType) {
         return Types.StringType.get();
@@ -129,7 +153,12 @@ public class FlussDataTypeToIcebergDataType implements 
DataTypeVisitor<Type> {
 
     @Override
     public Type visit(ArrayType arrayType) {
-        throw new UnsupportedOperationException("Unsupported array type");
+        Type elementType = arrayType.getElementType().accept(this);
+        if (arrayType.getElementType().isNullable()) {
+            return Types.ListType.ofOptional(getNextId(), elementType);
+        } else {
+            return Types.ListType.ofRequired(getNextId(), elementType);
+        }
     }
 
     @Override
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java
index 03afa011c..9bd9d54e6 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java
@@ -178,6 +178,11 @@ public class IcebergLakeCatalog implements LakeCatalog {
         List<Types.NestedField> fields = new ArrayList<>();
         int fieldId = 0;
 
+        int totalTopLevelFields =
+                tableDescriptor.getSchema().getColumns().size() + 
SYSTEM_COLUMNS.size();
+        FlussDataTypeToIcebergDataType converter =
+                new FlussDataTypeToIcebergDataType(totalTopLevelFields);
+
         // general columns
         for (org.apache.fluss.metadata.Schema.Column column :
                 tableDescriptor.getSchema().getColumns()) {
@@ -192,16 +197,14 @@ public class IcebergLakeCatalog implements LakeCatalog {
                         Types.NestedField.optional(
                                 fieldId++,
                                 colName,
-                                column.getDataType()
-                                        
.accept(FlussDataTypeToIcebergDataType.INSTANCE),
+                                column.getDataType().accept(converter),
                                 column.getComment().orElse(null));
             } else {
                 field =
                         Types.NestedField.required(
                                 fieldId++,
                                 colName,
-                                column.getDataType()
-                                        
.accept(FlussDataTypeToIcebergDataType.INSTANCE),
+                                column.getDataType().accept(converter),
                                 column.getComment().orElse(null));
             }
             fields.add(field);
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussArrayAsIcebergList.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussArrayAsIcebergList.java
new file mode 100644
index 000000000..2671e045c
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussArrayAsIcebergList.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.source;
+
+import org.apache.fluss.row.InternalArray;
+import org.apache.fluss.types.ArrayType;
+import org.apache.fluss.types.BigIntType;
+import org.apache.fluss.types.BinaryType;
+import org.apache.fluss.types.BooleanType;
+import org.apache.fluss.types.BytesType;
+import org.apache.fluss.types.CharType;
+import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.DateType;
+import org.apache.fluss.types.DecimalType;
+import org.apache.fluss.types.DoubleType;
+import org.apache.fluss.types.FloatType;
+import org.apache.fluss.types.IntType;
+import org.apache.fluss.types.LocalZonedTimestampType;
+import org.apache.fluss.types.SmallIntType;
+import org.apache.fluss.types.StringType;
+import org.apache.fluss.types.TimeType;
+import org.apache.fluss.types.TimestampType;
+import org.apache.fluss.types.TinyIntType;
+import org.apache.fluss.utils.DateTimeUtils;
+
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.AbstractList;
+
+/** Adapter class for converting Fluss InternalArray to a Java List for 
Iceberg. */
+public class FlussArrayAsIcebergList extends AbstractList<Object> {
+
+    private final InternalArray flussArray;
+    private final DataType elementType;
+
+    public FlussArrayAsIcebergList(InternalArray flussArray, DataType 
elementType) {
+        this.flussArray = flussArray;
+        this.elementType = elementType;
+    }
+
+    @Override
+    public Object get(int index) {
+        if (flussArray.isNullAt(index)) {
+            return null;
+        }
+
+        if (elementType instanceof BooleanType) {
+            return flussArray.getBoolean(index);
+        } else if (elementType instanceof TinyIntType) {
+            return (int) flussArray.getByte(index);
+        } else if (elementType instanceof SmallIntType) {
+            return (int) flussArray.getShort(index);
+        } else if (elementType instanceof IntType) {
+            return flussArray.getInt(index);
+        } else if (elementType instanceof BigIntType) {
+            return flussArray.getLong(index);
+        } else if (elementType instanceof FloatType) {
+            return flussArray.getFloat(index);
+        } else if (elementType instanceof DoubleType) {
+            return flussArray.getDouble(index);
+        } else if (elementType instanceof StringType) {
+            return flussArray.getString(index).toString();
+        } else if (elementType instanceof CharType) {
+            CharType charType = (CharType) elementType;
+            return flussArray.getChar(index, charType.getLength()).toString();
+        } else if (elementType instanceof BytesType || elementType instanceof 
BinaryType) {
+            return ByteBuffer.wrap(flussArray.getBytes(index));
+        } else if (elementType instanceof DecimalType) {
+            DecimalType decimalType = (DecimalType) elementType;
+            return flussArray
+                    .getDecimal(index, decimalType.getPrecision(), 
decimalType.getScale())
+                    .toBigDecimal();
+        } else if (elementType instanceof LocalZonedTimestampType) {
+            LocalZonedTimestampType ltzType = (LocalZonedTimestampType) 
elementType;
+            return toIcebergTimestampLtz(
+                    flussArray.getTimestampLtz(index, 
ltzType.getPrecision()).toInstant());
+        } else if (elementType instanceof TimestampType) {
+            TimestampType tsType = (TimestampType) elementType;
+            return flussArray.getTimestampNtz(index, 
tsType.getPrecision()).toLocalDateTime();
+        } else if (elementType instanceof DateType) {
+            return DateTimeUtils.toLocalDate(flussArray.getInt(index));
+        } else if (elementType instanceof TimeType) {
+            return DateTimeUtils.toLocalTime(flussArray.getInt(index));
+        } else if (elementType instanceof ArrayType) {
+            InternalArray innerArray = flussArray.getArray(index);
+            return innerArray == null
+                    ? null
+                    : new FlussArrayAsIcebergList(
+                            innerArray, ((ArrayType) 
elementType).getElementType());
+        } else {
+            throw new UnsupportedOperationException(
+                    "Unsupported array element type conversion for Fluss type: 
"
+                            + elementType.getClass().getSimpleName());
+        }
+    }
+
+    @Override
+    public int size() {
+        return flussArray.size();
+    }
+
+    private OffsetDateTime toIcebergTimestampLtz(Instant instant) {
+        return OffsetDateTime.ofInstant(instant, ZoneOffset.UTC);
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecord.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecord.java
index 805bddcb6..70dc8ea6f 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecord.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecord.java
@@ -17,7 +17,9 @@
 
 package org.apache.fluss.lake.iceberg.source;
 
+import org.apache.fluss.row.InternalArray;
 import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.types.ArrayType;
 import org.apache.fluss.types.BigIntType;
 import org.apache.fluss.types.BinaryType;
 import org.apache.fluss.types.BooleanType;
@@ -169,6 +171,14 @@ public class FlussRowAsIcebergRecord implements Record {
             return row -> DateTimeUtils.toLocalDate(row.getInt(pos));
         } else if (flussType instanceof TimeType) {
             return row -> DateTimeUtils.toLocalTime(row.getInt(pos));
+        } else if (flussType instanceof ArrayType) {
+            ArrayType arrayType = (ArrayType) flussType;
+            return row -> {
+                InternalArray array = row.getArray(pos);
+                return array == null
+                        ? null
+                        : new FlussArrayAsIcebergList(array, 
arrayType.getElementType());
+            };
         } else {
             throw new UnsupportedOperationException(
                     "Unsupported data type conversion for Fluss type: "
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergArrayAsFlussArray.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergArrayAsFlussArray.java
new file mode 100644
index 000000000..23aecefec
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergArrayAsFlussArray.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.source;
+
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.Decimal;
+import org.apache.fluss.row.InternalArray;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.TimestampLtz;
+import org.apache.fluss.row.TimestampNtz;
+import org.apache.fluss.utils.BytesUtils;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
+import java.util.List;
+
+/** Adapter for Iceberg List as Fluss InternalArray. */
+public class IcebergArrayAsFlussArray implements InternalArray {
+
+    private final List<?> icebergList;
+
+    public IcebergArrayAsFlussArray(List<?> icebergList) {
+        this.icebergList = icebergList;
+    }
+
+    @Override
+    public int size() {
+        return icebergList.size();
+    }
+
+    @Override
+    public boolean isNullAt(int pos) {
+        return icebergList.get(pos) == null;
+    }
+
+    @Override
+    public boolean getBoolean(int pos) {
+        return (boolean) icebergList.get(pos);
+    }
+
+    @Override
+    public byte getByte(int pos) {
+        Object value = icebergList.get(pos);
+        return ((Integer) value).byteValue();
+    }
+
+    @Override
+    public short getShort(int pos) {
+        Object value = icebergList.get(pos);
+        return ((Integer) value).shortValue();
+    }
+
+    @Override
+    public int getInt(int pos) {
+        return (Integer) icebergList.get(pos);
+    }
+
+    @Override
+    public long getLong(int pos) {
+        return (Long) icebergList.get(pos);
+    }
+
+    @Override
+    public float getFloat(int pos) {
+        return (float) icebergList.get(pos);
+    }
+
+    @Override
+    public double getDouble(int pos) {
+        return (double) icebergList.get(pos);
+    }
+
+    @Override
+    public BinaryString getChar(int pos, int length) {
+        String value = (String) icebergList.get(pos);
+        return BinaryString.fromBytes(value.getBytes());
+    }
+
+    @Override
+    public BinaryString getString(int pos) {
+        String value = (String) icebergList.get(pos);
+        return BinaryString.fromBytes(value.getBytes());
+    }
+
+    @Override
+    public Decimal getDecimal(int pos, int precision, int scale) {
+        BigDecimal bigDecimal = (BigDecimal) icebergList.get(pos);
+        return Decimal.fromBigDecimal(bigDecimal, precision, scale);
+    }
+
+    @Override
+    public TimestampNtz getTimestampNtz(int pos, int precision) {
+        LocalDateTime localDateTime = (LocalDateTime) icebergList.get(pos);
+        return TimestampNtz.fromLocalDateTime(localDateTime);
+    }
+
+    @Override
+    public TimestampLtz getTimestampLtz(int pos, int precision) {
+        OffsetDateTime offsetDateTime = (OffsetDateTime) icebergList.get(pos);
+        return TimestampLtz.fromInstant(offsetDateTime.toInstant());
+    }
+
+    @Override
+    public byte[] getBinary(int pos, int length) {
+        ByteBuffer byteBuffer = (ByteBuffer) icebergList.get(pos);
+        return BytesUtils.toArray(byteBuffer);
+    }
+
+    @Override
+    public byte[] getBytes(int pos) {
+        ByteBuffer byteBuffer = (ByteBuffer) icebergList.get(pos);
+        return BytesUtils.toArray(byteBuffer);
+    }
+
+    @Override
+    public InternalArray getArray(int pos) {
+        List<?> nestedList = (List<?>) icebergList.get(pos);
+        return nestedList == null ? null : new 
IcebergArrayAsFlussArray(nestedList);
+    }
+
+    @Override
+    public InternalRow getRow(int pos, int numFields) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean[] toBooleanArray() {
+        boolean[] result = new boolean[icebergList.size()];
+        for (int i = 0; i < icebergList.size(); i++) {
+            result[i] = (boolean) icebergList.get(i);
+        }
+        return result;
+    }
+
+    @Override
+    public byte[] toByteArray() {
+        byte[] result = new byte[icebergList.size()];
+        for (int i = 0; i < icebergList.size(); i++) {
+            result[i] = ((Integer) icebergList.get(i)).byteValue();
+        }
+        return result;
+    }
+
+    @Override
+    public short[] toShortArray() {
+        short[] result = new short[icebergList.size()];
+        for (int i = 0; i < icebergList.size(); i++) {
+            result[i] = ((Integer) icebergList.get(i)).shortValue();
+        }
+        return result;
+    }
+
+    @Override
+    public int[] toIntArray() {
+        int[] result = new int[icebergList.size()];
+        for (int i = 0; i < icebergList.size(); i++) {
+            result[i] = (int) icebergList.get(i);
+        }
+        return result;
+    }
+
+    @Override
+    public long[] toLongArray() {
+        long[] result = new long[icebergList.size()];
+        for (int i = 0; i < icebergList.size(); i++) {
+            result[i] = (long) icebergList.get(i);
+        }
+        return result;
+    }
+
+    @Override
+    public float[] toFloatArray() {
+        float[] result = new float[icebergList.size()];
+        for (int i = 0; i < icebergList.size(); i++) {
+            result[i] = (float) icebergList.get(i);
+        }
+        return result;
+    }
+
+    @Override
+    public double[] toDoubleArray() {
+        double[] result = new double[icebergList.size()];
+        for (int i = 0; i < icebergList.size(); i++) {
+            result[i] = (double) icebergList.get(i);
+        }
+        return result;
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java
index 2bd721716..8143433b0 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java
@@ -33,6 +33,7 @@ import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.time.LocalDateTime;
 import java.time.OffsetDateTime;
+import java.util.List;
 
 import static org.apache.fluss.lake.iceberg.IcebergLakeCatalog.SYSTEM_COLUMNS;
 
@@ -152,8 +153,12 @@ public class IcebergRecordAsFlussRow implements 
InternalRow {
 
     @Override
     public InternalArray getArray(int pos) {
-        // TODO: Support Array type conversion from Iceberg to Fluss
-        throw new UnsupportedOperationException();
+        Object value = icebergRecord.get(pos);
+        if (value == null) {
+            return null;
+        }
+        List<?> icebergList = (List<?>) value;
+        return new IcebergArrayAsFlussArray(icebergList);
     }
 
     @Override
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/IcebergConversions.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/IcebergConversions.java
index 4d7582c53..038ebd8a7 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/IcebergConversions.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/IcebergConversions.java
@@ -125,6 +125,9 @@ public class IcebergConversions {
         } else if (icebergType instanceof Types.DecimalType) {
             Types.DecimalType decimalType = (Types.DecimalType) icebergType;
             return DataTypes.DECIMAL(decimalType.precision(), 
decimalType.scale());
+        } else if (icebergType instanceof Types.ListType) {
+            Types.ListType listType = (Types.ListType) icebergType;
+            return 
DataTypes.ARRAY(convertIcebergTypeToFlussType(listType.elementType()));
         }
         throw new UnsupportedOperationException(
                 "Unsupported data type conversion for Iceberg type: "
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadPrimaryKeyTableITCase.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadPrimaryKeyTableITCase.java
index 0518290a5..a8b62e773 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadPrimaryKeyTableITCase.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadPrimaryKeyTableITCase.java
@@ -25,6 +25,7 @@ import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.metadata.TableDescriptor;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.row.Decimal;
+import org.apache.fluss.row.GenericArray;
 import org.apache.fluss.row.InternalRow;
 import org.apache.fluss.row.TimestampLtz;
 import org.apache.fluss.row.TimestampNtz;
@@ -98,6 +99,7 @@ public class FlinkUnionReadPrimaryKeyTableITCase extends 
FlinkUnionReadTestBase
                                 TimestampNtz.fromMillis(1698235273183L),
                                 TimestampNtz.fromMillis(1698235273183L, 6000),
                                 new byte[] {1, 2, 3, 4},
+                                new float[] {1.1f, 1.2f, 1.3f},
                                 partition));
                 expectedRows.add(
                         Row.of(
@@ -116,6 +118,7 @@ public class FlinkUnionReadPrimaryKeyTableITCase extends 
FlinkUnionReadTestBase
                                 TimestampNtz.fromMillis(1698235273201L),
                                 TimestampNtz.fromMillis(1698235273201L, 6000),
                                 new byte[] {1, 2, 3, 4},
+                                new float[] {1.1f, 1.2f, 1.3f},
                                 partition));
             }
         } else {
@@ -137,6 +140,7 @@ public class FlinkUnionReadPrimaryKeyTableITCase extends 
FlinkUnionReadTestBase
                                     TimestampNtz.fromMillis(1698235273183L),
                                     TimestampNtz.fromMillis(1698235273183L, 
6000),
                                     new byte[] {1, 2, 3, 4},
+                                    new float[] {1.1f, 1.2f, 1.3f},
                                     null),
                             Row.of(
                                     true,
@@ -154,6 +158,7 @@ public class FlinkUnionReadPrimaryKeyTableITCase extends 
FlinkUnionReadTestBase
                                     TimestampNtz.fromMillis(1698235273201L),
                                     TimestampNtz.fromMillis(1698235273201L, 
6000),
                                     new byte[] {1, 2, 3, 4},
+                                    new float[] {1.1f, 1.2f, 1.3f},
                                     null));
         }
 
@@ -196,6 +201,7 @@ public class FlinkUnionReadPrimaryKeyTableITCase extends 
FlinkUnionReadTestBase
                                 TimestampNtz.fromMillis(1698235273201L),
                                 TimestampNtz.fromMillis(1698235273201L, 6000),
                                 new byte[] {1, 2, 3, 4},
+                                new float[] {1.1f, 1.2f, 1.3f},
                                 partition));
                 expectedRows2.add(
                         Row.ofKind(
@@ -215,6 +221,7 @@ public class FlinkUnionReadPrimaryKeyTableITCase extends 
FlinkUnionReadTestBase
                                 TimestampNtz.fromMillis(1698235273501L),
                                 TimestampNtz.fromMillis(1698235273501L, 8000),
                                 new byte[] {5, 6, 7, 8},
+                                new float[] {2.1f, 2.2f, 2.3f},
                                 partition));
             }
         } else {
@@ -236,6 +243,7 @@ public class FlinkUnionReadPrimaryKeyTableITCase extends 
FlinkUnionReadTestBase
                             TimestampNtz.fromMillis(1698235273201L),
                             TimestampNtz.fromMillis(1698235273201L, 6000),
                             new byte[] {1, 2, 3, 4},
+                            new float[] {1.1f, 1.2f, 1.3f},
                             null));
             expectedRows2.add(
                     Row.ofKind(
@@ -255,6 +263,7 @@ public class FlinkUnionReadPrimaryKeyTableITCase extends 
FlinkUnionReadTestBase
                             TimestampNtz.fromMillis(1698235273501L),
                             TimestampNtz.fromMillis(1698235273501L, 8000),
                             new byte[] {5, 6, 7, 8},
+                            new float[] {2.1f, 2.2f, 2.3f},
                             null));
         }
 
@@ -358,6 +367,7 @@ public class FlinkUnionReadPrimaryKeyTableITCase extends 
FlinkUnionReadTestBase
                                 TimestampNtz.fromMillis(1698235273501L),
                                 TimestampNtz.fromMillis(1698235273501L, 8000),
                                 new byte[] {5, 6, 7, 8},
+                                new GenericArray(new float[] {2.1f, 2.2f, 
2.3f}),
                                 partition));
         writeRows(tablePath, rows, false);
     }
@@ -411,7 +421,8 @@ public class FlinkUnionReadPrimaryKeyTableITCase extends 
FlinkUnionReadTestBase
                         .column("c13", DataTypes.TIMESTAMP(3))
                         .column("c14", DataTypes.TIMESTAMP(6))
                         .column("c15", DataTypes.BINARY(4))
-                        .column("c16", DataTypes.STRING());
+                        .column("c16", DataTypes.ARRAY(DataTypes.FLOAT()))
+                        .column("c17", DataTypes.STRING());
 
         TableDescriptor.Builder tableBuilder =
                 TableDescriptor.builder()
@@ -421,8 +432,8 @@ public class FlinkUnionReadPrimaryKeyTableITCase extends 
FlinkUnionReadTestBase
 
         if (isPartitioned) {
             tableBuilder.property(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED, 
true);
-            tableBuilder.partitionedBy("c16");
-            schemaBuilder.primaryKey("c4", "c16");
+            tableBuilder.partitionedBy("c17");
+            schemaBuilder.primaryKey("c4", "c17");
             tableBuilder.property(
                     ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT, 
AutoPartitionTimeUnit.YEAR);
         } else {
@@ -450,6 +461,7 @@ public class FlinkUnionReadPrimaryKeyTableITCase extends 
FlinkUnionReadTestBase
                         TimestampNtz.fromMillis(1698235273183L),
                         TimestampNtz.fromMillis(1698235273183L, 6000),
                         new byte[] {1, 2, 3, 4},
+                        new GenericArray(new float[] {1.1f, 1.2f, 1.3f}),
                         partition),
                 row(
                         true,
@@ -467,6 +479,7 @@ public class FlinkUnionReadPrimaryKeyTableITCase extends 
FlinkUnionReadTestBase
                         TimestampNtz.fromMillis(1698235273201L),
                         TimestampNtz.fromMillis(1698235273201L, 6000),
                         new byte[] {1, 2, 3, 4},
+                        new GenericArray(new float[] {1.1f, 1.2f, 1.3f}),
                         partition));
     }
 
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecordTest.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecordTest.java
new file mode 100644
index 000000000..0c3ec679c
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecordTest.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.source;
+
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.Decimal;
+import org.apache.fluss.row.GenericArray;
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.types.DataTypes;
+import org.apache.fluss.types.RowType;
+
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit tests for {@link FlussRowAsIcebergRecord} with array types. */
+class FlussRowAsIcebergRecordTest {
+
+    @Test
+    void testArrayWithAllTypes() {
+        Types.StructType structType =
+                Types.StructType.of(
+                        Types.NestedField.required(
+                                0,
+                                "bool_array",
+                                Types.ListType.ofRequired(1, 
Types.BooleanType.get())),
+                        Types.NestedField.required(
+                                2,
+                                "byte_array",
+                                Types.ListType.ofRequired(3, 
Types.IntegerType.get())),
+                        Types.NestedField.required(
+                                4,
+                                "short_array",
+                                Types.ListType.ofRequired(5, 
Types.IntegerType.get())),
+                        Types.NestedField.required(
+                                6,
+                                "int_array",
+                                Types.ListType.ofRequired(7, 
Types.IntegerType.get())),
+                        Types.NestedField.required(
+                                8,
+                                "long_array",
+                                Types.ListType.ofRequired(9, 
Types.LongType.get())),
+                        Types.NestedField.required(
+                                10,
+                                "float_array",
+                                Types.ListType.ofRequired(11, 
Types.FloatType.get())),
+                        Types.NestedField.required(
+                                12,
+                                "double_array",
+                                Types.ListType.ofRequired(13, 
Types.DoubleType.get())),
+                        Types.NestedField.required(
+                                14,
+                                "string_array",
+                                Types.ListType.ofRequired(15, 
Types.StringType.get())),
+                        Types.NestedField.required(
+                                16,
+                                "decimal_array",
+                                Types.ListType.ofRequired(17, 
Types.DecimalType.of(10, 2))),
+                        Types.NestedField.required(
+                                18,
+                                "timestamp_ntz_array",
+                                Types.ListType.ofRequired(19, 
Types.TimestampType.withoutZone())),
+                        Types.NestedField.required(
+                                20,
+                                "timestamp_ltz_array",
+                                Types.ListType.ofRequired(21, 
Types.TimestampType.withZone())),
+                        Types.NestedField.required(
+                                22,
+                                "binary_array",
+                                Types.ListType.ofRequired(23, 
Types.BinaryType.get())),
+                        Types.NestedField.required(
+                                24,
+                                "nested_array",
+                                Types.ListType.ofRequired(
+                                        25,
+                                        Types.ListType.ofRequired(26, 
Types.IntegerType.get()))),
+                        Types.NestedField.required(
+                                27,
+                                "nullable_int_array",
+                                Types.ListType.ofOptional(28, 
Types.IntegerType.get())),
+                        Types.NestedField.optional(
+                                29,
+                                "null_array",
+                                Types.ListType.ofRequired(30, 
Types.IntegerType.get())));
+
+        RowType flussRowType =
+                RowType.of(
+                        DataTypes.ARRAY(DataTypes.BOOLEAN()),
+                        DataTypes.ARRAY(DataTypes.TINYINT()),
+                        DataTypes.ARRAY(DataTypes.SMALLINT()),
+                        DataTypes.ARRAY(DataTypes.INT()),
+                        DataTypes.ARRAY(DataTypes.BIGINT()),
+                        DataTypes.ARRAY(DataTypes.FLOAT()),
+                        DataTypes.ARRAY(DataTypes.DOUBLE()),
+                        DataTypes.ARRAY(DataTypes.STRING()),
+                        DataTypes.ARRAY(DataTypes.DECIMAL(10, 2)),
+                        DataTypes.ARRAY(DataTypes.TIMESTAMP(6)),
+                        DataTypes.ARRAY(DataTypes.TIMESTAMP_LTZ(6)),
+                        DataTypes.ARRAY(DataTypes.BYTES()),
+                        DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT())),
+                        DataTypes.ARRAY(DataTypes.INT()),
+                        DataTypes.ARRAY(DataTypes.INT()));
+
+        GenericRow genericRow = new GenericRow(15);
+        genericRow.setField(0, new GenericArray(new boolean[] {true, false, 
true}));
+        genericRow.setField(1, new GenericArray(new byte[] {1, 2, 3}));
+        genericRow.setField(2, new GenericArray(new short[] {100, 200, 300}));
+        genericRow.setField(3, new GenericArray(new int[] {1000, 2000, 3000}));
+        genericRow.setField(4, new GenericArray(new long[] {10000L, 20000L, 
30000L}));
+        genericRow.setField(5, new GenericArray(new float[] {1.1f, 2.2f, 
3.3f}));
+        genericRow.setField(6, new GenericArray(new double[] {1.11, 2.22, 
3.33}));
+        genericRow.setField(
+                7,
+                new GenericArray(
+                        new Object[] {
+                            BinaryString.fromString("hello"),
+                            BinaryString.fromString("world"),
+                            BinaryString.fromString("test")
+                        }));
+        genericRow.setField(
+                8,
+                new GenericArray(
+                        new Object[] {
+                            Decimal.fromBigDecimal(new BigDecimal("123.45"), 
10, 2),
+                            Decimal.fromBigDecimal(new BigDecimal("678.90"), 
10, 2)
+                        }));
+        genericRow.setField(
+                9,
+                new GenericArray(
+                        new Object[] {
+                            
org.apache.fluss.row.TimestampNtz.fromLocalDateTime(
+                                    LocalDateTime.now()),
+                            
org.apache.fluss.row.TimestampNtz.fromLocalDateTime(
+                                    LocalDateTime.now().plusSeconds(1))
+                        }));
+        genericRow.setField(
+                10,
+                new GenericArray(
+                        new Object[] {
+                            org.apache.fluss.row.TimestampLtz.fromEpochMillis(
+                                    System.currentTimeMillis()),
+                            org.apache.fluss.row.TimestampLtz.fromEpochMillis(
+                                    System.currentTimeMillis() + 1000)
+                        }));
+        genericRow.setField(
+                11,
+                new GenericArray(
+                        new Object[] {"hello".getBytes(), "world".getBytes(), 
"test".getBytes()}));
+        genericRow.setField(
+                12,
+                new GenericArray(
+                        new Object[] {
+                            new GenericArray(new int[] {1, 2}),
+                            new GenericArray(new int[] {3, 4, 5})
+                        }));
+        genericRow.setField(13, new GenericArray(new Object[] {1, null, 3}));
+        genericRow.setField(14, null);
+
+        FlussRowAsIcebergRecord record = new 
FlussRowAsIcebergRecord(structType, flussRowType);
+        record.internalRow = genericRow;
+
+        // Test boolean array
+        List<?> boolArray = (List<?>) record.get(0);
+        assertThat(boolArray.size()).isEqualTo(3);
+        assertThat(boolArray.get(0)).isEqualTo(true);
+        assertThat(boolArray.get(1)).isEqualTo(false);
+        assertThat(boolArray.get(2)).isEqualTo(true);
+
+        // Test byte array
+        List<?> byteArray = (List<?>) record.get(1);
+        assertThat(byteArray.size()).isEqualTo(3);
+        assertThat(byteArray.get(0)).isEqualTo(1);
+        assertThat(byteArray.get(1)).isEqualTo(2);
+        assertThat(byteArray.get(2)).isEqualTo(3);
+
+        // Test short array
+        List<?> shortArray = (List<?>) record.get(2);
+        assertThat(shortArray.size()).isEqualTo(3);
+        assertThat(shortArray.get(0)).isEqualTo(100);
+        assertThat(shortArray.get(1)).isEqualTo(200);
+        assertThat(shortArray.get(2)).isEqualTo(300);
+
+        // Test int array
+        List<?> intArray = (List<?>) record.get(3);
+        assertThat(intArray.size()).isEqualTo(3);
+        assertThat(intArray.get(0)).isEqualTo(1000);
+        assertThat(intArray.get(1)).isEqualTo(2000);
+        assertThat(intArray.get(2)).isEqualTo(3000);
+
+        // Test long array
+        List<?> longArray = (List<?>) record.get(4);
+        assertThat(longArray.size()).isEqualTo(3);
+        assertThat(longArray.get(0)).isEqualTo(10000L);
+        assertThat(longArray.get(1)).isEqualTo(20000L);
+        assertThat(longArray.get(2)).isEqualTo(30000L);
+
+        // Test float array
+        List<?> floatArray = (List<?>) record.get(5);
+        assertThat(floatArray.size()).isEqualTo(3);
+        assertThat(floatArray.get(0)).isEqualTo(1.1f);
+        assertThat(floatArray.get(1)).isEqualTo(2.2f);
+        assertThat(floatArray.get(2)).isEqualTo(3.3f);
+
+        // Test double array
+        List<?> doubleArray = (List<?>) record.get(6);
+        assertThat(doubleArray.size()).isEqualTo(3);
+        assertThat(doubleArray.get(0)).isEqualTo(1.11);
+        assertThat(doubleArray.get(1)).isEqualTo(2.22);
+        assertThat(doubleArray.get(2)).isEqualTo(3.33);
+
+        // Test string array
+        List<?> stringArray = (List<?>) record.get(7);
+        assertThat(stringArray.size()).isEqualTo(3);
+        assertThat(stringArray.get(0)).isEqualTo("hello");
+        assertThat(stringArray.get(1)).isEqualTo("world");
+        assertThat(stringArray.get(2)).isEqualTo("test");
+
+        // Test decimal array
+        List<?> decimalArray = (List<?>) record.get(8);
+        assertThat(decimalArray.size()).isEqualTo(2);
+        assertThat(decimalArray.get(0)).isEqualTo(new BigDecimal("123.45"));
+        assertThat(decimalArray.get(1)).isEqualTo(new BigDecimal("678.90"));
+
+        // Test timestamp array
+        List<?> timestampNtzArray = (List<?>) record.get(9);
+        assertThat(timestampNtzArray).isNotNull();
+        assertThat(timestampNtzArray.size()).isEqualTo(2);
+        assertThat(timestampNtzArray.get(0)).isInstanceOf(LocalDateTime.class);
+        assertThat(timestampNtzArray.get(1)).isInstanceOf(LocalDateTime.class);
+
+        // Test timestamp_ltz array
+        List<?> timestampLtzArray = (List<?>) record.get(10);
+        assertThat(timestampLtzArray).isNotNull();
+        assertThat(timestampLtzArray.size()).isEqualTo(2);
+        
assertThat(timestampLtzArray.get(0)).isInstanceOf(OffsetDateTime.class);
+        
assertThat(timestampLtzArray.get(1)).isInstanceOf(OffsetDateTime.class);
+
+        // Test binary array
+        List<?> binaryArray = (List<?>) record.get(11);
+        assertThat(binaryArray).isNotNull();
+        assertThat(binaryArray.size()).isEqualTo(3);
+        assertThat(binaryArray.get(0)).isInstanceOf(ByteBuffer.class);
+        assertThat(((ByteBuffer) 
binaryArray.get(0)).array()).isEqualTo("hello".getBytes());
+        assertThat(((ByteBuffer) 
binaryArray.get(1)).array()).isEqualTo("world".getBytes());
+        assertThat(((ByteBuffer) 
binaryArray.get(2)).array()).isEqualTo("test".getBytes());
+
+        // Test nested array (array<array<int>>)
+        List<?> outerArray = (List<?>) record.get(12);
+        assertThat(outerArray).isNotNull();
+        assertThat(outerArray.size()).isEqualTo(2);
+
+        List<?> innerArray1 = (List<?>) outerArray.get(0);
+        assertThat(innerArray1.size()).isEqualTo(2);
+        assertThat(innerArray1.get(0)).isEqualTo(1);
+        assertThat(innerArray1.get(1)).isEqualTo(2);
+
+        List<?> innerArray2 = (List<?>) outerArray.get(1);
+        assertThat(innerArray2.size()).isEqualTo(3);
+        assertThat(innerArray2.get(0)).isEqualTo(3);
+        assertThat(innerArray2.get(1)).isEqualTo(4);
+        assertThat(innerArray2.get(2)).isEqualTo(5);
+
+        // Test array with null elements
+        List<?> nullableArray = (List<?>) record.get(13);
+        assertThat(nullableArray).isNotNull();
+        assertThat(nullableArray.size()).isEqualTo(3);
+        assertThat(nullableArray.get(0)).isEqualTo(1);
+        assertThat(nullableArray.get(1)).isNull();
+        assertThat(nullableArray.get(2)).isEqualTo(3);
+
+        // Test null array
+        assertThat(record.get(14)).isNull();
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java
index ba59b1b74..6443e8529 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java
@@ -347,7 +347,7 @@ class IcebergTieringTest {
     }
 
     private GenericRecord toRecord(long offset, GenericRow row, ChangeType 
changeType) {
-        return new GenericRecord(offset, System.currentTimeMillis(), 
changeType, row);
+        return new GenericRecord(offset, 1000000000L + offset, changeType, 
row);
     }
 
     private void createTable(
diff --git a/website/docs/streaming-lakehouse/integrate-data-lakes/iceberg.md 
b/website/docs/streaming-lakehouse/integrate-data-lakes/iceberg.md
index cd2926caf..8cadf2c2a 100644
--- a/website/docs/streaming-lakehouse/integrate-data-lakes/iceberg.md
+++ b/website/docs/streaming-lakehouse/integrate-data-lakes/iceberg.md
@@ -500,6 +500,7 @@ When integrating with Iceberg, Fluss automatically converts 
between Fluss data t
 | TIMESTAMP WITH LOCAL TIMEZONE | TIMESTAMP (with timezone)     |              
       |
 | BINARY                        | BINARY                        |              
       |
 | BYTES                         | BINARY                        | Converted to 
BINARY |
+| ARRAY                         | LIST                          |              
       |
 
 
 ## Maintenance and Optimization


Reply via email to