This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new ed1e8f8ef [lake/iceberg] Support MAP type in Iceberg tables (#2367)
ed1e8f8ef is described below
commit ed1e8f8ef55011d30bb12e578e4a6872812a393d
Author: HONGGEUN JI <[email protected]>
AuthorDate: Mon Feb 2 01:01:13 2026 -0500
[lake/iceberg] Support MAP type in Iceberg tables (#2367)
---
.../row/encode/iceberg/IcebergBinaryRowWriter.java | 15 ++
.../iceberg/FlussDataTypeToIcebergDataType.java | 14 +-
.../iceberg/source/FlussArrayAsIcebergList.java | 9 ++
.../lake/iceberg/source/FlussMapAsIcebergMap.java | 180 +++++++++++++++++++++
.../iceberg/source/FlussRowAsIcebergRecord.java | 11 ++
.../iceberg/source/IcebergArrayAsFlussArray.java | 4 +-
.../lake/iceberg/source/IcebergMapAsFlussMap.java | 50 ++++++
.../iceberg/source/IcebergRecordAsFlussRow.java | 9 +-
.../flink/FlinkUnionReadLogTableITCase.java | 29 +++-
.../source/FlussRowAsIcebergRecordTest.java | 150 +++++++++++++++++
.../lake/iceberg/source/IcebergLakeSourceTest.java | 7 +-
.../source/IcebergRecordAsFlussRowTest.java | 60 ++++++-
.../encode/iceberg/IcebergBinaryRowWriterTest.java | 59 +++++++
.../integrate-data-lakes/iceberg.md | 43 +++--
14 files changed, 600 insertions(+), 40 deletions(-)
diff --git
a/fluss-common/src/main/java/org/apache/fluss/row/encode/iceberg/IcebergBinaryRowWriter.java
b/fluss-common/src/main/java/org/apache/fluss/row/encode/iceberg/IcebergBinaryRowWriter.java
index 9074759bb..433f36452 100644
---
a/fluss-common/src/main/java/org/apache/fluss/row/encode/iceberg/IcebergBinaryRowWriter.java
+++
b/fluss-common/src/main/java/org/apache/fluss/row/encode/iceberg/IcebergBinaryRowWriter.java
@@ -196,6 +196,13 @@ class IcebergBinaryRowWriter {
case BIGINT:
return (writer, value) -> writer.writeLong((long) value);
+
+ case FLOAT:
+ return (writer, value) -> writer.writeFloat((float) value);
+
+ case DOUBLE:
+ return (writer, value) -> writer.writeDouble((double) value);
+
// support for nanoseconds come check again after #1195 merge
case TIMESTAMP_WITHOUT_TIME_ZONE:
return (writer, value) -> {
@@ -215,6 +222,14 @@ class IcebergBinaryRowWriter {
case BYTES:
return (writer, value) -> writer.writeBytes((byte[]) value,
true);
+ case ARRAY:
+ throw new IllegalArgumentException(
+ "Array types cannot be used as bucket keys. Bucket
keys must be scalar types.");
+
+ case MAP:
+ throw new IllegalArgumentException(
+ "Map types cannot be used as bucket keys. Bucket keys
must be scalar types.");
+
default:
throw new IllegalArgumentException(
"Unsupported type for Iceberg binary row writer: " +
fieldType);
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java
index bd01d08af..fb8f81957 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java
@@ -167,7 +167,19 @@ public class FlussDataTypeToIcebergDataType implements
DataTypeVisitor<Type> {
@Override
public Type visit(MapType mapType) {
- throw new UnsupportedOperationException("Unsupported map type");
+ // According to the Iceberg spec,
+ // the key and value fields of a map should have consecutive IDs
+ int keyFieldId = getNextId();
+ int valueFieldId = getNextId();
+
+ Type keyType = mapType.getKeyType().accept(this);
+ Type valueType = mapType.getValueType().accept(this);
+
+ if (mapType.getValueType().isNullable()) {
+ return Types.MapType.ofOptional(keyFieldId, valueFieldId, keyType,
valueType);
+ } else {
+ return Types.MapType.ofRequired(keyFieldId, valueFieldId, keyType,
valueType);
+ }
}
@Override
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussArrayAsIcebergList.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussArrayAsIcebergList.java
index 2671e045c..285c8b1c5 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussArrayAsIcebergList.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussArrayAsIcebergList.java
@@ -18,6 +18,7 @@
package org.apache.fluss.lake.iceberg.source;
import org.apache.fluss.row.InternalArray;
+import org.apache.fluss.row.InternalMap;
import org.apache.fluss.types.ArrayType;
import org.apache.fluss.types.BigIntType;
import org.apache.fluss.types.BinaryType;
@@ -31,6 +32,7 @@ import org.apache.fluss.types.DoubleType;
import org.apache.fluss.types.FloatType;
import org.apache.fluss.types.IntType;
import org.apache.fluss.types.LocalZonedTimestampType;
+import org.apache.fluss.types.MapType;
import org.apache.fluss.types.SmallIntType;
import org.apache.fluss.types.StringType;
import org.apache.fluss.types.TimeType;
@@ -104,6 +106,13 @@ public class FlussArrayAsIcebergList extends
AbstractList<Object> {
? null
: new FlussArrayAsIcebergList(
innerArray, ((ArrayType)
elementType).getElementType());
+ } else if (elementType instanceof MapType) {
+ MapType mapType = (MapType) elementType;
+ InternalMap internalMap = flussArray.getMap(index);
+ return internalMap == null
+ ? null
+ : new FlussMapAsIcebergMap(
+ internalMap, mapType.getKeyType(),
mapType.getValueType());
} else {
throw new UnsupportedOperationException(
"Unsupported array element type conversion for Fluss type:
"
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussMapAsIcebergMap.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussMapAsIcebergMap.java
new file mode 100644
index 000000000..de05af5d5
--- /dev/null
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussMapAsIcebergMap.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.source;
+
+import org.apache.fluss.lake.iceberg.FlussDataTypeToIcebergDataType;
+import org.apache.fluss.row.InternalArray;
+import org.apache.fluss.row.InternalMap;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.types.ArrayType;
+import org.apache.fluss.types.BigIntType;
+import org.apache.fluss.types.BinaryType;
+import org.apache.fluss.types.BooleanType;
+import org.apache.fluss.types.BytesType;
+import org.apache.fluss.types.CharType;
+import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.DateType;
+import org.apache.fluss.types.DecimalType;
+import org.apache.fluss.types.DoubleType;
+import org.apache.fluss.types.FloatType;
+import org.apache.fluss.types.IntType;
+import org.apache.fluss.types.LocalZonedTimestampType;
+import org.apache.fluss.types.MapType;
+import org.apache.fluss.types.RowType;
+import org.apache.fluss.types.SmallIntType;
+import org.apache.fluss.types.StringType;
+import org.apache.fluss.types.TimeType;
+import org.apache.fluss.types.TimestampType;
+import org.apache.fluss.types.TinyIntType;
+import org.apache.fluss.utils.DateTimeUtils;
+
+import org.apache.iceberg.types.Types;
+
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.AbstractMap;
+import java.util.AbstractSet;
+import java.util.Iterator;
+import java.util.Set;
+
+/** Adapter class for converting Fluss InternalMap to a Java Map for Iceberg.
*/
+public class FlussMapAsIcebergMap extends AbstractMap<Object, Object> {
+
+ private final InternalMap flussMap;
+ private final DataType keyType;
+ private final DataType valueType;
+
+ public FlussMapAsIcebergMap(InternalMap flussMap, DataType keyType,
DataType valueType) {
+ this.flussMap = flussMap;
+ this.keyType = keyType;
+ this.valueType = valueType;
+ }
+
+ @Override
+ public int size() {
+ return flussMap.size();
+ }
+
+ @Override
+ public Set<Entry<Object, Object>> entrySet() {
+ return new AbstractSet<>() {
+ @Override
+ public Iterator<Entry<Object, Object>> iterator() {
+ return new Iterator<>() {
+ private final InternalArray keyArray = flussMap.keyArray();
+ private final InternalArray valueArray =
flussMap.valueArray();
+ private final int size = flussMap.size();
+ private int currentIndex = 0;
+
+ @Override
+ public boolean hasNext() {
+ return currentIndex < size;
+ }
+
+ @Override
+ public Entry<Object, Object> next() {
+ Object key = convertElement(keyArray, currentIndex,
keyType);
+ Object value = convertElement(valueArray,
currentIndex, valueType);
+ currentIndex++;
+ return new AbstractMap.SimpleEntry<>(key, value);
+ }
+ };
+ }
+
+ @Override
+ public int size() {
+ return flussMap.size();
+ }
+ };
+ }
+
+ private Object convertElement(InternalArray array, int index, DataType
elementType) {
+ if (array.isNullAt(index)) {
+ return null;
+ }
+
+ if (elementType instanceof BooleanType) {
+ return array.getBoolean(index);
+ } else if (elementType instanceof TinyIntType) {
+ return (int) array.getByte(index);
+ } else if (elementType instanceof SmallIntType) {
+ return (int) array.getShort(index);
+ } else if (elementType instanceof IntType) {
+ return array.getInt(index);
+ } else if (elementType instanceof BigIntType) {
+ return array.getLong(index);
+ } else if (elementType instanceof FloatType) {
+ return array.getFloat(index);
+ } else if (elementType instanceof DoubleType) {
+ return array.getDouble(index);
+ } else if (elementType instanceof StringType) {
+ return array.getString(index).toString();
+ } else if (elementType instanceof CharType) {
+ CharType charType = (CharType) elementType;
+ return array.getChar(index, charType.getLength()).toString();
+ } else if (elementType instanceof DecimalType) {
+ DecimalType decimalType = (DecimalType) elementType;
+ return array.getDecimal(index, decimalType.getPrecision(),
decimalType.getScale())
+ .toBigDecimal();
+ } else if (elementType instanceof LocalZonedTimestampType) {
+ LocalZonedTimestampType ltzType = (LocalZonedTimestampType)
elementType;
+ return toIcebergTimestampLtz(
+ array.getTimestampLtz(index,
ltzType.getPrecision()).toInstant());
+ } else if (elementType instanceof TimestampType) {
+ TimestampType tsType = (TimestampType) elementType;
+ return array.getTimestampNtz(index,
tsType.getPrecision()).toLocalDateTime();
+ } else if (elementType instanceof DateType) {
+ return DateTimeUtils.toLocalDate(array.getInt(index));
+ } else if (elementType instanceof TimeType) {
+ return DateTimeUtils.toLocalTime(array.getInt(index));
+ } else if (elementType instanceof BytesType || elementType instanceof
BinaryType) {
+ return ByteBuffer.wrap(array.getBytes(index));
+ } else if (elementType instanceof ArrayType) {
+ ArrayType arrayType = (ArrayType) elementType;
+ InternalArray internalArray = array.getArray(index);
+ return internalArray == null
+ ? null
+ : new FlussArrayAsIcebergList(internalArray,
arrayType.getElementType());
+ } else if (elementType instanceof MapType) {
+ MapType mapType = (MapType) elementType;
+ InternalMap internalMap = array.getMap(index);
+ return internalMap == null
+ ? null
+ : new FlussMapAsIcebergMap(
+ internalMap, mapType.getKeyType(),
mapType.getValueType());
+ } else if (elementType instanceof RowType) {
+ RowType rowType = (RowType) elementType;
+ Types.StructType nestedStructType =
+ (Types.StructType)
rowType.accept(FlussDataTypeToIcebergDataType.INSTANCE);
+ InternalRow internalRow = array.getRow(index,
rowType.getFieldCount());
+ return internalRow == null
+ ? null
+ : new FlussRowAsIcebergRecord(nestedStructType, rowType,
internalRow);
+ } else {
+ throw new UnsupportedOperationException(
+ "Unsupported array element type conversion for Fluss type:
"
+ + elementType.getClass().getSimpleName());
+ }
+ }
+
+ private OffsetDateTime toIcebergTimestampLtz(Instant instant) {
+ return OffsetDateTime.ofInstant(instant, ZoneOffset.UTC);
+ }
+}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecord.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecord.java
index 6d7070001..449dfc31f 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecord.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecord.java
@@ -19,6 +19,7 @@ package org.apache.fluss.lake.iceberg.source;
import org.apache.fluss.lake.iceberg.FlussDataTypeToIcebergDataType;
import org.apache.fluss.row.InternalArray;
+import org.apache.fluss.row.InternalMap;
import org.apache.fluss.row.InternalRow;
import org.apache.fluss.types.ArrayType;
import org.apache.fluss.types.BigIntType;
@@ -33,6 +34,7 @@ import org.apache.fluss.types.DoubleType;
import org.apache.fluss.types.FloatType;
import org.apache.fluss.types.IntType;
import org.apache.fluss.types.LocalZonedTimestampType;
+import org.apache.fluss.types.MapType;
import org.apache.fluss.types.RowType;
import org.apache.fluss.types.SmallIntType;
import org.apache.fluss.types.StringType;
@@ -189,6 +191,15 @@ public class FlussRowAsIcebergRecord implements Record {
InternalRow nestedRow = row.getRow(pos,
rowType.getFieldCount());
return new FlussRowAsIcebergRecord(nestedStructType, rowType,
nestedRow);
};
+ } else if (flussType instanceof MapType) {
+ MapType mapType = (MapType) flussType;
+ return row -> {
+ InternalMap map = row.getMap(pos);
+ return map == null
+ ? null
+ : new FlussMapAsIcebergMap(
+ map, mapType.getKeyType(),
mapType.getValueType());
+ };
} else {
throw new UnsupportedOperationException(
"Unsupported data type conversion for Fluss type: "
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergArrayAsFlussArray.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergArrayAsFlussArray.java
index e8a4b1914..fd459c218 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergArrayAsFlussArray.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergArrayAsFlussArray.java
@@ -32,6 +32,7 @@ import java.nio.ByteBuffer;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.util.List;
+import java.util.Map;
/** Adapter for Iceberg List as Fluss InternalArray. */
public class IcebergArrayAsFlussArray implements InternalArray {
@@ -139,7 +140,8 @@ public class IcebergArrayAsFlussArray implements
InternalArray {
@Override
public InternalMap getMap(int pos) {
- throw new UnsupportedOperationException();
+ Map<?, ?> nestedMap = (Map<?, ?>) icebergList.get(pos);
+ return nestedMap == null ? null : new IcebergMapAsFlussMap(nestedMap);
}
@Override
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergMapAsFlussMap.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergMapAsFlussMap.java
new file mode 100644
index 000000000..738e953ed
--- /dev/null
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergMapAsFlussMap.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.source;
+
+import org.apache.fluss.row.InternalArray;
+import org.apache.fluss.row.InternalMap;
+
+import java.util.ArrayList;
+import java.util.Map;
+
+/** Adapter for Iceberg Map as Fluss InternalMap. */
+public class IcebergMapAsFlussMap implements InternalMap {
+
+ private final Map<?, ?> icebergMap;
+
+ public IcebergMapAsFlussMap(Map<?, ?> icebergMap) {
+ this.icebergMap = icebergMap;
+ }
+
+ @Override
+ public int size() {
+ return icebergMap.size();
+ }
+
+ @Override
+ public InternalArray keyArray() {
+ return new IcebergArrayAsFlussArray(new
ArrayList<>(icebergMap.keySet()));
+ }
+
+ @Override
+ public InternalArray valueArray() {
+ return new IcebergArrayAsFlussArray(new
ArrayList<>(icebergMap.values()));
+ }
+}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java
index 2a560e8d1..4aca5cb08 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java
@@ -34,6 +34,7 @@ import java.nio.ByteBuffer;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.util.List;
+import java.util.Map;
import static org.apache.fluss.lake.iceberg.IcebergLakeCatalog.SYSTEM_COLUMNS;
@@ -167,8 +168,12 @@ public class IcebergRecordAsFlussRow implements
InternalRow {
@Override
public InternalMap getMap(int pos) {
- // TODO: Support Map type conversion from Iceberg to Fluss
- throw new UnsupportedOperationException();
+ Object value = icebergRecord.get(pos);
+ if (value == null) {
+ return null;
+ }
+ Map<?, ?> icebergMap = (Map<?, ?>) value;
+ return new IcebergMapAsFlussMap(icebergMap);
}
@Override
diff --git
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadLogTableITCase.java
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadLogTableITCase.java
index ef353d989..15774d4f7 100644
---
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadLogTableITCase.java
+++
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadLogTableITCase.java
@@ -23,7 +23,9 @@ import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.metadata.Schema;
import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.BinaryString;
import org.apache.fluss.row.Decimal;
+import org.apache.fluss.row.GenericMap;
import org.apache.fluss.row.InternalRow;
import org.apache.fluss.row.TimestampLtz;
import org.apache.fluss.row.TimestampNtz;
@@ -49,6 +51,7 @@ import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -126,12 +129,12 @@ public class FlinkUnionReadLogTableITCase extends
FlinkUnionReadTestBase {
// check filter push down
assertThat(plan)
.contains("TableSourceScan(")
- .contains("LogicalFilter(condition=[=($16, _UTF-16LE'" +
partition + "'")
+ .contains("LogicalFilter(condition=[=($17, _UTF-16LE'" +
partition + "'")
.contains("filter=[=(p, _UTF-16LE'" + partition + "'");
List<Row> expectedFiltered =
writtenRows.stream()
- .filter(r -> partition.equals(r.getField(16)))
+ .filter(r -> partition.equals(r.getField(17)))
.collect(Collectors.toList());
List<Row> actualFiltered =
@@ -300,7 +303,8 @@ public class FlinkUnionReadLogTableITCase extends
FlinkUnionReadTestBase {
"f_row",
DataTypes.ROW(
DataTypes.FIELD("f_nested_int",
DataTypes.INT()),
- DataTypes.FIELD("f_nested_string",
DataTypes.STRING())));
+ DataTypes.FIELD("f_nested_string",
DataTypes.STRING())))
+ .column("f_map", DataTypes.MAP(DataTypes.STRING(),
DataTypes.INT()));
TableDescriptor.Builder tableBuilder =
TableDescriptor.builder()
@@ -330,6 +334,17 @@ public class FlinkUnionReadLogTableITCase extends
FlinkUnionReadTestBase {
List<InternalRow> rows = new ArrayList<>();
List<Row> flinkRows = new ArrayList<>();
for (int i = 0; i < rowCount; i++) {
+ // Map for Fluss InternalRow
+ Map<Object, Object> mapData = new HashMap<>();
+ mapData.put(BinaryString.fromString("key1"), 100 + i);
+ mapData.put(BinaryString.fromString("key2"), 200 + i);
+ GenericMap map = new GenericMap(mapData);
+
+ // Map for FlinkRow
+ Map<String, Integer> flinkMap = new HashMap<>();
+ flinkMap.put("key1", 100 + i);
+ flinkMap.put("key2", 200 + i);
+
if (partition == null) {
rows.add(
row(
@@ -348,7 +363,8 @@ public class FlinkUnionReadLogTableITCase extends
FlinkUnionReadTestBase {
TimestampNtz.fromMillis(1698235273501L),
TimestampNtz.fromMillis(1698235273501L, 8000),
new byte[] {5, 6, 7, 8},
- row(10, "nested_string")));
+ row(10, "nested_string"),
+ map));
flinkRows.add(
Row.of(
@@ -371,7 +387,8 @@ public class FlinkUnionReadLogTableITCase extends
FlinkUnionReadTestBase {
ZoneId.of("UTC"))
.plusNanos(8000),
new byte[] {5, 6, 7, 8},
- Row.of(10, "nested_string")));
+ Row.of(10, "nested_string"),
+ flinkMap));
} else {
rows.add(
row(
@@ -391,6 +408,7 @@ public class FlinkUnionReadLogTableITCase extends
FlinkUnionReadTestBase {
TimestampNtz.fromMillis(1698235273501L, 8000),
new byte[] {5, 6, 7, 8},
row(10, "nested_string"),
+ map,
partition));
flinkRows.add(
@@ -415,6 +433,7 @@ public class FlinkUnionReadLogTableITCase extends
FlinkUnionReadTestBase {
.plusNanos(8000),
new byte[] {5, 6, 7, 8},
Row.of(10, "nested_string"),
+ flinkMap,
partition));
}
}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecordTest.java
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecordTest.java
index 247c33d12..720bc2991 100644
---
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecordTest.java
+++
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecordTest.java
@@ -20,6 +20,7 @@ package org.apache.fluss.lake.iceberg.source;
import org.apache.fluss.row.BinaryString;
import org.apache.fluss.row.Decimal;
import org.apache.fluss.row.GenericArray;
+import org.apache.fluss.row.GenericMap;
import org.apache.fluss.row.GenericRow;
import org.apache.fluss.types.DataTypes;
import org.apache.fluss.types.RowType;
@@ -413,4 +414,153 @@ class FlussRowAsIcebergRecordTest {
// Verify Nullable Row
assertThat(record.get(3)).isNull();
}
+
+ @SuppressWarnings("unchecked")
+ @Test
+ void testMapType() {
+ Types.StructType structType =
+ Types.StructType.of(
+ // simple map
+ Types.NestedField.required(
+ 0,
+ "simple_map",
+ Types.MapType.ofRequired(
+ 1, 2, Types.StringType.get(),
Types.IntegerType.get())),
+ // nested map
+ Types.NestedField.required(
+ 3,
+ "nested_map",
+ Types.MapType.ofRequired(
+ 4,
+ 5,
+ Types.StringType.get(),
+ Types.MapType.ofRequired(
+ 6,
+ 7,
+ Types.StringType.get(),
+ Types.IntegerType.get()))),
+ // map in array
+ Types.NestedField.required(
+ 8,
+ "map_array",
+ Types.ListType.ofRequired(
+ 9,
+ Types.MapType.ofRequired(
+ 10,
+ 11,
+ Types.StringType.get(),
+ Types.IntegerType.get()))),
+ // nullable map
+ Types.NestedField.optional(
+ 12,
+ "nullable_map",
+ Types.MapType.ofRequired(
+ 13, 14, Types.StringType.get(),
Types.IntegerType.get())));
+
+ RowType flussRowType =
+ RowType.of(
+ // simple map
+ DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()),
+ // nested map
+ DataTypes.MAP(
+ DataTypes.STRING(),
+ DataTypes.MAP(DataTypes.STRING(),
DataTypes.INT())),
+ // map in array
+ DataTypes.ARRAY(DataTypes.MAP(DataTypes.STRING(),
DataTypes.INT())),
+ // nullable map
+ DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()));
+
+ GenericRow genericRow = new GenericRow(4);
+
+ // Simple Map
+ GenericMap simpleMap =
+ new GenericMap(
+ new java.util.HashMap<Object, Object>() {
+ {
+ put(BinaryString.fromString("key1"), 100);
+ put(BinaryString.fromString("key2"), 200);
+ }
+ });
+ genericRow.setField(0, simpleMap);
+
+ // Nested Map
+ GenericMap innerMap1 =
+ new GenericMap(
+ new java.util.HashMap<Object, Object>() {
+ {
+ put(BinaryString.fromString("inner_a"), 1);
+ put(BinaryString.fromString("inner_b"), 2);
+ }
+ });
+ GenericMap innerMap2 =
+ new GenericMap(
+ new java.util.HashMap<Object, Object>() {
+ {
+ put(BinaryString.fromString("inner_c"), 3);
+ }
+ });
+ GenericMap nestedMap =
+ new GenericMap(
+ new java.util.HashMap<Object, Object>() {
+ {
+ put(BinaryString.fromString("outer_1"),
innerMap1);
+ put(BinaryString.fromString("outer_2"),
innerMap2);
+ }
+ });
+ genericRow.setField(1, nestedMap);
+
+ // Map in Array
+ GenericMap arrayMap1 =
+ new GenericMap(
+ new java.util.HashMap<Object, Object>() {
+ {
+ put(BinaryString.fromString("a"), 10);
+ }
+ });
+ GenericMap arrayMap2 =
+ new GenericMap(
+ new java.util.HashMap<Object, Object>() {
+ {
+ put(BinaryString.fromString("b"), 20);
+ }
+ });
+ genericRow.setField(2, new GenericArray(new Object[] {arrayMap1,
arrayMap2}));
+
+ // Nullable Map
+ genericRow.setField(3, null);
+
+ FlussRowAsIcebergRecord record = new
FlussRowAsIcebergRecord(structType, flussRowType);
+ record.internalRow = genericRow;
+
+ // Verify Simple Map
+ java.util.Map<Object, Object> simpleMapResult =
+ (java.util.Map<Object, Object>) record.get(0);
+ assertThat(simpleMapResult).isNotNull();
+ assertThat(simpleMapResult).containsEntry("key1",
100).containsEntry("key2", 200);
+
+ // Verify Nested Map
+ java.util.Map<Object, Object> nestedMapResult =
+ (java.util.Map<Object, Object>) record.get(1);
+ assertThat(nestedMapResult).isNotNull();
+ assertThat(nestedMapResult).hasSize(2);
+ java.util.Map<Object, Object> inner1 =
+ (java.util.Map<Object, Object>) nestedMapResult.get("outer_1");
+ assertThat(inner1).containsEntry("inner_a",
1).containsEntry("inner_b", 2);
+ java.util.Map<Object, Object> inner2 =
+ (java.util.Map<Object, Object>) nestedMapResult.get("outer_2");
+ assertThat(inner2).containsEntry("inner_c", 3);
+
+ // Verify Map in Array
+ List<?> mapArrayResult = (List<?>) record.get(2);
+ assertThat(mapArrayResult).hasSize(2);
+ java.util.Map<Object, Object> firstArrayMap =
+ (java.util.Map<Object, Object>) mapArrayResult.get(0);
+ assertThat(firstArrayMap).containsEntry("a", 10);
+ java.util.Map<Object, Object> secondArrayMap =
+ (java.util.Map<Object, Object>) mapArrayResult.get(1);
+ assertThat(secondArrayMap).containsEntry("b", 20);
+
+ // Verify Nullable Map
+ assertThat(record.get(3)).isNull();
+ }
}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergLakeSourceTest.java
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergLakeSourceTest.java
index 0366ee62d..d69def535 100644
---
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergLakeSourceTest.java
+++
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergLakeSourceTest.java
@@ -24,6 +24,7 @@ import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.predicate.Predicate;
import org.apache.fluss.predicate.PredicateBuilder;
import org.apache.fluss.record.LogRecord;
+import org.apache.fluss.row.BinaryString;
import org.apache.fluss.types.DataTypes;
import org.apache.fluss.types.IntType;
import org.apache.fluss.types.RowType;
@@ -111,8 +112,7 @@ class IcebergLakeSourceTest extends IcebergSourceTestBase {
// test all filter can be accepted
Predicate filter1 = FLUSS_BUILDER.greaterOrEqual(0, 2);
Predicate filter2 = FLUSS_BUILDER.lessOrEqual(0, 3);
- Predicate filter3 =
- FLUSS_BUILDER.startsWith(1,
org.apache.fluss.row.BinaryString.fromString("name"));
+ Predicate filter3 = FLUSS_BUILDER.startsWith(1,
BinaryString.fromString("name"));
List<Predicate> allFilters = Arrays.asList(filter1, filter2, filter3);
LakeSource<IcebergSplit> lakeSource =
lakeStorage.createLakeSource(tablePath);
@@ -141,8 +141,7 @@ class IcebergLakeSourceTest extends IcebergSourceTestBase {
assertThat(actual.toString()).isEqualTo("[+I[2, name2], +I[3,
name3]]");
// test mix one unaccepted filter
- Predicate nonConvertibleFilter =
- FLUSS_BUILDER.endsWith(1,
org.apache.fluss.row.BinaryString.fromString("name"));
+ Predicate nonConvertibleFilter = FLUSS_BUILDER.endsWith(1,
BinaryString.fromString("name"));
allFilters = Arrays.asList(nonConvertibleFilter, filter1, filter2);
filterPushDownResult = lakeSource.withFilters(allFilters);
diff --git
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRowTest.java
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRowTest.java
index 2aeb223ce..80244d727 100644
---
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRowTest.java
+++
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRowTest.java
@@ -32,6 +32,8 @@ import java.nio.ByteBuffer;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
+import java.util.HashMap;
+import java.util.Map;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
@@ -80,10 +82,15 @@ class IcebergRecordAsFlussRowTest {
18,
"subfield2",
Types.IntegerType.get()))))),
+ optional(
+ 19,
+ "map_field",
+ Types.MapType.ofOptional(
+ 20, 21, Types.StringType.get(),
Types.IntegerType.get())),
// System columns
- required(19, "__bucket", Types.IntegerType.get()),
- required(20, "__offset", Types.LongType.get()),
- required(21, "__timestamp",
Types.TimestampType.withZone()));
+ required(22, "__bucket", Types.IntegerType.get()),
+ required(23, "__offset", Types.LongType.get()),
+ required(24, "__timestamp",
Types.TimestampType.withZone()));
record = GenericRecord.create(schema);
}
@@ -101,7 +108,7 @@ class IcebergRecordAsFlussRowTest {
icebergRecordAsFlussRow.replaceIcebergRecord(record);
// Should return count excluding system columns (3 system columns)
- assertThat(icebergRecordAsFlussRow.getFieldCount()).isEqualTo(14);
+ assertThat(icebergRecordAsFlussRow.getFieldCount()).isEqualTo(15);
}
@Test
@@ -164,7 +171,7 @@ class IcebergRecordAsFlussRowTest {
.isEqualTo("Hello"); // char_data
// Test field count (excluding system columns)
- assertThat(icebergRecordAsFlussRow.getFieldCount()).isEqualTo(14);
+ assertThat(icebergRecordAsFlussRow.getFieldCount()).isEqualTo(15);
}
@Test
@@ -195,4 +202,47 @@ class IcebergRecordAsFlussRowTest {
assertThat(deepNestedRow.getString(0).toString()).isEqualTo(subfield1Value);
assertThat(deepNestedRow.getInt(1)).isEqualTo(subfield2Value);
}
+
+ @Test
+ void testMapType() {
+ // Create a simple map with String keys and Integer values
+ Map<String, Integer> mapData = new HashMap<>();
+ mapData.put("key1", 100);
+ mapData.put("key2", 200);
+ mapData.put("key3", 300);
+
+ record.setField("id", 1L);
+ record.setField("map_field", mapData);
+ // System columns
+ record.setField("__bucket", 1);
+ record.setField("__offset", 100L);
+ record.setField("__timestamp", OffsetDateTime.now(ZoneOffset.UTC));
+
+ icebergRecordAsFlussRow.replaceIcebergRecord(record);
+
+ // Test map retrieval - map_field is at index 14
+ org.apache.fluss.row.InternalMap internalMap =
icebergRecordAsFlussRow.getMap(14);
+ assertThat(internalMap).isNotNull();
+ assertThat(internalMap.size()).isEqualTo(3);
+
+ // Verify map contents by iterating through keys and values
+ org.apache.fluss.row.InternalArray keyArray = internalMap.keyArray();
+ org.apache.fluss.row.InternalArray valueArray =
internalMap.valueArray();
+
+ Map<String, Integer> resultMap = new HashMap<>();
+ for (int i = 0; i < internalMap.size(); i++) {
+ String key = keyArray.getString(i).toString();
+ int value = valueArray.getInt(i);
+ resultMap.put(key, value);
+ }
+
+ assertThat(resultMap).containsEntry("key1", 100);
+ assertThat(resultMap).containsEntry("key2", 200);
+ assertThat(resultMap).containsEntry("key3", 300);
+
+ // Test null map
+ record.setField("map_field", null);
+ icebergRecordAsFlussRow.replaceIcebergRecord(record);
+ assertThat(icebergRecordAsFlussRow.isNullAt(14)).isTrue();
+ }
}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/row/encode/iceberg/IcebergBinaryRowWriterTest.java
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/row/encode/iceberg/IcebergBinaryRowWriterTest.java
index f7d39498f..81fe569ef 100644
---
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/row/encode/iceberg/IcebergBinaryRowWriterTest.java
+++
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/row/encode/iceberg/IcebergBinaryRowWriterTest.java
@@ -29,6 +29,8 @@ import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Unit tests for {@link IcebergBinaryRowWriter}. */
class IcebergBinaryRowWriterTest {
@@ -218,4 +220,61 @@ class IcebergBinaryRowWriterTest {
byte[] writerBytesWithoutPrefix = writer.toBytes();
assertThat(writerBytesWithoutPrefix).isEqualTo(expectedBytes);
}
+
+ @Test
+ void testCreateFieldWriterRejectsMapType() {
+ org.apache.fluss.types.MapType mapType =
+ org.apache.fluss.types.DataTypes.MAP(
+ org.apache.fluss.types.DataTypes.STRING(),
+ org.apache.fluss.types.DataTypes.INT());
+
+ assertThatThrownBy(() ->
IcebergBinaryRowWriter.createFieldWriter(mapType))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Map types cannot be used as bucket
keys")
+ .hasMessageContaining("Bucket keys must be scalar types");
+ }
+
+ @Test
+ void testCreateFieldWriterRejectsArrayType() {
+ org.apache.fluss.types.ArrayType arrayType =
+
org.apache.fluss.types.DataTypes.ARRAY(org.apache.fluss.types.DataTypes.INT());
+
+ assertThatThrownBy(() ->
IcebergBinaryRowWriter.createFieldWriter(arrayType))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Array types cannot be used as bucket
keys")
+ .hasMessageContaining("Bucket keys must be scalar types");
+ }
+
+ @Test
+ void testCreateFieldWriterWithValidScalarTypes() {
+ assertThatCode(
+ () ->
+ IcebergBinaryRowWriter.createFieldWriter(
+
org.apache.fluss.types.DataTypes.STRING()))
+ .doesNotThrowAnyException();
+
+ assertThatCode(
+ () ->
+ IcebergBinaryRowWriter.createFieldWriter(
+
org.apache.fluss.types.DataTypes.INT()))
+ .doesNotThrowAnyException();
+
+ assertThatCode(
+ () ->
+ IcebergBinaryRowWriter.createFieldWriter(
+
org.apache.fluss.types.DataTypes.BIGINT()))
+ .doesNotThrowAnyException();
+
+ assertThatCode(
+ () ->
+ IcebergBinaryRowWriter.createFieldWriter(
+
org.apache.fluss.types.DataTypes.DOUBLE()))
+ .doesNotThrowAnyException();
+
+ assertThatCode(
+ () ->
+ IcebergBinaryRowWriter.createFieldWriter(
+
org.apache.fluss.types.DataTypes.DECIMAL(10, 2)))
+ .doesNotThrowAnyException();
+ }
}
diff --git a/website/docs/streaming-lakehouse/integrate-data-lakes/iceberg.md
b/website/docs/streaming-lakehouse/integrate-data-lakes/iceberg.md
index cf3b3c2ba..b28191b05 100644
--- a/website/docs/streaming-lakehouse/integrate-data-lakes/iceberg.md
+++ b/website/docs/streaming-lakehouse/integrate-data-lakes/iceberg.md
@@ -482,27 +482,27 @@ SELECT * FROM iceberg_catalog.fluss.orders WHERE __bucket
= 1 AND __offset >= 10
When integrating with Iceberg, Fluss automatically converts between Fluss data
types and Iceberg data types:
-| Fluss Data Type | Iceberg Data Type | Notes
|
-|-------------------------------|-------------------------------|---------------------|
-| BOOLEAN | BOOLEAN |
|
-| TINYINT | INTEGER | Promoted to
INT |
-| SMALLINT | INTEGER | Promoted to
INT |
-| INT | INTEGER |
|
-| BIGINT | LONG |
|
-| FLOAT | FLOAT |
|
-| DOUBLE | DOUBLE |
|
-| DECIMAL | DECIMAL |
|
-| STRING | STRING |
|
-| CHAR | STRING | Converted to
STRING |
-| DATE | DATE |
|
-| TIME | TIME |
|
-| TIMESTAMP | TIMESTAMP (without timezone) |
|
-| TIMESTAMP WITH LOCAL TIMEZONE | TIMESTAMP (with timezone) |
|
-| BINARY | BINARY |
|
-| BYTES | BINARY | Converted to
BINARY |
-| ARRAY | LIST |
|
-| ROW | STRUCT |
|
-
+| Fluss Data Type | Iceberg Data Type | Notes
|
+|-------------------------------|------------------------------|---------------------|
+| BOOLEAN | BOOLEAN |
|
+| TINYINT | INTEGER | Promoted to
INT |
+| SMALLINT | INTEGER | Promoted to
INT |
+| INT | INTEGER |
|
+| BIGINT | LONG |
|
+| FLOAT | FLOAT |
|
+| DOUBLE | DOUBLE |
|
+| DECIMAL | DECIMAL |
|
+| STRING | STRING |
|
+| CHAR | STRING | Converted to
STRING |
+| DATE | DATE |
|
+| TIME | TIME |
|
+| TIMESTAMP | TIMESTAMP (without timezone) |
|
+| TIMESTAMP WITH LOCAL TIMEZONE | TIMESTAMP (with timezone) |
|
+| BINARY | BINARY |
|
+| BYTES | BINARY | Converted to
BINARY |
+| ARRAY | LIST |
|
+| MAP | MAP |
|
+| ROW | STRUCT |
|
## Maintenance and Optimization
@@ -582,5 +582,4 @@ For partitioned tables, the metadata structure includes
partition information:
## Current Limitations
-- **Complex Types**: Map type is not supported
- **Multiple bucket keys**: Not supported until Iceberg implements
multi-argument partition transforms