This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-mosaic.git
The following commit(s) were added to refs/heads/main by this push:
new bf823e1 support MAP type with flattened columnar storage (#53)
bf823e1 is described below
commit bf823e1dcf5b7e1aff0333803e52b1843f629836
Author: Jingsong Lee <[email protected]>
AuthorDate: Sun Jun 7 09:53:26 2026 +0800
support MAP type with flattened columnar storage (#53)
MAP<K, V> columns are decomposed into three physical columns within
the same bucket: lengths (INT32), keys (type K), and values (type V).
All three independently benefit from DICT/CONST/PLAIN encoding.
Recursive nesting is fully supported:
- ARRAY<MAP<K,V>>, MAP<K, ARRAY<V>>, MAP<K, MAP<K2,V2>>
- Arbitrary depth like MAP<STRING, MAP<STRING, ARRAY<MAP<STRING, STRING>>>>
Schema metadata roundtrip preserves all MAP field names (entries, keys,
values). Sorted MAP is rejected at validation (not supported).
The format is fully backward compatible — no version bump needed.
MAP uses type byte 19. Older readers reject at schema validation.
---
core/src/bucket_reader.rs | 205 ++++++++-
core/src/bucket_writer.rs | 312 +++++++++++---
core/src/types.rs | 85 +++-
core/tests/array_type_test.rs | 456 +++++++++++++++++++++
core/tests/gen_fixtures.rs | 88 ++++
core/tests/testdata/v1_with_map.mosaic | Bin 0 -> 133 bytes
cpp/test_mosaic.cpp | 57 ++-
docs/cpp-api.html | 10 +-
docs/design.html | 42 +-
docs/java-api.html | 12 +
docs/python-api.html | 6 +-
.../apache/paimon/mosaic/MosaicRoundtripTest.java | 71 ++++
python/tests/test_mosaic.py | 56 +++
13 files changed, 1305 insertions(+), 95 deletions(-)
diff --git a/core/src/bucket_reader.rs b/core/src/bucket_reader.rs
index f81165e..e45b9fa 100644
--- a/core/src/bucket_reader.rs
+++ b/core/src/bucket_reader.rs
@@ -360,36 +360,199 @@ fn reassemble_list_columns(
_num_primary: usize,
num_rows: usize,
) {
- for child in children.iter().rev() {
- let phys_idx = child.physical_index;
+ let mut processed = vec![false; children.len()];
+
+ // Process innermost children first (highest physical index → lowest)
+ for idx in (0..children.len()).rev() {
+ if processed[idx] {
+ continue;
+ }
+ let child = &children[idx];
let parent = child.parent_logical_col;
- let values = arrays[phys_idx].clone();
-
- // Find the lengths column for this child:
- // - If the previous physical column (phys_idx - 1) is a child with
the same parent,
- // this is a nested child and lengths are at phys_idx - 1.
- // - Otherwise, this is a first-level child and lengths are at the
parent primary column.
- let is_nested = children
- .iter()
- .any(|c| c.physical_index == phys_idx - 1 && c.parent_logical_col
== parent);
- let lengths_idx = if is_nested { phys_idx - 1 } else { parent };
-
- let lengths = arrays[lengths_idx].clone();
- let lengths_rows = lengths.len();
-
- let element_field = child.element_field.clone();
- let reassembled = reassemble_list_array(lengths, values,
element_field, lengths_rows);
- arrays[lengths_idx] = reassembled;
+ let phys_idx = child.physical_index;
+ let elem_dt = child.element_field.data_type();
+
+ // What type is above us (the container that produced this child)?
+ // Look at the element_field of the child that stores lengths for us,
+ // or the logical type at the parent position.
+ // For a first-level child, the lengths are at `parent` (primary
column).
+ // For a nested child (e.g., inner values of ARRAY<ARRAY<INT>>),
+ // the lengths are at the previous sibling that was already
reassembled.
+ //
+ // Key insight: the container type that produced this child determines
+ // whether it's an ARRAY child (1 child) or MAP child (2 children).
+ // We detect MAP by checking if the NEXT child (idx+1) has the same
+ // "lengths column" — meaning they're siblings of the same MAP.
+
+ // Determine if this child is part of a MAP pair.
+ // MAP produces two consecutive children: keys then values.
+ // Nested ARRAY produces children where the first child's element_field
+ // has a complex data_type (List/Map), indicating a deeper level.
+ //
+ // A "MAP sibling pair" is: two consecutive children with same parent,
+ // where the first child's element_field is NOT itself a complex type
+ // (complex element_field means it's a nested ARRAY/MAP intermediate).
+
+ let prev_is_map_key = idx > 0
+ && children[idx - 1].parent_logical_col == parent
+ && !processed[idx - 1]
+ && !matches!(
+ children[idx - 1].element_field.data_type(),
+ DataType::List(_) | DataType::Map(_, _)
+ );
+
+ if prev_is_map_key {
+ // This child is the VALUES of a MAP (the keys are at idx-1)
+ continue;
+ }
+
+ let next_is_map_value = idx + 1 < children.len()
+ && children[idx + 1].parent_logical_col == parent
+ && !processed[idx + 1]
+ && !matches!(
+ child.element_field.data_type(),
+ DataType::List(_) | DataType::Map(_, _)
+ );
+
+ if next_is_map_value {
+ // MAP: this child is KEYS, next child is VALUES
+ let val_child = &children[idx + 1];
+ let keys = arrays[phys_idx].clone();
+ let values = arrays[val_child.physical_index].clone();
+
+ // Find lengths column: a child with complex element_field at
phys_idx-1
+ // stores the MAP lengths (it was produced by expand_element for a
Map element).
+ let prev_child_is_lengths = children.iter().any(|c| {
+ c.physical_index == phys_idx - 1
+ && c.parent_logical_col == parent
+ && matches!(
+ c.element_field.data_type(),
+ DataType::List(_) | DataType::Map(_, _)
+ )
+ });
+ let lengths_idx = if prev_child_is_lengths {
+ phys_idx - 1
+ } else {
+ parent
+ };
+
+ let lengths = arrays[lengths_idx].clone();
+ let lengths_rows = lengths.len();
+
+ // Reconstruct the MAP type from the element_field
+ // The child at this position has element_field = key Field
+ // We need the full MAP entries_field. Get it from the container.
+ let container_dt = if lengths_idx < logical_types.len() {
+ &logical_types[lengths_idx]
+ } else {
+ // Nested: the reassembled array at lengths_idx should already
have
+ // the right type, but we need the MAP descriptor.
+ // Use the child's element_field to reconstruct.
+ elem_dt
+ };
+
+ if let DataType::Map(entries_field, sorted) = container_dt {
+ arrays[lengths_idx] = reassemble_map_array(
+ lengths,
+ keys,
+ values,
+ entries_field,
+ *sorted,
+ lengths_rows,
+ );
+ } else {
+ // Reconstruct MAP from key/value fields
+ let key_field = child.element_field.clone();
+ let val_field = val_child.element_field.clone();
+ let entries_field = Arc::new(Field::new(
+ "entries",
+ DataType::Struct(arrow_schema::Fields::from(vec![
+ key_field.as_ref().clone(),
+ val_field.as_ref().clone(),
+ ])),
+ false,
+ ));
+ arrays[lengths_idx] = reassemble_map_array(
+ lengths,
+ keys,
+ values,
+ &entries_field,
+ false,
+ lengths_rows,
+ );
+ }
+ processed[idx] = true;
+ processed[idx + 1] = true;
+ } else {
+ // ARRAY: single child for values
+ let values = arrays[phys_idx].clone();
+ let is_nested = children
+ .iter()
+ .any(|c| c.physical_index == phys_idx - 1 &&
c.parent_logical_col == parent);
+ let lengths_idx = if is_nested { phys_idx - 1 } else { parent };
+
+ let lengths = arrays[lengths_idx].clone();
+ let lengths_rows = lengths.len();
+ let element_field = child.element_field.clone();
+ arrays[lengths_idx] =
+ reassemble_list_array(lengths, values, element_field,
lengths_rows);
+ processed[idx] = true;
+ }
}
- // Handle ALL_NULL list columns (where no children were created because
all null)
+ // Handle ALL_NULL list/map columns
for (i, lt) in logical_types.iter().enumerate() {
- if matches!(lt, DataType::List(_)) && !children.iter().any(|c|
c.parent_logical_col == i) {
+ if (matches!(lt, DataType::List(_)) || matches!(lt, DataType::Map(_,
_)))
+ && !children.iter().any(|c| c.parent_logical_col == i)
+ {
arrays[i] = arrow_array::new_null_array(lt, num_rows);
}
}
}
+fn reassemble_map_array(
+ lengths: ArrayRef,
+ keys: ArrayRef,
+ values: ArrayRef,
+ entries_field: &Arc<Field>,
+ sorted: bool,
+ num_rows: usize,
+) -> ArrayRef {
+ let lengths_arr = lengths
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .expect("map lengths must be Int32Array");
+
+ let mut offsets: Vec<i32> = Vec::with_capacity(num_rows + 1);
+ offsets.push(0);
+ for i in 0..num_rows {
+ let len = if lengths_arr.is_null(i) {
+ 0
+ } else {
+ lengths_arr.value(i)
+ };
+ offsets.push(offsets.last().unwrap() + len);
+ }
+
+ let null_buf = lengths_arr.nulls().cloned();
+ let entries = StructArray::new(
+ match entries_field.data_type() {
+ DataType::Struct(fields) => fields.clone(),
+ _ => unreachable!(),
+ },
+ vec![keys, values],
+ None,
+ );
+
+ Arc::new(MapArray::new(
+ entries_field.clone(),
+ OffsetBuffer::new(ScalarBuffer::from(offsets)),
+ entries,
+ null_buf,
+ sorted,
+ ))
+}
+
fn reassemble_list_array(
lengths: ArrayRef,
values: ArrayRef,
diff --git a/core/src/bucket_writer.rs b/core/src/bucket_writer.rs
index 4bfd606..a826bb6 100644
--- a/core/src/bucket_writer.rs
+++ b/core/src/bucket_writer.rs
@@ -153,43 +153,81 @@ impl BucketWriter {
let start_row = self.num_rows;
let mut total_size = 0;
- // Split List arrays into lengths + values, collect child writes
- let mut list_splits: Vec<(usize, Int32Array, ArrayRef)> = Vec::new();
+ // Split List/Map arrays into lengths + child values
+ struct ColSplit {
+ col: usize,
+ lengths: Int32Array,
+ child_arrays: Vec<ArrayRef>, // 1 for List, 2 for Map (keys +
values)
+ }
+ let mut splits: Vec<ColSplit> = Vec::new();
+ let mut seen_cols = Vec::new();
for child in &self.children {
let col = child.parent_logical_col;
- if list_splits.iter().any(|(c, _, _)| *c == col) {
- continue; // already split this column (nested lists share
parent)
+ if seen_cols.contains(&col) {
+ continue;
+ }
+ seen_cols.push(col);
+
+ match data_types[col] {
+ DataType::List(_) => {
+ let list_array = arrays[col]
+ .as_any()
+ .downcast_ref::<ListArray>()
+ .ok_or_else(|| {
+ io::Error::new(io::ErrorKind::InvalidInput,
"expected ListArray")
+ })?;
+ let lengths = extract_list_lengths(list_array);
+ let values = flatten_list_values(list_array);
+ splits.push(ColSplit {
+ col,
+ lengths,
+ child_arrays: vec![values],
+ });
+ }
+ DataType::Map(_, _) => {
+ let map_array =
+ arrays[col]
+ .as_any()
+ .downcast_ref::<MapArray>()
+ .ok_or_else(|| {
+ io::Error::new(io::ErrorKind::InvalidInput,
"expected MapArray")
+ })?;
+ let lengths = extract_map_lengths(map_array);
+ let (keys, values) = flatten_map_entries(map_array);
+ splits.push(ColSplit {
+ col,
+ lengths,
+ child_arrays: vec![keys, values],
+ });
+ }
+ _ => {}
}
- let list_array = arrays[col]
- .as_any()
- .downcast_ref::<ListArray>()
- .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput,
"expected ListArray"))?;
- let lengths = extract_list_lengths(list_array);
- let values = flatten_list_values(list_array);
- list_splits.push((col, lengths, values));
}
- // Write primary columns (lengths for ARRAY cols, regular data for
others)
+ // Write primary columns (lengths for ARRAY/MAP cols, regular data for
others)
let int32_dt = DataType::Int32;
for i in 0..self.num_primary {
- if let Some(split) = list_splits.iter().find(|(col, _, _)| *col ==
i) {
- total_size += self.append_array_column(i, &split.1, &int32_dt,
start_row)?;
+ if let Some(split) = splits.iter().find(|s| s.col == i) {
+ total_size += self.append_array_column(i, &split.lengths,
&int32_dt, start_row)?;
} else {
total_size += self.append_array_column(i, arrays[i],
data_types[i], start_row)?;
}
}
- // Recursively flatten all list levels and write to child columns
- let mut pending: Vec<(usize, ArrayRef)> = Vec::new(); // (child_index,
values)
- for split in &list_splits {
- let parent = split.0;
- // Find the first child for this parent
- if let Some(child_idx) = self
+ // Write child columns — for List: 1 child (values), for Map: 2
children (keys, values)
+ let mut pending: Vec<(usize, ArrayRef)> = Vec::new();
+ for split in &splits {
+ let parent_children: Vec<usize> = self
.children
.iter()
- .position(|c| c.parent_logical_col == parent)
- {
- pending.push((child_idx, split.2.clone()));
+ .enumerate()
+ .filter(|(_, c)| c.parent_logical_col == split.col)
+ .map(|(idx, _)| idx)
+ .collect();
+ for (i, child_arr) in split.child_arrays.iter().enumerate() {
+ if i < parent_children.len() {
+ pending.push((parent_children[i], child_arr.clone()));
+ }
}
}
@@ -200,24 +238,45 @@ impl BucketWriter {
let phys_idx = self.children[child_idx].physical_index;
let child_start = self.children[child_idx].num_elements;
- if let DataType::List(_) = values.data_type() {
- let inner_list =
values.as_any().downcast_ref::<ListArray>().ok_or_else(|| {
- io::Error::new(io::ErrorKind::InvalidInput, "expected
ListArray")
- })?;
- let inner_lengths = extract_list_lengths(inner_list);
- let inner_values = flatten_list_values(inner_list);
- total_size +=
- self.append_array_column(phys_idx, &inner_lengths,
&int32_dt, child_start)?;
- self.children[child_idx].num_elements += inner_lengths.len();
- // Queue the inner values for the next child
- if child_idx + 1 < self.children.len() {
- pending.push((child_idx + 1, inner_values));
+ match values.data_type() {
+ DataType::List(_) => {
+ let inner_list =
+
values.as_any().downcast_ref::<ListArray>().ok_or_else(|| {
+ io::Error::new(io::ErrorKind::InvalidInput,
"expected ListArray")
+ })?;
+ let inner_lengths = extract_list_lengths(inner_list);
+ let inner_values = flatten_list_values(inner_list);
+ total_size +=
+ self.append_array_column(phys_idx, &inner_lengths,
&int32_dt, child_start)?;
+ self.children[child_idx].num_elements +=
inner_lengths.len();
+ if child_idx + 1 < self.children.len() {
+ pending.push((child_idx + 1, inner_values));
+ }
+ }
+ DataType::Map(_, _) => {
+ let inner_map =
+
values.as_any().downcast_ref::<MapArray>().ok_or_else(|| {
+ io::Error::new(io::ErrorKind::InvalidInput,
"expected MapArray")
+ })?;
+ let inner_lengths = extract_map_lengths(inner_map);
+ let (inner_keys, inner_values) =
flatten_map_entries(inner_map);
+ total_size +=
+ self.append_array_column(phys_idx, &inner_lengths,
&int32_dt, child_start)?;
+ self.children[child_idx].num_elements +=
inner_lengths.len();
+ // Queue keys and values for the next children
+ if child_idx + 2 < self.children.len() {
+ pending.push((child_idx + 2, inner_values));
+ }
+ if child_idx + 1 < self.children.len() {
+ pending.push((child_idx + 1, inner_keys));
+ }
+ }
+ _ => {
+ let elem_dt =
self.children[child_idx].element_field.data_type().clone();
+ total_size +=
+ self.append_array_column(phys_idx, values.as_ref(),
&elem_dt, child_start)?;
+ self.children[child_idx].num_elements += values.len();
}
- } else {
- let elem_dt =
self.children[child_idx].element_field.data_type().clone();
- total_size +=
- self.append_array_column(phys_idx, values.as_ref(),
&elem_dt, child_start)?;
- self.children[child_idx].num_elements += values.len();
}
}
@@ -1044,6 +1103,58 @@ fn i128_to_biginteger_bytes(val: i128) -> Vec<u8> {
bytes[start..].to_vec()
}
+fn extract_map_lengths(map_array: &MapArray) -> Int32Array {
+ let offsets = map_array.value_offsets();
+ let num_rows = map_array.len();
+ let mut lengths = Vec::with_capacity(num_rows);
+ for i in 0..num_rows {
+ if map_array.is_null(i) {
+ lengths.push(0);
+ } else {
+ lengths.push(offsets[i + 1] - offsets[i]);
+ }
+ }
+ let null_buf = map_array.nulls().cloned();
+ Int32Array::new(ScalarBuffer::from(lengths), null_buf)
+}
+
+fn flatten_map_entries(map_array: &MapArray) -> (ArrayRef, ArrayRef) {
+ let offsets = map_array.value_offsets();
+ let num_rows = map_array.len();
+ let keys = map_array.keys();
+ let values = map_array.values();
+
+ if map_array.null_count() == 0 {
+ let start = offsets[0] as usize;
+ let end = offsets[num_rows] as usize;
+ return (
+ keys.slice(start, end - start),
+ values.slice(start, end - start),
+ );
+ }
+
+ let mut indices: Vec<u32> = Vec::new();
+ for i in 0..num_rows {
+ if !map_array.is_null(i) {
+ let start = offsets[i] as u32;
+ let end = offsets[i + 1] as u32;
+ for idx in start..end {
+ indices.push(idx);
+ }
+ }
+ }
+
+ if indices.is_empty() {
+ return (keys.slice(0, 0), values.slice(0, 0));
+ }
+
+ let idx_array = UInt32Array::from(indices);
+ (
+ take_array(keys.as_ref(), &idx_array),
+ take_array(values.as_ref(), &idx_array),
+ )
+}
+
fn extract_list_lengths(list_array: &ListArray) -> Int32Array {
let offsets = list_array.value_offsets();
let num_rows = list_array.len();
@@ -1268,6 +1379,58 @@ fn take_array(array: &dyn Array, indices: &UInt32Array)
-> ArrayRef {
null_buf,
))
}
+ DataType::Map(entries_field, sorted) => {
+ let src = array.as_any().downcast_ref::<MapArray>().unwrap();
+ let mut offsets_builder = vec![0i32];
+ let mut child_indices: Vec<u32> = Vec::new();
+ for i in 0..indices.len() {
+ let idx = indices.value(i) as usize;
+ let start = src.value_offsets()[idx] as u32;
+ let end = src.value_offsets()[idx + 1] as u32;
+ for ci in start..end {
+ child_indices.push(ci);
+ }
+ offsets_builder.push(child_indices.len() as i32);
+ }
+ let child_idx_arr = UInt32Array::from(child_indices);
+ let new_keys = take_array(src.keys().as_ref(), &child_idx_arr);
+ let new_values = take_array(src.values().as_ref(), &child_idx_arr);
+ let null_buf = if !indices.is_empty() {
+ let mut bm = vec![0u8; indices.len().div_ceil(8)];
+ for i in 0..indices.len() {
+ let idx = indices.value(i) as usize;
+ if !src.is_null(idx) {
+ bm[i / 8] |= 1 << (i % 8);
+ }
+ }
+ if bm.iter().all(|&b| b == 0xFF) || indices.is_empty() {
+ None
+ } else {
+ Some(NullBuffer::new(BooleanBuffer::new(
+ Buffer::from_vec(bm),
+ 0,
+ indices.len(),
+ )))
+ }
+ } else {
+ None
+ };
+ let entries_struct = StructArray::new(
+ match entries_field.data_type() {
+ DataType::Struct(fields) => fields.clone(),
+ _ => unreachable!(),
+ },
+ vec![new_keys, new_values],
+ None,
+ );
+ Arc::new(MapArray::new(
+ entries_field.clone(),
+ OffsetBuffer::new(ScalarBuffer::from(offsets_builder)),
+ entries_struct,
+ null_buf,
+ *sorted,
+ ))
+ }
other => panic!("take_array: unsupported DataType {:?}", other),
}
}
@@ -1276,7 +1439,7 @@ pub(crate) fn expand_col_types(col_types: &[&DataType])
-> (Vec<DataType>, Vec<C
let mut physical_types: Vec<DataType> = col_types
.iter()
.map(|t| {
- if matches!(t, DataType::List(_)) {
+ if matches!(t, DataType::List(_) | DataType::Map(_, _)) {
DataType::Int32
} else {
(*t).clone()
@@ -1286,14 +1449,34 @@ pub(crate) fn expand_col_types(col_types: &[&DataType])
-> (Vec<DataType>, Vec<C
let mut children = Vec::new();
for (i, t) in col_types.iter().enumerate() {
- if let DataType::List(element_field) = t {
- expand_child(i, element_field, &mut physical_types, &mut children);
- }
+ expand_type(i, t, &mut physical_types, &mut children);
}
(physical_types, children)
}
-fn expand_child(
+fn expand_type(
+ parent_logical: usize,
+ dt: &DataType,
+ physical_types: &mut Vec<DataType>,
+ children: &mut Vec<ChildColumnMeta>,
+) {
+ match dt {
+ DataType::List(element_field) => {
+ expand_element(parent_logical, element_field, physical_types,
children);
+ }
+ DataType::Map(entries_field, _) => {
+ if let DataType::Struct(fields) = entries_field.data_type() {
+ // Keys child — recurse if key is complex
+ expand_element(parent_logical, &fields[0], physical_types,
children);
+ // Values child — recurse if value is complex
+ expand_element(parent_logical, &fields[1], physical_types,
children);
+ }
+ }
+ _ => {}
+ }
+}
+
+fn expand_element(
parent_logical: usize,
element_field: &Arc<Field>,
physical_types: &mut Vec<DataType>,
@@ -1302,23 +1485,28 @@ fn expand_child(
let elem_dt = element_field.data_type();
let child_phys_idx = physical_types.len();
- if let DataType::List(inner_field) = elem_dt {
- physical_types.push(DataType::Int32);
- children.push(ChildColumnMeta {
- parent_logical_col: parent_logical,
- physical_index: child_phys_idx,
- element_field: element_field.clone(),
- num_elements: 0,
- });
- expand_child(parent_logical, inner_field, physical_types, children);
- } else {
- physical_types.push(elem_dt.clone());
- children.push(ChildColumnMeta {
- parent_logical_col: parent_logical,
- physical_index: child_phys_idx,
- element_field: element_field.clone(),
- num_elements: 0,
- });
+ match elem_dt {
+ DataType::List(_) | DataType::Map(_, _) => {
+ // Complex element: this child stores lengths (INT32), recurse for
deeper levels
+ physical_types.push(DataType::Int32);
+ children.push(ChildColumnMeta {
+ parent_logical_col: parent_logical,
+ physical_index: child_phys_idx,
+ element_field: element_field.clone(),
+ num_elements: 0,
+ });
+ expand_type(parent_logical, elem_dt, physical_types, children);
+ }
+ _ => {
+ // Primitive element: direct leaf column
+ physical_types.push(elem_dt.clone());
+ children.push(ChildColumnMeta {
+ parent_logical_col: parent_logical,
+ physical_index: child_phys_idx,
+ element_field: element_field.clone(),
+ num_elements: 0,
+ });
+ }
}
}
diff --git a/core/src/types.rs b/core/src/types.rs
index 9326c57..7f4a06d 100644
--- a/core/src/types.rs
+++ b/core/src/types.rs
@@ -104,12 +104,31 @@ pub fn validate_data_type(dt: &DataType) -> Result<(),
String> {
},
DataType::Struct(fields) if is_timestamp_nanos_struct(fields) =>
Ok(()),
DataType::List(field) => {
- if let DataType::Struct(fields) = field.data_type() {
+ let elem = field.data_type();
+ if let DataType::Struct(fields) = elem {
if is_timestamp_nanos_struct(fields) {
return Err("ARRAY<legacy timestamp nanos struct> is not
supported".to_string());
}
}
- validate_data_type(field.data_type())
+ validate_data_type(elem)
+ }
+ DataType::Map(entries_field, sorted) => {
+ if *sorted {
+ return Err("sorted MAP is not supported".to_string());
+ }
+ if let DataType::Struct(fields) = entries_field.data_type() {
+ if fields.len() != 2 {
+ return Err("MAP entries struct must have exactly 2
fields".to_string());
+ }
+ let key_dt = fields[0].data_type();
+ if matches!(key_dt, DataType::List(_) | DataType::Map(_, _)) {
+ return Err("MAP key type cannot be ARRAY or
MAP".to_string());
+ }
+ validate_data_type(key_dt)?;
+ validate_data_type(fields[1].data_type())
+ } else {
+ Err("MAP entries field must be a Struct".to_string())
+ }
}
_ => Err(format!("unsupported DataType: {:?}", dt)),
}
@@ -133,6 +152,7 @@ pub fn data_type_to_type_byte(dt: &DataType) -> u8 {
DataType::Timestamp(_, Some(_)) => 17,
DataType::Struct(fields) if is_timestamp_nanos_struct(fields) => 16,
DataType::List(_) => 18,
+ DataType::Map(_, _) => 19,
_ => panic!("unsupported DataType for serialization: {:?}", dt),
}
}
@@ -192,10 +212,52 @@ pub fn serialize_field(field: &Field, buf: &mut Vec<u8>) {
buf.extend_from_slice(name_bytes);
serialize_field(element_field, buf);
}
+ DataType::Map(entries_field, _sorted) => {
+ // entries field name
+ let entries_name = entries_field.name().as_bytes();
+ varint::encode(buf, entries_name.len() as u32);
+ buf.extend_from_slice(entries_name);
+ if let DataType::Struct(fields) = entries_field.data_type() {
+ // key field name + type
+ let key_name = fields[0].name().as_bytes();
+ varint::encode(buf, key_name.len() as u32);
+ buf.extend_from_slice(key_name);
+ serialize_field(&fields[0], buf);
+ // value field name + type
+ let val_name = fields[1].name().as_bytes();
+ varint::encode(buf, val_name.len() as u32);
+ buf.extend_from_slice(val_name);
+ serialize_field(&fields[1], buf);
+ }
+ }
_ => {}
}
}
+fn read_utf8_field_name(
+ buf: &[u8],
+ pos: &mut usize,
+ len: usize,
+ context: &str,
+) -> Result<String, std::io::Error> {
+ if *pos + len > buf.len() {
+ return Err(std::io::Error::new(
+ std::io::ErrorKind::UnexpectedEof,
+ format!("type: not enough bytes for {} field name", context),
+ ));
+ }
+ let name = std::str::from_utf8(&buf[*pos..*pos + len])
+ .map_err(|_| {
+ std::io::Error::new(
+ std::io::ErrorKind::InvalidData,
+ format!("type: invalid UTF-8 in {} field name", context),
+ )
+ })?
+ .to_string();
+ *pos += len;
+ Ok(name)
+}
+
pub fn deserialize_field(name: &str, buf: &[u8], pos: &mut usize) ->
Result<Field, std::io::Error> {
if *pos + 1 >= buf.len() {
return Err(std::io::Error::new(
@@ -293,6 +355,25 @@ pub fn deserialize_field(name: &str, buf: &[u8], pos: &mut
usize) -> Result<Fiel
let element_field = deserialize_field(&element_name, buf, pos)?;
DataType::List(std::sync::Arc::new(element_field))
}
+ 19 => {
+ // entries field name
+ let entries_name_len = varint::decode(buf, pos)? as usize;
+ let entries_name = read_utf8_field_name(buf, pos,
entries_name_len, "MAP entries")?;
+ // key field name + type
+ let key_name_len = varint::decode(buf, pos)? as usize;
+ let key_name = read_utf8_field_name(buf, pos, key_name_len, "MAP
key")?;
+ let key_field = deserialize_field(&key_name, buf, pos)?;
+ // value field name + type
+ let val_name_len = varint::decode(buf, pos)? as usize;
+ let val_name = read_utf8_field_name(buf, pos, val_name_len, "MAP
value")?;
+ let value_field = deserialize_field(&val_name, buf, pos)?;
+ let entries_field = Field::new(
+ &entries_name,
+ DataType::Struct(Fields::from(vec![key_field, value_field])),
+ false,
+ );
+ DataType::Map(std::sync::Arc::new(entries_field), false)
+ }
_ => {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
diff --git a/core/tests/array_type_test.rs b/core/tests/array_type_test.rs
index ad899e2..53d9ba0 100644
--- a/core/tests/array_type_test.rs
+++ b/core/tests/array_type_test.rs
@@ -854,3 +854,459 @@ fn test_project_one_array_from_multi_array_paged() {
assert_eq!(r0.value(0), 1);
assert_eq!(r0.value(1), 2);
}
+
+// ======================== MAP Tests ========================
+
+#[test]
+fn test_map_int_string_basic() {
+ let schema = Schema::new(vec![
+ Field::new("id", DataType::Int32, false),
+ Field::new(
+ "map",
+ DataType::Map(
+ Arc::new(Field::new(
+ "entries",
+ DataType::Struct(arrow_schema::Fields::from(vec![
+ Field::new("keys", DataType::Int32, false),
+ Field::new("values", DataType::Utf8, true),
+ ])),
+ false,
+ )),
+ false,
+ ),
+ true,
+ ),
+ ]);
+
+ let ids = Int32Array::from(vec![1, 2, 3, 4]);
+
+ let key_builder = Int32Builder::new();
+ let value_builder = StringBuilder::new();
+ let mut map_builder = MapBuilder::new(None, key_builder, value_builder);
+
+ // row 0: {1: "a", 2: "b"}
+ map_builder.keys().append_value(1);
+ map_builder.values().append_value("a");
+ map_builder.keys().append_value(2);
+ map_builder.values().append_value("b");
+ map_builder.append(true).unwrap();
+
+ // row 1: null
+ map_builder.append(false).unwrap();
+
+ // row 2: {3: null}
+ map_builder.keys().append_value(3);
+ map_builder.values().append_null();
+ map_builder.append(true).unwrap();
+
+ // row 3: {} (empty)
+ map_builder.append(true).unwrap();
+
+ let batch = RecordBatch::try_new(
+ Arc::new(schema.clone()),
+ vec![Arc::new(ids), Arc::new(map_builder.finish())],
+ )
+ .unwrap();
+
+ let result = roundtrip(&schema, &[batch]);
+ let rb = &result[0];
+ assert_eq!(rb.num_rows(), 4);
+
+ let map_col = rb.column(1).as_any().downcast_ref::<MapArray>().unwrap();
+ assert_eq!(map_col.len(), 4);
+ assert!(!map_col.is_null(0));
+ assert!(map_col.is_null(1));
+ assert!(!map_col.is_null(2));
+ assert!(!map_col.is_null(3));
+
+ // row 0: {1: "a", 2: "b"}
+ let keys0 = map_col
+ .keys()
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .unwrap();
+ let vals0 = map_col
+ .values()
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .unwrap();
+ assert_eq!(map_col.value_offsets()[0], 0);
+ assert_eq!(map_col.value_offsets()[1], 2);
+ assert_eq!(keys0.value(0), 1);
+ assert_eq!(keys0.value(1), 2);
+ assert_eq!(vals0.value(0), "a");
+ assert_eq!(vals0.value(1), "b");
+
+ // row 2: {3: null}
+ assert_eq!(map_col.value_offsets()[3] - map_col.value_offsets()[2], 1);
+
+ // row 3: empty
+ assert_eq!(map_col.value_offsets()[4] - map_col.value_offsets()[3], 0);
+}
+
+#[test]
+fn test_map_all_null() {
+ let entries_field = Field::new(
+ "entries",
+ DataType::Struct(arrow_schema::Fields::from(vec![
+ Field::new("keys", DataType::Utf8, false),
+ Field::new("values", DataType::Int64, true),
+ ])),
+ false,
+ );
+ let schema = Schema::new(vec![Field::new(
+ "m",
+ DataType::Map(Arc::new(entries_field), false),
+ true,
+ )]);
+
+ let key_builder = StringBuilder::new();
+ let value_builder = Int64Builder::new();
+ let mut map_builder = MapBuilder::new(None, key_builder, value_builder);
+ map_builder.append(false).unwrap();
+ map_builder.append(false).unwrap();
+
+ let batch = RecordBatch::try_new(
+ Arc::new(schema.clone()),
+ vec![Arc::new(map_builder.finish())],
+ )
+ .unwrap();
+
+ let result = roundtrip(&schema, &[batch]);
+ let col = result[0]
+ .column(0)
+ .as_any()
+ .downcast_ref::<MapArray>()
+ .unwrap();
+ assert_eq!(col.len(), 2);
+ assert!(col.is_null(0));
+ assert!(col.is_null(1));
+}
+
+#[test]
+fn test_map_with_other_columns() {
+ let entries_field = Field::new(
+ "entries",
+ DataType::Struct(arrow_schema::Fields::from(vec![
+ Field::new("keys", DataType::Utf8, false),
+ Field::new("values", DataType::Float64, true),
+ ])),
+ false,
+ );
+ let element_field = Arc::new(Field::new("item", DataType::Int32, true));
+ let schema = Schema::new(vec![
+ Field::new("id", DataType::Int64, false),
+ Field::new("tags", DataType::List(element_field.clone()), true),
+ Field::new("props", DataType::Map(Arc::new(entries_field), false),
true),
+ ]);
+
+ let ids = Int64Array::from(vec![1, 2]);
+
+ let mut list_builder = ListBuilder::new(Int32Builder::new());
+ list_builder.values().append_value(10);
+ list_builder.append(true);
+ list_builder.append(false);
+
+ let key_builder = StringBuilder::new();
+ let value_builder = Float64Builder::new();
+ let mut map_builder = MapBuilder::new(None, key_builder, value_builder);
+ map_builder.keys().append_value("x");
+ map_builder.values().append_value(1.5);
+ map_builder.append(true).unwrap();
+ map_builder.keys().append_value("y");
+ map_builder.values().append_value(2.5);
+ map_builder.keys().append_value("z");
+ map_builder.values().append_value(3.5);
+ map_builder.append(true).unwrap();
+
+ let mut opts = WriterOptions::default();
+ opts.num_buckets = 1;
+ let batch = RecordBatch::try_new(
+ Arc::new(schema.clone()),
+ vec![
+ Arc::new(ids),
+ Arc::new(list_builder.finish()),
+ Arc::new(map_builder.finish()),
+ ],
+ )
+ .unwrap();
+
+ let result = roundtrip_with_options(&schema, &[batch], opts);
+ let rb = &result[0];
+ assert_eq!(rb.num_rows(), 2);
+
+ let result_ids =
rb.column(0).as_any().downcast_ref::<Int64Array>().unwrap();
+ assert_eq!(result_ids.value(0), 1);
+
+ let result_tags =
rb.column(1).as_any().downcast_ref::<ListArray>().unwrap();
+ assert!(!result_tags.is_null(0));
+ assert!(result_tags.is_null(1));
+
+ let result_props =
rb.column(2).as_any().downcast_ref::<MapArray>().unwrap();
+ assert_eq!(result_props.len(), 2);
+ assert_eq!(
+ result_props.value_offsets()[1] - result_props.value_offsets()[0],
+ 1
+ );
+ assert_eq!(
+ result_props.value_offsets()[2] - result_props.value_offsets()[1],
+ 2
+ );
+}
+
+// ======================== Nested ARRAY/MAP Tests ========================
+
+#[test]
+fn test_array_of_map() {
+ // ARRAY<MAP<INT32, UTF8>>
+ let map_type = DataType::Map(
+ Arc::new(Field::new(
+ "entries",
+ DataType::Struct(arrow_schema::Fields::from(vec![
+ Field::new("keys", DataType::Int32, false),
+ Field::new("values", DataType::Utf8, true),
+ ])),
+ false,
+ )),
+ false,
+ );
+ let schema = Schema::new(vec![Field::new(
+ "col",
+ DataType::List(Arc::new(Field::new("item", map_type.clone(), true))),
+ true,
+ )]);
+
+ // Build: row 0 = [{1:"a"}, {2:"b", 3:"c"}], row 1 = null
+ let key_builder = Int32Builder::new();
+ let val_builder = StringBuilder::new();
+ let map_builder = MapBuilder::new(None, key_builder, val_builder);
+ let mut list_builder = ListBuilder::new(map_builder);
+
+ // row 0: [{1:"a"}, {2:"b", 3:"c"}]
+ list_builder.values().keys().append_value(1);
+ list_builder.values().values().append_value("a");
+ list_builder.values().append(true).unwrap();
+ list_builder.values().keys().append_value(2);
+ list_builder.values().values().append_value("b");
+ list_builder.values().keys().append_value(3);
+ list_builder.values().values().append_value("c");
+ list_builder.values().append(true).unwrap();
+ list_builder.append(true);
+
+ // row 1: null
+ list_builder.append(false);
+
+ let batch = RecordBatch::try_new(
+ Arc::new(schema.clone()),
+ vec![Arc::new(list_builder.finish())],
+ )
+ .unwrap();
+
+ let result = roundtrip(&schema, &[batch]);
+ let col = result[0]
+ .column(0)
+ .as_any()
+ .downcast_ref::<ListArray>()
+ .unwrap();
+ assert_eq!(col.len(), 2);
+ assert!(!col.is_null(0));
+ assert!(col.is_null(1));
+
+ let row0 = col.value(0);
+ let maps = row0.as_any().downcast_ref::<MapArray>().unwrap();
+ assert_eq!(maps.len(), 2);
+ assert_eq!(maps.value_length(0), 1); // {1:"a"}
+ assert_eq!(maps.value_length(1), 2); // {2:"b", 3:"c"}
+}
+
+#[test]
+fn test_map_with_array_value() {
+ // MAP<UTF8, ARRAY<INT32>>
+ let list_type = DataType::List(Arc::new(Field::new("item",
DataType::Int32, true)));
+ let schema = Schema::new(vec![Field::new(
+ "col",
+ DataType::Map(
+ Arc::new(Field::new(
+ "entries",
+ DataType::Struct(arrow_schema::Fields::from(vec![
+ Field::new("keys", DataType::Utf8, false),
+ Field::new("values", list_type.clone(), true),
+ ])),
+ false,
+ )),
+ false,
+ ),
+ true,
+ )]);
+
+ // Build: row 0 = {"x": [1,2], "y": [3]}, row 1 = {}
+ let key_builder = StringBuilder::new();
+ let val_builder = ListBuilder::new(Int32Builder::new());
+ let mut map_builder = MapBuilder::new(None, key_builder, val_builder);
+
+ // row 0
+ map_builder.keys().append_value("x");
+ map_builder.values().values().append_value(1);
+ map_builder.values().values().append_value(2);
+ map_builder.values().append(true);
+ map_builder.keys().append_value("y");
+ map_builder.values().values().append_value(3);
+ map_builder.values().append(true);
+ map_builder.append(true).unwrap();
+
+ // row 1: empty
+ map_builder.append(true).unwrap();
+
+ let batch = RecordBatch::try_new(
+ Arc::new(schema.clone()),
+ vec![Arc::new(map_builder.finish())],
+ )
+ .unwrap();
+
+ let result = roundtrip(&schema, &[batch]);
+ let col = result[0]
+ .column(0)
+ .as_any()
+ .downcast_ref::<MapArray>()
+ .unwrap();
+ assert_eq!(col.len(), 2);
+ assert_eq!(col.value_length(0), 2); // 2 entries
+ assert_eq!(col.value_length(1), 0); // empty
+
+ let keys = col.keys().as_any().downcast_ref::<StringArray>().unwrap();
+ assert_eq!(keys.value(0), "x");
+ assert_eq!(keys.value(1), "y");
+
+ let vals = col.values().as_any().downcast_ref::<ListArray>().unwrap();
+ let v0 = vals
+ .value(0)
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .unwrap()
+ .clone();
+ assert_eq!(v0.len(), 2);
+ assert_eq!(v0.value(0), 1);
+ assert_eq!(v0.value(1), 2);
+ let v1 = vals
+ .value(1)
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .unwrap()
+ .clone();
+ assert_eq!(v1.len(), 1);
+ assert_eq!(v1.value(0), 3);
+}
+
+// ======================== MAP Schema Validation Tests
========================
+
+#[test]
+fn test_map_custom_field_names_roundtrip() {
+ let schema = Schema::new(vec![Field::new(
+ "m",
+ DataType::Map(
+ Arc::new(Field::new(
+ "my_entries",
+ DataType::Struct(arrow_schema::Fields::from(vec![
+ Field::new("k_custom", DataType::Int32, false),
+ Field::new("v_custom", DataType::Utf8, true),
+ ])),
+ false,
+ )),
+ false,
+ ),
+ true,
+ )]);
+
+ let key_builder = Int32Builder::new();
+ let value_builder = StringBuilder::new();
+ let field_names = arrow_array::builder::MapFieldNames {
+ entry: "my_entries".to_string(),
+ key: "k_custom".to_string(),
+ value: "v_custom".to_string(),
+ };
+ let mut map_builder = MapBuilder::new(Some(field_names), key_builder,
value_builder);
+ map_builder.keys().append_value(1);
+ map_builder.values().append_value("a");
+ map_builder.append(true).unwrap();
+
+ let batch = RecordBatch::try_new(
+ Arc::new(schema.clone()),
+ vec![Arc::new(map_builder.finish())],
+ )
+ .unwrap();
+
+ let result = roundtrip(&schema, &[batch]);
+ let map_type = result[0].schema().field(0).data_type().clone();
+ match map_type {
+ DataType::Map(entries, sorted) => {
+ assert!(!sorted);
+ assert_eq!(entries.name(), "my_entries");
+ if let DataType::Struct(fields) = entries.data_type() {
+ assert_eq!(fields[0].name(), "k_custom");
+ assert_eq!(fields[1].name(), "v_custom");
+ } else {
+ panic!("entries should be struct");
+ }
+ }
+ other => panic!("expected map, got {:?}", other),
+ }
+}
+
+#[test]
+fn test_sorted_map_rejected() {
+ let schema = Schema::new(vec![Field::new(
+ "m",
+ DataType::Map(
+ Arc::new(Field::new(
+ "entries",
+ DataType::Struct(arrow_schema::Fields::from(vec![
+ Field::new("keys", DataType::Int32, false),
+ Field::new("values", DataType::Utf8, true),
+ ])),
+ false,
+ )),
+ true, // sorted = true
+ ),
+ true,
+ )]);
+
+ let out = MemOutputFile::new();
+ match MosaicWriter::new(out, &schema, WriterOptions::default()) {
+ Ok(_) => panic!("sorted MAP should be rejected"),
+ Err(e) => assert!(
+ e.to_string().contains("sorted"),
+ "error should mention sorted: {}",
+ e
+ ),
+ }
+}
+
+#[test]
+fn test_complex_map_key_rejected() {
+ let key_type = DataType::List(Arc::new(Field::new("item", DataType::Int32,
true)));
+ let schema = Schema::new(vec![Field::new(
+ "m",
+ DataType::Map(
+ Arc::new(Field::new(
+ "entries",
+ DataType::Struct(arrow_schema::Fields::from(vec![
+ Field::new("keys", key_type, false),
+ Field::new("values", DataType::Utf8, true),
+ ])),
+ false,
+ )),
+ false,
+ ),
+ true,
+ )]);
+
+ let out = MemOutputFile::new();
+ match MosaicWriter::new(out, &schema, WriterOptions::default()) {
+ Ok(_) => panic!("complex MAP key should be rejected"),
+ Err(e) => assert!(
+ e.to_string().contains("MAP key"),
+ "error should mention MAP key: {}",
+ e
+ ),
+ }
+}
diff --git a/core/tests/gen_fixtures.rs b/core/tests/gen_fixtures.rs
index 2cd2df3..5ac49f8 100644
--- a/core/tests/gen_fixtures.rs
+++ b/core/tests/gen_fixtures.rs
@@ -216,3 +216,91 @@ fn test_v1_with_array_golden_readable() {
.clone();
assert_eq!(r3.len(), 0);
}
+
+/// Generate the deterministic MAP file.
+/// Schema: id(INT32 NOT NULL), props(MAP<INT32, UTF8>)
+/// Data: 3 rows — {1:"a", 2:"b"}, null, {}
+/// Options: num_buckets=1, compression=none
+fn gen_with_map() -> Vec<u8> {
+ let schema = Schema::new(vec![
+ Field::new("id", DataType::Int32, false),
+ Field::new(
+ "props",
+ DataType::Map(
+ Arc::new(Field::new(
+ "entries",
+ DataType::Struct(arrow_schema::Fields::from(vec![
+ Field::new("keys", DataType::Int32, false),
+ Field::new("values", DataType::Utf8, true),
+ ])),
+ false,
+ )),
+ false,
+ ),
+ true,
+ ),
+ ]);
+
+ let ids = Int32Array::from(vec![1, 2, 3]);
+ let key_builder = Int32Builder::new();
+ let value_builder = StringBuilder::new();
+ let mut map_builder = MapBuilder::new(None, key_builder, value_builder);
+ // row 0: {1: "a", 2: "b"}
+ map_builder.keys().append_value(1);
+ map_builder.values().append_value("a");
+ map_builder.keys().append_value(2);
+ map_builder.values().append_value("b");
+ map_builder.append(true).unwrap();
+ // row 1: null
+ map_builder.append(false).unwrap();
+ // row 2: {} (empty)
+ map_builder.append(true).unwrap();
+
+ let batch = RecordBatch::try_new(
+ Arc::new(schema.clone()),
+ vec![Arc::new(ids), Arc::new(map_builder.finish())],
+ )
+ .unwrap();
+
+ let out = MemOutputFile::new();
+ let opts = WriterOptions {
+ num_buckets: 1,
+ compression: 0,
+ ..WriterOptions::default()
+ };
+ let mut writer = MosaicWriter::new(out, &schema, opts).unwrap();
+ writer.write_batch(&batch).unwrap();
+ writer.close().unwrap();
+ writer.output().buf.clone()
+}
+
+#[test]
+fn test_v1_with_map_binary_stable() {
+ let generated = gen_with_map();
+ let golden = std::fs::read(golden_path("v1_with_map.mosaic"))
+ .expect("golden file missing — regenerate with gen_with_map()");
+ assert_eq!(
+ generated, golden,
+ "MAP file differs from golden — format may have changed
unintentionally"
+ );
+}
+
+#[test]
+fn test_v1_with_map_golden_readable() {
+ let data = std::fs::read(golden_path("v1_with_map.mosaic")).unwrap();
+ let input = ByteArrayInputFile { data: data.clone() };
+ let reader = MosaicReader::new(input, data.len() as u64).unwrap();
+ let mut rg = reader.row_group_reader(0).unwrap();
+ let rb = rg.read_columns().unwrap();
+ assert_eq!(rb.num_rows(), 3);
+
+ let map_col = rb.column(1).as_any().downcast_ref::<MapArray>().unwrap();
+ assert!(!map_col.is_null(0));
+ assert!(map_col.is_null(1));
+ assert!(!map_col.is_null(2));
+
+ // Row 0: 2 entries
+ assert_eq!(map_col.value_offsets()[1] - map_col.value_offsets()[0], 2);
+ // Row 2: empty
+ assert_eq!(map_col.value_offsets()[3] - map_col.value_offsets()[2], 0);
+}
diff --git a/core/tests/testdata/v1_with_map.mosaic
b/core/tests/testdata/v1_with_map.mosaic
new file mode 100644
index 0000000..efe7a2f
Binary files /dev/null and b/core/tests/testdata/v1_with_map.mosaic differ
diff --git a/cpp/test_mosaic.cpp b/cpp/test_mosaic.cpp
index 4f85291..38cf55c 100644
--- a/cpp/test_mosaic.cpp
+++ b/cpp/test_mosaic.cpp
@@ -1005,6 +1005,60 @@ static void test_array_string_elements() {
printf(" PASS test_array_string_elements\n");
}
+static void test_map_type() {
+ auto schema = arrow::schema({
+ arrow::field("id", arrow::int32(), false),
+ arrow::field("props", arrow::map(arrow::int32(), arrow::utf8())),
+ });
+
+ arrow::Int32Builder id_b;
+ assert(id_b.Append(1).ok());
+ assert(id_b.Append(2).ok());
+ assert(id_b.Append(3).ok());
+
+ auto key_builder = std::make_shared<arrow::Int32Builder>();
+ auto val_builder = std::make_shared<arrow::StringBuilder>();
+ arrow::MapBuilder map_b(arrow::default_memory_pool(), key_builder,
val_builder);
+
+ // Row 0: {1: "a", 2: "b"}
+ assert(map_b.Append().ok());
+ assert(key_builder->Append(1).ok());
+ assert(val_builder->Append("a").ok());
+ assert(key_builder->Append(2).ok());
+ assert(val_builder->Append("b").ok());
+
+ // Row 1: null
+ assert(map_b.AppendNull().ok());
+
+ // Row 2: {} (empty)
+ assert(map_b.Append().ok());
+
+ auto batch = arrow::RecordBatch::Make(schema, 3, {
+ id_b.Finish().ValueUnsafe(),
+ map_b.Finish().ValueUnsafe(),
+ });
+
+ auto data_vec = write_and_get(schema, batch);
+
+ MemBuffer buf;
+ buf.data = data_vec;
+ auto reader = mosaic::make_reader(make_input(buf), buf.data.size());
+ auto rb = read_row_group(reader, 0);
+ ASSERT_EQ(rb->num_rows(), 3);
+
+ auto props =
std::static_pointer_cast<arrow::MapArray>(rb->GetColumnByName("props"));
+ ASSERT_TRUE(!props->IsNull(0));
+ ASSERT_TRUE(props->IsNull(1));
+ ASSERT_TRUE(!props->IsNull(2));
+
+ // Row 0: 2 entries
+ ASSERT_EQ(props->value_length(0), 2);
+ // Row 2: empty
+ ASSERT_EQ(props->value_length(2), 0);
+
+ printf(" PASS test_map_type\n");
+}
+
int main() {
printf("Running Mosaic C++ tests...\n");
test_basic_roundtrip();
@@ -1025,6 +1079,7 @@ int main() {
test_array_type();
test_array_with_null_elements();
test_array_string_elements();
- printf("All %d tests passed.\n", 18);
+ test_map_type();
+ printf("All %d tests passed.\n", 19);
return 0;
}
diff --git a/docs/cpp-api.html b/docs/cpp-api.html
index 0d08ebb..ed3456e 100644
--- a/docs/cpp-api.html
+++ b/docs/cpp-api.html
@@ -182,7 +182,15 @@ list_b.Append();
<span class="cmt">// Reading back</span>
<span class="kw">auto</span> tags =
std::static_pointer_cast<arrow::ListArray>(rb->GetColumnByName(<span
class="str">"tags"</span>));
<span class="kw">auto</span> row0 =
std::static_pointer_cast<arrow::Int32Array>(tags->value_slice(<span
class="num">0</span>));
-<span class="cmt">// row0: [10, 20]</span></code></pre>
+<span class="cmt">// row0: [10, 20]</span>
+
+<span class="cmt">// MAP columns use MapBuilder</span>
+<span class="kw">auto</span> key_b =
std::make_shared<arrow::Int32Builder>();
+<span class="kw">auto</span> val_b =
std::make_shared<arrow::StringBuilder>();
+arrow::MapBuilder map_b(arrow::default_memory_pool(), key_b, val_b);
+map_b.Append();
+key_b->Append(<span class="num">1</span>);
+val_b->Append(<span class="str">"a"</span>);</code></pre>
<h2>C++ API Reference</h2>
diff --git a/docs/design.html b/docs/design.html
index 19ffd81..538a3b0 100644
--- a/docs/design.html
+++ b/docs/design.html
@@ -740,9 +740,10 @@ repeated numColumns times:
<tr><td>16</td><td>TIMESTAMP</td><td>varint
precision</td></tr>
<tr><td>17</td><td>TIMESTAMP_LTZ</td><td>varint precision,
varint timezoneLength, bytes timezone</td></tr>
<tr><td>18</td><td>ARRAY</td><td>varint nameLength, bytes
name (element field name), TypeDescriptor (recursive element type)</td></tr>
+ <tr><td>19</td><td>MAP</td><td>varint entriesNameLen +
bytes entriesName, varint keyNameLen + bytes keyName + TypeDescriptor (key),
varint valNameLen + bytes valName + TypeDescriptor (value). Sorted MAP is not
supported; always unsorted.</td></tr>
</tbody>
</table>
- <p>MAP, ROW, VARIANT, and BLOB are not yet supported.</p>
+ <p>ROW, VARIANT, and BLOB are not yet supported.</p>
<!-- ============================================================
-->
<h2>Value Serialization</h2>
@@ -767,6 +768,7 @@ repeated numColumns times:
<tr><td>CHAR / VARCHAR / STRING</td><td>varint length +
UTF-8 bytes</td></tr>
<tr><td>BINARY / VARBINARY / BYTES</td><td>varint length +
raw bytes</td></tr>
<tr><td>ARRAY</td><td>Flattened columnar: lengths (INT32)
+ values column (see ARRAY Type Storage below)</td></tr>
+ <tr><td>MAP</td><td>Flattened columnar: lengths (INT32) +
keys column + values column (see MAP Type Storage below)</td></tr>
</tbody>
</table>
<p>
@@ -857,7 +859,31 @@ repeated numChildren times:
[column slots: each independently compressed, including child
columns]</code></pre>
<h3>Statistics</h3>
- <p>ARRAY columns do not support min/max statistics (no meaningful
ordering).</p>
+ <p>ARRAY and MAP columns do not support min/max statistics (no
meaningful ordering).</p>
+
+ <!-- ============================================================
-->
+ <h2>MAP Type Storage</h2>
+ <p>
+ MAP columns use the same flattened columnar approach as ARRAY.
+ A <code>MAP<K, V></code> column is decomposed into three
physical columns
+ within the same bucket:
+ </p>
+ <ol>
+ <li><strong>Lengths column</strong> (INT32, N entries): the
number of key-value pairs in each map.</li>
+ <li><strong>Keys column</strong> (type K, M entries): all keys
flattened across all rows.</li>
+ <li><strong>Values column</strong> (type V, M entries): all
values flattened across all rows.</li>
+ </ol>
+ <p>
+ All three columns independently benefit from standard column
encoding (DICT, CONST, PLAIN, ALL_NULL).
+ Null maps are represented by a null in the lengths column.
Empty maps have length 0.
+ </p>
+
+ <h4>Example</h4>
+<pre><code>MAP<INT, UTF8> with rows: {1:"a", 2:"b"}, null, {3:"c"}
+
+Lengths column (INT32): 2, null, 1
+Keys column (INT32): 1, 2, 3 ← shared DICT across all maps
+Values column (UTF8): "a", "b", "c" ← shared DICT across all
maps</code></pre>
<h3>Design Rationale: Comparison with Parquet and ORC</h3>
<p>
@@ -906,10 +932,12 @@ repeated numChildren times:
<h4>Why Not Parquet’s Repetition/Definition Levels?</h4>
<p>
- Parquet uses the Dremel encoding: regardless of nesting depth,
each ARRAY column
- produces a single physical column with repetition and
definition level arrays attached
- to every leaf value. This has the advantage of constant
physical column count
- (always 1 per ARRAY column), but significant disadvantages:
+ Parquet uses the Dremel encoding: each leaf column carries
repetition and definition
+ level arrays that encode the full nesting structure. For
<code>ARRAY<primitive></code>,
+ this means one physical leaf column; for nested types like
<code>ARRAY<MAP<K,V>></code>,
+ each leaf (K and V) gets its own column with deeper rep/def
levels. The physical column
+ count equals the number of leaf fields, not the number of
ARRAY/MAP columns.
+ This approach has some advantages but significant
disadvantages:
</p>
<ul>
<li><strong>Implementation complexity</strong>: The shredding
(decomposition) and assembly
@@ -948,7 +976,7 @@ repeated numChildren times:
<!-- ============================================================
-->
<h2>Limitations</h2>
<ol>
- <li>Complex types MAP, MULTISET, and ROW are not yet
supported. ARRAY is supported with flattened columnar storage.</li>
+ <li>Complex types MULTISET and ROW are not yet supported.
ARRAY and MAP are supported with flattened columnar storage.</li>
<li>Mosaic format is designed for wide tables and may not be
efficient for narrow tables with few columns.</li>
</ol>
</div>
diff --git a/docs/java-api.html b/docs/java-api.html
index f8f207a..180040e 100644
--- a/docs/java-api.html
+++ b/docs/java-api.html
@@ -159,6 +159,18 @@ listWriter.endList();</code></pre>
<span class="ty">List</span><?> row0 = readTags.getObject(<span
class="num">0</span>); <span class="cmt">// [10, 20, 30]</span>
<span class="kw">boolean</span> isNull = readTags.isNull(<span
class="num">1</span>); <span class="cmt">// true</span>
<span class="ty">List</span><?> row2 = readTags.getObject(<span
class="num">2</span>); <span class="cmt">// [] (empty)</span></code></pre>
+ <p>
+ MAP types use Arrow’s <code>MapVector</code> and
<code>UnionMapWriter</code>:
+ </p>
+<pre><code><span class="ty">MapVector</span> mapVec = (<span
class="ty">MapVector</span>) root.getVector(<span class="str">"props"</span>);
+<span class="ty">UnionMapWriter</span> w = mapVec.getWriter();
+w.setPosition(<span class="num">0</span>);
+w.startMap();
+w.startEntry();
+w.key().integer().writeInt(<span class="num">1</span>);
+w.value().varChar().writeVarChar(<span class="kw">new</span> <span
class="ty">Text</span>(<span class="str">"a"</span>));
+w.endEntry();
+w.endMap();</code></pre>
<h3>2. Create Writer and Write Batches</h3>
<p>
diff --git a/docs/python-api.html b/docs/python-api.html
index fd849dd..e64976b 100644
--- a/docs/python-api.html
+++ b/docs/python-api.html
@@ -103,6 +103,7 @@ pa_schema = pa.schema([
pa.field(<span class="str">"id"</span>, pa.int32()),
pa.field(<span class="str">"tags"</span>, pa.list_(pa.int32())),
pa.field(<span class="str">"nested"</span>, pa.list_(pa.list_(pa.utf8()))),
+ pa.field(<span class="str">"props"</span>, pa.map_(pa.utf8(), pa.int64())),
])</code></pre>
<h3>2. Create Writer and Write Batches</h3>
@@ -153,7 +154,10 @@ batch = pa.record_batch([
<span class="cmt"># Reading back</span>
rb = reader.read_row_group(<span class="num">0</span>)
-tags = rb.column(<span class="str">"tags"</span>).to_pylist() <span
class="cmt"># [[10, 20], None, []]</span></code></pre>
+tags = rb.column(<span class="str">"tags"</span>).to_pylist() <span
class="cmt"># [[10, 20], None, []]</span>
+
+<span class="cmt"># MAP columns</span>
+pa.array([{<span class="str">"x"</span>: <span class="num">1</span>, <span
class="str">"y"</span>: <span class="num">2</span>}, <span
class="kw">None</span>, {}], type=pa.map_(pa.utf8(), pa.int64()))</code></pre>
<h3>WriterOptions</h3>
<table>
diff --git
a/java/src/test/java/org/apache/paimon/mosaic/MosaicRoundtripTest.java
b/java/src/test/java/org/apache/paimon/mosaic/MosaicRoundtripTest.java
index 4b909db..4e7aea5 100644
--- a/java/src/test/java/org/apache/paimon/mosaic/MosaicRoundtripTest.java
+++ b/java/src/test/java/org/apache/paimon/mosaic/MosaicRoundtripTest.java
@@ -40,6 +40,7 @@ import org.apache.arrow.vector.VarBinaryVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.complex.ListVector;
+import org.apache.arrow.vector.complex.MapVector;
import org.apache.arrow.vector.complex.impl.UnionListWriter;
import org.apache.arrow.vector.types.FloatingPointPrecision;
import org.apache.arrow.vector.types.TimeUnit;
@@ -1232,4 +1233,74 @@ public class MosaicRoundtripTest {
}
}
}
+
+ @Test
+ public void testMapType() {
+ // Use MapVector's writer to avoid schema mismatch with UnionMapWriter
+ Field keyField = new Field("keys", FieldType.notNullable(new
ArrowType.Int(32, true)), null);
+ Field valueField = new Field("values",
FieldType.nullable(ArrowType.Utf8.INSTANCE), null);
+ Field entriesField = new Field("entries",
+ new FieldType(false, ArrowType.Struct.INSTANCE, null),
+ Arrays.asList(keyField, valueField));
+ Field mapField = new Field("props",
+ new FieldType(true, new ArrowType.Map(false), null),
+ Arrays.asList(entriesField));
+
+ Schema arrowSchema = new Schema(Arrays.asList(
+ Field.notNullable("id", new ArrowType.Int(32, true)),
+ mapField
+ ));
+
+ byte[] data;
+ try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema,
allocator)) {
+ IntVector ids = (IntVector) root.getVector("id");
+ MapVector mapVec = (MapVector) root.getVector("props");
+
+ ids.allocateNew(3);
+
+ IntVector keyVec = (IntVector)
mapVec.getDataVector().getChildrenFromFields().get(0);
+ VarCharVector valVec = (VarCharVector)
mapVec.getDataVector().getChildrenFromFields().get(1);
+
+ // Row 0: {1: "a", 2: "b"} -> offsets [0, 2]
+ mapVec.startNewValue(0);
+ keyVec.setSafe(0, 1);
+ valVec.setSafe(0, "a".getBytes());
+ keyVec.setSafe(1, 2);
+ valVec.setSafe(1, "b".getBytes());
+ mapVec.endValue(0, 2);
+
+ // Row 1: null
+ mapVec.setNull(1);
+
+ // Row 2: {} -> offsets [2, 2]
+ mapVec.startNewValue(2);
+ mapVec.endValue(2, 0);
+
+ ids.set(0, 1);
+ ids.set(1, 2);
+ ids.set(2, 3);
+
+ root.setRowCount(3);
+ data = writeToBytes(arrowSchema, writer -> writer.write(root));
+ }
+
+ try (MosaicReader reader = readerFromBytes(data)) {
+ try (VectorSchemaRoot batch = reader.readRowGroup(0, allocator)) {
+ assertEquals(3, batch.getRowCount());
+
+ MapVector readMap = (MapVector) batch.getVector("props");
+ assertFalse(readMap.isNull(0));
+ assertTrue(readMap.isNull(1));
+ assertFalse(readMap.isNull(2));
+
+ // Row 0: 2 entries
+ java.util.List<?> row0 = readMap.getObject(0);
+ assertEquals(2, row0.size());
+
+ // Row 2: empty
+ java.util.List<?> row2 = readMap.getObject(2);
+ assertEquals(0, row2.size());
+ }
+ }
+ }
}
diff --git a/python/tests/test_mosaic.py b/python/tests/test_mosaic.py
index de387c6..796e880 100644
--- a/python/tests/test_mosaic.py
+++ b/python/tests/test_mosaic.py
@@ -1058,3 +1058,59 @@ class TestWriter:
assert nested[0] == [[1, 2], [3]]
assert nested[1] == [[4]]
assert nested[2] is None
+
+ def test_map_type(self):
+ pa_schema = pa.schema(
+ [
+ pa.field("id", pa.int32(), nullable=False),
+ pa.field("props", pa.map_(pa.int32(), pa.utf8())),
+ ]
+ )
+
+ batch = pa.record_batch(
+ [
+ pa.array([1, 2, 3], type=pa.int32()),
+ pa.array(
+ [[(1, "a"), (2, "b")], None, []],
+ type=pa.map_(pa.int32(), pa.utf8()),
+ ),
+ ],
+ names=["id", "props"],
+ )
+
+ data = _write_to_bytes(pa_schema, batch)
+
+ with _reader_from_bytes(data) as reader:
+ rb = reader.read_row_group(0)
+ assert rb.num_rows == 3
+
+ ids = rb.column("id").to_pylist()
+ assert ids == [1, 2, 3]
+
+ props = rb.column("props").to_pylist()
+ assert props[0] == [(1, "a"), (2, "b")]
+ assert props[1] is None
+ assert props[2] == []
+
+ def test_map_with_null_values(self):
+ pa_schema = pa.schema(
+ [pa.field("m", pa.map_(pa.utf8(), pa.int64()))]
+ )
+
+ batch = pa.record_batch(
+ [
+ pa.array(
+ [[("x", 10), ("y", None)], [("z", 30)]],
+ type=pa.map_(pa.utf8(), pa.int64()),
+ )
+ ],
+ names=["m"],
+ )
+
+ data = _write_to_bytes(pa_schema, batch)
+
+ with _reader_from_bytes(data) as reader:
+ rb = reader.read_row_group(0)
+ m = rb.column("m").to_pylist()
+ assert m[0] == [("x", 10), ("y", None)]
+ assert m[1] == [("z", 30)]