This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-mosaic.git


The following commit(s) were added to refs/heads/main by this push:
     new 961e74f  Refactor complex child column layout metadata (#55)
961e74f is described below

commit 961e74fe128dbd800a30c80cd9681180d57966a1
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Jun 8 17:31:11 2026 +0800

    Refactor complex child column layout metadata (#55)
---
 core/src/bucket_reader.rs     | 192 +++++++++++++++---------------------------
 core/src/bucket_writer.rs     | 192 +++++++++++++++++++++++++++++++-----------
 core/tests/array_type_test.rs | 115 +++++++++++++++++++++++++
 3 files changed, 329 insertions(+), 170 deletions(-)

diff --git a/core/src/bucket_reader.rs b/core/src/bucket_reader.rs
index e45b9fa..2196fe2 100644
--- a/core/src/bucket_reader.rs
+++ b/core/src/bucket_reader.rs
@@ -368,135 +368,81 @@ fn reassemble_list_columns(
             continue;
         }
         let child = &children[idx];
-        let parent = child.parent_logical_col;
-        let phys_idx = child.physical_index;
-        let elem_dt = child.element_field.data_type();
-
-        // What type is above us (the container that produced this child)?
-        // Look at the element_field of the child that stores lengths for us,
-        // or the logical type at the parent position.
-        // For a first-level child, the lengths are at `parent` (primary 
column).
-        // For a nested child (e.g., inner values of ARRAY<ARRAY<INT>>),
-        // the lengths are at the previous sibling that was already 
reassembled.
-        //
-        // Key insight: the container type that produced this child determines
-        // whether it's an ARRAY child (1 child) or MAP child (2 children).
-        // We detect MAP by checking if the NEXT child (idx+1) has the same
-        // "lengths column" — meaning they're siblings of the same MAP.
-
-        // Determine if this child is part of a MAP pair.
-        // MAP produces two consecutive children: keys then values.
-        // Nested ARRAY produces children where the first child's element_field
-        // has a complex data_type (List/Map), indicating a deeper level.
-        //
-        // A "MAP sibling pair" is: two consecutive children with same parent,
-        // where the first child's element_field is NOT itself a complex type
-        // (complex element_field means it's a nested ARRAY/MAP intermediate).
-
-        let prev_is_map_key = idx > 0
-            && children[idx - 1].parent_logical_col == parent
-            && !processed[idx - 1]
-            && !matches!(
-                children[idx - 1].element_field.data_type(),
-                DataType::List(_) | DataType::Map(_, _)
-            );
-
-        if prev_is_map_key {
-            // This child is the VALUES of a MAP (the keys are at idx-1)
-            continue;
-        }
-
-        let next_is_map_value = idx + 1 < children.len()
-            && children[idx + 1].parent_logical_col == parent
-            && !processed[idx + 1]
-            && !matches!(
-                child.element_field.data_type(),
-                DataType::List(_) | DataType::Map(_, _)
-            );
-
-        if next_is_map_value {
-            // MAP: this child is KEYS, next child is VALUES
-            let val_child = &children[idx + 1];
-            let keys = arrays[phys_idx].clone();
-            let values = arrays[val_child.physical_index].clone();
-
-            // Find lengths column: a child with complex element_field at 
phys_idx-1
-            // stores the MAP lengths (it was produced by expand_element for a 
Map element).
-            let prev_child_is_lengths = children.iter().any(|c| {
-                c.physical_index == phys_idx - 1
-                    && c.parent_logical_col == parent
-                    && matches!(
-                        c.element_field.data_type(),
-                        DataType::List(_) | DataType::Map(_, _)
-                    )
-            });
-            let lengths_idx = if prev_child_is_lengths {
-                phys_idx - 1
-            } else {
-                parent
-            };
-
-            let lengths = arrays[lengths_idx].clone();
-            let lengths_rows = lengths.len();
+        match child.role {
+            ChildColumnRole::MapValue => {
+                let key_idx = children.iter().position(|candidate| {
+                    candidate.parent_logical_col == child.parent_logical_col
+                        && candidate.length_physical_index == 
child.length_physical_index
+                        && candidate.role == ChildColumnRole::MapKey
+                });
+                let Some(key_idx) = key_idx else {
+                    continue;
+                };
+                if processed[key_idx] {
+                    continue;
+                }
 
-            // Reconstruct the MAP type from the element_field
-            // The child at this position has element_field = key Field
-            // We need the full MAP entries_field. Get it from the container.
-            let container_dt = if lengths_idx < logical_types.len() {
-                &logical_types[lengths_idx]
-            } else {
-                // Nested: the reassembled array at lengths_idx should already 
have
-                // the right type, but we need the MAP descriptor.
-                // Use the child's element_field to reconstruct.
-                elem_dt
-            };
+                let key_child = &children[key_idx];
+                let lengths_idx = child.length_physical_index;
+                let lengths = arrays[lengths_idx].clone();
+                let keys = arrays[key_child.physical_index].clone();
+                let values = arrays[child.physical_index].clone();
+                let lengths_rows = lengths.len();
+
+                let container_dt = if lengths_idx < logical_types.len() {
+                    &logical_types[lengths_idx]
+                } else if let Some(length_child) =
+                    children.iter().find(|c| c.physical_index == lengths_idx)
+                {
+                    length_child.element_field.data_type()
+                } else {
+                    key_child.element_field.data_type()
+                };
 
-            if let DataType::Map(entries_field, sorted) = container_dt {
-                arrays[lengths_idx] = reassemble_map_array(
-                    lengths,
-                    keys,
-                    values,
-                    entries_field,
-                    *sorted,
-                    lengths_rows,
-                );
-            } else {
-                // Reconstruct MAP from key/value fields
-                let key_field = child.element_field.clone();
-                let val_field = val_child.element_field.clone();
-                let entries_field = Arc::new(Field::new(
-                    "entries",
-                    DataType::Struct(arrow_schema::Fields::from(vec![
-                        key_field.as_ref().clone(),
-                        val_field.as_ref().clone(),
-                    ])),
-                    false,
-                ));
-                arrays[lengths_idx] = reassemble_map_array(
+                if let DataType::Map(entries_field, sorted) = container_dt {
+                    arrays[lengths_idx] = reassemble_map_array(
+                        lengths,
+                        keys,
+                        values,
+                        entries_field,
+                        *sorted,
+                        lengths_rows,
+                    );
+                } else {
+                    let entries_field = Arc::new(Field::new(
+                        "entries",
+                        DataType::Struct(arrow_schema::Fields::from(vec![
+                            key_child.element_field.as_ref().clone(),
+                            child.element_field.as_ref().clone(),
+                        ])),
+                        false,
+                    ));
+                    arrays[lengths_idx] = reassemble_map_array(
+                        lengths,
+                        keys,
+                        values,
+                        &entries_field,
+                        false,
+                        lengths_rows,
+                    );
+                }
+                processed[idx] = true;
+                processed[key_idx] = true;
+            }
+            ChildColumnRole::MapKey => {}
+            ChildColumnRole::ListElement => {
+                let lengths_idx = child.length_physical_index;
+                let lengths = arrays[lengths_idx].clone();
+                let values = arrays[child.physical_index].clone();
+                let lengths_rows = lengths.len();
+                arrays[lengths_idx] = reassemble_list_array(
                     lengths,
-                    keys,
                     values,
-                    &entries_field,
-                    false,
+                    child.element_field.clone(),
                     lengths_rows,
                 );
+                processed[idx] = true;
             }
-            processed[idx] = true;
-            processed[idx + 1] = true;
-        } else {
-            // ARRAY: single child for values
-            let values = arrays[phys_idx].clone();
-            let is_nested = children
-                .iter()
-                .any(|c| c.physical_index == phys_idx - 1 && 
c.parent_logical_col == parent);
-            let lengths_idx = if is_nested { phys_idx - 1 } else { parent };
-
-            let lengths = arrays[lengths_idx].clone();
-            let lengths_rows = lengths.len();
-            let element_field = child.element_field.clone();
-            arrays[lengths_idx] =
-                reassemble_list_array(lengths, values, element_field, 
lengths_rows);
-            processed[idx] = true;
         }
     }
 
@@ -580,7 +526,7 @@ fn reassemble_list_array(
     Arc::new(ListArray::new(element_field, offset_buf, values, null_buf))
 }
 
-use crate::bucket_writer::{expand_col_types, ChildColumnMeta};
+use crate::bucket_writer::{expand_col_types, ChildColumnMeta, ChildColumnRole};
 
 pub struct BucketReader {
     data: Vec<u8>,
diff --git a/core/src/bucket_writer.rs b/core/src/bucket_writer.rs
index a826bb6..487a1a3 100644
--- a/core/src/bucket_writer.rs
+++ b/core/src/bucket_writer.rs
@@ -37,10 +37,19 @@ pub struct PagedBucketOutput {
     pub children: Vec<ChildColumnMeta>,
 }
 
-#[derive(Clone)]
+#[derive(Clone, Copy, Debug, PartialEq, Eq)]
+pub enum ChildColumnRole {
+    ListElement,
+    MapKey,
+    MapValue,
+}
+
+#[derive(Clone, Debug)]
 pub struct ChildColumnMeta {
     pub parent_logical_col: usize,
     pub physical_index: usize,
+    pub length_physical_index: usize,
+    pub role: ChildColumnRole,
     pub element_field: Arc<Field>,
     pub num_elements: usize,
 }
@@ -117,6 +126,30 @@ impl BucketWriter {
         &self.children
     }
 
+    fn find_child_index(
+        &self,
+        parent_logical_col: usize,
+        length_physical_index: usize,
+        role: ChildColumnRole,
+    ) -> io::Result<usize> {
+        self.children
+            .iter()
+            .position(|c| {
+                c.parent_logical_col == parent_logical_col
+                    && c.length_physical_index == length_physical_index
+                    && c.role == role
+            })
+            .ok_or_else(|| {
+                io::Error::new(
+                    io::ErrorKind::InvalidData,
+                    format!(
+                        "missing complex child column: parent={}, length={}, 
role={:?}",
+                        parent_logical_col, length_physical_index, role
+                    ),
+                )
+            })
+    }
+
     fn col_num_rows(&self, col: usize) -> usize {
         if col < self.num_primary {
             self.num_rows
@@ -157,9 +190,9 @@ impl BucketWriter {
         struct ColSplit {
             col: usize,
             lengths: Int32Array,
-            child_arrays: Vec<ArrayRef>, // 1 for List, 2 for Map (keys + 
values)
         }
         let mut splits: Vec<ColSplit> = Vec::new();
+        let mut pending: Vec<(usize, ArrayRef)> = Vec::new();
         let mut seen_cols = Vec::new();
         for child in &self.children {
             let col = child.parent_logical_col;
@@ -178,11 +211,10 @@ impl BucketWriter {
                         })?;
                     let lengths = extract_list_lengths(list_array);
                     let values = flatten_list_values(list_array);
-                    splits.push(ColSplit {
-                        col,
-                        lengths,
-                        child_arrays: vec![values],
-                    });
+                    splits.push(ColSplit { col, lengths });
+                    let child_idx =
+                        self.find_child_index(col, col, 
ChildColumnRole::ListElement)?;
+                    pending.push((child_idx, values));
                 }
                 DataType::Map(_, _) => {
                     let map_array =
@@ -194,11 +226,11 @@ impl BucketWriter {
                             })?;
                     let lengths = extract_map_lengths(map_array);
                     let (keys, values) = flatten_map_entries(map_array);
-                    splits.push(ColSplit {
-                        col,
-                        lengths,
-                        child_arrays: vec![keys, values],
-                    });
+                    splits.push(ColSplit { col, lengths });
+                    let key_idx = self.find_child_index(col, col, 
ChildColumnRole::MapKey)?;
+                    let value_idx = self.find_child_index(col, col, 
ChildColumnRole::MapValue)?;
+                    pending.push((key_idx, keys));
+                    pending.push((value_idx, values));
                 }
                 _ => {}
             }
@@ -214,23 +246,7 @@ impl BucketWriter {
             }
         }
 
-        // Write child columns — for List: 1 child (values), for Map: 2 
children (keys, values)
-        let mut pending: Vec<(usize, ArrayRef)> = Vec::new();
-        for split in &splits {
-            let parent_children: Vec<usize> = self
-                .children
-                .iter()
-                .enumerate()
-                .filter(|(_, c)| c.parent_logical_col == split.col)
-                .map(|(idx, _)| idx)
-                .collect();
-            for (i, child_arr) in split.child_arrays.iter().enumerate() {
-                if i < parent_children.len() {
-                    pending.push((parent_children[i], child_arr.clone()));
-                }
-            }
-        }
-
+        // Write child columns. Nested children are located through explicit 
layout metadata.
         while let Some((child_idx, values)) = pending.pop() {
             if child_idx >= self.children.len() || values.is_empty() {
                 continue;
@@ -249,9 +265,12 @@ impl BucketWriter {
                     total_size +=
                         self.append_array_column(phys_idx, &inner_lengths, 
&int32_dt, child_start)?;
                     self.children[child_idx].num_elements += 
inner_lengths.len();
-                    if child_idx + 1 < self.children.len() {
-                        pending.push((child_idx + 1, inner_values));
-                    }
+                    let nested_idx = self.find_child_index(
+                        self.children[child_idx].parent_logical_col,
+                        phys_idx,
+                        ChildColumnRole::ListElement,
+                    )?;
+                    pending.push((nested_idx, inner_values));
                 }
                 DataType::Map(_, _) => {
                     let inner_map =
@@ -263,13 +282,13 @@ impl BucketWriter {
                     total_size +=
                         self.append_array_column(phys_idx, &inner_lengths, 
&int32_dt, child_start)?;
                     self.children[child_idx].num_elements += 
inner_lengths.len();
-                    // Queue keys and values for the next children
-                    if child_idx + 2 < self.children.len() {
-                        pending.push((child_idx + 2, inner_values));
-                    }
-                    if child_idx + 1 < self.children.len() {
-                        pending.push((child_idx + 1, inner_keys));
-                    }
+                    let parent = self.children[child_idx].parent_logical_col;
+                    let key_idx =
+                        self.find_child_index(parent, phys_idx, 
ChildColumnRole::MapKey)?;
+                    let value_idx =
+                        self.find_child_index(parent, phys_idx, 
ChildColumnRole::MapValue)?;
+                    pending.push((key_idx, inner_keys));
+                    pending.push((value_idx, inner_values));
                 }
                 _ => {
                     let elem_dt = 
self.children[child_idx].element_field.data_type().clone();
@@ -1449,27 +1468,47 @@ pub(crate) fn expand_col_types(col_types: &[&DataType]) 
-> (Vec<DataType>, Vec<C
     let mut children = Vec::new();
 
     for (i, t) in col_types.iter().enumerate() {
-        expand_type(i, t, &mut physical_types, &mut children);
+        expand_container(i, i, t, &mut physical_types, &mut children);
     }
     (physical_types, children)
 }
 
-fn expand_type(
+fn expand_container(
     parent_logical: usize,
+    length_physical_index: usize,
     dt: &DataType,
     physical_types: &mut Vec<DataType>,
     children: &mut Vec<ChildColumnMeta>,
 ) {
     match dt {
         DataType::List(element_field) => {
-            expand_element(parent_logical, element_field, physical_types, 
children);
+            expand_element(
+                parent_logical,
+                length_physical_index,
+                ChildColumnRole::ListElement,
+                element_field,
+                physical_types,
+                children,
+            );
         }
         DataType::Map(entries_field, _) => {
             if let DataType::Struct(fields) = entries_field.data_type() {
-                // Keys child — recurse if key is complex
-                expand_element(parent_logical, &fields[0], physical_types, 
children);
-                // Values child — recurse if value is complex
-                expand_element(parent_logical, &fields[1], physical_types, 
children);
+                expand_element(
+                    parent_logical,
+                    length_physical_index,
+                    ChildColumnRole::MapKey,
+                    &fields[0],
+                    physical_types,
+                    children,
+                );
+                expand_element(
+                    parent_logical,
+                    length_physical_index,
+                    ChildColumnRole::MapValue,
+                    &fields[1],
+                    physical_types,
+                    children,
+                );
             }
         }
         _ => {}
@@ -1478,6 +1517,8 @@ fn expand_type(
 
 fn expand_element(
     parent_logical: usize,
+    length_physical_index: usize,
+    role: ChildColumnRole,
     element_field: &Arc<Field>,
     physical_types: &mut Vec<DataType>,
     children: &mut Vec<ChildColumnMeta>,
@@ -1492,10 +1533,18 @@ fn expand_element(
             children.push(ChildColumnMeta {
                 parent_logical_col: parent_logical,
                 physical_index: child_phys_idx,
+                length_physical_index,
+                role,
                 element_field: element_field.clone(),
                 num_elements: 0,
             });
-            expand_type(parent_logical, elem_dt, physical_types, children);
+            expand_container(
+                parent_logical,
+                child_phys_idx,
+                elem_dt,
+                physical_types,
+                children,
+            );
         }
         _ => {
             // Primitive element: direct leaf column
@@ -1503,6 +1552,8 @@ fn expand_element(
             children.push(ChildColumnMeta {
                 parent_logical_col: parent_logical,
                 physical_index: child_phys_idx,
+                length_physical_index,
+                role,
                 element_field: element_field.clone(),
                 num_elements: 0,
             });
@@ -1559,6 +1610,53 @@ mod tests {
         0
     }
 
+    #[test]
+    fn test_expand_col_types_records_explicit_child_layout() {
+        let map_type = DataType::Map(
+            Arc::new(Field::new(
+                "entries",
+                DataType::Struct(arrow_schema::Fields::from(vec![
+                    Field::new("keys", DataType::Int32, false),
+                    Field::new(
+                        "values",
+                        DataType::List(Arc::new(Field::new("item", 
DataType::Utf8, true))),
+                        true,
+                    ),
+                ])),
+                false,
+            )),
+            false,
+        );
+        let list_type = DataType::List(Arc::new(Field::new("item", map_type, 
true)));
+        let col_refs = vec![&list_type];
+
+        let (physical_types, children) = expand_col_types(&col_refs);
+
+        assert_eq!(
+            physical_types,
+            vec![
+                DataType::Int32,
+                DataType::Int32,
+                DataType::Int32,
+                DataType::Int32,
+                DataType::Utf8,
+            ]
+        );
+        assert_eq!(children.len(), 4);
+        assert_eq!(children[0].role, ChildColumnRole::ListElement);
+        assert_eq!(children[0].physical_index, 1);
+        assert_eq!(children[0].length_physical_index, 0);
+        assert_eq!(children[1].role, ChildColumnRole::MapKey);
+        assert_eq!(children[1].physical_index, 2);
+        assert_eq!(children[1].length_physical_index, 1);
+        assert_eq!(children[2].role, ChildColumnRole::MapValue);
+        assert_eq!(children[2].physical_index, 3);
+        assert_eq!(children[2].length_physical_index, 1);
+        assert_eq!(children[3].role, ChildColumnRole::ListElement);
+        assert_eq!(children[3].physical_index, 4);
+        assert_eq!(children[3].length_physical_index, 3);
+    }
+
     #[test]
     fn test_all_null_encoding() {
         let types = [DataType::Int32];
diff --git a/core/tests/array_type_test.rs b/core/tests/array_type_test.rs
index 53d9ba0..0a8c1b0 100644
--- a/core/tests/array_type_test.rs
+++ b/core/tests/array_type_test.rs
@@ -1197,6 +1197,121 @@ fn test_map_with_array_value() {
     assert_eq!(v1.value(0), 3);
 }
 
+#[test]
+fn test_array_of_map_with_array_value() {
+    // ARRAY<MAP<INT32, ARRAY<UTF8>>>
+    let list_type = DataType::List(Arc::new(Field::new("item", DataType::Utf8, 
true)));
+    let map_type = DataType::Map(
+        Arc::new(Field::new(
+            "entries",
+            DataType::Struct(arrow_schema::Fields::from(vec![
+                Field::new("keys", DataType::Int32, false),
+                Field::new("values", list_type.clone(), true),
+            ])),
+            false,
+        )),
+        false,
+    );
+    let schema = Schema::new(vec![Field::new(
+        "col",
+        DataType::List(Arc::new(Field::new("item", map_type.clone(), true))),
+        true,
+    )]);
+
+    let key_builder = Int32Builder::new();
+    let value_builder = ListBuilder::new(StringBuilder::new());
+    let map_builder = MapBuilder::new(None, key_builder, value_builder);
+    let mut list_builder = ListBuilder::new(map_builder);
+
+    // row 0: [{1:["a","b"], 2:["c"]}, {3:[]}]
+    list_builder.values().keys().append_value(1);
+    list_builder.values().values().values().append_value("a");
+    list_builder.values().values().values().append_value("b");
+    list_builder.values().values().append(true);
+    list_builder.values().keys().append_value(2);
+    list_builder.values().values().values().append_value("c");
+    list_builder.values().values().append(true);
+    list_builder.values().append(true).unwrap();
+
+    list_builder.values().keys().append_value(3);
+    list_builder.values().values().append(true);
+    list_builder.values().append(true).unwrap();
+    list_builder.append(true);
+
+    // row 1: null
+    list_builder.append(false);
+
+    // row 2: [{4:null}]
+    list_builder.values().keys().append_value(4);
+    list_builder.values().values().append(false);
+    list_builder.values().append(true).unwrap();
+    list_builder.append(true);
+
+    let batch = RecordBatch::try_new(
+        Arc::new(schema.clone()),
+        vec![Arc::new(list_builder.finish())],
+    )
+    .unwrap();
+
+    let result = roundtrip(&schema, &[batch]);
+    let col = result[0]
+        .column(0)
+        .as_any()
+        .downcast_ref::<ListArray>()
+        .unwrap();
+    assert_eq!(col.len(), 3);
+    assert!(!col.is_null(0));
+    assert!(col.is_null(1));
+    assert!(!col.is_null(2));
+
+    let row0 = col.value(0);
+    let maps0 = row0.as_any().downcast_ref::<MapArray>().unwrap();
+    assert_eq!(maps0.len(), 2);
+    assert_eq!(maps0.value_length(0), 2);
+    assert_eq!(maps0.value_length(1), 1);
+    let base0 = maps0.value_offsets()[0] as usize;
+    let keys0 = maps0.keys().as_any().downcast_ref::<Int32Array>().unwrap();
+    assert_eq!(keys0.value(base0), 1);
+    assert_eq!(keys0.value(base0 + 1), 2);
+    assert_eq!(keys0.value(base0 + 2), 3);
+
+    let values0 = maps0.values().as_any().downcast_ref::<ListArray>().unwrap();
+    let v0 = values0
+        .value(base0)
+        .as_any()
+        .downcast_ref::<StringArray>()
+        .unwrap()
+        .clone();
+    assert_eq!(v0.len(), 2);
+    assert_eq!(v0.value(0), "a");
+    assert_eq!(v0.value(1), "b");
+    let v1 = values0
+        .value(base0 + 1)
+        .as_any()
+        .downcast_ref::<StringArray>()
+        .unwrap()
+        .clone();
+    assert_eq!(v1.len(), 1);
+    assert_eq!(v1.value(0), "c");
+    let v2 = values0
+        .value(base0 + 2)
+        .as_any()
+        .downcast_ref::<StringArray>()
+        .unwrap()
+        .clone();
+    assert_eq!(v2.len(), 0);
+
+    let row2 = col.value(2);
+    let maps2 = row2.as_any().downcast_ref::<MapArray>().unwrap();
+    assert_eq!(maps2.len(), 1);
+    assert_eq!(maps2.value_length(0), 1);
+    let base2 = maps2.value_offsets()[0] as usize;
+    let keys2 = maps2.keys().as_any().downcast_ref::<Int32Array>().unwrap();
+    assert_eq!(keys2.value(base2), 4);
+    let values2 = maps2.values().as_any().downcast_ref::<ListArray>().unwrap();
+    assert!(values2.is_null(base2));
+}
+
 // ======================== MAP Schema Validation Tests 
========================
 
 #[test]

Reply via email to