wuchong commented on code in PR #2190:
URL: https://github.com/apache/fluss/pull/2190#discussion_r2649590715


##########
fluss-common/src/main/java/org/apache/fluss/row/aligned/AlignedRow.java:
##########
@@ -395,7 +396,13 @@ public InternalArray getArray(int pos) {
                 segments, offset, offsetAndSize, new AlignedArray());
     }
 
-    // TODO: getMap() will be added in Issue #1973
+    @Override
+    public InternalMap getMap(int pos) {
+        assertIndexIsValid(pos);
+        int fieldOffset = getFieldOffset(pos);
+        final long offsetAndSize = BinarySegmentUtils.getLong(segments, 
fieldOffset);

Review Comment:
   keep align with others to use `final long offsetAndSize = 
segments[0].getLong(fieldOffset)` as we don't support multi-segments yet. 



##########
fluss-common/src/main/java/org/apache/fluss/row/arrow/vectors/ArrowMapColumnVector.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.row.arrow.vectors;
+
+import org.apache.fluss.row.InternalMap;
+import org.apache.fluss.row.columnar.ColumnVector;
+import org.apache.fluss.row.columnar.ColumnarMap;
+import org.apache.fluss.row.columnar.MapColumnVector;
+import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.FieldVector;
+import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.MapVector;
+import 
org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.StructVector;
+import org.apache.fluss.types.DataType;
+import org.apache.fluss.utils.ArrowUtils;
+
+/** ArrowMapColumnVector is a wrapper class for Arrow MapVector. */
+public class ArrowMapColumnVector implements MapColumnVector {
+    private boolean inited = false;
+    private final FieldVector vector;
+    private final DataType keyType;
+    private final DataType valueType;
+    private ColumnVector keyColumnVector;
+    private ColumnVector valueColumnVector;
+
+    public ArrowMapColumnVector(FieldVector vector, DataType keyType, DataType 
valueType) {
+        this.vector = vector;
+        this.keyType = keyType;
+        this.valueType = valueType;
+    }
+
+    private void init() {
+        if (!inited) {
+            FieldVector mapVector = ((MapVector) vector).getDataVector();
+            StructVector structVector = (StructVector) mapVector;
+            FieldVector keyVector = 
structVector.getChildrenFromFields().get(0);
+            FieldVector valueVector = 
structVector.getChildrenFromFields().get(1);
+            this.keyColumnVector = 
ArrowUtils.createArrowColumnVector(keyVector, keyType);
+            this.valueColumnVector = 
ArrowUtils.createArrowColumnVector(valueVector, valueType);
+            inited = true;
+        }
+    }
+
+    @Override
+    public InternalMap getMap(int i) {
+        init();
+        MapVector mapVector = (MapVector) vector;
+        int start = mapVector.getElementStartIndex(i);
+        int end = mapVector.getElementEndIndex(i);
+
+        return new ColumnarMap(keyColumnVector, valueColumnVector, start, end 
- start);
+    }
+
+    public ColumnVector getKeyColumnVector() {
+        init();
+        return keyColumnVector;
+    }
+
+    public ColumnVector getValueColumnVector() {
+        init();
+        return valueColumnVector;
+    }

Review Comment:
   unsed, remove



##########
fluss-common/src/main/java/org/apache/fluss/row/columnar/ColumnarMap.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.row.columnar;
+
+import org.apache.fluss.annotation.Internal;
+import org.apache.fluss.row.InternalArray;
+import org.apache.fluss.row.InternalMap;
+
+import java.util.Objects;
+
+import static org.apache.fluss.utils.Preconditions.checkNotNull;
+
+/**
+ * Columnar map to support access to vector column data.
+ *
+ * @since 0.9
+ */
+@Internal
+public class ColumnarMap implements InternalMap {
+
+    private final InternalArray keyArray;
+    private final InternalArray valueArray;
+
+    public ColumnarMap(InternalArray keyArray, InternalArray valueArray) {
+        checkNotNull(keyArray, "keyArray must not be null");
+        checkNotNull(valueArray, "valueArray must not be null");
+        if (keyArray.size() != valueArray.size()) {
+            throw new IllegalArgumentException(
+                    "Key array size and value array size must be equal. Key 
array size: "
+                            + keyArray.size()
+                            + ", value array size: "
+                            + valueArray.size());
+        }
+        this.keyArray = keyArray;
+        this.valueArray = valueArray;
+    }
+
+    public ColumnarMap(
+            ColumnVector keyColumnVector,
+            ColumnVector valueColumnVector,
+            int offset,
+            int numElements) {
+        this.keyArray = new ColumnarArray(keyColumnVector, offset, 
numElements);
+        this.valueArray = new ColumnarArray(valueColumnVector, offset, 
numElements);

Review Comment:
   better to invoke `this(..)` to apply the same validation.
   
   ```java
   this(
                   new ColumnarArray(keyColumnVector, offset, numElements),
                   new ColumnarArray(valueColumnVector, offset, numElements));
   ```



##########
fluss-common/src/main/java/org/apache/fluss/row/compacted/CompactedRowReader.java:
##########
@@ -325,7 +326,9 @@ static FieldReader createFieldReader(DataType fieldType) {
                 DataType elementType = ((ArrayType) 
fieldType).getElementType();
                 fieldReader = (reader, pos) -> reader.readArray(elementType);
                 break;
-
+            case MAP:
+                fieldReader = (reader, pos) -> reader.readMap();

Review Comment:
   should pass `keyType` and `valueType` into it, otherwise, we can't support 
`Map<K, Row>` nested type.  



##########
fluss-common/src/main/java/org/apache/fluss/utils/ArrowUtils.java:
##########
@@ -434,9 +450,24 @@ private static Field toArrowField(String fieldName, 
DataType logicalType) {
             for (DataField field : rowType.getFields()) {
                 children.add(toArrowField(field.getName(), field.getType()));
             }
+        } else if (logicalType instanceof MapType) {
+            MapType mapType = (MapType) logicalType;
+            DataType keyType = mapType.getKeyType();
+            if (keyType.isNullable()) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Map key type must be non-nullable for Arrow 
conversion, but got: %s",
+                                keyType.asSummaryString()));
+            }
+            Field keyField = toArrowField("key", keyType);
+            Field valueField = toArrowField("value", mapType.getValueType());
+            FieldType structFieldType = new FieldType(false, 
ArrowType.Struct.INSTANCE, null);
+            List<Field> structChildren = new ArrayList<>();
+            structChildren.add(keyField);
+            structChildren.add(valueField);
+            Field structField = new Field("entries", structFieldType, 
structChildren);

Review Comment:
   Use `MapVector.KEY_NAME`, `MapVector.VALUE_NAME` and 
`MapVector.DATA_VECTOR_NAME` to replace the hard code string names ("key", 
"value", "entires"). 



##########
fluss-common/src/test/java/org/apache/fluss/row/columnar/ColumnarMapTest.java:
##########
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.row.columnar;
+
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.GenericArray;
+import org.apache.fluss.row.InternalArray;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link ColumnarMap}. */
+public class ColumnarMapTest {
+
+    @Test
+    public void testConstructorWithInternalArrays() {
+        Object[] keys = {
+            BinaryString.fromString("key1"),
+            BinaryString.fromString("key2"),
+            BinaryString.fromString("key3")
+        };
+        Object[] values = {1, 2, 3};
+
+        InternalArray keyArray = new GenericArray(keys);
+        InternalArray valueArray = new GenericArray(values);
+
+        ColumnarMap map = new ColumnarMap(keyArray, valueArray);
+
+        assertThat(map.size()).isEqualTo(3);
+        assertThat(map.keyArray()).isEqualTo(keyArray);
+        assertThat(map.valueArray()).isEqualTo(valueArray);
+    }
+
+    @Test
+    public void testConstructorWithColumnVectors() {
+        int[] keys = {1, 2, 3, 4, 5};
+        int[] values = {10, 20, 30, 40, 50};
+
+        TestIntColumnVector keyColumnVector = new TestIntColumnVector(keys);
+        TestIntColumnVector valueColumnVector = new 
TestIntColumnVector(values);
+
+        ColumnarMap map = new ColumnarMap(keyColumnVector, valueColumnVector, 
0, 5);
+
+        assertThat(map.size()).isEqualTo(5);
+        assertThat(map.keyArray()).isInstanceOf(ColumnarArray.class);
+        assertThat(map.valueArray()).isInstanceOf(ColumnarArray.class);
+        assertThat(map.keyArray().getInt(0)).isEqualTo(1);
+        assertThat(map.valueArray().getInt(0)).isEqualTo(10);
+    }
+
+    @Test
+    public void testConstructorWithOffsetAndNumElements() {
+        int[] keys = {1, 2, 3, 4, 5};
+        int[] values = {10, 20, 30, 40, 50};
+
+        TestIntColumnVector keyColumnVector = new TestIntColumnVector(keys);
+        TestIntColumnVector valueColumnVector = new 
TestIntColumnVector(values);
+
+        ColumnarMap map = new ColumnarMap(keyColumnVector, valueColumnVector, 
2, 2);
+
+        assertThat(map.size()).isEqualTo(2);
+        assertThat(map.keyArray().getInt(0)).isEqualTo(3);
+        assertThat(map.keyArray().getInt(1)).isEqualTo(4);
+        assertThat(map.valueArray().getInt(0)).isEqualTo(30);
+        assertThat(map.valueArray().getInt(1)).isEqualTo(40);
+    }
+
+    @Test
+    public void testConstructorWithNullKeyArray() {
+        Object[] values = {1, 2, 3};
+        InternalArray valueArray = new GenericArray(values);
+
+        assertThatThrownBy(() -> new ColumnarMap(null, valueArray))
+                .isInstanceOf(NullPointerException.class)
+                .hasMessageContaining("keyArray must not be null");
+    }
+
+    @Test
+    public void testConstructorWithNullValueArray() {
+        Object[] keys = {BinaryString.fromString("key1"), 
BinaryString.fromString("key2")};
+        InternalArray keyArray = new GenericArray(keys);
+
+        assertThatThrownBy(() -> new ColumnarMap(keyArray, null))
+                .isInstanceOf(NullPointerException.class)
+                .hasMessageContaining("valueArray must not be null");
+    }
+
+    @Test
+    public void testConstructorWithMismatchedArraySizes() {
+        Object[] keys = {BinaryString.fromString("key1"), 
BinaryString.fromString("key2")};
+        Object[] values = {1, 2, 3};
+
+        InternalArray keyArray = new GenericArray(keys);
+        InternalArray valueArray = new GenericArray(values);
+
+        assertThatThrownBy(() -> new ColumnarMap(keyArray, valueArray))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("Key array size and value array size 
must be equal");
+    }
+
+    @Test
+    public void testSize() {
+        Object[] keys = {
+            BinaryString.fromString("a"),
+            BinaryString.fromString("b"),
+            BinaryString.fromString("c"),
+            BinaryString.fromString("d")
+        };
+        Object[] values = {1, 2, 3, 4};
+
+        InternalArray keyArray = new GenericArray(keys);
+        InternalArray valueArray = new GenericArray(values);
+
+        ColumnarMap map = new ColumnarMap(keyArray, valueArray);
+
+        assertThat(map.size()).isEqualTo(4);
+    }
+
+    @Test
+    public void testEmptyMap() {
+        Object[] keys = {};
+        Object[] values = {};
+
+        InternalArray keyArray = new GenericArray(keys);
+        InternalArray valueArray = new GenericArray(values);
+
+        ColumnarMap map = new ColumnarMap(keyArray, valueArray);
+
+        assertThat(map.size()).isEqualTo(0);
+        assertThat(map.keyArray().size()).isEqualTo(0);
+        assertThat(map.valueArray().size()).isEqualTo(0);
+    }
+
+    @Test
+    public void testKeyArrayAndValueArray() {
+        Object[] keys = {
+            BinaryString.fromString("name"),
+            BinaryString.fromString("age"),
+            BinaryString.fromString("city")
+        };
+        Object[] values = {BinaryString.fromString("Alice"), 30, 
BinaryString.fromString("NYC")};
+
+        InternalArray keyArray = new GenericArray(keys);
+        InternalArray valueArray = new GenericArray(values);
+
+        ColumnarMap map = new ColumnarMap(keyArray, valueArray);
+
+        assertThat(map.keyArray()).isSameAs(keyArray);
+        assertThat(map.valueArray()).isSameAs(valueArray);
+
+        
assertThat(map.keyArray().getString(0)).isEqualTo(BinaryString.fromString("name"));
+        
assertThat(map.valueArray().getString(0)).isEqualTo(BinaryString.fromString("Alice"));
+        
assertThat(map.keyArray().getString(1)).isEqualTo(BinaryString.fromString("age"));
+        assertThat(map.valueArray().getInt(1)).isEqualTo(30);
+    }
+
+    @Test
+    public void testEquals() {
+        Object[] keys1 = {BinaryString.fromString("k1"), 
BinaryString.fromString("k2")};
+        Object[] values1 = {1, 2};
+        InternalArray keyArray1 = new GenericArray(keys1);
+        InternalArray valueArray1 = new GenericArray(values1);
+        ColumnarMap map1 = new ColumnarMap(keyArray1, valueArray1);
+
+        Object[] keys2 = {BinaryString.fromString("k1"), 
BinaryString.fromString("k2")};
+        Object[] values2 = {1, 2};
+        InternalArray keyArray2 = new GenericArray(keys2);
+        InternalArray valueArray2 = new GenericArray(values2);
+        ColumnarMap map2 = new ColumnarMap(keyArray2, valueArray2);
+
+        assertThat(map1.equals(map1)).isTrue();
+        assertThat(map1.equals(map2)).isTrue();
+        assertThat(map2.equals(map1)).isTrue();
+    }
+
+    @Test
+    public void testNotEquals() {
+        Object[] keys1 = {BinaryString.fromString("k1"), 
BinaryString.fromString("k2")};
+        Object[] values1 = {1, 2};
+        InternalArray keyArray1 = new GenericArray(keys1);
+        InternalArray valueArray1 = new GenericArray(values1);
+        ColumnarMap map1 = new ColumnarMap(keyArray1, valueArray1);
+
+        Object[] keys2 = {BinaryString.fromString("k1"), 
BinaryString.fromString("k2")};
+        Object[] values2 = {1, 3};
+        InternalArray keyArray2 = new GenericArray(keys2);
+        InternalArray valueArray2 = new GenericArray(values2);
+        ColumnarMap map2 = new ColumnarMap(keyArray2, valueArray2);
+
+        assertThat(map1.equals(map2)).isFalse();
+        assertThat(map1.equals(null)).isFalse();
+        assertThat(map1.equals("string")).isFalse();
+    }
+
+    @Test
+    public void testHashCode() {
+        Object[] keys = {BinaryString.fromString("key1"), 
BinaryString.fromString("key2")};
+        Object[] values = {100, 200};
+
+        InternalArray keyArray = new GenericArray(keys);
+        InternalArray valueArray = new GenericArray(values);
+
+        ColumnarMap map1 = new ColumnarMap(keyArray, valueArray);
+        ColumnarMap map2 = new ColumnarMap(keyArray, valueArray);
+
+        assertThat(map1.hashCode()).isEqualTo(map2.hashCode());
+    }
+
+    @Test
+    public void testToString() {
+        Object[] keys = {BinaryString.fromString("k1")};
+        Object[] values = {42};
+
+        InternalArray keyArray = new GenericArray(keys);
+        InternalArray valueArray = new GenericArray(values);
+
+        ColumnarMap map = new ColumnarMap(keyArray, valueArray);
+
+        String result = map.toString();
+        assertThat(result).contains("ColumnarMap");
+        assertThat(result).contains("keyArray");
+        assertThat(result).contains("valueArray");
+    }
+
+    @Test
+    public void testMapWithNullValues() {
+        Object[] keys = {
+            BinaryString.fromString("k1"),
+            BinaryString.fromString("k2"),
+            BinaryString.fromString("k3")
+        };
+        Object[] values = {1, null, 3};
+
+        InternalArray keyArray = new GenericArray(keys);
+        InternalArray valueArray = new GenericArray(values);
+
+        ColumnarMap map = new ColumnarMap(keyArray, valueArray);
+
+        assertThat(map.size()).isEqualTo(3);
+        assertThat(map.valueArray().isNullAt(0)).isFalse();
+        assertThat(map.valueArray().isNullAt(1)).isTrue();
+        assertThat(map.valueArray().isNullAt(2)).isFalse();
+    }
+
+    @Test
+    public void testMapWithDifferentDataTypes() {
+        long[] keys = {1L, 2L, 3L};
+        double[] values = {1.1, 2.2, 3.3};
+
+        TestLongColumnVector keyColumnVector = new TestLongColumnVector(keys);
+        TestDoubleColumnVector valueColumnVector = new 
TestDoubleColumnVector(values);
+
+        ColumnarMap map = new ColumnarMap(keyColumnVector, valueColumnVector, 
0, 3);
+
+        assertThat(map.size()).isEqualTo(3);
+        assertThat(map.keyArray().getLong(0)).isEqualTo(1L);
+        assertThat(map.valueArray().getDouble(0)).isEqualTo(1.1);
+        assertThat(map.keyArray().getLong(2)).isEqualTo(3L);
+        assertThat(map.valueArray().getDouble(2)).isEqualTo(3.3);
+    }
+
+    private static class TestIntColumnVector implements IntColumnVector {
+        private final int[] values;
+        private final boolean[] nulls;
+
+        TestIntColumnVector(int[] values) {
+            this.values = values;
+            this.nulls = new boolean[values.length];
+        }
+
+        @Override
+        public int getInt(int i) {
+            return values[i];
+        }
+
+        @Override
+        public boolean isNullAt(int i) {
+            return nulls[i];
+        }
+    }
+
+    private static class TestLongColumnVector implements LongColumnVector {
+        private final long[] values;
+        private final boolean[] nulls;
+
+        TestLongColumnVector(long[] values) {
+            this.values = values;
+            this.nulls = new boolean[values.length];
+        }
+
+        @Override
+        public long getLong(int i) {
+            return values[i];
+        }
+
+        @Override
+        public boolean isNullAt(int i) {
+            return nulls[i];
+        }
+    }
+
+    private static class TestDoubleColumnVector implements DoubleColumnVector {
+        private final double[] values;
+        private final boolean[] nulls;
+
+        TestDoubleColumnVector(double[] values) {
+            this.values = values;
+            this.nulls = new boolean[values.length];

Review Comment:
   ditto



##########
fluss-common/src/test/java/org/apache/fluss/row/columnar/ColumnarMapTest.java:
##########
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.row.columnar;
+
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.GenericArray;
+import org.apache.fluss.row.InternalArray;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link ColumnarMap}. */
+public class ColumnarMapTest {
+
+    @Test
+    public void testConstructorWithInternalArrays() {
+        Object[] keys = {
+            BinaryString.fromString("key1"),
+            BinaryString.fromString("key2"),
+            BinaryString.fromString("key3")
+        };
+        Object[] values = {1, 2, 3};
+
+        InternalArray keyArray = new GenericArray(keys);
+        InternalArray valueArray = new GenericArray(values);
+
+        ColumnarMap map = new ColumnarMap(keyArray, valueArray);
+
+        assertThat(map.size()).isEqualTo(3);
+        assertThat(map.keyArray()).isEqualTo(keyArray);
+        assertThat(map.valueArray()).isEqualTo(valueArray);
+    }
+
+    @Test
+    public void testConstructorWithColumnVectors() {
+        int[] keys = {1, 2, 3, 4, 5};
+        int[] values = {10, 20, 30, 40, 50};
+
+        TestIntColumnVector keyColumnVector = new TestIntColumnVector(keys);
+        TestIntColumnVector valueColumnVector = new 
TestIntColumnVector(values);
+
+        ColumnarMap map = new ColumnarMap(keyColumnVector, valueColumnVector, 
0, 5);
+
+        assertThat(map.size()).isEqualTo(5);
+        assertThat(map.keyArray()).isInstanceOf(ColumnarArray.class);
+        assertThat(map.valueArray()).isInstanceOf(ColumnarArray.class);
+        assertThat(map.keyArray().getInt(0)).isEqualTo(1);
+        assertThat(map.valueArray().getInt(0)).isEqualTo(10);
+    }
+
+    @Test
+    public void testConstructorWithOffsetAndNumElements() {
+        int[] keys = {1, 2, 3, 4, 5};
+        int[] values = {10, 20, 30, 40, 50};
+
+        TestIntColumnVector keyColumnVector = new TestIntColumnVector(keys);
+        TestIntColumnVector valueColumnVector = new 
TestIntColumnVector(values);
+
+        ColumnarMap map = new ColumnarMap(keyColumnVector, valueColumnVector, 
2, 2);
+
+        assertThat(map.size()).isEqualTo(2);
+        assertThat(map.keyArray().getInt(0)).isEqualTo(3);
+        assertThat(map.keyArray().getInt(1)).isEqualTo(4);
+        assertThat(map.valueArray().getInt(0)).isEqualTo(30);
+        assertThat(map.valueArray().getInt(1)).isEqualTo(40);
+    }
+
+    @Test
+    public void testConstructorWithNullKeyArray() {
+        Object[] values = {1, 2, 3};
+        InternalArray valueArray = new GenericArray(values);
+
+        assertThatThrownBy(() -> new ColumnarMap(null, valueArray))
+                .isInstanceOf(NullPointerException.class)
+                .hasMessageContaining("keyArray must not be null");
+    }
+
+    @Test
+    public void testConstructorWithNullValueArray() {
+        Object[] keys = {BinaryString.fromString("key1"), 
BinaryString.fromString("key2")};
+        InternalArray keyArray = new GenericArray(keys);
+
+        assertThatThrownBy(() -> new ColumnarMap(keyArray, null))
+                .isInstanceOf(NullPointerException.class)
+                .hasMessageContaining("valueArray must not be null");
+    }
+
+    @Test
+    public void testConstructorWithMismatchedArraySizes() {
+        Object[] keys = {BinaryString.fromString("key1"), 
BinaryString.fromString("key2")};
+        Object[] values = {1, 2, 3};
+
+        InternalArray keyArray = new GenericArray(keys);
+        InternalArray valueArray = new GenericArray(values);
+
+        assertThatThrownBy(() -> new ColumnarMap(keyArray, valueArray))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("Key array size and value array size 
must be equal");
+    }
+
+    @Test
+    public void testSize() {
+        Object[] keys = {
+            BinaryString.fromString("a"),
+            BinaryString.fromString("b"),
+            BinaryString.fromString("c"),
+            BinaryString.fromString("d")
+        };
+        Object[] values = {1, 2, 3, 4};
+
+        InternalArray keyArray = new GenericArray(keys);
+        InternalArray valueArray = new GenericArray(values);
+
+        ColumnarMap map = new ColumnarMap(keyArray, valueArray);
+
+        assertThat(map.size()).isEqualTo(4);
+    }
+
+    @Test
+    public void testEmptyMap() {
+        Object[] keys = {};
+        Object[] values = {};
+
+        InternalArray keyArray = new GenericArray(keys);
+        InternalArray valueArray = new GenericArray(values);
+
+        ColumnarMap map = new ColumnarMap(keyArray, valueArray);
+
+        assertThat(map.size()).isEqualTo(0);
+        assertThat(map.keyArray().size()).isEqualTo(0);
+        assertThat(map.valueArray().size()).isEqualTo(0);
+    }
+
+    @Test
+    public void testKeyArrayAndValueArray() {
+        Object[] keys = {
+            BinaryString.fromString("name"),
+            BinaryString.fromString("age"),
+            BinaryString.fromString("city")
+        };
+        Object[] values = {BinaryString.fromString("Alice"), 30, 
BinaryString.fromString("NYC")};
+
+        InternalArray keyArray = new GenericArray(keys);
+        InternalArray valueArray = new GenericArray(values);
+
+        ColumnarMap map = new ColumnarMap(keyArray, valueArray);
+
+        assertThat(map.keyArray()).isSameAs(keyArray);
+        assertThat(map.valueArray()).isSameAs(valueArray);
+
+        
assertThat(map.keyArray().getString(0)).isEqualTo(BinaryString.fromString("name"));
+        
assertThat(map.valueArray().getString(0)).isEqualTo(BinaryString.fromString("Alice"));
+        
assertThat(map.keyArray().getString(1)).isEqualTo(BinaryString.fromString("age"));
+        assertThat(map.valueArray().getInt(1)).isEqualTo(30);
+    }
+
+    @Test
+    public void testEquals() {
+        Object[] keys1 = {BinaryString.fromString("k1"), 
BinaryString.fromString("k2")};
+        Object[] values1 = {1, 2};
+        InternalArray keyArray1 = new GenericArray(keys1);
+        InternalArray valueArray1 = new GenericArray(values1);
+        ColumnarMap map1 = new ColumnarMap(keyArray1, valueArray1);
+
+        Object[] keys2 = {BinaryString.fromString("k1"), 
BinaryString.fromString("k2")};
+        Object[] values2 = {1, 2};
+        InternalArray keyArray2 = new GenericArray(keys2);
+        InternalArray valueArray2 = new GenericArray(values2);
+        ColumnarMap map2 = new ColumnarMap(keyArray2, valueArray2);
+
+        assertThat(map1.equals(map1)).isTrue();
+        assertThat(map1.equals(map2)).isTrue();
+        assertThat(map2.equals(map1)).isTrue();
+    }
+
+    @Test
+    public void testNotEquals() {
+        Object[] keys1 = {BinaryString.fromString("k1"), 
BinaryString.fromString("k2")};
+        Object[] values1 = {1, 2};
+        InternalArray keyArray1 = new GenericArray(keys1);
+        InternalArray valueArray1 = new GenericArray(values1);
+        ColumnarMap map1 = new ColumnarMap(keyArray1, valueArray1);
+
+        Object[] keys2 = {BinaryString.fromString("k1"), 
BinaryString.fromString("k2")};
+        Object[] values2 = {1, 3};
+        InternalArray keyArray2 = new GenericArray(keys2);
+        InternalArray valueArray2 = new GenericArray(values2);
+        ColumnarMap map2 = new ColumnarMap(keyArray2, valueArray2);
+
+        assertThat(map1.equals(map2)).isFalse();
+        assertThat(map1.equals(null)).isFalse();
+        assertThat(map1.equals("string")).isFalse();
+    }
+
+    @Test
+    public void testHashCode() {
+        Object[] keys = {BinaryString.fromString("key1"), 
BinaryString.fromString("key2")};
+        Object[] values = {100, 200};
+
+        InternalArray keyArray = new GenericArray(keys);
+        InternalArray valueArray = new GenericArray(values);
+
+        ColumnarMap map1 = new ColumnarMap(keyArray, valueArray);
+        ColumnarMap map2 = new ColumnarMap(keyArray, valueArray);
+
+        assertThat(map1.hashCode()).isEqualTo(map2.hashCode());
+    }
+
+    @Test
+    public void testToString() {
+        Object[] keys = {BinaryString.fromString("k1")};
+        Object[] values = {42};
+
+        InternalArray keyArray = new GenericArray(keys);
+        InternalArray valueArray = new GenericArray(values);
+
+        ColumnarMap map = new ColumnarMap(keyArray, valueArray);
+
+        String result = map.toString();
+        assertThat(result).contains("ColumnarMap");
+        assertThat(result).contains("keyArray");
+        assertThat(result).contains("valueArray");
+    }
+
+    @Test
+    public void testMapWithNullValues() {
+        Object[] keys = {
+            BinaryString.fromString("k1"),
+            BinaryString.fromString("k2"),
+            BinaryString.fromString("k3")
+        };
+        Object[] values = {1, null, 3};
+
+        InternalArray keyArray = new GenericArray(keys);
+        InternalArray valueArray = new GenericArray(values);
+
+        ColumnarMap map = new ColumnarMap(keyArray, valueArray);
+
+        assertThat(map.size()).isEqualTo(3);
+        assertThat(map.valueArray().isNullAt(0)).isFalse();
+        assertThat(map.valueArray().isNullAt(1)).isTrue();
+        assertThat(map.valueArray().isNullAt(2)).isFalse();
+    }
+
+    @Test
+    public void testMapWithDifferentDataTypes() {
+        long[] keys = {1L, 2L, 3L};
+        double[] values = {1.1, 2.2, 3.3};
+
+        TestLongColumnVector keyColumnVector = new TestLongColumnVector(keys);
+        TestDoubleColumnVector valueColumnVector = new 
TestDoubleColumnVector(values);
+
+        ColumnarMap map = new ColumnarMap(keyColumnVector, valueColumnVector, 
0, 3);
+
+        assertThat(map.size()).isEqualTo(3);
+        assertThat(map.keyArray().getLong(0)).isEqualTo(1L);
+        assertThat(map.valueArray().getDouble(0)).isEqualTo(1.1);
+        assertThat(map.keyArray().getLong(2)).isEqualTo(3L);
+        assertThat(map.valueArray().getDouble(2)).isEqualTo(3.3);
+    }
+
+    private static class TestIntColumnVector implements IntColumnVector {
+        private final int[] values;
+        private final boolean[] nulls;
+
+        TestIntColumnVector(int[] values) {
+            this.values = values;
+            this.nulls = new boolean[values.length];
+        }
+
+        @Override
+        public int getInt(int i) {
+            return values[i];
+        }
+
+        @Override
+        public boolean isNullAt(int i) {
+            return nulls[i];
+        }
+    }
+
+    private static class TestLongColumnVector implements LongColumnVector {
+        private final long[] values;
+        private final boolean[] nulls;
+
+        TestLongColumnVector(long[] values) {
+            this.values = values;
+            this.nulls = new boolean[values.length];

Review Comment:
   ditto



##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkComplexTypeITCase.java:
##########
@@ -304,6 +304,108 @@ void testRowTypesInLogTable() throws Exception {
         assertResultsIgnoreOrder(rowIter, expectedRows, true);
     }
 
+    @Test
+    void testMapTypesInLogTable() throws Exception {

Review Comment:
   I found that the current tests not cover all the cases, like missing PK 
table tests for Map and Row type, missing partitioned table test for Row type. 
   
   Could you merge the map type tests and row type tests into the existing 
`testArrayTypesInPrimaryKeyTable`, `testArrayTypesInPartitionedLogTable`, and 
`testArrayTypesInLogTable`, and rename them to 
`testComplexTypesInPrimaryKeyTable`, `testComplexTypesInPartitionedLogTable`, 
and `testComplexTypesInLogTable`? This can also reduce the test time. 
   
   Besides, could you add more nested type tests? Like 
   - `array<map<string, double>>`
   - `row<map<int, array<float>>`
   - `map<bigint, row<string, array<string>, array<int>>`
   
   
   
   
   



##########
fluss-common/src/main/java/org/apache/fluss/row/BinaryMap.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.row;
+
+import org.apache.fluss.annotation.PublicEvolving;
+import org.apache.fluss.memory.MemorySegment;
+import org.apache.fluss.row.array.PrimitiveBinaryArray;
+
+import java.io.Serializable;
+import java.util.function.Supplier;
+
+import static org.apache.fluss.utils.Preconditions.checkArgument;
+
+/**
+ * Binary implementation of {@link InternalMap} backed by {@link 
MemorySegment}s.
+ *
+ * <p>The binary layout of {@link BinaryMap}:
+ *
+ * <pre>
+ * [4 byte(keyArray size in bytes)] + [Key BinaryArray] + [Value BinaryArray].
+ * </pre>
+ *
+ * <p>Influenced by Apache Spark UnsafeMapData.
+ *
+ * @since 0.9
+ */
+@PublicEvolving
+public class BinaryMap extends BinarySection implements InternalMap {

Review Comment:
   follow `BinaryArray`, mark this as `abstract` class, and have 
`PrimitiveBinaryMap` subclass. 



##########
fluss-common/src/main/java/org/apache/fluss/row/columnar/ColumnarRow.java:
##########
@@ -137,7 +138,10 @@ public InternalArray getArray(int pos) {
         return vectorizedColumnBatch.getArray(rowId, pos);
     }
 
-    // TODO: getMap() will be added in Issue #1973
+    @Override
+    public InternalMap getMap(int pos) {
+        return ((MapColumnVector) 
vectorizedColumnBatch.getColumn(pos)).getMap(rowId);

Review Comment:
   Why not implement this like `getArray()` that having a `getMap` method 
directly on the `VectorizedColumnBatch`?



##########
fluss-common/src/main/java/org/apache/fluss/row/arrow/vectors/ArrowMapColumnVector.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.row.arrow.vectors;
+
+import org.apache.fluss.row.InternalMap;
+import org.apache.fluss.row.columnar.ColumnVector;
+import org.apache.fluss.row.columnar.ColumnarMap;
+import org.apache.fluss.row.columnar.MapColumnVector;
+import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.FieldVector;
+import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.MapVector;
+import 
org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.StructVector;
+import org.apache.fluss.types.DataType;
+import org.apache.fluss.utils.ArrowUtils;
+
+/** ArrowMapColumnVector is a wrapper class for Arrow MapVector. */
+public class ArrowMapColumnVector implements MapColumnVector {
+    private boolean inited = false;
+    private final FieldVector vector;
+    private final DataType keyType;
+    private final DataType valueType;
+    private ColumnVector keyColumnVector;
+    private ColumnVector valueColumnVector;
+
+    public ArrowMapColumnVector(FieldVector vector, DataType keyType, DataType 
valueType) {
+        this.vector = vector;
+        this.keyType = keyType;
+        this.valueType = valueType;
+    }
+
+    private void init() {
+        if (!inited) {
+            FieldVector mapVector = ((MapVector) vector).getDataVector();
+            StructVector structVector = (StructVector) mapVector;
+            FieldVector keyVector = 
structVector.getChildrenFromFields().get(0);
+            FieldVector valueVector = 
structVector.getChildrenFromFields().get(1);
+            this.keyColumnVector = 
ArrowUtils.createArrowColumnVector(keyVector, keyType);
+            this.valueColumnVector = 
ArrowUtils.createArrowColumnVector(valueVector, valueType);
+            inited = true;
+        }
+    }
+
+    @Override
+    public InternalMap getMap(int i) {
+        init();
+        MapVector mapVector = (MapVector) vector;

Review Comment:
   Direct store `vector` as `MapVector` in `ArrowMapColumnVector` to avoid 
casting per record. 



##########
fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java:
##########
@@ -321,6 +356,18 @@ void testComplexTypeFetch() throws Exception {
                     .isEqualTo(((Object[]) ((Object[]) 
complexData.get(i)[3])[1])[1]);
             assertThat(nestedRow.getString(2).toString())
                     .isEqualTo(((Object[]) complexData.get(i)[3])[2]);
+
+            assertThat(row.getMap(4)).isInstanceOf(GenericMap.class);
+            GenericMap simpleMap = (GenericMap) row.getMap(4);
+            assertThat(simpleMap.size()).isGreaterThan(0);
+
+            assertThat(row.getMap(5)).isInstanceOf(GenericMap.class);
+            GenericMap nestedMap = (GenericMap) row.getMap(5);
+            assertThat(nestedMap.size()).isGreaterThan(0);
+
+            assertThat(row.getMap(6)).isInstanceOf(GenericMap.class);
+            GenericMap mapWithArray = (GenericMap) row.getMap(6);
+            assertThat(mapWithArray.size()).isGreaterThan(0);

Review Comment:
   Actually, this approach doesn’t verify the actual field values, and it 
significantly inflates the line count, making the code harder to maintain.  
   
   Moreover, we’ll soon need to add assertions for column projection here as 
well.
   
   My suggestion: prepare a list of **expected rows** upfront and perform a 
direct assert equals comparison with the actual result. 
   
   This keeps the test concise, expressive, and much easier to maintain.



##########
fluss-common/src/main/java/org/apache/fluss/row/arrow/writers/ArrowMapWriter.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.row.arrow.writers;
+
+import org.apache.fluss.row.DataGetters;
+import org.apache.fluss.row.InternalArray;
+import org.apache.fluss.row.InternalMap;
+import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.FieldVector;
+import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.MapVector;
+import 
org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.StructVector;
+
+/** {@link ArrowFieldWriter} for Map. */
+public class ArrowMapWriter extends ArrowFieldWriter {
+
+    private final ArrowFieldWriter keyWriter;
+    private final ArrowFieldWriter valueWriter;
+    private int offset;
+
+    public ArrowMapWriter(
+            FieldVector fieldVector, ArrowFieldWriter keyWriter, 
ArrowFieldWriter valueWriter) {
+        super(fieldVector);
+        this.keyWriter = keyWriter;
+        this.valueWriter = valueWriter;
+    }
+
+    @Override
+    public void doWrite(int rowIndex, DataGetters row, int ordinal, boolean 
handleSafe) {
+        InternalMap map = row.getMap(ordinal);
+        MapVector mapVector = (MapVector) fieldVector;
+        StructVector structVector = (StructVector) mapVector.getDataVector();
+
+        InternalArray keyArray = map.keyArray();
+        InternalArray valueArray = map.valueArray();
+
+        mapVector.startNewValue(rowIndex);
+        for (int i = 0; i < map.size(); i++) {
+            int fieldIndex = offset + i;
+            structVector.setIndexDefined(fieldIndex);
+            keyWriter.write(fieldIndex, keyArray, i, handleSafe);
+            valueWriter.write(fieldIndex, valueArray, i, handleSafe);
+        }
+        offset += map.size();

Review Comment:
   we should override `reset()` method to reset `offset` and `keyWriter` and 
`valueWriter`. 
   
   I also found that we forgot to add the `reset()` for the newly introduced 
`ArrowRowWriter`.



##########
fluss-common/src/main/java/org/apache/fluss/row/BinaryMap.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.row;
+
+import org.apache.fluss.annotation.PublicEvolving;
+import org.apache.fluss.memory.MemorySegment;
+import org.apache.fluss.row.array.PrimitiveBinaryArray;
+
+import java.io.Serializable;
+import java.util.function.Supplier;
+
+import static org.apache.fluss.utils.Preconditions.checkArgument;
+
+/**
+ * Binary implementation of {@link InternalMap} backed by {@link 
MemorySegment}s.
+ *
+ * <p>The binary layout of {@link BinaryMap}:
+ *
+ * <pre>
+ * [4 byte(keyArray size in bytes)] + [Key BinaryArray] + [Value BinaryArray].
+ * </pre>
+ *
+ * <p>Influenced by Apache Spark UnsafeMapData.
+ *
+ * @since 0.9
+ */
+@PublicEvolving
+public class BinaryMap extends BinarySection implements InternalMap {
+
+    private static final long serialVersionUID = 1L;
+
+    private transient BinaryArray keys;
+    private transient BinaryArray values;
+
+    private final transient Supplier<BinaryArray> keyArraySupplier;
+    private final transient Supplier<BinaryArray> valueArraySupplier;
+    private final transient Supplier<BinaryMap> nestedMapSupplier;
+
+    public BinaryMap() {
+        this(
+                (Supplier<BinaryArray> & Serializable) 
PrimitiveBinaryArray::new,
+                (Supplier<BinaryArray> & Serializable) 
PrimitiveBinaryArray::new,
+                (Supplier<BinaryMap> & Serializable) BinaryMap::new);
+    }
+
+    public BinaryMap(
+            Supplier<BinaryArray> keyArraySupplier,
+            Supplier<BinaryArray> valueArraySupplier,
+            Supplier<BinaryMap> nestedMapSupplier) {
+        this.keyArraySupplier = keyArraySupplier;
+        this.valueArraySupplier = valueArraySupplier;
+        this.nestedMapSupplier = nestedMapSupplier;
+    }
+
+    @Override
+    public int size() {
+        return keys.size();
+    }
+
+    @Override
+    public void pointTo(MemorySegment[] segments, int offset, int sizeInBytes) 
{
+        // Read the numBytes of key array from the first 4 bytes.
+        final int keyArrayBytes = BinarySegmentUtils.getInt(segments, offset);
+        assert keyArrayBytes >= 0 : "keyArraySize (" + keyArrayBytes + ") 
should >= 0";
+        final int valueArrayBytes = sizeInBytes - keyArrayBytes - 4;
+        assert valueArrayBytes >= 0 : "valueArraySize (" + valueArrayBytes + 
") should >= 0";
+
+        // see BinarySection.readObject, on this call stack, keys and values 
are not initialized
+        if (keys == null) {
+            keys = createKeyArrayInstance();
+        }
+        keys.pointTo(segments, offset + 4, keyArrayBytes);
+        if (values == null) {
+            values = createValueArrayInstance();
+        }
+        values.pointTo(segments, offset + 4 + keyArrayBytes, valueArrayBytes);
+
+        assert keys.size() == values.size();
+
+        this.segments = segments;
+        this.offset = offset;
+        this.sizeInBytes = sizeInBytes;
+    }
+
+    /** Creates a {@link BinaryArray} instance for keys with the nested data 
type information. */
+    protected BinaryArray createKeyArrayInstance() {
+        return keyArraySupplier.get();
+    }
+
+    /** Creates a {@link BinaryArray} instance for values with the nested data 
type information. */
+    protected BinaryArray createValueArrayInstance() {
+        return valueArraySupplier.get();
+    }
+
+    /** Creates a nested {@link BinaryMap} with the nested data type 
information. */
+    protected BinaryMap createNestedMapInstance() {

Review Comment:
   The nested instance has already been handled by `BinaryArray`, so we don't 
need nested map instance. I mean, we exposed `InternalArray#getArray` and 
`InternalArray#getRow` interface, so there is nested row instance and nested 
array instance needs for `BinaryArray`. But we don't expose such APIs for 
`InternalMap`, so we don't need nested xxx instance. 
   
   Currently, this method is only used by `copy(...)` which actually is a 
create self map instance, so you should create another method for this purpose, 
like `createMapInstance`



##########
fluss-common/src/main/java/org/apache/fluss/row/indexed/IndexedRowReader.java:
##########
@@ -286,15 +295,14 @@ static FieldReader createFieldReader(DataType fieldType) {
                 DataType elementType = ((ArrayType) 
fieldType).getElementType();
                 fieldReader = (reader, pos) -> reader.readArray(elementType);
                 break;
+            case MAP:
+                fieldReader = (reader, pos) -> reader.readMap();

Review Comment:
   ditto



##########
fluss-common/src/main/java/org/apache/fluss/row/BinarySegmentUtils.java:
##########
@@ -1002,18 +1002,30 @@ public static BinaryArray readBinaryArray(
         return reusedArray;
     }
 
+    public static BinaryMap readBinaryMap(
+            MemorySegment[] segments, int baseOffset, long offsetAndSize) {

Review Comment:
   we should do like `readBinaryArray` that pass in a reused `BinaryMap` 
instead hard-coding `new BinaryMap`. 



##########
fluss-common/src/main/java/org/apache/fluss/row/arrow/vectors/ArrowMapColumnVector.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.row.arrow.vectors;
+
+import org.apache.fluss.row.InternalMap;
+import org.apache.fluss.row.columnar.ColumnVector;
+import org.apache.fluss.row.columnar.ColumnarMap;
+import org.apache.fluss.row.columnar.MapColumnVector;
+import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.FieldVector;
+import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.MapVector;
+import 
org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.StructVector;
+import org.apache.fluss.types.DataType;
+import org.apache.fluss.utils.ArrowUtils;
+
+/** ArrowMapColumnVector is a wrapper class for Arrow MapVector. */
+public class ArrowMapColumnVector implements MapColumnVector {
+    private boolean inited = false;
+    private final FieldVector vector;
+    private final DataType keyType;
+    private final DataType valueType;
+    private ColumnVector keyColumnVector;
+    private ColumnVector valueColumnVector;
+
+    public ArrowMapColumnVector(FieldVector vector, DataType keyType, DataType 
valueType) {
+        this.vector = vector;
+        this.keyType = keyType;
+        this.valueType = valueType;
+    }
+
+    private void init() {
+        if (!inited) {
+            FieldVector mapVector = ((MapVector) vector).getDataVector();
+            StructVector structVector = (StructVector) mapVector;
+            FieldVector keyVector = 
structVector.getChildrenFromFields().get(0);
+            FieldVector valueVector = 
structVector.getChildrenFromFields().get(1);
+            this.keyColumnVector = 
ArrowUtils.createArrowColumnVector(keyVector, keyType);
+            this.valueColumnVector = 
ArrowUtils.createArrowColumnVector(valueVector, valueType);
+            inited = true;
+        }
+    }
+
+    @Override
+    public InternalMap getMap(int i) {
+        init();
+        MapVector mapVector = (MapVector) vector;
+        int start = mapVector.getElementStartIndex(i);
+        int end = mapVector.getElementEndIndex(i);
+
+        return new ColumnarMap(keyColumnVector, valueColumnVector, start, end 
- start);
+    }
+
+    public ColumnVector getKeyColumnVector() {
+        init();
+        return keyColumnVector;
+    }
+
+    public ColumnVector getValueColumnVector() {
+        init();
+        return valueColumnVector;
+    }
+
+    @Override
+    public boolean isNullAt(int i) {
+        return vector.isNull(i);
+    }
+
+    public ColumnVector[] getChildren() {

Review Comment:
   unused, remove



##########
fluss-common/src/main/java/org/apache/fluss/row/arrow/vectors/ArrowMapColumnVector.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.row.arrow.vectors;
+
+import org.apache.fluss.row.InternalMap;
+import org.apache.fluss.row.columnar.ColumnVector;
+import org.apache.fluss.row.columnar.ColumnarMap;
+import org.apache.fluss.row.columnar.MapColumnVector;
+import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.FieldVector;
+import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.MapVector;
+import 
org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.StructVector;
+import org.apache.fluss.types.DataType;
+import org.apache.fluss.utils.ArrowUtils;
+
+/** ArrowMapColumnVector is a wrapper class for Arrow MapVector. */
+public class ArrowMapColumnVector implements MapColumnVector {
+    private boolean inited = false;
+    private final FieldVector vector;
+    private final DataType keyType;
+    private final DataType valueType;
+    private ColumnVector keyColumnVector;
+    private ColumnVector valueColumnVector;
+
+    public ArrowMapColumnVector(FieldVector vector, DataType keyType, DataType 
valueType) {
+        this.vector = vector;
+        this.keyType = keyType;
+        this.valueType = valueType;
+    }
+
+    private void init() {
+        if (!inited) {
+            FieldVector mapVector = ((MapVector) vector).getDataVector();
+            StructVector structVector = (StructVector) mapVector;
+            FieldVector keyVector = 
structVector.getChildrenFromFields().get(0);
+            FieldVector valueVector = 
structVector.getChildrenFromFields().get(1);
+            this.keyColumnVector = 
ArrowUtils.createArrowColumnVector(keyVector, keyType);
+            this.valueColumnVector = 
ArrowUtils.createArrowColumnVector(valueVector, valueType);
+            inited = true;
+        }
+    }

Review Comment:
   initialize in the constructor and remove this method. 



##########
fluss-common/src/main/java/org/apache/fluss/row/InternalArray.java:
##########
@@ -184,6 +185,29 @@ static ElementGetter createDeepElementGetter(DataType 
fieldType) {
                             return new GenericArray(objs);
                         };
                 break;
+            case MAP:
+                DataType keyType = ((org.apache.fluss.types.MapType) 
fieldType).getKeyType();
+                DataType valueType = ((org.apache.fluss.types.MapType) 
fieldType).getValueType();

Review Comment:
   replace qualified name with imports



##########
fluss-common/src/main/java/org/apache/fluss/utils/ArrowUtils.java:
##########
@@ -339,6 +342,16 @@ public static ArrowFieldWriter 
createArrowFieldWriter(FieldVector vector, DataTy
             FieldVector elementFieldVector = ((ListVector) 
vector).getDataVector();
             return new ArrowArrayWriter(
                     vector, 
ArrowUtils.createArrowFieldWriter(elementFieldVector, elementType));
+        } else if (vector instanceof MapVector && dataType instanceof MapType) 
{
+            MapType mapType = (MapType) dataType;
+            MapVector mapVector = (MapVector) vector;
+            StructVector structVector = (StructVector) 
mapVector.getDataVector();
+            FieldVector keyVector = structVector.getChild("key");
+            FieldVector valueVector = structVector.getChild("value");

Review Comment:
   use Arrow's `MapVector.KEY_NAME` and `MapVector.VALUE_NAME`.



##########
fluss-common/src/main/java/org/apache/fluss/utils/json/DataTypeJsonSerde.java:
##########
@@ -285,7 +285,8 @@ private static DataType deserializeMap(JsonNode 
dataTypeNode) {
         final DataType keyType = 
DataTypeJsonSerde.INSTANCE.deserialize(keyNode);
         final JsonNode valueNode = dataTypeNode.get(FIELD_NAME_VALUE_TYPE);
         final DataType valueType = 
DataTypeJsonSerde.INSTANCE.deserialize(valueNode);
-        return new MapType(keyType, valueType);
+        final DataType nonNullableKeyType = keyType.isNullable() ? 
keyType.copy(false) : keyType;

Review Comment:
   do not transform the data type during deserialize, this is very hack. If the 
key shouldn't be nullable, then this should be guaranteed by `MapType` 
constructor, not the type serialization/deserialition phase. 
Serialization/deserialition should just focus on how to read and write types 
from JSON.



##########
fluss-common/src/main/java/org/apache/fluss/row/InternalArray.java:
##########
@@ -184,6 +185,29 @@ static ElementGetter createDeepElementGetter(DataType 
fieldType) {
                             return new GenericArray(objs);
                         };
                 break;
+            case MAP:
+                DataType keyType = ((org.apache.fluss.types.MapType) 
fieldType).getKeyType();
+                DataType valueType = ((org.apache.fluss.types.MapType) 
fieldType).getValueType();
+                ElementGetter keyGetter = createDeepElementGetter(keyType);
+                ElementGetter valueGetter = createDeepElementGetter(valueType);
+                elementGetter =
+                        (array, pos) -> {
+                            InternalMap inner = array.getMap(pos);
+                            InternalArray keys = inner.keyArray();
+                            InternalArray values = inner.valueArray();
+                            Object[] keyObjs = new Object[keys.size()];
+                            Object[] valueObjs = new Object[values.size()];
+                            for (int i = 0; i < keys.size(); i++) {
+                                keyObjs[i] = keyGetter.getElementOrNull(keys, 
i);
+                                valueObjs[i] = 
valueGetter.getElementOrNull(values, i);

Review Comment:
   we don't need the temporary `keyObjs` and `valueObjs`, directly put them 
into the map, this can reduce small object overhead. 



##########
fluss-common/src/main/java/org/apache/fluss/row/InternalArray.java:
##########
@@ -184,6 +185,29 @@ static ElementGetter createDeepElementGetter(DataType 
fieldType) {
                             return new GenericArray(objs);
                         };
                 break;
+            case MAP:
+                DataType keyType = ((org.apache.fluss.types.MapType) 
fieldType).getKeyType();
+                DataType valueType = ((org.apache.fluss.types.MapType) 
fieldType).getValueType();
+                ElementGetter keyGetter = createDeepElementGetter(keyType);
+                ElementGetter valueGetter = createDeepElementGetter(valueType);
+                elementGetter =
+                        (array, pos) -> {
+                            InternalMap inner = array.getMap(pos);
+                            InternalArray keys = inner.keyArray();
+                            InternalArray values = inner.valueArray();
+                            Object[] keyObjs = new Object[keys.size()];
+                            Object[] valueObjs = new Object[values.size()];
+                            for (int i = 0; i < keys.size(); i++) {
+                                keyObjs[i] = keyGetter.getElementOrNull(keys, 
i);
+                                valueObjs[i] = 
valueGetter.getElementOrNull(values, i);
+                            }
+                            java.util.Map<Object, Object> map = new 
java.util.HashMap<>();

Review Comment:
   replace qualified name with imports



##########
fluss-common/src/main/java/org/apache/fluss/row/InternalRow.java:
##########
@@ -260,6 +263,28 @@ static FieldGetter createDeepFieldGetter(DataType 
fieldType, int fieldPos) {
                             return new GenericArray(objs);
                         };
                 break;
+            case MAP:
+                MapType mapType = (MapType) fieldType;
+                InternalArray.ElementGetter keyGetter =
+                        createDeepElementGetter(mapType.getKeyType());
+                InternalArray.ElementGetter valueGetter =
+                        createDeepElementGetter(mapType.getValueType());
+                fieldGetter =
+                        row -> {
+                            InternalMap map = row.getMap(fieldPos);
+                            Object[] keys = new Object[map.size()];
+                            Object[] values = new Object[map.size()];
+                            for (int i = 0; i < map.size(); i++) {
+                                keys[i] = 
keyGetter.getElementOrNull(map.keyArray(), i);
+                                values[i] = 
valueGetter.getElementOrNull(map.valueArray(), i);
+                            }
+                            java.util.Map<Object, Object> javaMap = new 
java.util.HashMap<>();

Review Comment:
   ditto



##########
fluss-common/src/main/java/org/apache/fluss/row/BinaryMap.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.row;
+
+import org.apache.fluss.annotation.PublicEvolving;
+import org.apache.fluss.memory.MemorySegment;
+import org.apache.fluss.row.array.PrimitiveBinaryArray;
+
+import java.io.Serializable;
+import java.util.function.Supplier;
+
+import static org.apache.fluss.utils.Preconditions.checkArgument;
+
+/**
+ * Binary implementation of {@link InternalMap} backed by {@link 
MemorySegment}s.
+ *
+ * <p>The binary layout of {@link BinaryMap}:
+ *
+ * <pre>
+ * [4 byte(keyArray size in bytes)] + [Key BinaryArray] + [Value BinaryArray].
+ * </pre>
+ *
+ * <p>Influenced by Apache Spark UnsafeMapData.
+ *
+ * @since 0.9
+ */
+@PublicEvolving
+public class BinaryMap extends BinarySection implements InternalMap {
+
+    private static final long serialVersionUID = 1L;
+
+    private transient BinaryArray keys;
+    private transient BinaryArray values;
+
+    private final transient Supplier<BinaryArray> keyArraySupplier;
+    private final transient Supplier<BinaryArray> valueArraySupplier;
+    private final transient Supplier<BinaryMap> nestedMapSupplier;
+
+    public BinaryMap() {
+        this(
+                (Supplier<BinaryArray> & Serializable) 
PrimitiveBinaryArray::new,
+                (Supplier<BinaryArray> & Serializable) 
PrimitiveBinaryArray::new,
+                (Supplier<BinaryMap> & Serializable) BinaryMap::new);
+    }
+
+    public BinaryMap(
+            Supplier<BinaryArray> keyArraySupplier,
+            Supplier<BinaryArray> valueArraySupplier,
+            Supplier<BinaryMap> nestedMapSupplier) {
+        this.keyArraySupplier = keyArraySupplier;
+        this.valueArraySupplier = valueArraySupplier;
+        this.nestedMapSupplier = nestedMapSupplier;
+    }
+
+    @Override
+    public int size() {
+        return keys.size();
+    }
+
+    @Override
+    public void pointTo(MemorySegment[] segments, int offset, int sizeInBytes) 
{
+        // Read the numBytes of key array from the first 4 bytes.
+        final int keyArrayBytes = BinarySegmentUtils.getInt(segments, offset);
+        assert keyArrayBytes >= 0 : "keyArraySize (" + keyArrayBytes + ") 
should >= 0";
+        final int valueArrayBytes = sizeInBytes - keyArrayBytes - 4;
+        assert valueArrayBytes >= 0 : "valueArraySize (" + valueArrayBytes + 
") should >= 0";
+
+        // see BinarySection.readObject, on this call stack, keys and values 
are not initialized
+        if (keys == null) {
+            keys = createKeyArrayInstance();
+        }
+        keys.pointTo(segments, offset + 4, keyArrayBytes);
+        if (values == null) {
+            values = createValueArrayInstance();
+        }
+        values.pointTo(segments, offset + 4 + keyArrayBytes, valueArrayBytes);
+
+        assert keys.size() == values.size();
+
+        this.segments = segments;
+        this.offset = offset;
+        this.sizeInBytes = sizeInBytes;
+    }
+
+    /** Creates a {@link BinaryArray} instance for keys with the nested data 
type information. */
+    protected BinaryArray createKeyArrayInstance() {
+        return keyArraySupplier.get();
+    }
+
+    /** Creates a {@link BinaryArray} instance for values with the nested data 
type information. */
+    protected BinaryArray createValueArrayInstance() {
+        return valueArraySupplier.get();
+    }
+
+    /** Creates a nested {@link BinaryMap} with the nested data type 
information. */
+    protected BinaryMap createNestedMapInstance() {
+        return nestedMapSupplier.get();
+    }

Review Comment:
   Follow the pattern used in `BinaryArray` by declaring these as **abstract 
methods** and implementing them in subclasses.  
   
   This approach eliminates the need for non-serializable fields like 
`keyArraySupplier`, which become `null` after deserialization and can lead to 
`NullPointerException` here at runtime.



##########
fluss-common/src/main/java/org/apache/fluss/utils/ArrowUtils.java:
##########
@@ -434,9 +450,24 @@ private static Field toArrowField(String fieldName, 
DataType logicalType) {
             for (DataField field : rowType.getFields()) {
                 children.add(toArrowField(field.getName(), field.getType()));
             }
+        } else if (logicalType instanceof MapType) {
+            MapType mapType = (MapType) logicalType;
+            DataType keyType = mapType.getKeyType();
+            if (keyType.isNullable()) {
+                throw new IllegalArgumentException(

Review Comment:
   If we enforce non-null keys at this point, I suggest automatically 
converting the key type to non-nullable in the `MapType` constructor. That 
means we treat map keys as non-nullable by default, even if the user specifies 
a nullable type.
   
   Without this conversion, users would be forced to explicitly declare 
non-nullable keys (e.g., `MAP<INT NOT NULL, STRING>`), which is not 
user-friendly. 
   
   For example, the following standard-looking DDL would fail:
   ```sql
   CREATE TABLE xxx (
     f1 MAP<INT, STRING>
   );
   ```
   
   By handling the nullability conversion internally, we align with user 
expectations while maintaining the required constraint that map keys must not 
be null. This conversion is similar to how we automantically convert data types 
of primary key fields to not-null.



##########
fluss-common/src/main/java/org/apache/fluss/row/BinarySegmentUtils.java:
##########
@@ -1002,18 +1002,30 @@ public static BinaryArray readBinaryArray(
         return reusedArray;
     }
 
+    public static BinaryMap readBinaryMap(
+            MemorySegment[] segments, int baseOffset, long offsetAndSize) {
+        final int size = ((int) offsetAndSize);
+        int offset = (int) (offsetAndSize >> 32);
+        BinaryMap map = new BinaryMap();
+        map.pointTo(segments, offset + baseOffset, size);
+        return map;
+    }
+
     /** Read map data from segments. */
     public static InternalMap readMap(MemorySegment[] segments, int offset, 
int numBytes) {
-        // TODO: Map type support will be added in Issue #1973
-        throw new UnsupportedOperationException(
-                "Map type is not supported yet. Will be added in Issue 
#1973.");
+        return readMapData(segments, offset, numBytes);
     }
 
     /** Read map data from segments with long offset. */
     public static InternalMap readMap(MemorySegment[] segments, int offset, 
long numBytes) {
-        // TODO: Map type support will be added in Issue #1973
-        throw new UnsupportedOperationException(
-                "Map type is not supported yet. Will be added in Issue 
#1973.");
+        return readMapData(segments, offset, (int) numBytes);
+    }
+
+    /** Read map data from segments. */
+    public static BinaryMap readMapData(MemorySegment[] segments, int offset, 
int numBytes) {
+        BinaryMap map = new BinaryMap();
+        map.pointTo(segments, offset, numBytes);
+        return map;

Review Comment:
   unused, remove



##########
fluss-common/src/test/java/org/apache/fluss/row/columnar/ColumnarMapTest.java:
##########
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.row.columnar;
+
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.GenericArray;
+import org.apache.fluss.row.InternalArray;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link ColumnarMap}. */
+public class ColumnarMapTest {
+
+    @Test
+    public void testConstructorWithInternalArrays() {
+        Object[] keys = {
+            BinaryString.fromString("key1"),
+            BinaryString.fromString("key2"),
+            BinaryString.fromString("key3")
+        };
+        Object[] values = {1, 2, 3};
+
+        InternalArray keyArray = new GenericArray(keys);
+        InternalArray valueArray = new GenericArray(values);
+
+        ColumnarMap map = new ColumnarMap(keyArray, valueArray);
+
+        assertThat(map.size()).isEqualTo(3);
+        assertThat(map.keyArray()).isEqualTo(keyArray);
+        assertThat(map.valueArray()).isEqualTo(valueArray);
+    }
+
+    @Test
+    public void testConstructorWithColumnVectors() {
+        int[] keys = {1, 2, 3, 4, 5};
+        int[] values = {10, 20, 30, 40, 50};
+
+        TestIntColumnVector keyColumnVector = new TestIntColumnVector(keys);
+        TestIntColumnVector valueColumnVector = new 
TestIntColumnVector(values);
+
+        ColumnarMap map = new ColumnarMap(keyColumnVector, valueColumnVector, 
0, 5);
+
+        assertThat(map.size()).isEqualTo(5);
+        assertThat(map.keyArray()).isInstanceOf(ColumnarArray.class);
+        assertThat(map.valueArray()).isInstanceOf(ColumnarArray.class);
+        assertThat(map.keyArray().getInt(0)).isEqualTo(1);
+        assertThat(map.valueArray().getInt(0)).isEqualTo(10);
+    }
+
+    @Test
+    public void testConstructorWithOffsetAndNumElements() {
+        int[] keys = {1, 2, 3, 4, 5};
+        int[] values = {10, 20, 30, 40, 50};
+
+        TestIntColumnVector keyColumnVector = new TestIntColumnVector(keys);
+        TestIntColumnVector valueColumnVector = new 
TestIntColumnVector(values);
+
+        ColumnarMap map = new ColumnarMap(keyColumnVector, valueColumnVector, 
2, 2);
+
+        assertThat(map.size()).isEqualTo(2);
+        assertThat(map.keyArray().getInt(0)).isEqualTo(3);
+        assertThat(map.keyArray().getInt(1)).isEqualTo(4);
+        assertThat(map.valueArray().getInt(0)).isEqualTo(30);
+        assertThat(map.valueArray().getInt(1)).isEqualTo(40);
+    }
+
+    @Test
+    public void testConstructorWithNullKeyArray() {
+        Object[] values = {1, 2, 3};
+        InternalArray valueArray = new GenericArray(values);
+
+        assertThatThrownBy(() -> new ColumnarMap(null, valueArray))
+                .isInstanceOf(NullPointerException.class)
+                .hasMessageContaining("keyArray must not be null");
+    }
+
+    @Test
+    public void testConstructorWithNullValueArray() {
+        Object[] keys = {BinaryString.fromString("key1"), 
BinaryString.fromString("key2")};
+        InternalArray keyArray = new GenericArray(keys);
+
+        assertThatThrownBy(() -> new ColumnarMap(keyArray, null))
+                .isInstanceOf(NullPointerException.class)
+                .hasMessageContaining("valueArray must not be null");
+    }
+
+    @Test
+    public void testConstructorWithMismatchedArraySizes() {
+        Object[] keys = {BinaryString.fromString("key1"), 
BinaryString.fromString("key2")};
+        Object[] values = {1, 2, 3};
+
+        InternalArray keyArray = new GenericArray(keys);
+        InternalArray valueArray = new GenericArray(values);
+
+        assertThatThrownBy(() -> new ColumnarMap(keyArray, valueArray))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("Key array size and value array size 
must be equal");
+    }
+
+    @Test
+    public void testSize() {
+        Object[] keys = {
+            BinaryString.fromString("a"),
+            BinaryString.fromString("b"),
+            BinaryString.fromString("c"),
+            BinaryString.fromString("d")
+        };
+        Object[] values = {1, 2, 3, 4};
+
+        InternalArray keyArray = new GenericArray(keys);
+        InternalArray valueArray = new GenericArray(values);
+
+        ColumnarMap map = new ColumnarMap(keyArray, valueArray);
+
+        assertThat(map.size()).isEqualTo(4);
+    }
+
+    @Test
+    public void testEmptyMap() {
+        Object[] keys = {};
+        Object[] values = {};
+
+        InternalArray keyArray = new GenericArray(keys);
+        InternalArray valueArray = new GenericArray(values);
+
+        ColumnarMap map = new ColumnarMap(keyArray, valueArray);
+
+        assertThat(map.size()).isEqualTo(0);
+        assertThat(map.keyArray().size()).isEqualTo(0);
+        assertThat(map.valueArray().size()).isEqualTo(0);
+    }
+
+    @Test
+    public void testKeyArrayAndValueArray() {
+        Object[] keys = {
+            BinaryString.fromString("name"),
+            BinaryString.fromString("age"),
+            BinaryString.fromString("city")
+        };
+        Object[] values = {BinaryString.fromString("Alice"), 30, 
BinaryString.fromString("NYC")};
+
+        InternalArray keyArray = new GenericArray(keys);
+        InternalArray valueArray = new GenericArray(values);
+
+        ColumnarMap map = new ColumnarMap(keyArray, valueArray);
+
+        assertThat(map.keyArray()).isSameAs(keyArray);
+        assertThat(map.valueArray()).isSameAs(valueArray);
+
+        
assertThat(map.keyArray().getString(0)).isEqualTo(BinaryString.fromString("name"));
+        
assertThat(map.valueArray().getString(0)).isEqualTo(BinaryString.fromString("Alice"));
+        
assertThat(map.keyArray().getString(1)).isEqualTo(BinaryString.fromString("age"));
+        assertThat(map.valueArray().getInt(1)).isEqualTo(30);
+    }
+
+    @Test
+    public void testEquals() {
+        Object[] keys1 = {BinaryString.fromString("k1"), 
BinaryString.fromString("k2")};
+        Object[] values1 = {1, 2};
+        InternalArray keyArray1 = new GenericArray(keys1);
+        InternalArray valueArray1 = new GenericArray(values1);
+        ColumnarMap map1 = new ColumnarMap(keyArray1, valueArray1);
+
+        Object[] keys2 = {BinaryString.fromString("k1"), 
BinaryString.fromString("k2")};
+        Object[] values2 = {1, 2};
+        InternalArray keyArray2 = new GenericArray(keys2);
+        InternalArray valueArray2 = new GenericArray(values2);
+        ColumnarMap map2 = new ColumnarMap(keyArray2, valueArray2);
+
+        assertThat(map1.equals(map1)).isTrue();
+        assertThat(map1.equals(map2)).isTrue();
+        assertThat(map2.equals(map1)).isTrue();
+    }
+
+    @Test
+    public void testNotEquals() {
+        Object[] keys1 = {BinaryString.fromString("k1"), 
BinaryString.fromString("k2")};
+        Object[] values1 = {1, 2};
+        InternalArray keyArray1 = new GenericArray(keys1);
+        InternalArray valueArray1 = new GenericArray(values1);
+        ColumnarMap map1 = new ColumnarMap(keyArray1, valueArray1);
+
+        Object[] keys2 = {BinaryString.fromString("k1"), 
BinaryString.fromString("k2")};
+        Object[] values2 = {1, 3};
+        InternalArray keyArray2 = new GenericArray(keys2);
+        InternalArray valueArray2 = new GenericArray(values2);
+        ColumnarMap map2 = new ColumnarMap(keyArray2, valueArray2);
+
+        assertThat(map1.equals(map2)).isFalse();
+        assertThat(map1.equals(null)).isFalse();
+        assertThat(map1.equals("string")).isFalse();
+    }
+
+    @Test
+    public void testHashCode() {
+        Object[] keys = {BinaryString.fromString("key1"), 
BinaryString.fromString("key2")};
+        Object[] values = {100, 200};
+
+        InternalArray keyArray = new GenericArray(keys);
+        InternalArray valueArray = new GenericArray(values);
+
+        ColumnarMap map1 = new ColumnarMap(keyArray, valueArray);
+        ColumnarMap map2 = new ColumnarMap(keyArray, valueArray);
+
+        assertThat(map1.hashCode()).isEqualTo(map2.hashCode());
+    }
+
+    @Test
+    public void testToString() {
+        Object[] keys = {BinaryString.fromString("k1")};
+        Object[] values = {42};
+
+        InternalArray keyArray = new GenericArray(keys);
+        InternalArray valueArray = new GenericArray(values);
+
+        ColumnarMap map = new ColumnarMap(keyArray, valueArray);
+
+        String result = map.toString();
+        assertThat(result).contains("ColumnarMap");
+        assertThat(result).contains("keyArray");
+        assertThat(result).contains("valueArray");
+    }
+
+    @Test
+    public void testMapWithNullValues() {
+        Object[] keys = {
+            BinaryString.fromString("k1"),
+            BinaryString.fromString("k2"),
+            BinaryString.fromString("k3")
+        };
+        Object[] values = {1, null, 3};
+
+        InternalArray keyArray = new GenericArray(keys);
+        InternalArray valueArray = new GenericArray(values);
+
+        ColumnarMap map = new ColumnarMap(keyArray, valueArray);
+
+        assertThat(map.size()).isEqualTo(3);
+        assertThat(map.valueArray().isNullAt(0)).isFalse();
+        assertThat(map.valueArray().isNullAt(1)).isTrue();
+        assertThat(map.valueArray().isNullAt(2)).isFalse();
+    }
+
+    @Test
+    public void testMapWithDifferentDataTypes() {
+        long[] keys = {1L, 2L, 3L};
+        double[] values = {1.1, 2.2, 3.3};
+
+        TestLongColumnVector keyColumnVector = new TestLongColumnVector(keys);
+        TestDoubleColumnVector valueColumnVector = new 
TestDoubleColumnVector(values);
+
+        ColumnarMap map = new ColumnarMap(keyColumnVector, valueColumnVector, 
0, 3);
+
+        assertThat(map.size()).isEqualTo(3);
+        assertThat(map.keyArray().getLong(0)).isEqualTo(1L);
+        assertThat(map.valueArray().getDouble(0)).isEqualTo(1.1);
+        assertThat(map.keyArray().getLong(2)).isEqualTo(3L);
+        assertThat(map.valueArray().getDouble(2)).isEqualTo(3.3);
+    }
+
+    private static class TestIntColumnVector implements IntColumnVector {
+        private final int[] values;
+        private final boolean[] nulls;
+
+        TestIntColumnVector(int[] values) {
+            this.values = values;
+            this.nulls = new boolean[values.length];

Review Comment:
   as this is a primitive array, all elements should be not-null, rather than 
null. 



##########
fluss-common/src/main/java/org/apache/fluss/row/columnar/VectorizedColumnBatch.java:
##########
@@ -43,6 +43,10 @@ public int getFieldCount() {
         return columns.length;
     }
 
+    public ColumnVector getColumn(int colId) {

Review Comment:
   It's very strange to expose the underlying `ColumnVector` directly. Why not 
introduce a `getMap` on the `VectorizedColumnBatch`, like other complex types 
`getArray`, `getRow`?



##########
fluss-common/src/main/java/org/apache/fluss/row/arrow/vectors/ArrowMapColumnVector.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.row.arrow.vectors;
+
+import org.apache.fluss.row.InternalMap;
+import org.apache.fluss.row.columnar.ColumnVector;
+import org.apache.fluss.row.columnar.ColumnarMap;
+import org.apache.fluss.row.columnar.MapColumnVector;
+import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.FieldVector;
+import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.MapVector;
+import 
org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.StructVector;
+import org.apache.fluss.types.DataType;
+import org.apache.fluss.utils.ArrowUtils;
+
+/** ArrowMapColumnVector is a wrapper class for Arrow MapVector. */
+public class ArrowMapColumnVector implements MapColumnVector {
+    private boolean inited = false;
+    private final FieldVector vector;
+    private final DataType keyType;
+    private final DataType valueType;
+    private ColumnVector keyColumnVector;
+    private ColumnVector valueColumnVector;
+
+    public ArrowMapColumnVector(FieldVector vector, DataType keyType, DataType 
valueType) {
+        this.vector = vector;
+        this.keyType = keyType;
+        this.valueType = valueType;
+    }
+
+    private void init() {
+        if (!inited) {
+            FieldVector mapVector = ((MapVector) vector).getDataVector();
+            StructVector structVector = (StructVector) mapVector;
+            FieldVector keyVector = 
structVector.getChildrenFromFields().get(0);
+            FieldVector valueVector = 
structVector.getChildrenFromFields().get(1);

Review Comment:
   Instead of direct access, you should use  
   ```java
   structVector.getChild(MapVector.KEY_NAME)
   ```  
   and  
   ```java
   structVector.getChild(MapVector.VALUE_NAME)
   ```  
   to obtain the `keyVector` and `valueVector`, respectively.
   
   Additionally, to maintain consistency with `ArrowArrayColumnVector` and 
`ArrowRowColumnVector`, it’s better to move the initialization of 
`keyColumnVector` and `valueColumnVector` **outside** of 
`ArrowMapColumnVector`.  
   
   This also allows us to avoid exposing `ArrowUtils.createArrowColumnVector` 
as `public`, preserving better encapsulation and alignment with existing 
patterns.



##########
fluss-common/src/test/java/org/apache/fluss/row/GenericMapTest.java:
##########
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.row;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link GenericMap}. */
+public class GenericMapTest {
+
+    @Test
+    public void testConstructorAndGet() {
+        Map<String, Integer> javaMap = new HashMap<>();
+        javaMap.put("a", 1);
+        javaMap.put("b", 2);
+        javaMap.put("c", 3);
+
+        GenericMap genericMap = new GenericMap(javaMap);
+
+        assertThat(genericMap.get("a")).isEqualTo(1);
+        assertThat(genericMap.get("b")).isEqualTo(2);
+        assertThat(genericMap.get("c")).isEqualTo(3);
+        assertThat(genericMap.get("d")).isNull();
+    }
+
+    @Test
+    public void testSize() {
+        Map<String, Integer> javaMap = new HashMap<>();
+        javaMap.put("key1", 100);
+        javaMap.put("key2", 200);
+        javaMap.put("key3", 300);
+        javaMap.put("key4", 400);
+
+        GenericMap genericMap = new GenericMap(javaMap);
+
+        assertThat(genericMap.size()).isEqualTo(4);
+    }
+
+    @Test
+    public void testEmptyMap() {
+        Map<String, Integer> javaMap = new HashMap<>();
+        GenericMap genericMap = new GenericMap(javaMap);
+
+        assertThat(genericMap.size()).isEqualTo(0);
+        assertThat(genericMap.keyArray().size()).isEqualTo(0);
+        assertThat(genericMap.valueArray().size()).isEqualTo(0);
+    }
+
+    @Test
+    public void testKeyArrayAndValueArray() {
+        Map<String, Integer> javaMap = new LinkedHashMap<>();
+        javaMap.put("first", 1);
+        javaMap.put("second", 2);
+        javaMap.put("third", 3);
+
+        GenericMap genericMap = new GenericMap(javaMap);
+
+        InternalArray keyArray = genericMap.keyArray();
+        InternalArray valueArray = genericMap.valueArray();
+
+        assertThat(keyArray.size()).isEqualTo(3);
+        assertThat(valueArray.size()).isEqualTo(3);
+
+        assertThat(keyArray.isNullAt(0)).isFalse();
+        assertThat(valueArray.isNullAt(0)).isFalse();
+    }
+
+    @Test
+    public void testKeyArrayCaching() {
+        Map<String, Integer> javaMap = new HashMap<>();
+        javaMap.put("key", 42);
+
+        GenericMap genericMap = new GenericMap(javaMap);
+
+        InternalArray keyArray1 = genericMap.keyArray();
+        InternalArray keyArray2 = genericMap.keyArray();
+
+        assertThat(keyArray1).isSameAs(keyArray2);
+    }
+
+    @Test
+    public void testValueArrayCaching() {
+        Map<String, Integer> javaMap = new HashMap<>();
+        javaMap.put("key", 42);
+
+        GenericMap genericMap = new GenericMap(javaMap);
+
+        InternalArray valueArray1 = genericMap.valueArray();
+        InternalArray valueArray2 = genericMap.valueArray();
+
+        assertThat(valueArray1).isSameAs(valueArray2);
+    }
+
+    @Test
+    public void testMapWithNullValues() {
+        Map<String, Integer> javaMap = new HashMap<>();
+        javaMap.put("key1", 1);
+        javaMap.put("key2", null);
+        javaMap.put("key3", 3);
+
+        GenericMap genericMap = new GenericMap(javaMap);
+
+        assertThat(genericMap.size()).isEqualTo(3);
+        assertThat(genericMap.get("key1")).isEqualTo(1);
+        assertThat(genericMap.get("key2")).isNull();
+        assertThat(genericMap.get("key3")).isEqualTo(3);
+    }
+
+    @Test
+    public void testEquals() {

Review Comment:
   remove all the tests about `equals`, `toString` and `hashcode`. Tests don't 
need to cover these logics. If we want to improve coverage, they should has 
already been invoked in other tests and thus has test coverage. 



##########
fluss-common/src/test/java/org/apache/fluss/row/map/IndexedMapTest.java:
##########
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.fluss.row.map;
+
+import org.apache.fluss.row.BinaryArray;
+import org.apache.fluss.row.BinaryMap;
+import org.apache.fluss.row.GenericArray;
+import org.apache.fluss.row.GenericMap;
+import org.apache.fluss.row.serializer.ArraySerializer;
+import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.DataTypes;
+import org.apache.fluss.types.MapType;
+
+import org.junit.jupiter.api.Test;
+
+import static org.apache.fluss.row.BinaryRow.BinaryRowFormat.INDEXED;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link IndexedMap}. */
+public class IndexedMapTest {

Review Comment:
   Could you please remove `IndexedMapTest` and `CompactedMapTest`?  
   
   These tests provide little to no real value, they don’t verify any behavior 
or assert expected values; they only check that the constructed map is not 
`null`. This doesn’t meaningfully validate correctness or prevent regressions.
   
   If the goal is to improve test coverage, those code paths are already 
covered by other, more substantive tests. And if certain methods truly remain 
uncovered, we should add **focused, purposeful tests** for those specific 
cases, rather than relying on AI-generated boilerplate that inflates the test 
suite without adding confidence in the code.



##########
fluss-common/src/test/java/org/apache/fluss/row/map/IndexedMapTest.java:
##########
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.fluss.row.map;
+
+import org.apache.fluss.row.BinaryArray;
+import org.apache.fluss.row.BinaryMap;
+import org.apache.fluss.row.GenericArray;
+import org.apache.fluss.row.GenericMap;
+import org.apache.fluss.row.serializer.ArraySerializer;
+import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.DataTypes;
+import org.apache.fluss.types.MapType;
+
+import org.junit.jupiter.api.Test;
+
+import static org.apache.fluss.row.BinaryRow.BinaryRowFormat.INDEXED;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link IndexedMap}. */
+public class IndexedMapTest {
+
+    @Test
+    public void testConstructorWithSimpleTypes() {
+        DataType keyType = DataTypes.INT();
+        DataType valueType = DataTypes.STRING();
+
+        IndexedMap map = new IndexedMap(keyType, valueType);
+
+        assertThat(map).isNotNull();
+        assertThat(map).isInstanceOf(BinaryMap.class);
+    }
+
+    @Test
+    public void testConstructorWithMapValueType() {
+        DataType keyType = DataTypes.STRING();
+        DataType innerKeyType = DataTypes.INT();
+        DataType innerValueType = DataTypes.STRING();
+        MapType valueType = DataTypes.MAP(innerKeyType, innerValueType);
+
+        IndexedMap map = new IndexedMap(keyType, valueType);
+
+        assertThat(map).isNotNull();
+    }
+
+    @Test
+    public void testMapInstanceCreation() {
+        DataType keyType = DataTypes.STRING();
+        DataType innerKeyType = DataTypes.INT();
+        DataType innerValueType = DataTypes.STRING();
+        MapType valueType = DataTypes.MAP(innerKeyType, innerValueType);
+
+        IndexedMap map = new IndexedMap(keyType, valueType);
+
+        assertThat(map).isNotNull();
+        assertThat(map).isInstanceOf(BinaryMap.class);
+    }
+
+    @Test
+    public void testWithIntKeyAndStringValue() {
+        DataType keyType = DataTypes.INT();
+        DataType valueType = DataTypes.STRING();
+
+        IndexedMap map = new IndexedMap(keyType, valueType);
+
+        GenericArray keyArray = new GenericArray(new Object[] {1, 2, 3});
+        GenericArray valueArray =
+                new GenericArray(
+                        new Object[] {
+                            org.apache.fluss.row.BinaryString.fromString("a"),
+                            org.apache.fluss.row.BinaryString.fromString("b"),
+                            org.apache.fluss.row.BinaryString.fromString("c")

Review Comment:
   replace qualified names with static imporsts



##########
fluss-common/src/test/java/org/apache/fluss/row/indexed/IndexedRowReaderTest.java:
##########
@@ -120,8 +120,9 @@ private void assertAllTypeEqualsForReader(IndexedRowReader 
reader) {
                                 GenericArray.of(fromString("a"), null, 
fromString("c")),
                                 null,
                                 GenericArray.of(fromString("hello"), 
fromString("world"))));
+        assertThat(reader.readMap().size()).isEqualTo(3);

Review Comment:
   assert the map values, rather than just assert size



##########
fluss-common/src/test/java/org/apache/fluss/row/columnar/ColumnarMapTest.java:
##########
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.row.columnar;
+
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.GenericArray;
+import org.apache.fluss.row.InternalArray;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link ColumnarMap}. */
+public class ColumnarMapTest {
+
+    @Test
+    public void testConstructorWithInternalArrays() {
+        Object[] keys = {
+            BinaryString.fromString("key1"),
+            BinaryString.fromString("key2"),
+            BinaryString.fromString("key3")
+        };
+        Object[] values = {1, 2, 3};
+
+        InternalArray keyArray = new GenericArray(keys);
+        InternalArray valueArray = new GenericArray(values);
+
+        ColumnarMap map = new ColumnarMap(keyArray, valueArray);
+
+        assertThat(map.size()).isEqualTo(3);
+        assertThat(map.keyArray()).isEqualTo(keyArray);
+        assertThat(map.valueArray()).isEqualTo(valueArray);
+    }
+
+    @Test
+    public void testConstructorWithColumnVectors() {
+        int[] keys = {1, 2, 3, 4, 5};
+        int[] values = {10, 20, 30, 40, 50};
+
+        TestIntColumnVector keyColumnVector = new TestIntColumnVector(keys);
+        TestIntColumnVector valueColumnVector = new 
TestIntColumnVector(values);
+
+        ColumnarMap map = new ColumnarMap(keyColumnVector, valueColumnVector, 
0, 5);
+
+        assertThat(map.size()).isEqualTo(5);
+        assertThat(map.keyArray()).isInstanceOf(ColumnarArray.class);
+        assertThat(map.valueArray()).isInstanceOf(ColumnarArray.class);
+        assertThat(map.keyArray().getInt(0)).isEqualTo(1);
+        assertThat(map.valueArray().getInt(0)).isEqualTo(10);
+    }
+
+    @Test
+    public void testConstructorWithOffsetAndNumElements() {
+        int[] keys = {1, 2, 3, 4, 5};
+        int[] values = {10, 20, 30, 40, 50};
+
+        TestIntColumnVector keyColumnVector = new TestIntColumnVector(keys);
+        TestIntColumnVector valueColumnVector = new 
TestIntColumnVector(values);
+
+        ColumnarMap map = new ColumnarMap(keyColumnVector, valueColumnVector, 
2, 2);
+
+        assertThat(map.size()).isEqualTo(2);
+        assertThat(map.keyArray().getInt(0)).isEqualTo(3);
+        assertThat(map.keyArray().getInt(1)).isEqualTo(4);
+        assertThat(map.valueArray().getInt(0)).isEqualTo(30);
+        assertThat(map.valueArray().getInt(1)).isEqualTo(40);
+    }
+
+    @Test
+    public void testConstructorWithNullKeyArray() {
+        Object[] values = {1, 2, 3};
+        InternalArray valueArray = new GenericArray(values);
+
+        assertThatThrownBy(() -> new ColumnarMap(null, valueArray))
+                .isInstanceOf(NullPointerException.class)
+                .hasMessageContaining("keyArray must not be null");
+    }
+
+    @Test
+    public void testConstructorWithNullValueArray() {
+        Object[] keys = {BinaryString.fromString("key1"), 
BinaryString.fromString("key2")};
+        InternalArray keyArray = new GenericArray(keys);
+
+        assertThatThrownBy(() -> new ColumnarMap(keyArray, null))
+                .isInstanceOf(NullPointerException.class)
+                .hasMessageContaining("valueArray must not be null");
+    }
+
+    @Test
+    public void testConstructorWithMismatchedArraySizes() {
+        Object[] keys = {BinaryString.fromString("key1"), 
BinaryString.fromString("key2")};
+        Object[] values = {1, 2, 3};
+
+        InternalArray keyArray = new GenericArray(keys);
+        InternalArray valueArray = new GenericArray(values);
+
+        assertThatThrownBy(() -> new ColumnarMap(keyArray, valueArray))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("Key array size and value array size 
must be equal");
+    }
+
+    @Test
+    public void testSize() {
+        Object[] keys = {
+            BinaryString.fromString("a"),
+            BinaryString.fromString("b"),
+            BinaryString.fromString("c"),
+            BinaryString.fromString("d")
+        };
+        Object[] values = {1, 2, 3, 4};
+
+        InternalArray keyArray = new GenericArray(keys);
+        InternalArray valueArray = new GenericArray(values);
+
+        ColumnarMap map = new ColumnarMap(keyArray, valueArray);
+
+        assertThat(map.size()).isEqualTo(4);
+    }
+
+    @Test
+    public void testEmptyMap() {
+        Object[] keys = {};
+        Object[] values = {};
+
+        InternalArray keyArray = new GenericArray(keys);
+        InternalArray valueArray = new GenericArray(values);
+
+        ColumnarMap map = new ColumnarMap(keyArray, valueArray);
+
+        assertThat(map.size()).isEqualTo(0);
+        assertThat(map.keyArray().size()).isEqualTo(0);
+        assertThat(map.valueArray().size()).isEqualTo(0);
+    }
+
+    @Test
+    public void testKeyArrayAndValueArray() {
+        Object[] keys = {
+            BinaryString.fromString("name"),
+            BinaryString.fromString("age"),
+            BinaryString.fromString("city")
+        };
+        Object[] values = {BinaryString.fromString("Alice"), 30, 
BinaryString.fromString("NYC")};
+
+        InternalArray keyArray = new GenericArray(keys);
+        InternalArray valueArray = new GenericArray(values);
+
+        ColumnarMap map = new ColumnarMap(keyArray, valueArray);
+
+        assertThat(map.keyArray()).isSameAs(keyArray);
+        assertThat(map.valueArray()).isSameAs(valueArray);
+
+        
assertThat(map.keyArray().getString(0)).isEqualTo(BinaryString.fromString("name"));
+        
assertThat(map.valueArray().getString(0)).isEqualTo(BinaryString.fromString("Alice"));
+        
assertThat(map.keyArray().getString(1)).isEqualTo(BinaryString.fromString("age"));
+        assertThat(map.valueArray().getInt(1)).isEqualTo(30);
+    }
+
+    @Test
+    public void testEquals() {
+        Object[] keys1 = {BinaryString.fromString("k1"), 
BinaryString.fromString("k2")};
+        Object[] values1 = {1, 2};
+        InternalArray keyArray1 = new GenericArray(keys1);
+        InternalArray valueArray1 = new GenericArray(values1);
+        ColumnarMap map1 = new ColumnarMap(keyArray1, valueArray1);
+
+        Object[] keys2 = {BinaryString.fromString("k1"), 
BinaryString.fromString("k2")};
+        Object[] values2 = {1, 2};
+        InternalArray keyArray2 = new GenericArray(keys2);
+        InternalArray valueArray2 = new GenericArray(values2);
+        ColumnarMap map2 = new ColumnarMap(keyArray2, valueArray2);
+
+        assertThat(map1.equals(map1)).isTrue();
+        assertThat(map1.equals(map2)).isTrue();
+        assertThat(map2.equals(map1)).isTrue();
+    }
+
+    @Test
+    public void testNotEquals() {

Review Comment:
   remove all the tests about `equals`, `toString` and `hashcode`. Tests don't 
need to cover these logics. If we want to improve coverage, they should has 
already been invoked in other tests and thus has test coverage. 



##########
fluss-common/src/test/java/org/apache/fluss/row/columnar/ColumnarMapTest.java:
##########
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.row.columnar;
+
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.GenericArray;
+import org.apache.fluss.row.InternalArray;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link ColumnarMap}. */
+public class ColumnarMapTest {
+
+    @Test
+    public void testConstructorWithInternalArrays() {
+        Object[] keys = {
+            BinaryString.fromString("key1"),
+            BinaryString.fromString("key2"),
+            BinaryString.fromString("key3")
+        };
+        Object[] values = {1, 2, 3};
+
+        InternalArray keyArray = new GenericArray(keys);
+        InternalArray valueArray = new GenericArray(values);
+
+        ColumnarMap map = new ColumnarMap(keyArray, valueArray);
+
+        assertThat(map.size()).isEqualTo(3);
+        assertThat(map.keyArray()).isEqualTo(keyArray);
+        assertThat(map.valueArray()).isEqualTo(valueArray);
+    }
+
+    @Test
+    public void testConstructorWithColumnVectors() {
+        int[] keys = {1, 2, 3, 4, 5};
+        int[] values = {10, 20, 30, 40, 50};
+
+        TestIntColumnVector keyColumnVector = new TestIntColumnVector(keys);
+        TestIntColumnVector valueColumnVector = new 
TestIntColumnVector(values);
+
+        ColumnarMap map = new ColumnarMap(keyColumnVector, valueColumnVector, 
0, 5);
+
+        assertThat(map.size()).isEqualTo(5);
+        assertThat(map.keyArray()).isInstanceOf(ColumnarArray.class);
+        assertThat(map.valueArray()).isInstanceOf(ColumnarArray.class);
+        assertThat(map.keyArray().getInt(0)).isEqualTo(1);
+        assertThat(map.valueArray().getInt(0)).isEqualTo(10);
+    }
+
+    @Test
+    public void testConstructorWithOffsetAndNumElements() {
+        int[] keys = {1, 2, 3, 4, 5};
+        int[] values = {10, 20, 30, 40, 50};
+
+        TestIntColumnVector keyColumnVector = new TestIntColumnVector(keys);
+        TestIntColumnVector valueColumnVector = new 
TestIntColumnVector(values);
+
+        ColumnarMap map = new ColumnarMap(keyColumnVector, valueColumnVector, 
2, 2);
+
+        assertThat(map.size()).isEqualTo(2);
+        assertThat(map.keyArray().getInt(0)).isEqualTo(3);
+        assertThat(map.keyArray().getInt(1)).isEqualTo(4);
+        assertThat(map.valueArray().getInt(0)).isEqualTo(30);
+        assertThat(map.valueArray().getInt(1)).isEqualTo(40);
+    }
+
+    @Test
+    public void testConstructorWithNullKeyArray() {
+        Object[] values = {1, 2, 3};
+        InternalArray valueArray = new GenericArray(values);
+
+        assertThatThrownBy(() -> new ColumnarMap(null, valueArray))
+                .isInstanceOf(NullPointerException.class)
+                .hasMessageContaining("keyArray must not be null");
+    }
+
+    @Test
+    public void testConstructorWithNullValueArray() {
+        Object[] keys = {BinaryString.fromString("key1"), 
BinaryString.fromString("key2")};
+        InternalArray keyArray = new GenericArray(keys);
+
+        assertThatThrownBy(() -> new ColumnarMap(keyArray, null))
+                .isInstanceOf(NullPointerException.class)
+                .hasMessageContaining("valueArray must not be null");
+    }
+
+    @Test
+    public void testConstructorWithMismatchedArraySizes() {
+        Object[] keys = {BinaryString.fromString("key1"), 
BinaryString.fromString("key2")};
+        Object[] values = {1, 2, 3};
+
+        InternalArray keyArray = new GenericArray(keys);
+        InternalArray valueArray = new GenericArray(values);
+
+        assertThatThrownBy(() -> new ColumnarMap(keyArray, valueArray))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("Key array size and value array size 
must be equal");
+    }
+
+    @Test
+    public void testSize() {

Review Comment:
   remove! as this has been tested in other test methods.



##########
fluss-common/src/test/java/org/apache/fluss/row/BinaryWriterTest.java:
##########
@@ -124,13 +123,27 @@ public void testValueSetterWithDecimalType() {
     }
 
     @Test
-    public void testCreateValueSetterForMapThrowsException() {
-        assertThatThrownBy(
-                        () ->
-                                createPrimitiveValueWriter(
-                                        DataTypes.MAP(DataTypes.INT(), 
DataTypes.STRING())))
-                .isInstanceOf(UnsupportedOperationException.class)
-                .hasMessageContaining("Map type is not supported yet");
+    public void testValueSetterWithMapType() {
+        BinaryArray array = new 
org.apache.fluss.row.array.PrimitiveBinaryArray();
+        BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8);
+
+        BinaryWriter.ValueWriter setter =
+                BinaryWriter.createValueWriter(
+                        DataTypes.MAP(DataTypes.INT().copy(false), 
DataTypes.STRING()),
+                        BinaryRow.BinaryRowFormat.ALIGNED);

Review Comment:
   Use `COMPACTED` format in tests by default as it is more widely used than 
`ALIGNED` and `INDEXED`. 



##########
fluss-common/src/test/java/org/apache/fluss/row/serializer/MapSerializerTest.java:
##########
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.row.serializer;
+
+import org.apache.fluss.row.BinaryMap;
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.GenericArray;
+import org.apache.fluss.row.GenericMap;
+import org.apache.fluss.row.InternalArray;
+import org.apache.fluss.row.InternalMap;
+import org.apache.fluss.row.map.AlignedMap;
+import org.apache.fluss.row.map.CompactedMap;
+import org.apache.fluss.row.map.IndexedMap;
+import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.DataTypes;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static org.apache.fluss.row.BinaryRow.BinaryRowFormat.ALIGNED;
+import static org.apache.fluss.row.BinaryRow.BinaryRowFormat.COMPACTED;
+import static org.apache.fluss.row.BinaryRow.BinaryRowFormat.INDEXED;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link MapSerializer}. */
+public class MapSerializerTest {

Review Comment:
   remove this as well! See the comments on `IndexedMapTest`.



##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlussRowToFlinkRowConverterTest.java:
##########
@@ -125,12 +125,16 @@ void testConverter() throws Exception {
             assertThat(stringArray2)
                     .isEqualTo(new BinaryString[] {fromString("hello"), 
fromString("world")});
 
+            // map
+            assertThat(flinkRow.getMap(22)).isNotNull();
+            assertThat(flinkRow.getMap(22).size()).isEqualTo(3);

Review Comment:
   assert map values.



##########
fluss-common/src/test/java/org/apache/fluss/row/BinaryWriterTest.java:
##########
@@ -124,13 +123,27 @@ public void testValueSetterWithDecimalType() {
     }
 
     @Test
-    public void testCreateValueSetterForMapThrowsException() {
-        assertThatThrownBy(
-                        () ->
-                                createPrimitiveValueWriter(
-                                        DataTypes.MAP(DataTypes.INT(), 
DataTypes.STRING())))
-                .isInstanceOf(UnsupportedOperationException.class)
-                .hasMessageContaining("Map type is not supported yet");
+    public void testValueSetterWithMapType() {
+        BinaryArray array = new 
org.apache.fluss.row.array.PrimitiveBinaryArray();
+        BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8);
+
+        BinaryWriter.ValueWriter setter =
+                BinaryWriter.createValueWriter(
+                        DataTypes.MAP(DataTypes.INT().copy(false), 
DataTypes.STRING()),
+                        BinaryRow.BinaryRowFormat.ALIGNED);
+
+        GenericMap map1 =
+                GenericMap.of(1, BinaryString.fromString("one"), 2, 
BinaryString.fromString("two"));
+        GenericMap map2 =
+                GenericMap.of(
+                        3, BinaryString.fromString("three"), 4, 
BinaryString.fromString("four"));
+
+        setter.writeValue(writer, 0, map1);
+        setter.writeValue(writer, 1, map2);
+        writer.complete();
+
+        assertThat(array.getMap(0).size()).isEqualTo(2);
+        assertThat(array.getMap(1).size()).isEqualTo(2);

Review Comment:
   please assert keys and values, you can assert them by comparing arrays. 



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlussRowToFlinkRowConverter.java:
##########
@@ -162,8 +165,30 @@ static FlussDeserializationConverter 
createInternalConverter(DataType flussDataT
                     return new GenericArrayData(flinkArray);
                 };
             case MAP:
-                // TODO: Add Map type support in future
-                throw new UnsupportedOperationException("Map type not 
supported yet");
+                MapType mapType = (MapType) flussDataType;
+                InternalArray.ElementGetter keyGetter =
+                        
InternalArray.createElementGetter(mapType.getKeyType());
+                InternalArray.ElementGetter valueGetter =
+                        
InternalArray.createElementGetter(mapType.getValueType());
+                FlussDeserializationConverter keyConverter =
+                        createNullableInternalConverter(mapType.getKeyType());
+                FlussDeserializationConverter valueConverter =
+                        
createNullableInternalConverter(mapType.getValueType());
+                return (flussField) -> {
+                    InternalMap flussMap = (InternalMap) flussField;
+                    InternalArray keyArray = flussMap.keyArray();
+                    InternalArray valueArray = flussMap.valueArray();
+                    int size = flussMap.size();
+                    java.util.Map<Object, Object> javaMap = new 
java.util.HashMap<>();

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to