Copilot commented on code in PR #2367: URL: https://github.com/apache/fluss/pull/2367#discussion_r2687737468
########## fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussMapAsIcebergMap.java: ########## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.iceberg.source; + +import org.apache.fluss.row.InternalArray; +import org.apache.fluss.row.InternalMap; +import org.apache.fluss.types.ArrayType; +import org.apache.fluss.types.BigIntType; +import org.apache.fluss.types.BinaryType; +import org.apache.fluss.types.BooleanType; +import org.apache.fluss.types.BytesType; +import org.apache.fluss.types.CharType; +import org.apache.fluss.types.DataType; +import org.apache.fluss.types.DateType; +import org.apache.fluss.types.DecimalType; +import org.apache.fluss.types.DoubleType; +import org.apache.fluss.types.FloatType; +import org.apache.fluss.types.IntType; +import org.apache.fluss.types.LocalZonedTimestampType; +import org.apache.fluss.types.MapType; +import org.apache.fluss.types.SmallIntType; +import org.apache.fluss.types.StringType; +import org.apache.fluss.types.TimeType; +import org.apache.fluss.types.TimestampType; +import org.apache.fluss.types.TinyIntType; +import org.apache.fluss.utils.DateTimeUtils; + +import java.nio.ByteBuffer; +import java.time.Instant; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.AbstractMap; +import java.util.AbstractSet; +import java.util.Iterator; +import java.util.Set; + +/** Adapter class for converting Fluss InternalMap to a Java Map for Iceberg. */ +public class FlussMapAsIcebergMap extends AbstractMap<Object, Object> { + + private final InternalMap flussMap; + private final DataType keyType; + private final DataType valueType; + + public FlussMapAsIcebergMap(InternalMap flussMap, DataType keyType, DataType valueType) { + this.flussMap = flussMap; + this.keyType = keyType; + this.valueType = valueType; + } + + @Override + public int size() { + return flussMap.size(); + } + + @Override + public Set<Entry<Object, Object>> entrySet() { + return new AbstractSet<>() { + @Override + public Iterator<Entry<Object, Object>> iterator() { + return new Iterator<>() { + private final InternalArray keyArray = flussMap.keyArray(); + private final InternalArray valueArray = flussMap.valueArray(); + private final int size = flussMap.size(); + private int currentIndex = 0; + + @Override + public boolean hasNext() { + return currentIndex < size; + } + + @Override + public Entry<Object, Object> next() { + Object key = convertElement(keyArray, currentIndex, keyType); + Object value = convertElement(valueArray, currentIndex, valueType); + currentIndex++; + return new AbstractMap.SimpleEntry<>(key, value); + } + }; + } + + @Override + public int size() { + return flussMap.size(); + } + }; + } + + private Object convertElement(InternalArray array, int index, DataType elementType) { + if (array.isNullAt(index)) { + return null; + } + + if (elementType instanceof BooleanType) { + return array.getBoolean(index); + } else if (elementType instanceof TinyIntType) { + return (int) array.getByte(index); + } else if (elementType instanceof SmallIntType) { + return (int) array.getShort(index); + } else if (elementType instanceof IntType) { + return array.getInt(index); + } else if (elementType instanceof BigIntType) { + return array.getLong(index); + } else if (elementType instanceof FloatType) { + return array.getFloat(index); + } else if (elementType instanceof DoubleType) { + return array.getDouble(index); + } else if (elementType instanceof StringType) { + return array.getString(index).toString(); + } else if (elementType instanceof CharType) { + CharType charType = (CharType) elementType; + return array.getChar(index, charType.getLength()).toString(); + } else if (elementType instanceof DecimalType) { + DecimalType decimalType = (DecimalType) elementType; + return array.getDecimal(index, decimalType.getPrecision(), decimalType.getScale()) + .toBigDecimal(); + } else if (elementType instanceof LocalZonedTimestampType) { + LocalZonedTimestampType ltzType = (LocalZonedTimestampType) elementType; + return toIcebergTimestampLtz( + array.getTimestampLtz(index, ltzType.getPrecision()).toInstant()); + } else if (elementType instanceof TimestampType) { + TimestampType tsType = (TimestampType) elementType; + return array.getTimestampNtz(index, tsType.getPrecision()).toLocalDateTime(); + } else if (elementType instanceof DateType) { + return DateTimeUtils.toLocalDate(array.getInt(index)); + } else if (elementType instanceof TimeType) { + return DateTimeUtils.toLocalTime(array.getInt(index)); + } else if (elementType instanceof BytesType || elementType instanceof BinaryType) { + return ByteBuffer.wrap(array.getBytes(index)); + } else if (elementType instanceof ArrayType) { + ArrayType arrayType = (ArrayType) elementType; + InternalArray internalArray = array.getArray(index); + return new FlussArrayAsIcebergList(internalArray, arrayType.getElementType()); + } else if (elementType instanceof MapType) { + MapType mapType = (MapType) elementType; + InternalMap internalMap = array.getMap(index); + return new FlussMapAsIcebergMap( + internalMap, mapType.getKeyType(), mapType.getValueType()); + } else { + throw new UnsupportedOperationException( + "Unsupported array element type conversion for Fluss type: " + + elementType.getClass().getSimpleName()); + } Review Comment: The convertElement method is missing support for RowType, which is a supported nested type in Maps according to the test in FlussDataTypeToIcebergDataTypeMapTest.testMapWithRowValue. When a Map contains Row values (e.g., Map<String, Row<id: Int, name: String>>), this method will throw an UnsupportedOperationException. Add a case to handle RowType by retrieving the InternalRow from the array and converting it to a FlussRowAsIcebergRecord, similar to how it's done in FlussRowAsIcebergRecord.createTypeConverter. ########## fluss-common/src/main/java/org/apache/fluss/row/encode/iceberg/IcebergBinaryRowWriter.java: ########## @@ -196,6 +196,13 @@ public static FieldWriter createFieldWriter(DataType fieldType) { case BIGINT: return (writer, value) -> writer.writeLong((long) value); + + case FLOAT: + return (writer, value) -> writer.writeFloat((float) value); + + case DOUBLE: + return (writer, value) -> writer.writeDouble((double) value); + // support for nanoseconds come check again after #1195 merge Review Comment: The comment about nanoseconds support is misplaced. It appears after the DOUBLE case but logically should be placed immediately before or within the TIMESTAMP_WITHOUT_TIME_ZONE case that it refers to. Move the comment to line 207, right before the TIMESTAMP_WITHOUT_TIME_ZONE case starts. ```suggestion // support for nanoseconds come check again after #1195 merge ``` ########## fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java: ########## @@ -167,7 +167,19 @@ public Type visit(ArrayType arrayType) { @Override public Type visit(MapType mapType) { - throw new UnsupportedOperationException("Unsupported map type"); + Type keyType = mapType.getKeyType().accept(this); + Type valueType = mapType.getValueType().accept(this); + + // According to the Iceberg spec, + // the key and value fields of a map should have consecutive IDs + int keyFieldId = getNextId(); + int valueFieldId = getNextId(); + + if (mapType.getValueType().isNullable()) { + return Types.MapType.ofOptional(keyFieldId, valueFieldId, keyType, valueType); + } else { + return Types.MapType.ofRequired(keyFieldId, valueFieldId, keyType, valueType); + } Review Comment: The field ID allocation order is incorrect. Currently, the code converts the key and value types (lines 170-171) before allocating field IDs (lines 175-176). This means that if key or value types are complex types like nested Maps or Arrays that themselves consume field IDs, those IDs will be allocated before the map's own key and value field IDs. According to Iceberg's schema specification, the map's key and value field IDs should be allocated first, then nested types should be processed. The correct order should be: get the key and value field IDs first using getNextId, then call accept on the key and value types to convert them. ########## fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecordTest.java: ########## @@ -413,4 +413,390 @@ void testNestedRow() { // Verify Nullable Row assertThat(record.get(3)).isNull(); } + + @Test + void testMapWithAllTypes() { + Types.StructType structType = + Types.StructType.of( + Types.NestedField.required( + 0, + "string_to_string_map", + Types.MapType.ofRequired( + 1, 2, Types.StringType.get(), Types.StringType.get())), + Types.NestedField.required( + 3, + "string_to_int_map", + Types.MapType.ofRequired( + 4, 5, Types.StringType.get(), Types.IntegerType.get())), + Types.NestedField.required( + 6, + "int_to_string_map", + Types.MapType.ofRequired( + 7, 8, Types.IntegerType.get(), Types.StringType.get())), + Types.NestedField.required( + 9, + "string_to_long_map", + Types.MapType.ofRequired( + 10, 11, Types.StringType.get(), Types.LongType.get())), + Types.NestedField.required( + 12, + "string_to_double_map", + Types.MapType.ofRequired( + 13, 14, Types.StringType.get(), Types.DoubleType.get())), + Types.NestedField.required( + 15, + "string_to_decimal_map", + Types.MapType.ofRequired( + 16, + 17, + Types.StringType.get(), + Types.DecimalType.of(10, 2))), + Types.NestedField.required( + 18, + "string_to_boolean_map", + Types.MapType.ofRequired( + 19, 20, Types.StringType.get(), Types.BooleanType.get())), + Types.NestedField.optional( + 21, + "null_map", + Types.MapType.ofRequired( + 22, 23, Types.StringType.get(), Types.StringType.get()))); + + RowType flussRowType = + RowType.of( + DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()), + DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()), + DataTypes.MAP(DataTypes.INT(), DataTypes.STRING()), + DataTypes.MAP(DataTypes.STRING(), DataTypes.BIGINT()), + DataTypes.MAP(DataTypes.STRING(), DataTypes.DOUBLE()), + DataTypes.MAP(DataTypes.STRING(), DataTypes.DECIMAL(10, 2)), + DataTypes.MAP(DataTypes.STRING(), DataTypes.BOOLEAN()), + DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())); + + GenericRow genericRow = new GenericRow(8); + + // String to String map + org.apache.fluss.row.GenericMap stringToStringMap = + new org.apache.fluss.row.GenericMap( + new java.util.HashMap<Object, Object>() { + { + put( + BinaryString.fromString("key1"), + BinaryString.fromString("value1")); + put( + BinaryString.fromString("key2"), + BinaryString.fromString("value2")); + put( + BinaryString.fromString("key3"), + BinaryString.fromString("value3")); + } + }); + genericRow.setField(0, stringToStringMap); + + // String to Int map + org.apache.fluss.row.GenericMap stringToIntMap = + new org.apache.fluss.row.GenericMap( + new java.util.HashMap<Object, Object>() { + { + put(BinaryString.fromString("num1"), 100); + put(BinaryString.fromString("num2"), 200); + put(BinaryString.fromString("num3"), 300); + } + }); + genericRow.setField(1, stringToIntMap); + + // Int to String map + org.apache.fluss.row.GenericMap intToStringMap = + new org.apache.fluss.row.GenericMap( + new java.util.HashMap<Object, Object>() { + { + put(1, BinaryString.fromString("one")); + put(2, BinaryString.fromString("two")); + put(3, BinaryString.fromString("three")); + } + }); + genericRow.setField(2, intToStringMap); + + // String to Long map + org.apache.fluss.row.GenericMap stringToLongMap = + new org.apache.fluss.row.GenericMap( + new java.util.HashMap<Object, Object>() { + { + put(BinaryString.fromString("big1"), 10000L); + put(BinaryString.fromString("big2"), 20000L); + } + }); + genericRow.setField(3, stringToLongMap); + + // String to Double map + org.apache.fluss.row.GenericMap stringToDoubleMap = + new org.apache.fluss.row.GenericMap( + new java.util.HashMap<Object, Object>() { + { + put(BinaryString.fromString("pi"), 3.14); + put(BinaryString.fromString("e"), 2.71); + } + }); + genericRow.setField(4, stringToDoubleMap); + + // String to Decimal map + org.apache.fluss.row.GenericMap stringToDecimalMap = + new org.apache.fluss.row.GenericMap( + new java.util.HashMap<Object, Object>() { + { + put( + BinaryString.fromString("dec1"), + Decimal.fromBigDecimal(new BigDecimal("123.45"), 10, 2)); + put( + BinaryString.fromString("dec2"), + Decimal.fromBigDecimal(new BigDecimal("678.90"), 10, 2)); + } + }); + genericRow.setField(5, stringToDecimalMap); + + // String to Boolean map + org.apache.fluss.row.GenericMap stringToBooleanMap = + new org.apache.fluss.row.GenericMap( + new java.util.HashMap<Object, Object>() { + { + put(BinaryString.fromString("flag1"), true); + put(BinaryString.fromString("flag2"), false); + put(BinaryString.fromString("flag3"), true); + } + }); + genericRow.setField(6, stringToBooleanMap); + + // Null map + genericRow.setField(7, null); + + FlussRowAsIcebergRecord record = new FlussRowAsIcebergRecord(structType, flussRowType); + record.internalRow = genericRow; + + // Test String to String map + java.util.Map<Object, Object> stringToStringResult = + (java.util.Map<Object, Object>) record.get(0); + assertThat(stringToStringResult).isNotNull(); + assertThat(stringToStringResult) + .containsEntry("key1", "value1") + .containsEntry("key2", "value2") + .containsEntry("key3", "value3"); + + // Test String to Int map + java.util.Map<Object, Object> stringToIntResult = + (java.util.Map<Object, Object>) record.get(1); + assertThat(stringToIntResult).isNotNull(); + assertThat(stringToIntResult) + .containsEntry("num1", 100) + .containsEntry("num2", 200) + .containsEntry("num3", 300); + + // Test Int to String map + java.util.Map<Object, Object> intToStringResult = + (java.util.Map<Object, Object>) record.get(2); + assertThat(intToStringResult).isNotNull(); + assertThat(intToStringResult) + .containsEntry(1, "one") + .containsEntry(2, "two") + .containsEntry(3, "three"); + + // Test String to Long map + java.util.Map<Object, Object> stringToLongResult = + (java.util.Map<Object, Object>) record.get(3); + assertThat(stringToLongResult).isNotNull(); + assertThat(stringToLongResult).containsEntry("big1", 10000L).containsEntry("big2", 20000L); + + // Test String to Double map + java.util.Map<Object, Object> stringToDoubleResult = + (java.util.Map<Object, Object>) record.get(4); + assertThat(stringToDoubleResult).isNotNull(); + assertThat(stringToDoubleResult.get("pi")).isEqualTo(3.14); + assertThat(stringToDoubleResult.get("e")).isEqualTo(2.71); + + // Test String to Decimal map + java.util.Map<Object, Object> stringToDecimalResult = + (java.util.Map<Object, Object>) record.get(5); + assertThat(stringToDecimalResult).isNotNull(); + assertThat(stringToDecimalResult.get("dec1")).isEqualTo(new BigDecimal("123.45")); + assertThat(stringToDecimalResult.get("dec2")).isEqualTo(new BigDecimal("678.90")); + + // Test String to Boolean map + java.util.Map<Object, Object> stringToBooleanResult = + (java.util.Map<Object, Object>) record.get(6); + assertThat(stringToBooleanResult).isNotNull(); + assertThat(stringToBooleanResult) + .containsEntry("flag1", true) + .containsEntry("flag2", false) + .containsEntry("flag3", true); + + // Test null map + assertThat(record.get(7)).isNull(); + } + + @Test + void testMapWithNullValues() { + Types.StructType structType = + Types.StructType.of( + Types.NestedField.required( + 0, + "map_with_nulls", + Types.MapType.ofOptional( + 1, 2, Types.StringType.get(), Types.StringType.get()))); + + RowType flussRowType = RowType.of(DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())); + + GenericRow genericRow = new GenericRow(1); + + org.apache.fluss.row.GenericMap mapWithNulls = + new org.apache.fluss.row.GenericMap( + new java.util.HashMap<Object, Object>() { + { + put( + BinaryString.fromString("key1"), + BinaryString.fromString("value1")); + put(BinaryString.fromString("key2"), null); + put( + BinaryString.fromString("key3"), + BinaryString.fromString("value3")); + put(BinaryString.fromString("key4"), null); + } + }); + genericRow.setField(0, mapWithNulls); + + FlussRowAsIcebergRecord record = new FlussRowAsIcebergRecord(structType, flussRowType); + record.internalRow = genericRow; + + java.util.Map<Object, Object> resultMap = (java.util.Map<Object, Object>) record.get(0); + assertThat(resultMap).isNotNull(); + assertThat(resultMap).hasSize(4); + assertThat(resultMap) + .containsEntry("key1", "value1") + .containsEntry("key2", null) + .containsEntry("key3", "value3") + .containsEntry("key4", null); + } + + @Test + void testNestedMapInArray() { + Types.StructType structType = + Types.StructType.of( + Types.NestedField.required( + 0, + "map_array", + Types.ListType.ofRequired( + 1, + Types.MapType.ofRequired( + 2, + 3, + Types.StringType.get(), + Types.IntegerType.get())))); + + RowType flussRowType = + RowType.of(DataTypes.ARRAY(DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()))); + + GenericRow genericRow = new GenericRow(1); + + org.apache.fluss.row.GenericMap map1 = + new org.apache.fluss.row.GenericMap( + new java.util.HashMap<Object, Object>() { + { + put(BinaryString.fromString("a"), 1); + put(BinaryString.fromString("b"), 2); + } + }); + + org.apache.fluss.row.GenericMap map2 = + new org.apache.fluss.row.GenericMap( + new java.util.HashMap<Object, Object>() { + { + put(BinaryString.fromString("c"), 3); + put(BinaryString.fromString("d"), 4); + } + }); + + genericRow.setField(0, new GenericArray(new Object[] {map1, map2})); + + FlussRowAsIcebergRecord record = new FlussRowAsIcebergRecord(structType, flussRowType); + record.internalRow = genericRow; + + List<?> arrayResult = (List<?>) record.get(0); + assertThat(arrayResult).hasSize(2); + + java.util.Map<Object, Object> firstMap = (java.util.Map<Object, Object>) arrayResult.get(0); + assertThat(firstMap).containsEntry("a", 1).containsEntry("b", 2); + + java.util.Map<Object, Object> secondMap = + (java.util.Map<Object, Object>) arrayResult.get(1); + assertThat(secondMap).containsEntry("c", 3).containsEntry("d", 4); + } + + @Test + void testNestedMap() { + Types.StructType structType = + Types.StructType.of( + Types.NestedField.required( + 0, + "nested_map", + Types.MapType.ofRequired( + 1, + 2, + Types.StringType.get(), + Types.MapType.ofRequired( + 3, + 4, + Types.StringType.get(), + Types.IntegerType.get())))); + + RowType flussRowType = + RowType.of( + DataTypes.MAP( + DataTypes.STRING(), + DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()))); + + GenericRow genericRow = new GenericRow(1); + + // Create inner map + org.apache.fluss.row.GenericMap innerMap1 = + new org.apache.fluss.row.GenericMap( + new java.util.HashMap<Object, Object>() { + { + put(BinaryString.fromString("inner_a"), 100); + put(BinaryString.fromString("inner_b"), 200); + } + }); + + org.apache.fluss.row.GenericMap innerMap2 = + new org.apache.fluss.row.GenericMap( + new java.util.HashMap<Object, Object>() { + { + put(BinaryString.fromString("inner_c"), 300); + } + }); + + // Create outer map containing inner maps + org.apache.fluss.row.GenericMap outerMap = + new org.apache.fluss.row.GenericMap( + new java.util.HashMap<Object, Object>() { + { + put(BinaryString.fromString("outer_1"), innerMap1); + put(BinaryString.fromString("outer_2"), innerMap2); + } + }); + + genericRow.setField(0, outerMap); + + FlussRowAsIcebergRecord record = new FlussRowAsIcebergRecord(structType, flussRowType); + record.internalRow = genericRow; + + java.util.Map<Object, Object> resultMap = (java.util.Map<Object, Object>) record.get(0); + assertThat(resultMap).isNotNull(); + assertThat(resultMap).hasSize(2); + + java.util.Map<Object, Object> result1 = + (java.util.Map<Object, Object>) resultMap.get("outer_1"); + assertThat(result1).containsEntry("inner_a", 100).containsEntry("inner_b", 200); + + java.util.Map<Object, Object> result2 = + (java.util.Map<Object, Object>) resultMap.get("outer_2"); + assertThat(result2).containsEntry("inner_c", 300); + } Review Comment: The new test class FlussDataTypeToIcebergDataTypeMapTest includes testMapWithRowValue that tests Map<String, Row> type conversion, but there is no corresponding runtime test in FlussRowAsIcebergRecordTest to verify that the actual data conversion works end-to-end. Consider adding a test case that creates a GenericRow with a Map containing Row values and verifies the conversion produces the correct Iceberg Record. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
