This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-mosaic.git


The following commit(s) were added to refs/heads/main by this push:
     new bf823e1  support MAP type with flattened columnar storage (#53)
bf823e1 is described below

commit bf823e1dcf5b7e1aff0333803e52b1843f629836
Author: Jingsong Lee <[email protected]>
AuthorDate: Sun Jun 7 09:53:26 2026 +0800

    support MAP type with flattened columnar storage (#53)
    
    MAP<K, V> columns are decomposed into three physical columns within
    the same bucket: lengths (INT32), keys (type K), and values (type V).
    All three independently benefit from DICT/CONST/PLAIN encoding.
    
    Recursive nesting is fully supported:
    - ARRAY<MAP<K,V>>, MAP<K, ARRAY<V>>, MAP<K, MAP<K2,V2>>
    - Arbitrary depth like MAP<STRING, MAP<STRING, ARRAY<MAP<STRING, STRING>>>>
    
    Schema metadata roundtrip preserves all MAP field names (entries, keys,
    values). Sorted MAP is rejected at validation (not supported).
    
    The format is fully backward compatible — no version bump needed.
    MAP uses type byte 19. Older readers reject at schema validation.
---
 core/src/bucket_reader.rs                          | 205 ++++++++-
 core/src/bucket_writer.rs                          | 312 +++++++++++---
 core/src/types.rs                                  |  85 +++-
 core/tests/array_type_test.rs                      | 456 +++++++++++++++++++++
 core/tests/gen_fixtures.rs                         |  88 ++++
 core/tests/testdata/v1_with_map.mosaic             | Bin 0 -> 133 bytes
 cpp/test_mosaic.cpp                                |  57 ++-
 docs/cpp-api.html                                  |  10 +-
 docs/design.html                                   |  42 +-
 docs/java-api.html                                 |  12 +
 docs/python-api.html                               |   6 +-
 .../apache/paimon/mosaic/MosaicRoundtripTest.java  |  71 ++++
 python/tests/test_mosaic.py                        |  56 +++
 13 files changed, 1305 insertions(+), 95 deletions(-)

diff --git a/core/src/bucket_reader.rs b/core/src/bucket_reader.rs
index f81165e..e45b9fa 100644
--- a/core/src/bucket_reader.rs
+++ b/core/src/bucket_reader.rs
@@ -360,36 +360,199 @@ fn reassemble_list_columns(
     _num_primary: usize,
     num_rows: usize,
 ) {
-    for child in children.iter().rev() {
-        let phys_idx = child.physical_index;
+    let mut processed = vec![false; children.len()];
+
+    // Process innermost children first (highest physical index → lowest)
+    for idx in (0..children.len()).rev() {
+        if processed[idx] {
+            continue;
+        }
+        let child = &children[idx];
         let parent = child.parent_logical_col;
-        let values = arrays[phys_idx].clone();
-
-        // Find the lengths column for this child:
-        // - If the previous physical column (phys_idx - 1) is a child with 
the same parent,
-        //   this is a nested child and lengths are at phys_idx - 1.
-        // - Otherwise, this is a first-level child and lengths are at the 
parent primary column.
-        let is_nested = children
-            .iter()
-            .any(|c| c.physical_index == phys_idx - 1 && c.parent_logical_col 
== parent);
-        let lengths_idx = if is_nested { phys_idx - 1 } else { parent };
-
-        let lengths = arrays[lengths_idx].clone();
-        let lengths_rows = lengths.len();
-
-        let element_field = child.element_field.clone();
-        let reassembled = reassemble_list_array(lengths, values, 
element_field, lengths_rows);
-        arrays[lengths_idx] = reassembled;
+        let phys_idx = child.physical_index;
+        let elem_dt = child.element_field.data_type();
+
+        // What type is above us (the container that produced this child)?
+        // Look at the element_field of the child that stores lengths for us,
+        // or the logical type at the parent position.
+        // For a first-level child, the lengths are at `parent` (primary 
column).
+        // For a nested child (e.g., inner values of ARRAY<ARRAY<INT>>),
+        // the lengths are at the previous sibling that was already 
reassembled.
+        //
+        // Key insight: the container type that produced this child determines
+        // whether it's an ARRAY child (1 child) or MAP child (2 children).
+        // We detect MAP by checking if the NEXT child (idx+1) has the same
+        // "lengths column" — meaning they're siblings of the same MAP.
+
+        // Determine if this child is part of a MAP pair.
+        // MAP produces two consecutive children: keys then values.
+        // Nested ARRAY produces children where the first child's element_field
+        // has a complex data_type (List/Map), indicating a deeper level.
+        //
+        // A "MAP sibling pair" is: two consecutive children with same parent,
+        // where the first child's element_field is NOT itself a complex type
+        // (complex element_field means it's a nested ARRAY/MAP intermediate).
+
+        let prev_is_map_key = idx > 0
+            && children[idx - 1].parent_logical_col == parent
+            && !processed[idx - 1]
+            && !matches!(
+                children[idx - 1].element_field.data_type(),
+                DataType::List(_) | DataType::Map(_, _)
+            );
+
+        if prev_is_map_key {
+            // This child is the VALUES of a MAP (the keys are at idx-1)
+            continue;
+        }
+
+        let next_is_map_value = idx + 1 < children.len()
+            && children[idx + 1].parent_logical_col == parent
+            && !processed[idx + 1]
+            && !matches!(
+                child.element_field.data_type(),
+                DataType::List(_) | DataType::Map(_, _)
+            );
+
+        if next_is_map_value {
+            // MAP: this child is KEYS, next child is VALUES
+            let val_child = &children[idx + 1];
+            let keys = arrays[phys_idx].clone();
+            let values = arrays[val_child.physical_index].clone();
+
+            // Find lengths column: a child with complex element_field at 
phys_idx-1
+            // stores the MAP lengths (it was produced by expand_element for a 
Map element).
+            let prev_child_is_lengths = children.iter().any(|c| {
+                c.physical_index == phys_idx - 1
+                    && c.parent_logical_col == parent
+                    && matches!(
+                        c.element_field.data_type(),
+                        DataType::List(_) | DataType::Map(_, _)
+                    )
+            });
+            let lengths_idx = if prev_child_is_lengths {
+                phys_idx - 1
+            } else {
+                parent
+            };
+
+            let lengths = arrays[lengths_idx].clone();
+            let lengths_rows = lengths.len();
+
+            // Reconstruct the MAP type from the element_field
+            // The child at this position has element_field = key Field
+            // We need the full MAP entries_field. Get it from the container.
+            let container_dt = if lengths_idx < logical_types.len() {
+                &logical_types[lengths_idx]
+            } else {
+                // Nested: the reassembled array at lengths_idx should already 
have
+                // the right type, but we need the MAP descriptor.
+                // Use the child's element_field to reconstruct.
+                elem_dt
+            };
+
+            if let DataType::Map(entries_field, sorted) = container_dt {
+                arrays[lengths_idx] = reassemble_map_array(
+                    lengths,
+                    keys,
+                    values,
+                    entries_field,
+                    *sorted,
+                    lengths_rows,
+                );
+            } else {
+                // Reconstruct MAP from key/value fields
+                let key_field = child.element_field.clone();
+                let val_field = val_child.element_field.clone();
+                let entries_field = Arc::new(Field::new(
+                    "entries",
+                    DataType::Struct(arrow_schema::Fields::from(vec![
+                        key_field.as_ref().clone(),
+                        val_field.as_ref().clone(),
+                    ])),
+                    false,
+                ));
+                arrays[lengths_idx] = reassemble_map_array(
+                    lengths,
+                    keys,
+                    values,
+                    &entries_field,
+                    false,
+                    lengths_rows,
+                );
+            }
+            processed[idx] = true;
+            processed[idx + 1] = true;
+        } else {
+            // ARRAY: single child for values
+            let values = arrays[phys_idx].clone();
+            let is_nested = children
+                .iter()
+                .any(|c| c.physical_index == phys_idx - 1 && 
c.parent_logical_col == parent);
+            let lengths_idx = if is_nested { phys_idx - 1 } else { parent };
+
+            let lengths = arrays[lengths_idx].clone();
+            let lengths_rows = lengths.len();
+            let element_field = child.element_field.clone();
+            arrays[lengths_idx] =
+                reassemble_list_array(lengths, values, element_field, 
lengths_rows);
+            processed[idx] = true;
+        }
     }
 
-    // Handle ALL_NULL list columns (where no children were created because 
all null)
+    // Handle ALL_NULL list/map columns
     for (i, lt) in logical_types.iter().enumerate() {
-        if matches!(lt, DataType::List(_)) && !children.iter().any(|c| 
c.parent_logical_col == i) {
+        if (matches!(lt, DataType::List(_)) || matches!(lt, DataType::Map(_, 
_)))
+            && !children.iter().any(|c| c.parent_logical_col == i)
+        {
             arrays[i] = arrow_array::new_null_array(lt, num_rows);
         }
     }
 }
 
+fn reassemble_map_array(
+    lengths: ArrayRef,
+    keys: ArrayRef,
+    values: ArrayRef,
+    entries_field: &Arc<Field>,
+    sorted: bool,
+    num_rows: usize,
+) -> ArrayRef {
+    let lengths_arr = lengths
+        .as_any()
+        .downcast_ref::<Int32Array>()
+        .expect("map lengths must be Int32Array");
+
+    let mut offsets: Vec<i32> = Vec::with_capacity(num_rows + 1);
+    offsets.push(0);
+    for i in 0..num_rows {
+        let len = if lengths_arr.is_null(i) {
+            0
+        } else {
+            lengths_arr.value(i)
+        };
+        offsets.push(offsets.last().unwrap() + len);
+    }
+
+    let null_buf = lengths_arr.nulls().cloned();
+    let entries = StructArray::new(
+        match entries_field.data_type() {
+            DataType::Struct(fields) => fields.clone(),
+            _ => unreachable!(),
+        },
+        vec![keys, values],
+        None,
+    );
+
+    Arc::new(MapArray::new(
+        entries_field.clone(),
+        OffsetBuffer::new(ScalarBuffer::from(offsets)),
+        entries,
+        null_buf,
+        sorted,
+    ))
+}
+
 fn reassemble_list_array(
     lengths: ArrayRef,
     values: ArrayRef,
diff --git a/core/src/bucket_writer.rs b/core/src/bucket_writer.rs
index 4bfd606..a826bb6 100644
--- a/core/src/bucket_writer.rs
+++ b/core/src/bucket_writer.rs
@@ -153,43 +153,81 @@ impl BucketWriter {
         let start_row = self.num_rows;
         let mut total_size = 0;
 
-        // Split List arrays into lengths + values, collect child writes
-        let mut list_splits: Vec<(usize, Int32Array, ArrayRef)> = Vec::new();
+        // Split List/Map arrays into lengths + child values
+        struct ColSplit {
+            col: usize,
+            lengths: Int32Array,
+            child_arrays: Vec<ArrayRef>, // 1 for List, 2 for Map (keys + 
values)
+        }
+        let mut splits: Vec<ColSplit> = Vec::new();
+        let mut seen_cols = Vec::new();
         for child in &self.children {
             let col = child.parent_logical_col;
-            if list_splits.iter().any(|(c, _, _)| *c == col) {
-                continue; // already split this column (nested lists share 
parent)
+            if seen_cols.contains(&col) {
+                continue;
+            }
+            seen_cols.push(col);
+
+            match data_types[col] {
+                DataType::List(_) => {
+                    let list_array = arrays[col]
+                        .as_any()
+                        .downcast_ref::<ListArray>()
+                        .ok_or_else(|| {
+                            io::Error::new(io::ErrorKind::InvalidInput, 
"expected ListArray")
+                        })?;
+                    let lengths = extract_list_lengths(list_array);
+                    let values = flatten_list_values(list_array);
+                    splits.push(ColSplit {
+                        col,
+                        lengths,
+                        child_arrays: vec![values],
+                    });
+                }
+                DataType::Map(_, _) => {
+                    let map_array =
+                        arrays[col]
+                            .as_any()
+                            .downcast_ref::<MapArray>()
+                            .ok_or_else(|| {
+                                io::Error::new(io::ErrorKind::InvalidInput, 
"expected MapArray")
+                            })?;
+                    let lengths = extract_map_lengths(map_array);
+                    let (keys, values) = flatten_map_entries(map_array);
+                    splits.push(ColSplit {
+                        col,
+                        lengths,
+                        child_arrays: vec![keys, values],
+                    });
+                }
+                _ => {}
             }
-            let list_array = arrays[col]
-                .as_any()
-                .downcast_ref::<ListArray>()
-                .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, 
"expected ListArray"))?;
-            let lengths = extract_list_lengths(list_array);
-            let values = flatten_list_values(list_array);
-            list_splits.push((col, lengths, values));
         }
 
