This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new ed1e8f8ef [lake/iceberg] Support MAP type in Iceberg tables (#2367)
ed1e8f8ef is described below

commit ed1e8f8ef55011d30bb12e578e4a6872812a393d
Author: HONGGEUN JI <[email protected]>
AuthorDate: Mon Feb 2 01:01:13 2026 -0500

    [lake/iceberg] Support MAP type in Iceberg tables (#2367)
---
 .../row/encode/iceberg/IcebergBinaryRowWriter.java |  15 ++
 .../iceberg/FlussDataTypeToIcebergDataType.java    |  14 +-
 .../iceberg/source/FlussArrayAsIcebergList.java    |   9 ++
 .../lake/iceberg/source/FlussMapAsIcebergMap.java  | 180 +++++++++++++++++++++
 .../iceberg/source/FlussRowAsIcebergRecord.java    |  11 ++
 .../iceberg/source/IcebergArrayAsFlussArray.java   |   4 +-
 .../lake/iceberg/source/IcebergMapAsFlussMap.java  |  50 ++++++
 .../iceberg/source/IcebergRecordAsFlussRow.java    |   9 +-
 .../flink/FlinkUnionReadLogTableITCase.java        |  29 +++-
 .../source/FlussRowAsIcebergRecordTest.java        | 150 +++++++++++++++++
 .../lake/iceberg/source/IcebergLakeSourceTest.java |   7 +-
 .../source/IcebergRecordAsFlussRowTest.java        |  60 ++++++-
 .../encode/iceberg/IcebergBinaryRowWriterTest.java |  59 +++++++
 .../integrate-data-lakes/iceberg.md                |  43 +++--
 14 files changed, 600 insertions(+), 40 deletions(-)

diff --git 
a/fluss-common/src/main/java/org/apache/fluss/row/encode/iceberg/IcebergBinaryRowWriter.java
 
b/fluss-common/src/main/java/org/apache/fluss/row/encode/iceberg/IcebergBinaryRowWriter.java
index 9074759bb..433f36452 100644
--- 
a/fluss-common/src/main/java/org/apache/fluss/row/encode/iceberg/IcebergBinaryRowWriter.java
+++ 
b/fluss-common/src/main/java/org/apache/fluss/row/encode/iceberg/IcebergBinaryRowWriter.java
@@ -196,6 +196,13 @@ class IcebergBinaryRowWriter {
 
             case BIGINT:
                 return (writer, value) -> writer.writeLong((long) value);
+
+            case FLOAT:
+                return (writer, value) -> writer.writeFloat((float) value);
+
+            case DOUBLE:
+                return (writer, value) -> writer.writeDouble((double) value);
+
                 // support for nanoseconds come check again after #1195 merge
             case TIMESTAMP_WITHOUT_TIME_ZONE:
                 return (writer, value) -> {
@@ -215,6 +222,14 @@ class IcebergBinaryRowWriter {
             case BYTES:
                 return (writer, value) -> writer.writeBytes((byte[]) value, 
true);
 
+            case ARRAY:
+                throw new IllegalArgumentException(
+                        "Array types cannot be used as bucket keys. Bucket 
keys must be scalar types.");
+
+            case MAP:
+                throw new IllegalArgumentException(
+                        "Map types cannot be used as bucket keys. Bucket keys 
must be scalar types.");
+
             default:
                 throw new IllegalArgumentException(
                         "Unsupported type for Iceberg binary row writer: " + 
fieldType);
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java
index bd01d08af..fb8f81957 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java
@@ -167,7 +167,19 @@ public class FlussDataTypeToIcebergDataType implements 
DataTypeVisitor<Type> {
 
     @Override
     public Type visit(MapType mapType) {
-        throw new UnsupportedOperationException("Unsupported map type");
+        // According to the Iceberg spec,
+        // the key and value fields of a map should have consecutive IDs
+        int keyFieldId = getNextId();
+        int valueFieldId = getNextId();
+
+        Type keyType = mapType.getKeyType().accept(this);
+        Type valueType = mapType.getValueType().accept(this);
+
+        if (mapType.getValueType().isNullable()) {
+            return Types.MapType.ofOptional(keyFieldId, valueFieldId, keyType, 
valueType);
+        } else {
+            return Types.MapType.ofRequired(keyFieldId, valueFieldId, keyType, 
valueType);
+        }
     }
 
     @Override
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussArrayAsIcebergList.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussArrayAsIcebergList.java
index 2671e045c..285c8b1c5 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussArrayAsIcebergList.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussArrayAsIcebergList.java
@@ -18,6 +18,7 @@
 package org.apache.fluss.lake.iceberg.source;
 
 import org.apache.fluss.row.InternalArray;
+import org.apache.fluss.row.InternalMap;
 import org.apache.fluss.types.ArrayType;
 import org.apache.fluss.types.BigIntType;
 import org.apache.fluss.types.BinaryType;
@@ -31,6 +32,7 @@ import org.apache.fluss.types.DoubleType;
 import org.apache.fluss.types.FloatType;
 import org.apache.fluss.types.IntType;
 import org.apache.fluss.types.LocalZonedTimestampType;
+import org.apache.fluss.types.MapType;
 import org.apache.fluss.types.SmallIntType;
 import org.apache.fluss.types.StringType;
 import org.apache.fluss.types.TimeType;
@@ -104,6 +106,13 @@ public class FlussArrayAsIcebergList extends 
AbstractList<Object> {
                     ? null
                     : new FlussArrayAsIcebergList(
                             innerArray, ((ArrayType) 
elementType).getElementType());
+        } else if (elementType instanceof MapType) {
+            MapType mapType = (MapType) elementType;
+            InternalMap internalMap = flussArray.getMap(index);
+            return internalMap == null
+                    ? null
+                    : new FlussMapAsIcebergMap(
+                            internalMap, mapType.getKeyType(), 
mapType.getValueType());
         } else {
             throw new UnsupportedOperationException(
                     "Unsupported array element type conversion for Fluss type: 
"
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussMapAsIcebergMap.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussMapAsIcebergMap.java
new file mode 100644
index 000000000..de05af5d5
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussMapAsIcebergMap.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.source;
+
+import org.apache.fluss.lake.iceberg.FlussDataTypeToIcebergDataType;
+import org.apache.fluss.row.InternalArray;
+import org.apache.fluss.row.InternalMap;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.types.ArrayType;
+import org.apache.fluss.types.BigIntType;
+import org.apache.fluss.types.BinaryType;
+import org.apache.fluss.types.BooleanType;
+import org.apache.fluss.types.BytesType;
+import org.apache.fluss.types.CharType;
+import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.DateType;
+import org.apache.fluss.types.DecimalType;
+import org.apache.fluss.types.DoubleType;
+import org.apache.fluss.types.FloatType;
+import org.apache.fluss.types.IntType;
+import org.apache.fluss.types.LocalZonedTimestampType;
+import org.apache.fluss.types.MapType;
+import org.apache.fluss.types.RowType;
+import org.apache.fluss.types.SmallIntType;
+import org.apache.fluss.types.StringType;
+import org.apache.fluss.types.TimeType;
+import org.apache.fluss.types.TimestampType;
+import org.apache.fluss.types.TinyIntType;
+import org.apache.fluss.utils.DateTimeUtils;
+
+import org.apache.iceberg.types.Types;
+
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.AbstractMap;
+import java.util.AbstractSet;
+import java.util.Iterator;
+import java.util.Set;
+
+/** Adapter class for converting Fluss InternalMap to a Java Map for Iceberg. 
*/
+public class FlussMapAsIcebergMap extends AbstractMap<Object, Object> {
+
+    private final InternalMap flussMap;
+    private final DataType keyType;
+    private final DataType valueType;
+
+    public FlussMapAsIcebergMap(InternalMap flussMap, DataType keyType, 
DataType valueType) {
+        this.flussMap = flussMap;
+        this.keyType = keyType;
+        this.valueType = valueType;
+    }
+
+    @Override
+    public int size() {
+        return flussMap.size();
+    }
+
+    @Override
+    public Set<Entry<Object, Object>> entrySet() {
+        return new AbstractSet<>() {
+            @Override
+            public Iterator<Entry<Object, Object>> iterator() {
+                return new Iterator<>() {
+                    private final InternalArray keyArray = flussMap.keyArray();
+                    private final InternalArray valueArray = 
flussMap.valueArray();
+                    private final int size = flussMap.size();
+                    private int currentIndex = 0;
+
+                    @Override
+                    public boolean hasNext() {
+                        return currentIndex < size;
+                    }
+
+                    @Override
+                    public Entry<Object, Object> next() {
+                        Object key = convertElement(keyArray, currentIndex, 
keyType);
+                        Object value = convertElement(valueArray, 
currentIndex, valueType);
+                        currentIndex++;
+                        return new AbstractMap.SimpleEntry<>(key, value);
+                    }
+                };
+            }
+
+            @Override
+            public int size() {
+                return flussMap.size();
+            }
+        };
+    }
+
+    private Object convertElement(InternalArray array, int index, DataType 
elementType) {
+        if (array.isNullAt(index)) {
+            return null;
+        }
+
+        if (elementType instanceof BooleanType) {
+            return array.getBoolean(index);
+        } else if (elementType instanceof TinyIntType) {
+            return (int) array.getByte(index);
+        } else if (elementType instanceof SmallIntType) {
+            return (int) array.getShort(index);
+        } else if (elementType instanceof IntType) {
+            return array.getInt(index);
+        } else if (elementType instanceof BigIntType) {
+            return array.getLong(index);
+        } else if (elementType instanceof FloatType) {
+            return array.getFloat(index);
+        } else if (elementType instanceof DoubleType) {
+            return array.getDouble(index);
+        } else if (elementType instanceof StringType) {
+            return array.getString(index).toString();
+        } else if (elementType instanceof CharType) {
+            CharType charType = (CharType) elementType;
+            return array.getChar(index, charType.getLength()).toString();
+        } else if (elementType instanceof DecimalType) {
+            DecimalType decimalType = (DecimalType) elementType;
+            return array.getDecimal(index, decimalType.getPrecision(), 
decimalType.getScale())
+                    .toBigDecimal();
+        } else if (elementType instanceof LocalZonedTimestampType) {
+            LocalZonedTimestampType ltzType = (LocalZonedTimestampType) 
elementType;
+            return toIcebergTimestampLtz(
+                    array.getTimestampLtz(index, 
ltzType.getPrecision()).toInstant());
+        } else if (elementType instanceof TimestampType) {
+            TimestampType tsType = (TimestampType) elementType;
+            return array.getTimestampNtz(index, 
tsType.getPrecision()).toLocalDateTime();
+        } else if (elementType instanceof DateType) {
+            return DateTimeUtils.toLocalDate(array.getInt(index));
+        } else if (elementType instanceof TimeType) {
+            return DateTimeUtils.toLocalTime(array.getInt(index));
+        } else if (elementType instanceof BytesType || elementType instanceof 
BinaryType) {
+            return ByteBuffer.wrap(array.getBytes(index));
+        } else if (elementType instanceof ArrayType) {
+            ArrayType arrayType = (ArrayType) elementType;
+            InternalArray internalArray = array.getArray(index);
+            return internalArray == null
+                    ? null
+                    : new FlussArrayAsIcebergList(internalArray, 
arrayType.getElementType());
+        } else if (elementType instanceof MapType) {
+            MapType mapType = (MapType) elementType;
+            InternalMap internalMap = array.getMap(index);
+            return internalMap == null
+                    ? null
+                    : new FlussMapAsIcebergMap(
+                            internalMap, mapType.getKeyType(), 
mapType.getValueType());
+        } else if (elementType instanceof RowType) {
+            RowType rowType = (RowType) elementType;
+            Types.StructType nestedStructType =
+                    (Types.StructType) 
rowType.accept(FlussDataTypeToIcebergDataType.INSTANCE);
+            InternalRow internalRow = array.getRow(index, 
rowType.getFieldCount());
+            return internalRow == null
+                    ? null
+                    : new FlussRowAsIcebergRecord(nestedStructType, rowType, 
internalRow);
+        } else {
+            throw new UnsupportedOperationException(
+                    "Unsupported array element type conversion for Fluss type: 
"
+                            + elementType.getClass().getSimpleName());
+        }
+    }
+
+    private OffsetDateTime toIcebergTimestampLtz(Instant instant) {
+        return OffsetDateTime.ofInstant(instant, ZoneOffset.UTC);
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecord.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecord.java
index 6d7070001..449dfc31f 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecord.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecord.java
@@ -19,6 +19,7 @@ package org.apache.fluss.lake.iceberg.source;
 
 import org.apache.fluss.lake.iceberg.FlussDataTypeToIcebergDataType;
 import org.apache.fluss.row.InternalArray;
+import org.apache.fluss.row.InternalMap;
 import org.apache.fluss.row.InternalRow;
 import org.apache.fluss.types.ArrayType;
 import org.apache.fluss.types.BigIntType;
@@ -33,6 +34,7 @@ import org.apache.fluss.types.DoubleType;
 import org.apache.fluss.types.FloatType;
 import org.apache.fluss.types.IntType;
 import org.apache.fluss.types.LocalZonedTimestampType;
+import org.apache.fluss.types.MapType;
 import org.apache.fluss.types.RowType;
 import org.apache.fluss.types.SmallIntType;
 import org.apache.fluss.types.StringType;
@@ -189,6 +191,15 @@ public class FlussRowAsIcebergRecord implements Record {
                 InternalRow nestedRow = row.getRow(pos, 
rowType.getFieldCount());
                 return new FlussRowAsIcebergRecord(nestedStructType, rowType, 
nestedRow);
             };
+        } else if (flussType instanceof MapType) {
+            MapType mapType = (MapType) flussType;
+            return row -> {
+                InternalMap map = row.getMap(pos);
+                return map == null
+                        ? null
+                        : new FlussMapAsIcebergMap(
+                                map, mapType.getKeyType(), 
mapType.getValueType());
+            };
         } else {
             throw new UnsupportedOperationException(
                     "Unsupported data type conversion for Fluss type: "
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergArrayAsFlussArray.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergArrayAsFlussArray.java
index e8a4b1914..fd459c218 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergArrayAsFlussArray.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergArrayAsFlussArray.java
@@ -32,6 +32,7 @@ import java.nio.ByteBuffer;
 import java.time.LocalDateTime;
 import java.time.OffsetDateTime;
 import java.util.List;
+import java.util.Map;
 
 /** Adapter for Iceberg List as Fluss InternalArray. */
 public class IcebergArrayAsFlussArray implements InternalArray {
@@ -139,7 +140,8 @@ public class IcebergArrayAsFlussArray implements 
InternalArray {
 
     @Override
     public InternalMap getMap(int pos) {
-        throw new UnsupportedOperationException();
+        Map<?, ?> nestedMap = (Map<?, ?>) icebergList.get(pos);
+        return nestedMap == null ? null : new IcebergMapAsFlussMap(nestedMap);
     }
 
     @Override
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergMapAsFlussMap.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergMapAsFlussMap.java
new file mode 100644
index 000000000..738e953ed
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergMapAsFlussMap.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.source;
+
+import org.apache.fluss.row.InternalArray;
+import org.apache.fluss.row.InternalMap;
+
+import java.util.ArrayList;
+import java.util.Map;
+
+/** Adapter for Iceberg Map as Fluss InternalMap. */
+public class IcebergMapAsFlussMap implements InternalMap {
+
+    private final Map<?, ?> icebergMap;
+
+    public IcebergMapAsFlussMap(Map<?, ?> icebergMap) {
+        this.icebergMap = icebergMap;
+    }
+
+    @Override
+    public int size() {
+        return icebergMap.size();
+    }
+
+    @Override
+    public InternalArray keyArray() {
+        return new IcebergArrayAsFlussArray(new 
ArrayList<>(icebergMap.keySet()));
+    }
+
+    @Override
+    public InternalArray valueArray() {
+        return new IcebergArrayAsFlussArray(new 
ArrayList<>(icebergMap.values()));
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java
index 2a560e8d1..4aca5cb08 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java
@@ -34,6 +34,7 @@ import java.nio.ByteBuffer;
 import java.time.LocalDateTime;
 import java.time.OffsetDateTime;
 import java.util.List;
+import java.util.Map;
 
 import static org.apache.fluss.lake.iceberg.IcebergLakeCatalog.SYSTEM_COLUMNS;
 
@@ -167,8 +168,12 @@ public class IcebergRecordAsFlussRow implements 
InternalRow {
 
     @Override
     public InternalMap getMap(int pos) {
-        // TODO: Support Map type conversion from Iceberg to Fluss
-        throw new UnsupportedOperationException();
+        Object value = icebergRecord.get(pos);
+        if (value == null) {
+            return null;
+        }
+        Map<?, ?> icebergMap = (Map<?, ?>) value;
+        return new IcebergMapAsFlussMap(icebergMap);
     }
 
     @Override
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadLogTableITCase.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadLogTableITCase.java
index ef353d989..15774d4f7 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadLogTableITCase.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadLogTableITCase.java
@@ -23,7 +23,9 @@ import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.metadata.Schema;
 import org.apache.fluss.metadata.TableDescriptor;
 import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.BinaryString;
 import org.apache.fluss.row.Decimal;
+import org.apache.fluss.row.GenericMap;
 import org.apache.fluss.row.InternalRow;
 import org.apache.fluss.row.TimestampLtz;
 import org.apache.fluss.row.TimestampNtz;
@@ -49,6 +51,7 @@ import java.time.Instant;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -126,12 +129,12 @@ public class FlinkUnionReadLogTableITCase extends 
FlinkUnionReadTestBase {
             // check filter push down
             assertThat(plan)
                     .contains("TableSourceScan(")
-                    .contains("LogicalFilter(condition=[=($16, _UTF-16LE'" + 
partition + "'")
+                    .contains("LogicalFilter(condition=[=($17, _UTF-16LE'" + 
partition + "'")
                     .contains("filter=[=(p, _UTF-16LE'" + partition + "'");
 
             List<Row> expectedFiltered =
                     writtenRows.stream()
-                            .filter(r -> partition.equals(r.getField(16)))
+                            .filter(r -> partition.equals(r.getField(17)))
                             .collect(Collectors.toList());
 
             List<Row> actualFiltered =
@@ -300,7 +303,8 @@ public class FlinkUnionReadLogTableITCase extends 
FlinkUnionReadTestBase {
                                 "f_row",
                                 DataTypes.ROW(
                                         DataTypes.FIELD("f_nested_int", 
DataTypes.INT()),
-                                        DataTypes.FIELD("f_nested_string", 
DataTypes.STRING())));
+                                        DataTypes.FIELD("f_nested_string", 
DataTypes.STRING())))
+                        .column("f_map", DataTypes.MAP(DataTypes.STRING(), 
DataTypes.INT()));
 
         TableDescriptor.Builder tableBuilder =
                 TableDescriptor.builder()
@@ -330,6 +334,17 @@ public class FlinkUnionReadLogTableITCase extends 
FlinkUnionReadTestBase {
         List<InternalRow> rows = new ArrayList<>();
         List<Row> flinkRows = new ArrayList<>();
         for (int i = 0; i < rowCount; i++) {
+            // Map for Fluss InternalRow
+            Map<Object, Object> mapData = new HashMap<>();
+            mapData.put(BinaryString.fromString("key1"), 100 + i);
+            mapData.put(BinaryString.fromString("key2"), 200 + i);
+            GenericMap map = new GenericMap(mapData);
+
+            // Map for FlinkRow
+            Map<String, Integer> flinkMap = new HashMap<>();
+            flinkMap.put("key1", 100 + i);
+            flinkMap.put("key2", 200 + i);
+
             if (partition == null) {
                 rows.add(
                         row(
@@ -348,7 +363,8 @@ public class FlinkUnionReadLogTableITCase extends 
FlinkUnionReadTestBase {
                                 TimestampNtz.fromMillis(1698235273501L),
                                 TimestampNtz.fromMillis(1698235273501L, 8000),
                                 new byte[] {5, 6, 7, 8},
-                                row(10, "nested_string")));
+                                row(10, "nested_string"),
+                                map));
 
                 flinkRows.add(
                         Row.of(
@@ -371,7 +387,8 @@ public class FlinkUnionReadLogTableITCase extends 
FlinkUnionReadTestBase {
                                                 ZoneId.of("UTC"))
                                         .plusNanos(8000),
                                 new byte[] {5, 6, 7, 8},
-                                Row.of(10, "nested_string")));
+                                Row.of(10, "nested_string"),
+                                flinkMap));
             } else {
                 rows.add(
                         row(
@@ -391,6 +408,7 @@ public class FlinkUnionReadLogTableITCase extends 
FlinkUnionReadTestBase {
                                 TimestampNtz.fromMillis(1698235273501L, 8000),
                                 new byte[] {5, 6, 7, 8},
                                 row(10, "nested_string"),
+                                map,
                                 partition));
 
                 flinkRows.add(
@@ -415,6 +433,7 @@ public class FlinkUnionReadLogTableITCase extends 
FlinkUnionReadTestBase {
                                         .plusNanos(8000),
                                 new byte[] {5, 6, 7, 8},
                                 Row.of(10, "nested_string"),
+                                flinkMap,
                                 partition));
             }
         }
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecordTest.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecordTest.java
index 247c33d12..720bc2991 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecordTest.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecordTest.java
@@ -20,6 +20,7 @@ package org.apache.fluss.lake.iceberg.source;
 import org.apache.fluss.row.BinaryString;
 import org.apache.fluss.row.Decimal;
 import org.apache.fluss.row.GenericArray;
+import org.apache.fluss.row.GenericMap;
 import org.apache.fluss.row.GenericRow;
 import org.apache.fluss.types.DataTypes;
 import org.apache.fluss.types.RowType;
@@ -413,4 +414,153 @@ class FlussRowAsIcebergRecordTest {
         // Verify Nullable Row
         assertThat(record.get(3)).isNull();
     }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    void testMapType() {
+        Types.StructType structType =
+                Types.StructType.of(
+                        // simple map
+                        Types.NestedField.required(
+                                0,
+                                "simple_map",
+                                Types.MapType.ofRequired(
+                                        1, 2, Types.StringType.get(), 
Types.IntegerType.get())),
+                        // nested map
+                        Types.NestedField.required(
+                                3,
+                                "nested_map",
+                                Types.MapType.ofRequired(
+                                        4,
+                                        5,
+                                        Types.StringType.get(),
+                                        Types.MapType.ofRequired(
+                                                6,
+                                                7,
+                                                Types.StringType.get(),
+                                                Types.IntegerType.get()))),
+                        // map in array
+                        Types.NestedField.required(
+                                8,
+                                "map_array",
+                                Types.ListType.ofRequired(
+                                        9,
+                                        Types.MapType.ofRequired(
+                                                10,
+                                                11,
+                                                Types.StringType.get(),
+                                                Types.IntegerType.get()))),
+                        // nullable map
+                        Types.NestedField.optional(
+                                12,
+                                "nullable_map",
+                                Types.MapType.ofRequired(
+                                        13, 14, Types.StringType.get(), 
Types.IntegerType.get())));
+
+        RowType flussRowType =
+                RowType.of(
+                        // simple map
+                        DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()),
+                        // nested map
+                        DataTypes.MAP(
+                                DataTypes.STRING(),
+                                DataTypes.MAP(DataTypes.STRING(), 
DataTypes.INT())),
+                        // map in array
+                        DataTypes.ARRAY(DataTypes.MAP(DataTypes.STRING(), 
DataTypes.INT())),
+                        // nullable map
+                        DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()));
+
+        GenericRow genericRow = new GenericRow(4);
+
+        // Simple Map
+        GenericMap simpleMap =
+                new GenericMap(
+                        new java.util.HashMap<Object, Object>() {
+                            {
+                                put(BinaryString.fromString("key1"), 100);
+                                put(BinaryString.fromString("key2"), 200);
+                            }
+                        });
+        genericRow.setField(0, simpleMap);
+
+        // Nested Map
+        GenericMap innerMap1 =
+                new GenericMap(
+                        new java.util.HashMap<Object, Object>() {
+                            {
+                                put(BinaryString.fromString("inner_a"), 1);
+                                put(BinaryString.fromString("inner_b"), 2);
+                            }
+                        });
+        GenericMap innerMap2 =
+                new GenericMap(
+                        new java.util.HashMap<Object, Object>() {
+                            {
+                                put(BinaryString.fromString("inner_c"), 3);
+                            }
+                        });
+        GenericMap nestedMap =
+                new GenericMap(
+                        new java.util.HashMap<Object, Object>() {
+                            {
+                                put(BinaryString.fromString("outer_1"), 
innerMap1);
+                                put(BinaryString.fromString("outer_2"), 
innerMap2);
+                            }
+                        });
+        genericRow.setField(1, nestedMap);
+
+        // Map in Array
+        GenericMap arrayMap1 =
+                new GenericMap(
+                        new java.util.HashMap<Object, Object>() {
+                            {
+                                put(BinaryString.fromString("a"), 10);
+                            }
+                        });
+        GenericMap arrayMap2 =
+                new GenericMap(
+                        new java.util.HashMap<Object, Object>() {
+                            {
+                                put(BinaryString.fromString("b"), 20);
+                            }
+                        });
+        genericRow.setField(2, new GenericArray(new Object[] {arrayMap1, 
arrayMap2}));
+
+        // Nullable Map
+        genericRow.setField(3, null);
+
+        FlussRowAsIcebergRecord record = new 
FlussRowAsIcebergRecord(structType, flussRowType);
+        record.internalRow = genericRow;
+
+        // Verify Simple Map
+        java.util.Map<Object, Object> simpleMapResult =
+                (java.util.Map<Object, Object>) record.get(0);
+        assertThat(simpleMapResult).isNotNull();
+        assertThat(simpleMapResult).containsEntry("key1", 
100).containsEntry("key2", 200);
+
+        // Verify Nested Map
+        java.util.Map<Object, Object> nestedMapResult =
+                (java.util.Map<Object, Object>) record.get(1);
+        assertThat(nestedMapResult).isNotNull();
+        assertThat(nestedMapResult).hasSize(2);
+        java.util.Map<Object, Object> inner1 =
+                (java.util.Map<Object, Object>) nestedMapResult.get("outer_1");
+        assertThat(inner1).containsEntry("inner_a", 
1).containsEntry("inner_b", 2);
+        java.util.Map<Object, Object> inner2 =
+                (java.util.Map<Object, Object>) nestedMapResult.get("outer_2");
+        assertThat(inner2).containsEntry("inner_c", 3);
+
+        // Verify Map in Array
+        List<?> mapArrayResult = (List<?>) record.get(2);
+        assertThat(mapArrayResult).hasSize(2);
+        java.util.Map<Object, Object> firstArrayMap =
+                (java.util.Map<Object, Object>) mapArrayResult.get(0);
+        assertThat(firstArrayMap).containsEntry("a", 10);
+        java.util.Map<Object, Object> secondArrayMap =
+                (java.util.Map<Object, Object>) mapArrayResult.get(1);
+        assertThat(secondArrayMap).containsEntry("b", 20);
+
+        // Verify Nullable Map
+        assertThat(record.get(3)).isNull();
+    }
 }
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergLakeSourceTest.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergLakeSourceTest.java
index 0366ee62d..d69def535 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergLakeSourceTest.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergLakeSourceTest.java
@@ -24,6 +24,7 @@ import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.predicate.Predicate;
 import org.apache.fluss.predicate.PredicateBuilder;
 import org.apache.fluss.record.LogRecord;
+import org.apache.fluss.row.BinaryString;
 import org.apache.fluss.types.DataTypes;
 import org.apache.fluss.types.IntType;
 import org.apache.fluss.types.RowType;
@@ -111,8 +112,7 @@ class IcebergLakeSourceTest extends IcebergSourceTestBase {
         // test all filter can be accepted
         Predicate filter1 = FLUSS_BUILDER.greaterOrEqual(0, 2);
         Predicate filter2 = FLUSS_BUILDER.lessOrEqual(0, 3);
-        Predicate filter3 =
-                FLUSS_BUILDER.startsWith(1, 
org.apache.fluss.row.BinaryString.fromString("name"));
+        Predicate filter3 = FLUSS_BUILDER.startsWith(1, 
BinaryString.fromString("name"));
         List<Predicate> allFilters = Arrays.asList(filter1, filter2, filter3);
 
         LakeSource<IcebergSplit> lakeSource = 
lakeStorage.createLakeSource(tablePath);
@@ -141,8 +141,7 @@ class IcebergLakeSourceTest extends IcebergSourceTestBase {
         assertThat(actual.toString()).isEqualTo("[+I[2, name2], +I[3, 
name3]]");
 
         // test mix one unaccepted filter
-        Predicate nonConvertibleFilter =
-                FLUSS_BUILDER.endsWith(1, 
org.apache.fluss.row.BinaryString.fromString("name"));
+        Predicate nonConvertibleFilter = FLUSS_BUILDER.endsWith(1, 
BinaryString.fromString("name"));
         allFilters = Arrays.asList(nonConvertibleFilter, filter1, filter2);
 
         filterPushDownResult = lakeSource.withFilters(allFilters);
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRowTest.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRowTest.java
index 2aeb223ce..80244d727 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRowTest.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRowTest.java
@@ -32,6 +32,8 @@ import java.nio.ByteBuffer;
 import java.time.LocalDateTime;
 import java.time.OffsetDateTime;
 import java.time.ZoneOffset;
+import java.util.HashMap;
+import java.util.Map;
 
 import static org.apache.iceberg.types.Types.NestedField.optional;
 import static org.apache.iceberg.types.Types.NestedField.required;
@@ -80,10 +82,15 @@ class IcebergRecordAsFlussRowTest {
                                                                 18,
                                                                 "subfield2",
                                                                 
Types.IntegerType.get()))))),
+                        optional(
+                                19,
+                                "map_field",
+                                Types.MapType.ofOptional(
+                                        20, 21, Types.StringType.get(), 
Types.IntegerType.get())),
                         // System columns
-                        required(19, "__bucket", Types.IntegerType.get()),
-                        required(20, "__offset", Types.LongType.get()),
-                        required(21, "__timestamp", 
Types.TimestampType.withZone()));
+                        required(22, "__bucket", Types.IntegerType.get()),
+                        required(23, "__offset", Types.LongType.get()),
+                        required(24, "__timestamp", 
Types.TimestampType.withZone()));
 
         record = GenericRecord.create(schema);
     }
@@ -101,7 +108,7 @@ class IcebergRecordAsFlussRowTest {
         icebergRecordAsFlussRow.replaceIcebergRecord(record);
 
         // Should return count excluding system columns (3 system columns)
-        assertThat(icebergRecordAsFlussRow.getFieldCount()).isEqualTo(14);
+        assertThat(icebergRecordAsFlussRow.getFieldCount()).isEqualTo(15);
     }
 
     @Test
@@ -164,7 +171,7 @@ class IcebergRecordAsFlussRowTest {
                 .isEqualTo("Hello"); // char_data
 
         // Test field count (excluding system columns)
-        assertThat(icebergRecordAsFlussRow.getFieldCount()).isEqualTo(14);
+        assertThat(icebergRecordAsFlussRow.getFieldCount()).isEqualTo(15);
     }
 
     @Test
@@ -195,4 +202,47 @@ class IcebergRecordAsFlussRowTest {
         
assertThat(deepNestedRow.getString(0).toString()).isEqualTo(subfield1Value);
         assertThat(deepNestedRow.getInt(1)).isEqualTo(subfield2Value);
     }
+
+    @Test
+    void testMapType() {
+        // Create a simple map with String keys and Integer values
+        Map<String, Integer> mapData = new HashMap<>();
+        mapData.put("key1", 100);
+        mapData.put("key2", 200);
+        mapData.put("key3", 300);
+
+        record.setField("id", 1L);
+        record.setField("map_field", mapData);
+        // System columns
+        record.setField("__bucket", 1);
+        record.setField("__offset", 100L);
+        record.setField("__timestamp", OffsetDateTime.now(ZoneOffset.UTC));
+
+        icebergRecordAsFlussRow.replaceIcebergRecord(record);
+
+        // Test map retrieval - map_field is at index 14
+        org.apache.fluss.row.InternalMap internalMap = 
icebergRecordAsFlussRow.getMap(14);
+        assertThat(internalMap).isNotNull();
+        assertThat(internalMap.size()).isEqualTo(3);
+
+        // Verify map contents by iterating through keys and values
+        org.apache.fluss.row.InternalArray keyArray = internalMap.keyArray();
+        org.apache.fluss.row.InternalArray valueArray = 
internalMap.valueArray();
+
+        Map<String, Integer> resultMap = new HashMap<>();
+        for (int i = 0; i < internalMap.size(); i++) {
+            String key = keyArray.getString(i).toString();
+            int value = valueArray.getInt(i);
+            resultMap.put(key, value);
+        }
+
+        assertThat(resultMap).containsEntry("key1", 100);
+        assertThat(resultMap).containsEntry("key2", 200);
+        assertThat(resultMap).containsEntry("key3", 300);
+
+        // Test null map
+        record.setField("map_field", null);
+        icebergRecordAsFlussRow.replaceIcebergRecord(record);
+        assertThat(icebergRecordAsFlussRow.isNullAt(14)).isTrue();
+    }
 }
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/row/encode/iceberg/IcebergBinaryRowWriterTest.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/row/encode/iceberg/IcebergBinaryRowWriterTest.java
index f7d39498f..81fe569ef 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/row/encode/iceberg/IcebergBinaryRowWriterTest.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/row/encode/iceberg/IcebergBinaryRowWriterTest.java
@@ -29,6 +29,8 @@ import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Unit tests for {@link IcebergBinaryRowWriter}. */
 class IcebergBinaryRowWriterTest {
@@ -218,4 +220,61 @@ class IcebergBinaryRowWriterTest {
         byte[] writerBytesWithoutPrefix = writer.toBytes();
         assertThat(writerBytesWithoutPrefix).isEqualTo(expectedBytes);
     }
+
+    @Test
+    void testCreateFieldWriterRejectsMapType() {
+        org.apache.fluss.types.MapType mapType =
+                org.apache.fluss.types.DataTypes.MAP(
+                        org.apache.fluss.types.DataTypes.STRING(),
+                        org.apache.fluss.types.DataTypes.INT());
+
+        assertThatThrownBy(() -> 
IcebergBinaryRowWriter.createFieldWriter(mapType))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("Map types cannot be used as bucket 
keys")
+                .hasMessageContaining("Bucket keys must be scalar types");
+    }
+
+    @Test
+    void testCreateFieldWriterRejectsArrayType() {
+        org.apache.fluss.types.ArrayType arrayType =
+                
org.apache.fluss.types.DataTypes.ARRAY(org.apache.fluss.types.DataTypes.INT());
+
+        assertThatThrownBy(() -> 
IcebergBinaryRowWriter.createFieldWriter(arrayType))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("Array types cannot be used as bucket 
keys")
+                .hasMessageContaining("Bucket keys must be scalar types");
+    }
+
+    @Test
+    void testCreateFieldWriterWithValidScalarTypes() {
+        assertThatCode(
+                        () ->
+                                IcebergBinaryRowWriter.createFieldWriter(
+                                        
org.apache.fluss.types.DataTypes.STRING()))
+                .doesNotThrowAnyException();
+
+        assertThatCode(
+                        () ->
+                                IcebergBinaryRowWriter.createFieldWriter(
+                                        
org.apache.fluss.types.DataTypes.INT()))
+                .doesNotThrowAnyException();
+
+        assertThatCode(
+                        () ->
+                                IcebergBinaryRowWriter.createFieldWriter(
+                                        
org.apache.fluss.types.DataTypes.BIGINT()))
+                .doesNotThrowAnyException();
+
+        assertThatCode(
+                        () ->
+                                IcebergBinaryRowWriter.createFieldWriter(
+                                        
org.apache.fluss.types.DataTypes.DOUBLE()))
+                .doesNotThrowAnyException();
+
+        assertThatCode(
+                        () ->
+                                IcebergBinaryRowWriter.createFieldWriter(
+                                        
org.apache.fluss.types.DataTypes.DECIMAL(10, 2)))
+                .doesNotThrowAnyException();
+    }
 }
diff --git a/website/docs/streaming-lakehouse/integrate-data-lakes/iceberg.md 
b/website/docs/streaming-lakehouse/integrate-data-lakes/iceberg.md
index cf3b3c2ba..b28191b05 100644
--- a/website/docs/streaming-lakehouse/integrate-data-lakes/iceberg.md
+++ b/website/docs/streaming-lakehouse/integrate-data-lakes/iceberg.md
@@ -482,27 +482,27 @@ SELECT * FROM iceberg_catalog.fluss.orders WHERE __bucket 
= 1 AND __offset >= 10
 
 When integrating with Iceberg, Fluss automatically converts between Fluss data 
types and Iceberg data types:
 
-| Fluss Data Type               | Iceberg Data Type             | Notes        
       |
-|-------------------------------|-------------------------------|---------------------|
-| BOOLEAN                       | BOOLEAN                       |              
       |
-| TINYINT                       | INTEGER                       | Promoted to 
INT     |
-| SMALLINT                      | INTEGER                       | Promoted to 
INT     |
-| INT                           | INTEGER                       |              
       |
-| BIGINT                        | LONG                          |              
       |
-| FLOAT                         | FLOAT                         |              
       |
-| DOUBLE                        | DOUBLE                        |              
       |
-| DECIMAL                       | DECIMAL                       |              
       |
-| STRING                        | STRING                        |              
       |
-| CHAR                          | STRING                        | Converted to 
STRING |
-| DATE                          | DATE                          |              
       |
-| TIME                          | TIME                          |              
       |
-| TIMESTAMP                     | TIMESTAMP (without timezone)  |              
       |
-| TIMESTAMP WITH LOCAL TIMEZONE | TIMESTAMP (with timezone)     |              
       |
-| BINARY                        | BINARY                        |              
       |
-| BYTES                         | BINARY                        | Converted to 
BINARY |
-| ARRAY                         | LIST                          |              
       |
-| ROW                           | STRUCT                        |              
       |
-
+| Fluss Data Type               | Iceberg Data Type            | Notes         
      |
+|-------------------------------|------------------------------|---------------------|
+| BOOLEAN                       | BOOLEAN                      |               
      |
+| TINYINT                       | INTEGER                      | Promoted to 
INT     |
+| SMALLINT                      | INTEGER                      | Promoted to 
INT     |
+| INT                           | INTEGER                      |               
      |
+| BIGINT                        | LONG                         |               
      |
+| FLOAT                         | FLOAT                        |               
      |
+| DOUBLE                        | DOUBLE                       |               
      |
+| DECIMAL                       | DECIMAL                      |               
      |
+| STRING                        | STRING                       |               
      |
+| CHAR                          | STRING                       | Converted to 
STRING |
+| DATE                          | DATE                         |               
      |
+| TIME                          | TIME                         |               
      |
+| TIMESTAMP                     | TIMESTAMP (without timezone) |               
      |
+| TIMESTAMP WITH LOCAL TIMEZONE | TIMESTAMP (with timezone)    |               
      |
+| BINARY                        | BINARY                       |               
      |
+| BYTES                         | BINARY                       | Converted to 
BINARY |
+| ARRAY                         | LIST                         |               
      |
+| MAP                           | MAP                          |               
      |
+| ROW                           | STRUCT                       |               
      |
 
 ## Maintenance and Optimization
 
@@ -582,5 +582,4 @@ For partitioned tables, the metadata structure includes 
partition information:
 
 ## Current Limitations
 
-- **Complex Types**: Map type is not supported
 - **Multiple bucket keys**: Not supported until Iceberg implements 
multi-argument partition transforms

Reply via email to