This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-mosaic.git
The following commit(s) were added to refs/heads/main by this push:
new 961e74f Refactor complex child column layout metadata (#55)
961e74f is described below
commit 961e74fe128dbd800a30c80cd9681180d57966a1
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Jun 8 17:31:11 2026 +0800
Refactor complex child column layout metadata (#55)
---
core/src/bucket_reader.rs | 192 +++++++++++++++---------------------------
core/src/bucket_writer.rs | 192 +++++++++++++++++++++++++++++++-----------
core/tests/array_type_test.rs | 115 +++++++++++++++++++++++++
3 files changed, 329 insertions(+), 170 deletions(-)
diff --git a/core/src/bucket_reader.rs b/core/src/bucket_reader.rs
index e45b9fa..2196fe2 100644
--- a/core/src/bucket_reader.rs
+++ b/core/src/bucket_reader.rs
@@ -368,135 +368,81 @@ fn reassemble_list_columns(
continue;
}
let child = &children[idx];
- let parent = child.parent_logical_col;
- let phys_idx = child.physical_index;
- let elem_dt = child.element_field.data_type();
-
- // What type is above us (the container that produced this child)?
- // Look at the element_field of the child that stores lengths for us,
- // or the logical type at the parent position.
- // For a first-level child, the lengths are at `parent` (primary
column).
- // For a nested child (e.g., inner values of ARRAY<ARRAY<INT>>),
- // the lengths are at the previous sibling that was already
reassembled.
- //
- // Key insight: the container type that produced this child determines
- // whether it's an ARRAY child (1 child) or MAP child (2 children).
- // We detect MAP by checking if the NEXT child (idx+1) has the same
- // "lengths column" — meaning they're siblings of the same MAP.
-
- // Determine if this child is part of a MAP pair.
- // MAP produces two consecutive children: keys then values.
- // Nested ARRAY produces children where the first child's element_field
- // has a complex data_type (List/Map), indicating a deeper level.
- //
- // A "MAP sibling pair" is: two consecutive children with same parent,
- // where the first child's element_field is NOT itself a complex type
- // (complex element_field means it's a nested ARRAY/MAP intermediate).
-
- let prev_is_map_key = idx > 0
- && children[idx - 1].parent_logical_col == parent
- && !processed[idx - 1]
- && !matches!(
- children[idx - 1].element_field.data_type(),
- DataType::List(_) | DataType::Map(_, _)
- );
-
- if prev_is_map_key {
- // This child is the VALUES of a MAP (the keys are at idx-1)
- continue;
- }
-
- let next_is_map_value = idx + 1 < children.len()
- && children[idx + 1].parent_logical_col == parent
- && !processed[idx + 1]
- && !matches!(
- child.element_field.data_type(),
- DataType::List(_) | DataType::Map(_, _)
- );
-
- if next_is_map_value {
- // MAP: this child is KEYS, next child is VALUES
- let val_child = &children[idx + 1];
- let keys = arrays[phys_idx].clone();
- let values = arrays[val_child.physical_index].clone();
-
- // Find lengths column: a child with complex element_field at
phys_idx-1
- // stores the MAP lengths (it was produced by expand_element for a
Map element).
- let prev_child_is_lengths = children.iter().any(|c| {
- c.physical_index == phys_idx - 1
- && c.parent_logical_col == parent
- && matches!(
- c.element_field.data_type(),
- DataType::List(_) | DataType::Map(_, _)
- )
- });
- let lengths_idx = if prev_child_is_lengths {
- phys_idx - 1
- } else {
- parent
- };
-
- let lengths = arrays[lengths_idx].clone();
- let lengths_rows = lengths.len();
+ match child.role {
+ ChildColumnRole::MapValue => {
+ let key_idx = children.iter().position(|candidate| {
+ candidate.parent_logical_col == child.parent_logical_col
+ && candidate.length_physical_index ==
child.length_physical_index
+ && candidate.role == ChildColumnRole::MapKey
+ });
+ let Some(key_idx) = key_idx else {
+ continue;
+ };
+ if processed[key_idx] {
+ continue;
+ }
- // Reconstruct the MAP type from the element_field
- // The child at this position has element_field = key Field
- // We need the full MAP entries_field. Get it from the container.
- let container_dt = if lengths_idx < logical_types.len() {
- &logical_types[lengths_idx]
- } else {
- // Nested: the reassembled array at lengths_idx should already
have
- // the right type, but we need the MAP descriptor.
- // Use the child's element_field to reconstruct.
- elem_dt
- };
+ let key_child = &children[key_idx];
+ let lengths_idx = child.length_physical_index;
+ let lengths = arrays[lengths_idx].clone();
+ let keys = arrays[key_child.physical_index].clone();
+ let values = arrays[child.physical_index].clone();
+ let lengths_rows = lengths.len();
+
+ let container_dt = if lengths_idx < logical_types.len() {
+ &logical_types[lengths_idx]
+ } else if let Some(length_child) =
+ children.iter().find(|c| c.physical_index == lengths_idx)
+ {
+ length_child.element_field.data_type()
+ } else {
+ key_child.element_field.data_type()
+ };
- if let DataType::Map(entries_field, sorted) = container_dt {
- arrays[lengths_idx] = reassemble_map_array(
- lengths,
- keys,
- values,
- entries_field,
- *sorted,
- lengths_rows,
- );
- } else {
- // Reconstruct MAP from key/value fields
- let key_field = child.element_field.clone();
- let val_field = val_child.element_field.clone();
- let entries_field = Arc::new(Field::new(
- "entries",
- DataType::Struct(arrow_schema::Fields::from(vec![
- key_field.as_ref().clone(),
- val_field.as_ref().clone(),
- ])),
- false,
- ));
- arrays[lengths_idx] = reassemble_map_array(
+ if let DataType::Map(entries_field, sorted) = container_dt {
+ arrays[lengths_idx] = reassemble_map_array(
+ lengths,
+ keys,
+ values,
+ entries_field,
+ *sorted,
+ lengths_rows,
+ );
+ } else {
+ let entries_field = Arc::new(Field::new(
+ "entries",
+ DataType::Struct(arrow_schema::Fields::from(vec![
+ key_child.element_field.as_ref().clone(),
+ child.element_field.as_ref().clone(),
+ ])),
+ false,
+ ));
+ arrays[lengths_idx] = reassemble_map_array(
+ lengths,
+ keys,
+ values,
+ &entries_field,
+ false,
+ lengths_rows,
+ );
+ }
+ processed[idx] = true;
+ processed[key_idx] = true;
+ }
+ ChildColumnRole::MapKey => {}
+ ChildColumnRole::ListElement => {
+ let lengths_idx = child.length_physical_index;
+ let lengths = arrays[lengths_idx].clone();
+ let values = arrays[child.physical_index].clone();
+ let lengths_rows = lengths.len();
+ arrays[lengths_idx] = reassemble_list_array(
lengths,
- keys,
values,
- &entries_field,
- false,
+ child.element_field.clone(),
lengths_rows,
);
+ processed[idx] = true;
}
- processed[idx] = true;
- processed[idx + 1] = true;
- } else {
- // ARRAY: single child for values
- let values = arrays[phys_idx].clone();
- let is_nested = children
- .iter()
- .any(|c| c.physical_index == phys_idx - 1 &&
c.parent_logical_col == parent);
- let lengths_idx = if is_nested { phys_idx - 1 } else { parent };
-
- let lengths = arrays[lengths_idx].clone();
- let lengths_rows = lengths.len();
- let element_field = child.element_field.clone();
- arrays[lengths_idx] =
- reassemble_list_array(lengths, values, element_field,
lengths_rows);
- processed[idx] = true;
}
}
@@ -580,7 +526,7 @@ fn reassemble_list_array(
Arc::new(ListArray::new(element_field, offset_buf, values, null_buf))
}
-use crate::bucket_writer::{expand_col_types, ChildColumnMeta};
+use crate::bucket_writer::{expand_col_types, ChildColumnMeta, ChildColumnRole};
pub struct BucketReader {
data: Vec<u8>,
diff --git a/core/src/bucket_writer.rs b/core/src/bucket_writer.rs
index a826bb6..487a1a3 100644
--- a/core/src/bucket_writer.rs
+++ b/core/src/bucket_writer.rs
@@ -37,10 +37,19 @@ pub struct PagedBucketOutput {
pub children: Vec<ChildColumnMeta>,
}
-#[derive(Clone)]
+#[derive(Clone, Copy, Debug, PartialEq, Eq)]
+pub enum ChildColumnRole {
+ ListElement,
+ MapKey,
+ MapValue,
+}
+
+#[derive(Clone, Debug)]
pub struct ChildColumnMeta {
pub parent_logical_col: usize,
pub physical_index: usize,
+ pub length_physical_index: usize,
+ pub role: ChildColumnRole,
pub element_field: Arc<Field>,
pub num_elements: usize,
}
@@ -117,6 +126,30 @@ impl BucketWriter {
&self.children
}
+ fn find_child_index(
+ &self,
+ parent_logical_col: usize,
+ length_physical_index: usize,
+ role: ChildColumnRole,
+ ) -> io::Result<usize> {
+ self.children
+ .iter()
+ .position(|c| {
+ c.parent_logical_col == parent_logical_col
+ && c.length_physical_index == length_physical_index
+ && c.role == role
+ })
+ .ok_or_else(|| {
+ io::Error::new(
+ io::ErrorKind::InvalidData,
+ format!(
+ "missing complex child column: parent={}, length={},
role={:?}",
+ parent_logical_col, length_physical_index, role
+ ),
+ )
+ })
+ }
+
fn col_num_rows(&self, col: usize) -> usize {
if col < self.num_primary {
self.num_rows
@@ -157,9 +190,9 @@ impl BucketWriter {
struct ColSplit {
col: usize,
lengths: Int32Array,
- child_arrays: Vec<ArrayRef>, // 1 for List, 2 for Map (keys +
values)
}
let mut splits: Vec<ColSplit> = Vec::new();
+ let mut pending: Vec<(usize, ArrayRef)> = Vec::new();
let mut seen_cols = Vec::new();
for child in &self.children {
let col = child.parent_logical_col;
@@ -178,11 +211,10 @@ impl BucketWriter {
})?;
let lengths = extract_list_lengths(list_array);
let values = flatten_list_values(list_array);
- splits.push(ColSplit {
- col,
- lengths,
- child_arrays: vec![values],
- });
+ splits.push(ColSplit { col, lengths });
+ let child_idx =
+ self.find_child_index(col, col,
ChildColumnRole::ListElement)?;
+ pending.push((child_idx, values));
}
DataType::Map(_, _) => {
let map_array =
@@ -194,11 +226,11 @@ impl BucketWriter {
})?;
let lengths = extract_map_lengths(map_array);
let (keys, values) = flatten_map_entries(map_array);
- splits.push(ColSplit {
- col,
- lengths,
- child_arrays: vec![keys, values],
- });
+ splits.push(ColSplit { col, lengths });
+ let key_idx = self.find_child_index(col, col,
ChildColumnRole::MapKey)?;
+ let value_idx = self.find_child_index(col, col,
ChildColumnRole::MapValue)?;
+ pending.push((key_idx, keys));
+ pending.push((value_idx, values));
}
_ => {}
}
@@ -214,23 +246,7 @@ impl BucketWriter {
}
}
- // Write child columns — for List: 1 child (values), for Map: 2
children (keys, values)
- let mut pending: Vec<(usize, ArrayRef)> = Vec::new();
- for split in &splits {
- let parent_children: Vec<usize> = self
- .children
- .iter()
- .enumerate()
- .filter(|(_, c)| c.parent_logical_col == split.col)
- .map(|(idx, _)| idx)
- .collect();
- for (i, child_arr) in split.child_arrays.iter().enumerate() {
- if i < parent_children.len() {
- pending.push((parent_children[i], child_arr.clone()));
- }
- }
- }
-
+ // Write child columns. Nested children are located through explicit
layout metadata.
while let Some((child_idx, values)) = pending.pop() {
if child_idx >= self.children.len() || values.is_empty() {
continue;
@@ -249,9 +265,12 @@ impl BucketWriter {
total_size +=
self.append_array_column(phys_idx, &inner_lengths,
&int32_dt, child_start)?;
self.children[child_idx].num_elements +=
inner_lengths.len();
- if child_idx + 1 < self.children.len() {
- pending.push((child_idx + 1, inner_values));
- }
+ let nested_idx = self.find_child_index(
+ self.children[child_idx].parent_logical_col,
+ phys_idx,
+ ChildColumnRole::ListElement,
+ )?;
+ pending.push((nested_idx, inner_values));
}
DataType::Map(_, _) => {
let inner_map =
@@ -263,13 +282,13 @@ impl BucketWriter {
total_size +=
self.append_array_column(phys_idx, &inner_lengths,
&int32_dt, child_start)?;
self.children[child_idx].num_elements +=
inner_lengths.len();
- // Queue keys and values for the next children
- if child_idx + 2 < self.children.len() {
- pending.push((child_idx + 2, inner_values));
- }
- if child_idx + 1 < self.children.len() {
- pending.push((child_idx + 1, inner_keys));
- }
+ let parent = self.children[child_idx].parent_logical_col;
+ let key_idx =
+ self.find_child_index(parent, phys_idx,
ChildColumnRole::MapKey)?;
+ let value_idx =
+ self.find_child_index(parent, phys_idx,
ChildColumnRole::MapValue)?;
+ pending.push((key_idx, inner_keys));
+ pending.push((value_idx, inner_values));
}
_ => {
let elem_dt =
self.children[child_idx].element_field.data_type().clone();
@@ -1449,27 +1468,47 @@ pub(crate) fn expand_col_types(col_types: &[&DataType])
-> (Vec<DataType>, Vec<C
let mut children = Vec::new();
for (i, t) in col_types.iter().enumerate() {
- expand_type(i, t, &mut physical_types, &mut children);
+ expand_container(i, i, t, &mut physical_types, &mut children);
}
(physical_types, children)
}
-fn expand_type(
+fn expand_container(
parent_logical: usize,
+ length_physical_index: usize,
dt: &DataType,
physical_types: &mut Vec<DataType>,
children: &mut Vec<ChildColumnMeta>,
) {
match dt {
DataType::List(element_field) => {
- expand_element(parent_logical, element_field, physical_types,
children);
+ expand_element(
+ parent_logical,
+ length_physical_index,
+ ChildColumnRole::ListElement,
+ element_field,
+ physical_types,
+ children,
+ );
}
DataType::Map(entries_field, _) => {
if let DataType::Struct(fields) = entries_field.data_type() {
- // Keys child — recurse if key is complex
- expand_element(parent_logical, &fields[0], physical_types,
children);
- // Values child — recurse if value is complex
- expand_element(parent_logical, &fields[1], physical_types,
children);
+ expand_element(
+ parent_logical,
+ length_physical_index,
+ ChildColumnRole::MapKey,
+ &fields[0],
+ physical_types,
+ children,
+ );
+ expand_element(
+ parent_logical,
+ length_physical_index,
+ ChildColumnRole::MapValue,
+ &fields[1],
+ physical_types,
+ children,
+ );
}
}
_ => {}
@@ -1478,6 +1517,8 @@ fn expand_type(
fn expand_element(
parent_logical: usize,
+ length_physical_index: usize,
+ role: ChildColumnRole,
element_field: &Arc<Field>,
physical_types: &mut Vec<DataType>,
children: &mut Vec<ChildColumnMeta>,
@@ -1492,10 +1533,18 @@ fn expand_element(
children.push(ChildColumnMeta {
parent_logical_col: parent_logical,
physical_index: child_phys_idx,
+ length_physical_index,
+ role,
element_field: element_field.clone(),
num_elements: 0,
});
- expand_type(parent_logical, elem_dt, physical_types, children);
+ expand_container(
+ parent_logical,
+ child_phys_idx,
+ elem_dt,
+ physical_types,
+ children,
+ );
}
_ => {
// Primitive element: direct leaf column
@@ -1503,6 +1552,8 @@ fn expand_element(
children.push(ChildColumnMeta {
parent_logical_col: parent_logical,
physical_index: child_phys_idx,
+ length_physical_index,
+ role,
element_field: element_field.clone(),
num_elements: 0,
});
@@ -1559,6 +1610,53 @@ mod tests {
0
}
+ #[test]
+ fn test_expand_col_types_records_explicit_child_layout() {
+ let map_type = DataType::Map(
+ Arc::new(Field::new(
+ "entries",
+ DataType::Struct(arrow_schema::Fields::from(vec![
+ Field::new("keys", DataType::Int32, false),
+ Field::new(
+ "values",
+ DataType::List(Arc::new(Field::new("item",
DataType::Utf8, true))),
+ true,
+ ),
+ ])),
+ false,
+ )),
+ false,
+ );
+ let list_type = DataType::List(Arc::new(Field::new("item", map_type,
true)));
+ let col_refs = vec![&list_type];
+
+ let (physical_types, children) = expand_col_types(&col_refs);
+
+ assert_eq!(
+ physical_types,
+ vec![
+ DataType::Int32,
+ DataType::Int32,
+ DataType::Int32,
+ DataType::Int32,
+ DataType::Utf8,
+ ]
+ );
+ assert_eq!(children.len(), 4);
+ assert_eq!(children[0].role, ChildColumnRole::ListElement);
+ assert_eq!(children[0].physical_index, 1);
+ assert_eq!(children[0].length_physical_index, 0);
+ assert_eq!(children[1].role, ChildColumnRole::MapKey);
+ assert_eq!(children[1].physical_index, 2);
+ assert_eq!(children[1].length_physical_index, 1);
+ assert_eq!(children[2].role, ChildColumnRole::MapValue);
+ assert_eq!(children[2].physical_index, 3);
+ assert_eq!(children[2].length_physical_index, 1);
+ assert_eq!(children[3].role, ChildColumnRole::ListElement);
+ assert_eq!(children[3].physical_index, 4);
+ assert_eq!(children[3].length_physical_index, 3);
+ }
+
#[test]
fn test_all_null_encoding() {
let types = [DataType::Int32];
diff --git a/core/tests/array_type_test.rs b/core/tests/array_type_test.rs
index 53d9ba0..0a8c1b0 100644
--- a/core/tests/array_type_test.rs
+++ b/core/tests/array_type_test.rs
@@ -1197,6 +1197,121 @@ fn test_map_with_array_value() {
assert_eq!(v1.value(0), 3);
}
+#[test]
+fn test_array_of_map_with_array_value() {
+ // ARRAY<MAP<INT32, ARRAY<UTF8>>>
+ let list_type = DataType::List(Arc::new(Field::new("item", DataType::Utf8,
true)));
+ let map_type = DataType::Map(
+ Arc::new(Field::new(
+ "entries",
+ DataType::Struct(arrow_schema::Fields::from(vec![
+ Field::new("keys", DataType::Int32, false),
+ Field::new("values", list_type.clone(), true),
+ ])),
+ false,
+ )),
+ false,
+ );
+ let schema = Schema::new(vec![Field::new(
+ "col",
+ DataType::List(Arc::new(Field::new("item", map_type.clone(), true))),
+ true,
+ )]);
+
+ let key_builder = Int32Builder::new();
+ let value_builder = ListBuilder::new(StringBuilder::new());
+ let map_builder = MapBuilder::new(None, key_builder, value_builder);
+ let mut list_builder = ListBuilder::new(map_builder);
+
+ // row 0: [{1:["a","b"], 2:["c"]}, {3:[]}]
+ list_builder.values().keys().append_value(1);
+ list_builder.values().values().values().append_value("a");
+ list_builder.values().values().values().append_value("b");
+ list_builder.values().values().append(true);
+ list_builder.values().keys().append_value(2);
+ list_builder.values().values().values().append_value("c");
+ list_builder.values().values().append(true);
+ list_builder.values().append(true).unwrap();
+
+ list_builder.values().keys().append_value(3);
+ list_builder.values().values().append(true);
+ list_builder.values().append(true).unwrap();
+ list_builder.append(true);
+
+ // row 1: null
+ list_builder.append(false);
+
+ // row 2: [{4:null}]
+ list_builder.values().keys().append_value(4);
+ list_builder.values().values().append(false);
+ list_builder.values().append(true).unwrap();
+ list_builder.append(true);
+
+ let batch = RecordBatch::try_new(
+ Arc::new(schema.clone()),
+ vec![Arc::new(list_builder.finish())],
+ )
+ .unwrap();
+
+ let result = roundtrip(&schema, &[batch]);
+ let col = result[0]
+ .column(0)
+ .as_any()
+ .downcast_ref::<ListArray>()
+ .unwrap();
+ assert_eq!(col.len(), 3);
+ assert!(!col.is_null(0));
+ assert!(col.is_null(1));
+ assert!(!col.is_null(2));
+
+ let row0 = col.value(0);
+ let maps0 = row0.as_any().downcast_ref::<MapArray>().unwrap();
+ assert_eq!(maps0.len(), 2);
+ assert_eq!(maps0.value_length(0), 2);
+ assert_eq!(maps0.value_length(1), 1);
+ let base0 = maps0.value_offsets()[0] as usize;
+ let keys0 = maps0.keys().as_any().downcast_ref::<Int32Array>().unwrap();
+ assert_eq!(keys0.value(base0), 1);
+ assert_eq!(keys0.value(base0 + 1), 2);
+ assert_eq!(keys0.value(base0 + 2), 3);
+
+ let values0 = maps0.values().as_any().downcast_ref::<ListArray>().unwrap();
+ let v0 = values0
+ .value(base0)
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .unwrap()
+ .clone();
+ assert_eq!(v0.len(), 2);
+ assert_eq!(v0.value(0), "a");
+ assert_eq!(v0.value(1), "b");
+ let v1 = values0
+ .value(base0 + 1)
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .unwrap()
+ .clone();
+ assert_eq!(v1.len(), 1);
+ assert_eq!(v1.value(0), "c");
+ let v2 = values0
+ .value(base0 + 2)
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .unwrap()
+ .clone();
+ assert_eq!(v2.len(), 0);
+
+ let row2 = col.value(2);
+ let maps2 = row2.as_any().downcast_ref::<MapArray>().unwrap();
+ assert_eq!(maps2.len(), 1);
+ assert_eq!(maps2.value_length(0), 1);
+ let base2 = maps2.value_offsets()[0] as usize;
+ let keys2 = maps2.keys().as_any().downcast_ref::<Int32Array>().unwrap();
+ assert_eq!(keys2.value(base2), 4);
+ let values2 = maps2.values().as_any().downcast_ref::<ListArray>().unwrap();
+ assert!(values2.is_null(base2));
+}
+
// ======================== MAP Schema Validation Tests
========================
#[test]