-        // Write primary columns (lengths for ARRAY cols, regular data for 
others)
+        // Write primary columns (lengths for ARRAY/MAP cols, regular data for 
others)
         let int32_dt = DataType::Int32;
         for i in 0..self.num_primary {
-            if let Some(split) = list_splits.iter().find(|(col, _, _)| *col == 
i) {
-                total_size += self.append_array_column(i, &split.1, &int32_dt, 
start_row)?;
+            if let Some(split) = splits.iter().find(|s| s.col == i) {
+                total_size += self.append_array_column(i, &split.lengths, 
&int32_dt, start_row)?;
             } else {
                 total_size += self.append_array_column(i, arrays[i], 
data_types[i], start_row)?;
             }
         }
 
-        // Recursively flatten all list levels and write to child columns
-        let mut pending: Vec<(usize, ArrayRef)> = Vec::new(); // (child_index, 
values)
-        for split in &list_splits {
-            let parent = split.0;
-            // Find the first child for this parent
-            if let Some(child_idx) = self
+        // Write child columns — for List: 1 child (values), for Map: 2 
children (keys, values)
+        let mut pending: Vec<(usize, ArrayRef)> = Vec::new();
+        for split in &splits {
+            let parent_children: Vec<usize> = self
                 .children
                 .iter()
-                .position(|c| c.parent_logical_col == parent)
-            {
-                pending.push((child_idx, split.2.clone()));
+                .enumerate()
+                .filter(|(_, c)| c.parent_logical_col == split.col)
+                .map(|(idx, _)| idx)
+                .collect();
+            for (i, child_arr) in split.child_arrays.iter().enumerate() {
+                if i < parent_children.len() {
+                    pending.push((parent_children[i], child_arr.clone()));
+                }
             }
         }
 
@@ -200,24 +238,45 @@ impl BucketWriter {
             let phys_idx = self.children[child_idx].physical_index;
             let child_start = self.children[child_idx].num_elements;
 
-            if let DataType::List(_) = values.data_type() {
-                let inner_list = 
values.as_any().downcast_ref::<ListArray>().ok_or_else(|| {
-                    io::Error::new(io::ErrorKind::InvalidInput, "expected 
ListArray")
-                })?;
-                let inner_lengths = extract_list_lengths(inner_list);
-                let inner_values = flatten_list_values(inner_list);
-                total_size +=
-                    self.append_array_column(phys_idx, &inner_lengths, 
&int32_dt, child_start)?;
-                self.children[child_idx].num_elements += inner_lengths.len();
-                // Queue the inner values for the next child
-                if child_idx + 1 < self.children.len() {
-                    pending.push((child_idx + 1, inner_values));
+            match values.data_type() {
+                DataType::List(_) => {
+                    let inner_list =
+                        
values.as_any().downcast_ref::<ListArray>().ok_or_else(|| {
+                            io::Error::new(io::ErrorKind::InvalidInput, 
"expected ListArray")
+                        })?;
+                    let inner_lengths = extract_list_lengths(inner_list);
+                    let inner_values = flatten_list_values(inner_list);
+                    total_size +=
+                        self.append_array_column(phys_idx, &inner_lengths, 
&int32_dt, child_start)?;
+                    self.children[child_idx].num_elements += 
inner_lengths.len();
+                    if child_idx + 1 < self.children.len() {
+                        pending.push((child_idx + 1, inner_values));
+                    }
+                }
+                DataType::Map(_, _) => {
+                    let inner_map =
+                        
values.as_any().downcast_ref::<MapArray>().ok_or_else(|| {
+                            io::Error::new(io::ErrorKind::InvalidInput, 
"expected MapArray")
+                        })?;
+                    let inner_lengths = extract_map_lengths(inner_map);
+                    let (inner_keys, inner_values) = 
flatten_map_entries(inner_map);
+                    total_size +=
+                        self.append_array_column(phys_idx, &inner_lengths, 
&int32_dt, child_start)?;
+                    self.children[child_idx].num_elements += 
inner_lengths.len();
+                    // Queue keys and values for the next children
+                    if child_idx + 2 < self.children.len() {
+                        pending.push((child_idx + 2, inner_values));
+                    }
+                    if child_idx + 1 < self.children.len() {
+                        pending.push((child_idx + 1, inner_keys));
+                    }
+                }
+                _ => {
+                    let elem_dt = 
self.children[child_idx].element_field.data_type().clone();
+                    total_size +=
+                        self.append_array_column(phys_idx, values.as_ref(), 
&elem_dt, child_start)?;
+                    self.children[child_idx].num_elements += values.len();
                 }
-            } else {
-                let elem_dt = 
self.children[child_idx].element_field.data_type().clone();
-                total_size +=
-                    self.append_array_column(phys_idx, values.as_ref(), 
&elem_dt, child_start)?;
-                self.children[child_idx].num_elements += values.len();
             }
         }
 
@@ -1044,6 +1103,58 @@ fn i128_to_biginteger_bytes(val: i128) -> Vec<u8> {
     bytes[start..].to_vec()
 }
 
+fn extract_map_lengths(map_array: &MapArray) -> Int32Array {
+    let offsets = map_array.value_offsets();
+    let num_rows = map_array.len();
+    let mut lengths = Vec::with_capacity(num_rows);
+    for i in 0..num_rows {
+        if map_array.is_null(i) {
+            lengths.push(0);
+        } else {
+            lengths.push(offsets[i + 1] - offsets[i]);
+        }
+    }
+    let null_buf = map_array.nulls().cloned();
+    Int32Array::new(ScalarBuffer::from(lengths), null_buf)
+}
+
+fn flatten_map_entries(map_array: &MapArray) -> (ArrayRef, ArrayRef) {
+    let offsets = map_array.value_offsets();
+    let num_rows = map_array.len();
+    let keys = map_array.keys();
+    let values = map_array.values();
+
+    if map_array.null_count() == 0 {
+        let start = offsets[0] as usize;
+        let end = offsets[num_rows] as usize;
+        return (
+            keys.slice(start, end - start),
+            values.slice(start, end - start),
+        );
+    }
+
+    let mut indices: Vec<u32> = Vec::new();
+    for i in 0..num_rows {
+        if !map_array.is_null(i) {
+            let start = offsets[i] as u32;
+            let end = offsets[i + 1] as u32;
+            for idx in start..end {
+                indices.push(idx);
+            }
+        }
+    }
+
+    if indices.is_empty() {
+        return (keys.slice(0, 0), values.slice(0, 0));
+    }
+
+    let idx_array = UInt32Array::from(indices);
+    (
+        take_array(keys.as_ref(), &idx_array),
+        take_array(values.as_ref(), &idx_array),
+    )
+}
+
 fn extract_list_lengths(list_array: &ListArray) -> Int32Array {
     let offsets = list_array.value_offsets();
     let num_rows = list_array.len();
@@ -1268,6 +1379,58 @@ fn take_array(array: &dyn Array, indices: &UInt32Array) 
-> ArrayRef {
                 null_buf,
             ))
         }
+        DataType::Map(entries_field, sorted) => {
+            let src = array.as_any().downcast_ref::<MapArray>().unwrap();
+            let mut offsets_builder = vec![0i32];
+            let mut child_indices: Vec<u32> = Vec::new();
+            for i in 0..indices.len() {
+                let idx = indices.value(i) as usize;
+                let start = src.value_offsets()[idx] as u32;
+                let end = src.value_offsets()[idx + 1] as u32;
+                for ci in start..end {
+                    child_indices.push(ci);
+                }
+                offsets_builder.push(child_indices.len() as i32);
+            }
+            let child_idx_arr = UInt32Array::from(child_indices);
+            let new_keys = take_array(src.keys().as_ref(), &child_idx_arr);
+            let new_values = take_array(src.values().as_ref(), &child_idx_arr);
+            let null_buf = if !indices.is_empty() {
+                let mut bm = vec![0u8; indices.len().div_ceil(8)];
+                for i in 0..indices.len() {
+                    let idx = indices.value(i) as usize;
+                    if !src.is_null(idx) {
+                        bm[i / 8] |= 1 << (i % 8);
+                    }
+                }
+                if bm.iter().all(|&b| b == 0xFF) || indices.is_empty() {
+                    None
+                } else {
+                    Some(NullBuffer::new(BooleanBuffer::new(
+                        Buffer::from_vec(bm),
+                        0,
+                        indices.len(),
+                    )))
+                }
+            } else {
+                None
+            };
+            let entries_struct = StructArray::new(
+                match entries_field.data_type() {
+                    DataType::Struct(fields) => fields.clone(),
+                    _ => unreachable!(),
+                },
+                vec![new_keys, new_values],
+                None,
+            );
+            Arc::new(MapArray::new(
+                entries_field.clone(),
+                OffsetBuffer::new(ScalarBuffer::from(offsets_builder)),
+                entries_struct,
+                null_buf,
+                *sorted,
+            ))
+        }
         other => panic!("take_array: unsupported DataType {:?}", other),
     }
 }
@@ -1276,7 +1439,7 @@ pub(crate) fn expand_col_types(col_types: &[&DataType]) 
-> (Vec<DataType>, Vec<C
     let mut physical_types: Vec<DataType> = col_types
         .iter()
         .map(|t| {
-            if matches!(t, DataType::List(_)) {
+            if matches!(t, DataType::List(_) | DataType::Map(_, _)) {
                 DataType::Int32
             } else {
                 (*t).clone()
@@ -1286,14 +1449,34 @@ pub(crate) fn expand_col_types(col_types: &[&DataType]) 
-> (Vec<DataType>, Vec<C
     let mut children = Vec::new();
 
     for (i, t) in col_types.iter().enumerate() {
-        if let DataType::List(element_field) = t {
-            expand_child(i, element_field, &mut physical_types, &mut children);
-        }
+        expand_type(i, t, &mut physical_types, &mut children);
     }
     (physical_types, children)
 }
 
-fn expand_child(
+fn expand_type(
+    parent_logical: usize,
+    dt: &DataType,
+    physical_types: &mut Vec<DataType>,
+    children: &mut Vec<ChildColumnMeta>,
+) {
+    match dt {
+        DataType::List(element_field) => {
+            expand_element(parent_logical, element_field, physical_types, 
children);
+        }
+        DataType::Map(entries_field, _) => {
+            if let DataType::Struct(fields) = entries_field.data_type() {
+                // Keys child — recurse if key is complex
+                expand_element(parent_logical, &fields[0], physical_types, 
children);
+                // Values child — recurse if value is complex
+                expand_element(parent_logical, &fields[1], physical_types, 
children);
+            }
+        }
+        _ => {}
+    }
+}
+
+fn expand_element(
     parent_logical: usize,
     element_field: &Arc<Field>,
     physical_types: &mut Vec<DataType>,
@@ -1302,23 +1485,28 @@ fn expand_child(
     let elem_dt = element_field.data_type();
     let child_phys_idx = physical_types.len();
 
-    if let DataType::List(inner_field) = elem_dt {
-        physical_types.push(DataType::Int32);
-        children.push(ChildColumnMeta {
-            parent_logical_col: parent_logical,
-            physical_index: child_phys_idx,
-            element_field: element_field.clone(),
-            num_elements: 0,
-        });
-        expand_child(parent_logical, inner_field, physical_types, children);
-    } else {
-        physical_types.push(elem_dt.clone());
-        children.push(ChildColumnMeta {
-            parent_logical_col: parent_logical,
-            physical_index: child_phys_idx,
-            element_field: element_field.clone(),
-            num_elements: 0,
-        });
+    match elem_dt {
+        DataType::List(_) | DataType::Map(_, _) => {
+            // Complex element: this child stores lengths (INT32), recurse for 
deeper levels
+            physical_types.push(DataType::Int32);
+            children.push(ChildColumnMeta {
+                parent_logical_col: parent_logical,
+                physical_index: child_phys_idx,
+                element_field: element_field.clone(),
+                num_elements: 0,
+            });
+            expand_type(parent_logical, elem_dt, physical_types, children);
+        }
+        _ => {
+            // Primitive element: direct leaf column
+            physical_types.push(elem_dt.clone());
+            children.push(ChildColumnMeta {
+                parent_logical_col: parent_logical,
+                physical_index: child_phys_idx,
+                element_field: element_field.clone(),
+                num_elements: 0,
+            });
+        }
     }
 }
 
diff --git a/core/src/types.rs b/core/src/types.rs
index 9326c57..7f4a06d 100644
--- a/core/src/types.rs
+++ b/core/src/types.rs
@@ -104,12 +104,31 @@ pub fn validate_data_type(dt: &DataType) -> Result<(), 
String> {
         },
         DataType::Struct(fields) if is_timestamp_nanos_struct(fields) => 
Ok(()),
         DataType::List(field) => {
-            if let DataType::Struct(fields) = field.data_type() {
+            let elem = field.data_type();
+            if let DataType::Struct(fields) = elem {
                 if is_timestamp_nanos_struct(fields) {
                     return Err("ARRAY<legacy timestamp nanos struct> is not 
supported".to_string());
                 }
             }
-            validate_data_type(field.data_type())
+            validate_data_type(elem)
+        }
+        DataType::Map(entries_field, sorted) => {
+            if *sorted {
+                return Err("sorted MAP is not supported".to_string());
+            }
+            if let DataType::Struct(fields) = entries_field.data_type() {
+                if fields.len() != 2 {
+                    return Err("MAP entries struct must have exactly 2 
fields".to_string());
+                }
+                let key_dt = fields[0].data_type();
+                if matches!(key_dt, DataType::List(_) | DataType::Map(_, _)) {
+                    return Err("MAP key type cannot be ARRAY or 
MAP".to_string());
+                }
+                validate_data_type(key_dt)?;
+                validate_data_type(fields[1].data_type())
+            } else {
+                Err("MAP entries field must be a Struct".to_string())
+            }
         }
         _ => Err(format!("unsupported DataType: {:?}", dt)),
     }
@@ -133,6 +152,7 @@ pub fn data_type_to_type_byte(dt: &DataType) -> u8 {
         DataType::Timestamp(_, Some(_)) => 17,
         DataType::Struct(fields) if is_timestamp_nanos_struct(fields) => 16,
         DataType::List(_) => 18,
+        DataType::Map(_, _) => 19,
         _ => panic!("unsupported DataType for serialization: {:?}", dt),
     }
 }
@@ -192,10 +212,52 @@ pub fn serialize_field(field: &Field, buf: &mut Vec<u8>) {
             buf.extend_from_slice(name_bytes);
             serialize_field(element_field, buf);
         }
+        DataType::Map(entries_field, _sorted) => {
+            // entries field name
+            let entries_name = entries_field.name().as_bytes();
+            varint::encode(buf, entries_name.len() as u32);
+            buf.extend_from_slice(entries_name);
+            if let DataType::Struct(fields) = entries_field.data_type() {
+                // key field name + type
+                let key_name = fields[0].name().as_bytes();
+                varint::encode(buf, key_name.len() as u32);
+                buf.extend_from_slice(key_name);
+                serialize_field(&fields[0], buf);
+                // value field name + type
+                let val_name = fields[1].name().as_bytes();
+                varint::encode(buf, val_name.len() as u32);
+                buf.extend_from_slice(val_name);
+                serialize_field(&fields[1], buf);
+            }
+        }
         _ => {}
     }
 }
 
+fn read_utf8_field_name(
+    buf: &[u8],
+    pos: &mut usize,
+    len: usize,
+    context: &str,
+) -> Result<String, std::io::Error> {
+    if *pos + len > buf.len() {
+        return Err(std::io::Error::new(
+            std::io::ErrorKind::UnexpectedEof,
+            format!("type: not enough bytes for {} field name", context),
+        ));
+    }
+    let name = std::str::from_utf8(&buf[*pos..*pos + len])
+        .map_err(|_| {
+            std::io::Error::new(
+                std::io::ErrorKind::InvalidData,
+                format!("type: invalid UTF-8 in {} field name", context),
+            )
+        })?
+        .to_string();
+    *pos += len;
+    Ok(name)
+}
+
 pub fn deserialize_field(name: &str, buf: &[u8], pos: &mut usize) -> 
Result<Field, std::io::Error> {
     if *pos + 1 >= buf.len() {
         return Err(std::io::Error::new(
@@ -293,6 +355,25 @@ pub fn deserialize_field(name: &str, buf: &[u8], pos: &mut 
usize) -> Result<Fiel
             let element_field = deserialize_field(&element_name, buf, pos)?;
             DataType::List(std::sync::Arc::new(element_field))
         }
+        19 => {
+            // entries field name
+            let entries_name_len = varint::decode(buf, pos)? as usize;
+            let entries_name = read_utf8_field_name(buf, pos, 
entries_name_len, "MAP entries")?;
+            // key field name + type
+            let key_name_len = varint::decode(buf, pos)? as usize;
+            let key_name = read_utf8_field_name(buf, pos, key_name_len, "MAP 
key")?;
+            let key_field = deserialize_field(&key_name, buf, pos)?;
+            // value field name + type
+            let val_name_len = varint::decode(buf, pos)? as usize;
+            let val_name = read_utf8_field_name(buf, pos, val_name_len, "MAP 
value")?;
+            let value_field = deserialize_field(&val_name, buf, pos)?;
+            let entries_field = Field::new(
+                &entries_name,
+                DataType::Struct(Fields::from(vec![key_field, value_field])),
+                false,
+            );
+            DataType::Map(std::sync::Arc::new(entries_field), false)
+        }
         _ => {
             return Err(std::io::Error::new(
                 std::io::ErrorKind::InvalidData,
diff --git a/core/tests/array_type_test.rs b/core/tests/array_type_test.rs
index ad899e2..53d9ba0 100644
--- a/core/tests/array_type_test.rs
+++ b/core/tests/array_type_test.rs
@@ -854,3 +854,459 @@ fn test_project_one_array_from_multi_array_paged() {
     assert_eq!(r0.value(0), 1);
     assert_eq!(r0.value(1), 2);
 }
+
+// ======================== MAP Tests ========================
+
+#[test]
+fn test_map_int_string_basic() {
+    let schema = Schema::new(vec![
+        Field::new("id", DataType::Int32, false),
+        Field::new(
+            "map",
+            DataType::Map(
+                Arc::new(Field::new(
+                    "entries",
+                    DataType::Struct(arrow_schema::Fields::from(vec![
+                        Field::new("keys", DataType::Int32, false),
+                        Field::new("values", DataType::Utf8, true),
+                    ])),
+                    false,
+                )),
+                false,
+            ),
+            true,
+        ),
+    ]);
+
+    let ids = Int32Array::from(vec![1, 2, 3, 4]);
+
+    let key_builder = Int32Builder::new();
+    let value_builder = StringBuilder::new();
+    let mut map_builder = MapBuilder::new(None, key_builder, value_builder);
+
+    // row 0: {1: "a", 2: "b"}
+    map_builder.keys().append_value(1);
+    map_builder.values().append_value("a");
+    map_builder.keys().append_value(2);
+    map_builder.values().append_value("b");
+    map_builder.append(true).unwrap();
+
+    // row 1: null
+    map_builder.append(false).unwrap();
+
+    // row 2: {3: null}
+    map_builder.keys().append_value(3);
+    map_builder.values().append_null();
+    map_builder.append(true).unwrap();
+
+    // row 3: {} (empty)
+    map_builder.append(true).unwrap();
+
+    let batch = RecordBatch::try_new(
+        Arc::new(schema.clone()),
+        vec![Arc::new(ids), Arc::new(map_builder.finish())],
+    )
+    .unwrap();
+
+    let result = roundtrip(&schema, &[batch]);
+    let rb = &result[0];
+    assert_eq!(rb.num_rows(), 4);
+
+    let map_col = rb.column(1).as_any().downcast_ref::<MapArray>().unwrap();
+    assert_eq!(map_col.len(), 4);
+    assert!(!map_col.is_null(0));
+    assert!(map_col.is_null(1));
+    assert!(!map_col.is_null(2));
+    assert!(!map_col.is_null(3));
+
+    // row 0: {1: "a", 2: "b"}
+    let keys0 = map_col
+        .keys()
+        .as_any()
+        .downcast_ref::<Int32Array>()
+        .unwrap();
+    let vals0 = map_col
+        .values()
+        .as_any()
+        .downcast_ref::<StringArray>()
+        .unwrap();
+    assert_eq!(map_col.value_offsets()[0], 0);
+    assert_eq!(map_col.value_offsets()[1], 2);
+    assert_eq!(keys0.value(0), 1);
+    assert_eq!(keys0.value(1), 2);
+    assert_eq!(vals0.value(0), "a");
+    assert_eq!(vals0.value(1), "b");
+
+    // row 2: {3: null}
+    assert_eq!(map_col.value_offsets()[3] - map_col.value_offsets()[2], 1);
+
+    // row 3: empty
+    assert_eq!(map_col.value_offsets()[4] - map_col.value_offsets()[3], 0);
+}
+
+#[test]
+fn test_map_all_null() {
+    let entries_field = Field::new(
+        "entries",
+        DataType::Struct(arrow_schema::Fields::from(vec![
+            Field::new("keys", DataType::Utf8, false),
+            Field::new("values", DataType::Int64, true),
+        ])),
+        false,
+    );
+    let schema = Schema::new(vec![Field::new(
+        "m",
+        DataType::Map(Arc::new(entries_field), false),
+        true,
+    )]);
+
+    let key_builder = StringBuilder::new();
+    let value_builder = Int64Builder::new();
+    let mut map_builder = MapBuilder::new(None, key_builder, value_builder);
+    map_builder.append(false).unwrap();
+    map_builder.append(false).unwrap();
+
+    let batch = RecordBatch::try_new(
+        Arc::new(schema.clone()),
+        vec![Arc::new(map_builder.finish())],
+    )
+    .unwrap();
+
+    let result = roundtrip(&schema, &[batch]);
+    let col = result[0]
+        .column(0)
+        .as_any()
+        .downcast_ref::<MapArray>()
+        .unwrap();
+    assert_eq!(col.len(), 2);
+    assert!(col.is_null(0));
+    assert!(col.is_null(1));
+}
+
+#[test]
+fn test_map_with_other_columns() {
+    let entries_field = Field::new(
+        "entries",
+        DataType::Struct(arrow_schema::Fields::from(vec![
+            Field::new("keys", DataType::Utf8, false),
+            Field::new("values", DataType::Float64, true),
+        ])),
+        false,
+    );
+    let element_field = Arc::new(Field::new("item", DataType::Int32, true));
+    let schema = Schema::new(vec![
+        Field::new("id", DataType::Int64, false),
+        Field::new("tags", DataType::List(element_field.clone()), true),
+        Field::new("props", DataType::Map(Arc::new(entries_field), false), 
true),
+    ]);
+
+    let ids = Int64Array::from(vec![1, 2]);
+
+    let mut list_builder = ListBuilder::new(Int32Builder::new());
+    list_builder.values().append_value(10);
+    list_builder.append(true);
+    list_builder.append(false);
+
+    let key_builder = StringBuilder::new();
+    let value_builder = Float64Builder::new();
+    let mut map_builder = MapBuilder::new(None, key_builder, value_builder);
+    map_builder.keys().append_value("x");
+    map_builder.values().append_value(1.5);
+    map_builder.append(true).unwrap();
+    map_builder.keys().append_value("y");
+    map_builder.values().append_value(2.5);
+    map_builder.keys().append_value("z");
+    map_builder.values().append_value(3.5);
+    map_builder.append(true).unwrap();
+
+    let mut opts = WriterOptions::default();
+    opts.num_buckets = 1;
+    let batch = RecordBatch::try_new(
+        Arc::new(schema.clone()),
+        vec![
+            Arc::new(ids),
+            Arc::new(list_builder.finish()),
+            Arc::new(map_builder.finish()),
+        ],
+    )
+    .unwrap();
+
+    let result = roundtrip_with_options(&schema, &[batch], opts);
+    let rb = &result[0];
+    assert_eq!(rb.num_rows(), 2);
+
+    let result_ids = 
rb.column(0).as_any().downcast_ref::<Int64Array>().unwrap();
+    assert_eq!(result_ids.value(0), 1);
+
+    let result_tags = 
rb.column(1).as_any().downcast_ref::<ListArray>().unwrap();
+    assert!(!result_tags.is_null(0));
+    assert!(result_tags.is_null(1));
+
+    let result_props = 
rb.column(2).as_any().downcast_ref::<MapArray>().unwrap();
+    assert_eq!(result_props.len(), 2);
+    assert_eq!(
+        result_props.value_offsets()[1] - result_props.value_offsets()[0],
+        1
+    );
+    assert_eq!(
+        result_props.value_offsets()[2] - result_props.value_offsets()[1],
+        2
+    );
+}
+
+// ======================== Nested ARRAY/MAP Tests ========================
+
+#[test]
+fn test_array_of_map() {
+    // ARRAY<MAP<INT32, UTF8>>
+    let map_type = DataType::Map(
+        Arc::new(Field::new(
+            "entries",
+            DataType::Struct(arrow_schema::Fields::from(vec![
+                Field::new("keys", DataType::Int32, false),
+                Field::new("values", DataType::Utf8, true),
+            ])),
+            false,
+        )),
+        false,
+    );
+    let schema = Schema::new(vec![Field::new(
+        "col",
+        DataType::List(Arc::new(Field::new("item", map_type.clone(), true))),
+        true,
+    )]);
+
+    // Build: row 0 = [{1:"a"}, {2:"b", 3:"c"}], row 1 = null
+    let key_builder = Int32Builder::new();
+    let val_builder = StringBuilder::new();
+    let map_builder = MapBuilder::new(None, key_builder, val_builder);
+    let mut list_builder = ListBuilder::new(map_builder);
+
+    // row 0: [{1:"a"}, {2:"b", 3:"c"}]
+    list_builder.values().keys().append_value(1);
+    list_builder.values().values().append_value("a");
+    list_builder.values().append(true).unwrap();
+    list_builder.values().keys().append_value(2);
+    list_builder.values().values().append_value("b");
+    list_builder.values().keys().append_value(3);
+    list_builder.values().values().append_value("c");
+    list_builder.values().append(true).unwrap();
+    list_builder.append(true);
+
+    // row 1: null
+    list_builder.append(false);
+
+    let batch = RecordBatch::try_new(
+        Arc::new(schema.clone()),
+        vec![Arc::new(list_builder.finish())],
+    )
+    .unwrap();
+
+    let result = roundtrip(&schema, &[batch]);
+    let col = result[0]
+        .column(0)
+        .as_any()
+        .downcast_ref::<ListArray>()
+        .unwrap();
+    assert_eq!(col.len(), 2);
+    assert!(!col.is_null(0));
+    assert!(col.is_null(1));
+
+    let row0 = col.value(0);
+    let maps = row0.as_any().downcast_ref::<MapArray>().unwrap();
+    assert_eq!(maps.len(), 2);
+    assert_eq!(maps.value_length(0), 1); // {1:"a"}
+    assert_eq!(maps.value_length(1), 2); // {2:"b", 3:"c"}
+}
+
+#[test]
+fn test_map_with_array_value() {
+    // MAP<UTF8, ARRAY<INT32>>
+    let list_type = DataType::List(Arc::new(Field::new("item", 
DataType::Int32, true)));
+    let schema = Schema::new(vec![Field::new(
+        "col",
+        DataType::Map(
+            Arc::new(Field::new(
+                "entries",
+                DataType::Struct(arrow_schema::Fields::from(vec![
+                    Field::new("keys", DataType::Utf8, false),
+                    Field::new("values", list_type.clone(), true),
+                ])),
+                false,
+            )),
+            false,
+        ),
+        true,
+    )]);
+
+    // Build: row 0 = {"x": [1,2], "y": [3]}, row 1 = {}
+    let key_builder = StringBuilder::new();
+    let val_builder = ListBuilder::new(Int32Builder::new());
+    let mut map_builder = MapBuilder::new(None, key_builder, val_builder);
+
+    // row 0
+    map_builder.keys().append_value("x");
+    map_builder.values().values().append_value(1);
+    map_builder.values().values().append_value(2);
+    map_builder.values().append(true);
+    map_builder.keys().append_value("y");
+    map_builder.values().values().append_value(3);
+    map_builder.values().append(true);
+    map_builder.append(true).unwrap();
+
+    // row 1: empty
+    map_builder.append(true).unwrap();
+
+    let batch = RecordBatch::try_new(
+        Arc::new(schema.clone()),
+        vec![Arc::new(map_builder.finish())],
+    )
+    .unwrap();
+
+    let result = roundtrip(&schema, &[batch]);
+    let col = result[0]
+        .column(0)
+        .as_any()
+        .downcast_ref::<MapArray>()
+        .unwrap();
+    assert_eq!(col.len(), 2);
+    assert_eq!(col.value_length(0), 2); // 2 entries
+    assert_eq!(col.value_length(1), 0); // empty
+
+    let keys = col.keys().as_any().downcast_ref::<StringArray>().unwrap();
+    assert_eq!(keys.value(0), "x");
+    assert_eq!(keys.value(1), "y");
+
+    let vals = col.values().as_any().downcast_ref::<ListArray>().unwrap();
+    let v0 = vals
+        .value(0)
+        .as_any()
+        .downcast_ref::<Int32Array>()
+        .unwrap()
+        .clone();
+    assert_eq!(v0.len(), 2);
+    assert_eq!(v0.value(0), 1);
+    assert_eq!(v0.value(1), 2);
+    let v1 = vals
+        .value(1)
+        .as_any()
+        .downcast_ref::<Int32Array>()
+        .unwrap()
+        .clone();
+    assert_eq!(v1.len(), 1);
+    assert_eq!(v1.value(0), 3);
+}
+
+// ======================== MAP Schema Validation Tests 
========================
+
+#[test]
+fn test_map_custom_field_names_roundtrip() {
+    let schema = Schema::new(vec![Field::new(
+        "m",
+        DataType::Map(
+            Arc::new(Field::new(
+                "my_entries",
+                DataType::Struct(arrow_schema::Fields::from(vec![
+                    Field::new("k_custom", DataType::Int32, false),
+                    Field::new("v_custom", DataType::Utf8, true),
+                ])),
+                false,
+            )),
+            false,
+        ),
+        true,
+    )]);
+
+    let key_builder = Int32Builder::new();
+    let value_builder = StringBuilder::new();
+    let field_names = arrow_array::builder::MapFieldNames {
+        entry: "my_entries".to_string(),
+        key: "k_custom".to_string(),
+        value: "v_custom".to_string(),
+    };
+    let mut map_builder = MapBuilder::new(Some(field_names), key_builder, 
value_builder);
+    map_builder.keys().append_value(1);
+    map_builder.values().append_value("a");
+    map_builder.append(true).unwrap();
+
+    let batch = RecordBatch::try_new(
+        Arc::new(schema.clone()),
+        vec![Arc::new(map_builder.finish())],
+    )
+    .unwrap();
+
+    let result = roundtrip(&schema, &[batch]);
+    let map_type = result[0].schema().field(0).data_type().clone();
+    match map_type {
+        DataType::Map(entries, sorted) => {
+            assert!(!sorted);
+            assert_eq!(entries.name(), "my_entries");
+            if let DataType::Struct(fields) = entries.data_type() {
+                assert_eq!(fields[0].name(), "k_custom");
+                assert_eq!(fields[1].name(), "v_custom");
+            } else {
+                panic!("entries should be struct");
+            }
+        }
+        other => panic!("expected map, got {:?}", other),
+    }
+}
+
+#[test]
+fn test_sorted_map_rejected() {
+    let schema = Schema::new(vec![Field::new(
+        "m",
+        DataType::Map(
+            Arc::new(Field::new(
+                "entries",
+                DataType::Struct(arrow_schema::Fields::from(vec![
+                    Field::new("keys", DataType::Int32, false),
+                    Field::new("values", DataType::Utf8, true),
+                ])),
+                false,
+            )),
+            true, // sorted = true
+        ),
+        true,
+    )]);
+
+    let out = MemOutputFile::new();
+    match MosaicWriter::new(out, &schema, WriterOptions::default()) {
+        Ok(_) => panic!("sorted MAP should be rejected"),
+        Err(e) => assert!(
+            e.to_string().contains("sorted"),
+            "error should mention sorted: {}",
+            e
+        ),
+    }
+}
+
+#[test]
+fn test_complex_map_key_rejected() {
+    let key_type = DataType::List(Arc::new(Field::new("item", DataType::Int32, 
true)));
+    let schema = Schema::new(vec![Field::new(
+        "m",
+        DataType::Map(
+            Arc::new(Field::new(
+                "entries",
+                DataType::Struct(arrow_schema::Fields::from(vec![
+                    Field::new("keys", key_type, false),
+                    Field::new("values", DataType::Utf8, true),
+                ])),
+                false,
+            )),
+            false,
+        ),
+        true,
+    )]);
+
+    let out = MemOutputFile::new();
+    match MosaicWriter::new(out, &schema, WriterOptions::default()) {
+        Ok(_) => panic!("complex MAP key should be rejected"),
+        Err(e) => assert!(
+            e.to_string().contains("MAP key"),
+            "error should mention MAP key: {}",
+            e
+        ),
+    }
+}
diff --git a/core/tests/gen_fixtures.rs b/core/tests/gen_fixtures.rs
index 2cd2df3..5ac49f8 100644
--- a/core/tests/gen_fixtures.rs
+++ b/core/tests/gen_fixtures.rs
@@ -216,3 +216,91 @@ fn test_v1_with_array_golden_readable() {
         .clone();
     assert_eq!(r3.len(), 0);
 }
+
+/// Generate the deterministic MAP file.
+/// Schema: id(INT32 NOT NULL), props(MAP<INT32, UTF8>)
+/// Data: 3 rows — {1:"a", 2:"b"}, null, {}
+/// Options: num_buckets=1, compression=none
+fn gen_with_map() -> Vec<u8> {
+    let schema = Schema::new(vec![
+        Field::new("id", DataType::Int32, false),
+        Field::new(
+            "props",
+            DataType::Map(
+                Arc::new(Field::new(
+                    "entries",
+                    DataType::Struct(arrow_schema::Fields::from(vec![
+                        Field::new("keys", DataType::Int32, false),
+                        Field::new("values", DataType::Utf8, true),
+                    ])),
+                    false,
+                )),
+                false,
+            ),
+            true,
+        ),
+    ]);
+
+    let ids = Int32Array::from(vec![1, 2, 3]);
+    let key_builder = Int32Builder::new();
+    let value_builder = StringBuilder::new();
+    let mut map_builder = MapBuilder::new(None, key_builder, value_builder);
+    // row 0: {1: "a", 2: "b"}
+    map_builder.keys().append_value(1);
+    map_builder.values().append_value("a");
+    map_builder.keys().append_value(2);
+    map_builder.values().append_value("b");
+    map_builder.append(true).unwrap();
+    // row 1: null
+    map_builder.append(false).unwrap();
+    // row 2: {} (empty)
+    map_builder.append(true).unwrap();
+
+    let batch = RecordBatch::try_new(
+        Arc::new(schema.clone()),
+        vec![Arc::new(ids), Arc::new(map_builder.finish())],
+    )
+    .unwrap();
+
+    let out = MemOutputFile::new();
+    let opts = WriterOptions {
+        num_buckets: 1,
+        compression: 0,
+        ..WriterOptions::default()
+    };
+    let mut writer = MosaicWriter::new(out, &schema, opts).unwrap();
+    writer.write_batch(&batch).unwrap();
+    writer.close().unwrap();
+    writer.output().buf.clone()
+}
+
+#[test]
+fn test_v1_with_map_binary_stable() {
+    let generated = gen_with_map();
+    let golden = std::fs::read(golden_path("v1_with_map.mosaic"))
+        .expect("golden file missing — regenerate with gen_with_map()");
+    assert_eq!(
+        generated, golden,
+        "MAP file differs from golden — format may have changed 
unintentionally"
+    );
+}
+
+#[test]
+fn test_v1_with_map_golden_readable() {
+    let data = std::fs::read(golden_path("v1_with_map.mosaic")).unwrap();
+    let input = ByteArrayInputFile { data: data.clone() };
+    let reader = MosaicReader::new(input, data.len() as u64).unwrap();
+    let mut rg = reader.row_group_reader(0).unwrap();
+    let rb = rg.read_columns().unwrap();
+    assert_eq!(rb.num_rows(), 3);
+
+    let map_col = rb.column(1).as_any().downcast_ref::<MapArray>().unwrap();
+    assert!(!map_col.is_null(0));
+    assert!(map_col.is_null(1));
+    assert!(!map_col.is_null(2));
+
+    // Row 0: 2 entries
+    assert_eq!(map_col.value_offsets()[1] - map_col.value_offsets()[0], 2);
+    // Row 2: empty
+    assert_eq!(map_col.value_offsets()[3] - map_col.value_offsets()[2], 0);
+}
diff --git a/core/tests/testdata/v1_with_map.mosaic 
b/core/tests/testdata/v1_with_map.mosaic
new file mode 100644
index 0000000..efe7a2f
Binary files /dev/null and b/core/tests/testdata/v1_with_map.mosaic differ
diff --git a/cpp/test_mosaic.cpp b/cpp/test_mosaic.cpp
index 4f85291..38cf55c 100644
--- a/cpp/test_mosaic.cpp
+++ b/cpp/test_mosaic.cpp
@@ -1005,6 +1005,60 @@ static void test_array_string_elements() {
     printf("  PASS test_array_string_elements\n");
 }
 
+static void test_map_type() {
+    auto schema = arrow::schema({
+        arrow::field("id", arrow::int32(), false),
+        arrow::field("props", arrow::map(arrow::int32(), arrow::utf8())),
+    });
+
+    arrow::Int32Builder id_b;
+    assert(id_b.Append(1).ok());
+    assert(id_b.Append(2).ok());
+    assert(id_b.Append(3).ok());
+
+    auto key_builder = std::make_shared<arrow::Int32Builder>();
+    auto val_builder = std::make_shared<arrow::StringBuilder>();
+    arrow::MapBuilder map_b(arrow::default_memory_pool(), key_builder, 
val_builder);
+
+    // Row 0: {1: "a", 2: "b"}
+    assert(map_b.Append().ok());
+    assert(key_builder->Append(1).ok());
+    assert(val_builder->Append("a").ok());
+    assert(key_builder->Append(2).ok());
+    assert(val_builder->Append("b").ok());
+
+    // Row 1: null
+    assert(map_b.AppendNull().ok());
+
+    // Row 2: {} (empty)
+    assert(map_b.Append().ok());
+
+    auto batch = arrow::RecordBatch::Make(schema, 3, {
+        id_b.Finish().ValueUnsafe(),
+        map_b.Finish().ValueUnsafe(),
+    });
+
+    auto data_vec = write_and_get(schema, batch);
+
+    MemBuffer buf;
+    buf.data = data_vec;
+    auto reader = mosaic::make_reader(make_input(buf), buf.data.size());
+    auto rb = read_row_group(reader, 0);
+    ASSERT_EQ(rb->num_rows(), 3);
+
+    auto props = 
std::static_pointer_cast<arrow::MapArray>(rb->GetColumnByName("props"));
+    ASSERT_TRUE(!props->IsNull(0));
+    ASSERT_TRUE(props->IsNull(1));
+    ASSERT_TRUE(!props->IsNull(2));
+
+    // Row 0: 2 entries
+    ASSERT_EQ(props->value_length(0), 2);
+    // Row 2: empty
+    ASSERT_EQ(props->value_length(2), 0);
+
+    printf("  PASS test_map_type\n");
+}
+
 int main() {
     printf("Running Mosaic C++ tests...\n");
     test_basic_roundtrip();
@@ -1025,6 +1079,7 @@ int main() {
     test_array_type();
     test_array_with_null_elements();
     test_array_string_elements();
-    printf("All %d tests passed.\n", 18);
+    test_map_type();
+    printf("All %d tests passed.\n", 19);
     return 0;
 }
diff --git a/docs/cpp-api.html b/docs/cpp-api.html
index 0d08ebb..ed3456e 100644
--- a/docs/cpp-api.html
+++ b/docs/cpp-api.html
@@ -182,7 +182,15 @@ list_b.Append();
 <span class="cmt">// Reading back</span>
 <span class="kw">auto</span> tags = 
std::static_pointer_cast&lt;arrow::ListArray&gt;(rb-&gt;GetColumnByName(<span 
class="str">"tags"</span>));
 <span class="kw">auto</span> row0 = 
std::static_pointer_cast&lt;arrow::Int32Array&gt;(tags-&gt;value_slice(<span 
class="num">0</span>));
-<span class="cmt">// row0: [10, 20]</span></code></pre>
+<span class="cmt">// row0: [10, 20]</span>
+
+<span class="cmt">// MAP columns use MapBuilder</span>
+<span class="kw">auto</span> key_b = 
std::make_shared&lt;arrow::Int32Builder&gt;();
+<span class="kw">auto</span> val_b = 
std::make_shared&lt;arrow::StringBuilder&gt;();
+arrow::MapBuilder map_b(arrow::default_memory_pool(), key_b, val_b);
+map_b.Append();
+key_b-&gt;Append(<span class="num">1</span>);
+val_b-&gt;Append(<span class="str">"a"</span>);</code></pre>
 
             <h2>C++ API Reference</h2>
 
diff --git a/docs/design.html b/docs/design.html
index 19ffd81..538a3b0 100644
--- a/docs/design.html
+++ b/docs/design.html
@@ -740,9 +740,10 @@ repeated numColumns times:
                     <tr><td>16</td><td>TIMESTAMP</td><td>varint 
precision</td></tr>
                     <tr><td>17</td><td>TIMESTAMP_LTZ</td><td>varint precision, 
varint timezoneLength, bytes timezone</td></tr>
                     <tr><td>18</td><td>ARRAY</td><td>varint nameLength, bytes 
name (element field name), TypeDescriptor (recursive element type)</td></tr>
+                    <tr><td>19</td><td>MAP</td><td>varint entriesNameLen + 
bytes entriesName, varint keyNameLen + bytes keyName + TypeDescriptor (key), 
varint valNameLen + bytes valName + TypeDescriptor (value). Sorted MAP is not 
supported; always unsorted.</td></tr>
                 </tbody>
             </table>
-            <p>MAP, ROW, VARIANT, and BLOB are not yet supported.</p>
+            <p>ROW, VARIANT, and BLOB are not yet supported.</p>
 
             <!-- ============================================================ 
-->
             <h2>Value Serialization</h2>
@@ -767,6 +768,7 @@ repeated numColumns times:
                     <tr><td>CHAR / VARCHAR / STRING</td><td>varint length + 
UTF-8 bytes</td></tr>
                     <tr><td>BINARY / VARBINARY / BYTES</td><td>varint length + 
raw bytes</td></tr>
                     <tr><td>ARRAY</td><td>Flattened columnar: lengths (INT32) 
+ values column (see ARRAY Type Storage below)</td></tr>
+                    <tr><td>MAP</td><td>Flattened columnar: lengths (INT32) + 
keys column + values column (see MAP Type Storage below)</td></tr>
                 </tbody>
             </table>
             <p>
@@ -857,7 +859,31 @@ repeated numChildren times:
 [column slots: each independently compressed, including child 
columns]</code></pre>
 
             <h3>Statistics</h3>
-            <p>ARRAY columns do not support min/max statistics (no meaningful 
ordering).</p>
+            <p>ARRAY and MAP columns do not support min/max statistics (no 
meaningful ordering).</p>
+
+            <!-- ============================================================ 
-->
+            <h2>MAP Type Storage</h2>
+            <p>
+                MAP columns use the same flattened columnar approach as ARRAY.
+                A <code>MAP&lt;K, V&gt;</code> column is decomposed into three 
physical columns
+                within the same bucket:
+            </p>
+            <ol>
+                <li><strong>Lengths column</strong> (INT32, N entries): the 
number of key-value pairs in each map.</li>
+                <li><strong>Keys column</strong> (type K, M entries): all keys 
flattened across all rows.</li>
+                <li><strong>Values column</strong> (type V, M entries): all 
values flattened across all rows.</li>
+            </ol>
+            <p>
+                All three columns independently benefit from standard column 
encoding (DICT, CONST, PLAIN, ALL_NULL).
+                Null maps are represented by a null in the lengths column. 
Empty maps have length 0.
+            </p>
+
+            <h4>Example</h4>
+<pre><code>MAP&lt;INT, UTF8&gt; with rows:  {1:"a", 2:"b"},  null,  {3:"c"}
+
+Lengths column (INT32):  2, null, 1
+Keys column    (INT32):  1, 2, 3       &larr; shared DICT across all maps
+Values column  (UTF8):   "a", "b", "c" &larr; shared DICT across all 
maps</code></pre>
 
             <h3>Design Rationale: Comparison with Parquet and ORC</h3>
             <p>
@@ -906,10 +932,12 @@ repeated numChildren times:
 
             <h4>Why Not Parquet&rsquo;s Repetition/Definition Levels?</h4>
             <p>
-                Parquet uses the Dremel encoding: regardless of nesting depth, 
each ARRAY column
-                produces a single physical column with repetition and 
definition level arrays attached
-                to every leaf value. This has the advantage of constant 
physical column count
-                (always 1 per ARRAY column), but significant disadvantages:
+                Parquet uses the Dremel encoding: each leaf column carries 
repetition and definition
+                level arrays that encode the full nesting structure. For 
<code>ARRAY&lt;primitive&gt;</code>,
+                this means one physical leaf column; for nested types like 
<code>ARRAY&lt;MAP&lt;K,V&gt;&gt;</code>,
+                each leaf (K and V) gets its own column with deeper rep/def 
levels. The physical column
+                count equals the number of leaf fields, not the number of 
ARRAY/MAP columns.
+                This approach has some advantages but significant 
disadvantages:
             </p>
             <ul>
                 <li><strong>Implementation complexity</strong>: The shredding 
(decomposition) and assembly
@@ -948,7 +976,7 @@ repeated numChildren times:
             <!-- ============================================================ 
-->
             <h2>Limitations</h2>
             <ol>
-                <li>Complex types MAP, MULTISET, and ROW are not yet 
supported. ARRAY is supported with flattened columnar storage.</li>
+                <li>Complex types MULTISET and ROW are not yet supported. 
ARRAY and MAP are supported with flattened columnar storage.</li>
                 <li>Mosaic format is designed for wide tables and may not be 
efficient for narrow tables with few columns.</li>
             </ol>
         </div>
diff --git a/docs/java-api.html b/docs/java-api.html
index f8f207a..180040e 100644
--- a/docs/java-api.html
+++ b/docs/java-api.html
@@ -159,6 +159,18 @@ listWriter.endList();</code></pre>
 <span class="ty">List</span>&lt;?&gt; row0 = readTags.getObject(<span 
class="num">0</span>);  <span class="cmt">// [10, 20, 30]</span>
 <span class="kw">boolean</span> isNull = readTags.isNull(<span 
class="num">1</span>);    <span class="cmt">// true</span>
 <span class="ty">List</span>&lt;?&gt; row2 = readTags.getObject(<span 
class="num">2</span>);  <span class="cmt">// [] (empty)</span></code></pre>
+            <p>
+                MAP types use Arrow&rsquo;s <code>MapVector</code> and 
<code>UnionMapWriter</code>:
+            </p>
+<pre><code><span class="ty">MapVector</span> mapVec = (<span 
class="ty">MapVector</span>) root.getVector(<span class="str">"props"</span>);
+<span class="ty">UnionMapWriter</span> w = mapVec.getWriter();
+w.setPosition(<span class="num">0</span>);
+w.startMap();
+w.startEntry();
+w.key().integer().writeInt(<span class="num">1</span>);
+w.value().varChar().writeVarChar(<span class="kw">new</span> <span 
class="ty">Text</span>(<span class="str">"a"</span>));
+w.endEntry();
+w.endMap();</code></pre>
 
             <h3>2. Create Writer and Write Batches</h3>
             <p>
diff --git a/docs/python-api.html b/docs/python-api.html
index fd849dd..e64976b 100644
--- a/docs/python-api.html
+++ b/docs/python-api.html
@@ -103,6 +103,7 @@ pa_schema = pa.schema([
     pa.field(<span class="str">"id"</span>, pa.int32()),
     pa.field(<span class="str">"tags"</span>, pa.list_(pa.int32())),
     pa.field(<span class="str">"nested"</span>, pa.list_(pa.list_(pa.utf8()))),
+    pa.field(<span class="str">"props"</span>, pa.map_(pa.utf8(), pa.int64())),
 ])</code></pre>
 
             <h3>2. Create Writer and Write Batches</h3>
@@ -153,7 +154,10 @@ batch = pa.record_batch([
 
 <span class="cmt"># Reading back</span>
 rb = reader.read_row_group(<span class="num">0</span>)
-tags = rb.column(<span class="str">"tags"</span>).to_pylist()  <span 
class="cmt"># [[10, 20], None, []]</span></code></pre>
+tags = rb.column(<span class="str">"tags"</span>).to_pylist()  <span 
class="cmt"># [[10, 20], None, []]</span>
+
+<span class="cmt"># MAP columns</span>
+pa.array([{<span class="str">"x"</span>: <span class="num">1</span>, <span 
class="str">"y"</span>: <span class="num">2</span>}, <span 
class="kw">None</span>, {}], type=pa.map_(pa.utf8(), pa.int64()))</code></pre>
 
             <h3>WriterOptions</h3>
             <table>
diff --git 
a/java/src/test/java/org/apache/paimon/mosaic/MosaicRoundtripTest.java 
b/java/src/test/java/org/apache/paimon/mosaic/MosaicRoundtripTest.java
index 4b909db..4e7aea5 100644
--- a/java/src/test/java/org/apache/paimon/mosaic/MosaicRoundtripTest.java
+++ b/java/src/test/java/org/apache/paimon/mosaic/MosaicRoundtripTest.java
@@ -40,6 +40,7 @@ import org.apache.arrow.vector.VarBinaryVector;
 import org.apache.arrow.vector.VarCharVector;
 import org.apache.arrow.vector.VectorSchemaRoot;
 import org.apache.arrow.vector.complex.ListVector;
+import org.apache.arrow.vector.complex.MapVector;
 import org.apache.arrow.vector.complex.impl.UnionListWriter;
 import org.apache.arrow.vector.types.FloatingPointPrecision;
 import org.apache.arrow.vector.types.TimeUnit;
@@ -1232,4 +1233,74 @@ public class MosaicRoundtripTest {
             }
         }
     }
+
+    @Test
+    public void testMapType() {
+        // Use MapVector's writer to avoid schema mismatch with UnionMapWriter
+        Field keyField = new Field("keys", FieldType.notNullable(new 
ArrowType.Int(32, true)), null);
+        Field valueField = new Field("values", 
FieldType.nullable(ArrowType.Utf8.INSTANCE), null);
+        Field entriesField = new Field("entries",
+            new FieldType(false, ArrowType.Struct.INSTANCE, null),
+            Arrays.asList(keyField, valueField));
+        Field mapField = new Field("props",
+            new FieldType(true, new ArrowType.Map(false), null),
+            Arrays.asList(entriesField));
+
+        Schema arrowSchema = new Schema(Arrays.asList(
+                Field.notNullable("id", new ArrowType.Int(32, true)),
+                mapField
+        ));
+
+        byte[] data;
+        try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, 
allocator)) {
+            IntVector ids = (IntVector) root.getVector("id");
+            MapVector mapVec = (MapVector) root.getVector("props");
+
+            ids.allocateNew(3);
+
+            IntVector keyVec = (IntVector) 
mapVec.getDataVector().getChildrenFromFields().get(0);
+            VarCharVector valVec = (VarCharVector) 
mapVec.getDataVector().getChildrenFromFields().get(1);
+
+            // Row 0: {1: "a", 2: "b"} -> offsets [0, 2]
+            mapVec.startNewValue(0);
+            keyVec.setSafe(0, 1);
+            valVec.setSafe(0, "a".getBytes());
+            keyVec.setSafe(1, 2);
+            valVec.setSafe(1, "b".getBytes());
+            mapVec.endValue(0, 2);
+
+            // Row 1: null
+            mapVec.setNull(1);
+
+            // Row 2: {} -> offsets [2, 2]
+            mapVec.startNewValue(2);
+            mapVec.endValue(2, 0);
+
+            ids.set(0, 1);
+            ids.set(1, 2);
+            ids.set(2, 3);
+
+            root.setRowCount(3);
+            data = writeToBytes(arrowSchema, writer -> writer.write(root));
+        }
+
+        try (MosaicReader reader = readerFromBytes(data)) {
+            try (VectorSchemaRoot batch = reader.readRowGroup(0, allocator)) {
+                assertEquals(3, batch.getRowCount());
+
+                MapVector readMap = (MapVector) batch.getVector("props");
+                assertFalse(readMap.isNull(0));
+                assertTrue(readMap.isNull(1));
+                assertFalse(readMap.isNull(2));
+
+                // Row 0: 2 entries
+                java.util.List<?> row0 = readMap.getObject(0);
+                assertEquals(2, row0.size());
+
+                // Row 2: empty
+                java.util.List<?> row2 = readMap.getObject(2);
+                assertEquals(0, row2.size());
+            }
+        }
+    }
 }
diff --git a/python/tests/test_mosaic.py b/python/tests/test_mosaic.py
index de387c6..796e880 100644
--- a/python/tests/test_mosaic.py
+++ b/python/tests/test_mosaic.py
@@ -1058,3 +1058,59 @@ class TestWriter:
             assert nested[0] == [[1, 2], [3]]
             assert nested[1] == [[4]]
             assert nested[2] is None
+
+    def test_map_type(self):
+        pa_schema = pa.schema(
+            [
+                pa.field("id", pa.int32(), nullable=False),
+                pa.field("props", pa.map_(pa.int32(), pa.utf8())),
+            ]
+        )
+
+        batch = pa.record_batch(
+            [
+                pa.array([1, 2, 3], type=pa.int32()),
+                pa.array(
+                    [[(1, "a"), (2, "b")], None, []],
+                    type=pa.map_(pa.int32(), pa.utf8()),
+                ),
+            ],
+            names=["id", "props"],
+        )
+
+        data = _write_to_bytes(pa_schema, batch)
+
+        with _reader_from_bytes(data) as reader:
+            rb = reader.read_row_group(0)
+            assert rb.num_rows == 3
+
+            ids = rb.column("id").to_pylist()
+            assert ids == [1, 2, 3]
+
+            props = rb.column("props").to_pylist()
+            assert props[0] == [(1, "a"), (2, "b")]
+            assert props[1] is None
+            assert props[2] == []
+
+    def test_map_with_null_values(self):
+        pa_schema = pa.schema(
+            [pa.field("m", pa.map_(pa.utf8(), pa.int64()))]
+        )
+
+        batch = pa.record_batch(
+            [
+                pa.array(
+                    [[("x", 10), ("y", None)], [("z", 30)]],
+                    type=pa.map_(pa.utf8(), pa.int64()),
+                )
+            ],
+            names=["m"],
+        )
+
+        data = _write_to_bytes(pa_schema, batch)
+
+        with _reader_from_bytes(data) as reader:
+            rb = reader.read_row_group(0)
+            m = rb.column("m").to_pylist()
+            assert m[0] == [("x", 10), ("y", None)]
+            assert m[1] == [("z", 30)]

Reply via email to