This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 9949226f0a perf(parquet): LevelInfoBuilder batch write when no 
repetition childs (#10037)
9949226f0a is described below

commit 9949226f0ab644c701e6ed283db32989bbf6b006
Author: mwish <[email protected]>
AuthorDate: Wed Jun 3 21:33:22 2026 +0800

    perf(parquet): LevelInfoBuilder batch write when no repetition childs 
(#10037)
    
    # Which issue does this PR close?
    
    - Closes #10023 .
    
    # Rationale for this change
    
    Parquet writer writes lists element one by one, this is extremly slow.
    This patch batches writes.
    
    # What changes are included in this PR?
    
    Batches write when writing list with maximum rep level.
    
    # Are these changes tested?
    
    Covered by existing
    
    # Are there any user-facing changes?
    
    No
---
 parquet/src/arrow/arrow_writer/levels.rs | 191 +++++++++++++++++++++++++++++--
 1 file changed, 180 insertions(+), 11 deletions(-)

diff --git a/parquet/src/arrow/arrow_writer/levels.rs 
b/parquet/src/arrow/arrow_writer/levels.rs
index 10f90f707c..e577cf73a8 100644
--- a/parquet/src/arrow/arrow_writer/levels.rs
+++ b/parquet/src/arrow/arrow_writer/levels.rs
@@ -118,6 +118,7 @@ enum LevelInfoBuilder {
         LevelContext,          // Context
         OffsetBuffer<i32>,     // Offsets
         Option<NullBuffer>,    // Nulls
+        bool,                  // is_last_level (child has no nested rep)
     ),
     /// A large list array
     LargeList(
@@ -125,6 +126,7 @@ enum LevelInfoBuilder {
         LevelContext,          // Context
         OffsetBuffer<i64>,     // Offsets
         Option<NullBuffer>,    // Nulls
+        bool,                  // is_last_level (child has no nested rep)
     ),
     /// A fixed size list array
     FixedSizeList(
@@ -223,22 +225,31 @@ impl LevelInfoBuilder {
                     DataType::List(_) => {
                         let list = array.as_list();
                         let child = Self::try_new(child.as_ref(), ctx, 
list.values())?;
+                        let is_last = child.child_has_no_nested_rep();
                         let offsets = list.offsets().clone();
-                        Self::List(Box::new(child), ctx, offsets, 
list.nulls().cloned())
+                        Self::List(
+                            Box::new(child),
+                            ctx,
+                            offsets,
+                            list.nulls().cloned(),
+                            is_last,
+                        )
                     }
                     DataType::LargeList(_) => {
                         let list = array.as_list();
                         let child = Self::try_new(child.as_ref(), ctx, 
list.values())?;
+                        let is_last = child.child_has_no_nested_rep();
                         let offsets = list.offsets().clone();
                         let nulls = list.nulls().cloned();
-                        Self::LargeList(Box::new(child), ctx, offsets, nulls)
+                        Self::LargeList(Box::new(child), ctx, offsets, nulls, 
is_last)
                     }
                     DataType::Map(_, _) => {
                         let map = array.as_map();
                         let entries = Arc::new(map.entries().clone()) as 
ArrayRef;
                         let child = Self::try_new(child.as_ref(), ctx, 
&entries)?;
+                        let is_last = child.child_has_no_nested_rep();
                         let offsets = map.offsets().clone();
-                        Self::List(Box::new(child), ctx, offsets, 
map.nulls().cloned())
+                        Self::List(Box::new(child), ctx, offsets, 
map.nulls().cloned(), is_last)
                     }
                     DataType::FixedSizeList(_, size) => {
                         let list = array.as_fixed_size_list();
@@ -274,8 +285,8 @@ impl LevelInfoBuilder {
     fn finish(self) -> Vec<ArrayLevels> {
         match self {
             LevelInfoBuilder::Primitive(v) => vec![v],
-            LevelInfoBuilder::List(v, _, _, _)
-            | LevelInfoBuilder::LargeList(v, _, _, _)
+            LevelInfoBuilder::List(v, _, _, _, _)
+            | LevelInfoBuilder::LargeList(v, _, _, _, _)
             | LevelInfoBuilder::FixedSizeList(v, _, _, _)
             | LevelInfoBuilder::ListView(v, _, _, _, _)
             | LevelInfoBuilder::LargeListView(v, _, _, _, _) => v.finish(),
@@ -287,11 +298,11 @@ impl LevelInfoBuilder {
     fn write(&mut self, range: Range<usize>) {
         match self {
             LevelInfoBuilder::Primitive(info) => Self::write_leaf(info, range),
-            LevelInfoBuilder::List(child, ctx, offsets, nulls) => {
-                Self::write_list(child, ctx, offsets, nulls.as_ref(), range)
+            LevelInfoBuilder::List(child, ctx, offsets, nulls, is_last) => {
+                Self::write_list(child, ctx, offsets, nulls.as_ref(), range, 
*is_last)
             }
-            LevelInfoBuilder::LargeList(child, ctx, offsets, nulls) => {
-                Self::write_list(child, ctx, offsets, nulls.as_ref(), range)
+            LevelInfoBuilder::LargeList(child, ctx, offsets, nulls, is_last) 
=> {
+                Self::write_list(child, ctx, offsets, nulls.as_ref(), range, 
*is_last)
             }
             LevelInfoBuilder::FixedSizeList(child, ctx, size, nulls) => {
                 Self::write_fixed_size_list(child, ctx, *size, nulls.as_ref(), 
range)
@@ -308,6 +319,19 @@ impl LevelInfoBuilder {
         }
     }
 
+    /// Returns `true` if the child contains no nested repetition levels, 
meaning
+    /// each child element produces exactly one rep_level entry in the leaf.
+    /// This is true for `Primitive` children and `Struct` trees with no list 
descendants.
+    fn child_has_no_nested_rep(&self) -> bool {
+        match self {
+            LevelInfoBuilder::Primitive(_) => true,
+            LevelInfoBuilder::Struct(children, _, _) => {
+                children.iter().all(|c| c.child_has_no_nested_rep())
+            }
+            _ => false,
+        }
+    }
+
     /// Write `range` elements from ListArray `array`
     ///
     /// Note: MapArrays are `ListArray<i32>` under the hood and so are 
dispatched to this method
@@ -317,6 +341,7 @@ impl LevelInfoBuilder {
         offsets: &[O],
         nulls: Option<&NullBuffer>,
         range: Range<usize>,
+        is_last_level: bool,
     ) {
         // Fast path: entire list array is null; emit bulk null rep/def levels
         if nulls.is_some_and(|nulls| nulls.null_count() == nulls.len()) {
@@ -327,6 +352,18 @@ impl LevelInfoBuilder {
             return;
         }
 
+        // Fast path for "last-level list": when the child has no nested 
rep_levels,
+        // each child element produces exactly one rep_level entry. We can 
batch
+        // contiguous non-empty list slots into a single child.write() call, 
then
+        // fix up the rep_levels at list-slot boundaries using offsets 
directly.
+        //
+        // Kept as a separate function so the compiler can optimize 
write_list's
+        // hot loop independently (function body size affects codegen quality).
+        if is_last_level {
+            Self::write_list_last_level(child, ctx, offsets, nulls, range);
+            return;
+        }
+
         let offsets = &offsets[range.start..range.end + 1];
 
         let write_non_null_slice =
@@ -427,6 +464,138 @@ impl LevelInfoBuilder {
         }
     }
 
+    /// Optimized write path for lists whose child has no nested repetition 
levels.
+    ///
+    /// When the child is a leaf (or a struct of leaves), each child element 
maps to
+    /// exactly one rep_level entry. This lets us batch contiguous non-empty 
list
+    /// slots into a single `child.write()` call, then stamp the list-start 
markers
+    /// at positions computed directly from offsets — avoiding per-slot 
`write` +
+    /// reverse-scan overhead.
+    fn write_list_last_level<O: OffsetSizeTrait>(
+        child: &mut LevelInfoBuilder,
+        ctx: &LevelContext,
+        offsets: &[O],
+        nulls: Option<&NullBuffer>,
+        range: Range<usize>,
+    ) {
+        let null_offset = range.start;
+        let offsets = &offsets[range.start..range.end + 1];
+        let list_start_rep = ctx.rep_level - 1;
+
+        let emit_nulls = |child: &mut LevelInfoBuilder, count: usize| {
+            child.visit_leaves(|leaf| {
+                leaf.append_rep_level_run(list_start_rep, count);
+                leaf.append_def_level_run(ctx.def_level - 2, count);
+            });
+        };
+
+        let emit_empties = |child: &mut LevelInfoBuilder, count: usize| {
+            child.visit_leaves(|leaf| {
+                leaf.append_rep_level_run(list_start_rep, count);
+                leaf.append_def_level_run(ctx.def_level - 1, count);
+            });
+        };
+
+        let emit_non_empty_run = |child: &mut LevelInfoBuilder, run_offsets: 
&[O]| {
+            debug_assert!(run_offsets.len() >= 2);
+            let values_start = run_offsets[0].as_usize();
+            let values_end = run_offsets[run_offsets.len() - 1].as_usize();
+            debug_assert!(values_end > values_start);
+
+            // Write all leaf values in one batch. Since the child has no 
nested
+            // rep, this emits (values_end - values_start) rep_levels all equal
+            // to ctx.rep_level (= "continuation within list").
+            child.write(values_start..values_end);
+
+            // The first element of each list slot needs rep_level =
+            // list_start_rep to mark a new list boundary. Because there's a 
1:1
+            // mapping between child elements and rep_level entries, the 
position
+            // of each slot's first element is directly computable from 
offsets.
+            child.visit_leaves(|leaf| {
+                let rep_levels = leaf.rep_levels.materialize_mut().unwrap();
+                let batch_len = values_end - values_start;
+                let batch_base = rep_levels.len() - batch_len;
+
+                for slot_offset in run_offsets.iter().take(run_offsets.len() - 
1) {
+                    let list_start_pos = batch_base + (slot_offset.as_usize() 
- values_start);
+                    rep_levels[list_start_pos] = list_start_rep;
+                }
+            });
+        };
+
+        // Classify each slot, detect run boundaries, flush on transition.
+        #[derive(Clone, Copy, PartialEq)]
+        enum SlotKind {
+            Null,
+            Empty,
+            NonEmpty,
+        }
+
+        let num_slots = offsets.len() - 1;
+        if num_slots == 0 {
+            return;
+        }
+
+        macro_rules! classify {
+            ($i:expr, $nulls:expr) => {
+                if !$nulls.is_valid($i + null_offset) {
+                    SlotKind::Null
+                } else if offsets[$i] == offsets[$i + 1] {
+                    SlotKind::Empty
+                } else {
+                    SlotKind::NonEmpty
+                }
+            };
+        }
+
+        macro_rules! flush_run {
+            ($kind:expr, $start:expr, $end:expr) => {
+                match $kind {
+                    SlotKind::Null => emit_nulls(child, $end - $start),
+                    SlotKind::Empty => emit_empties(child, $end - $start),
+                    SlotKind::NonEmpty => emit_non_empty_run(child, 
&offsets[$start..$end + 1]),
+                }
+            };
+        }
+
+        match nulls {
+            Some(nulls) => {
+                let mut run_kind = classify!(0, nulls);
+                let mut run_start: usize = 0;
+                for i in 1..num_slots {
+                    let kind = classify!(i, nulls);
+                    if kind != run_kind {
+                        flush_run!(run_kind, run_start, i);
+                        run_kind = kind;
+                        run_start = i;
+                    }
+                }
+                flush_run!(run_kind, run_start, num_slots);
+            }
+            None => {
+                let mut run_kind = if offsets[0] == offsets[1] {
+                    SlotKind::Empty
+                } else {
+                    SlotKind::NonEmpty
+                };
+                let mut run_start: usize = 0;
+                for i in 1..num_slots {
+                    let kind = if offsets[i] == offsets[i + 1] {
+                        SlotKind::Empty
+                    } else {
+                        SlotKind::NonEmpty
+                    };
+                    if kind != run_kind {
+                        flush_run!(run_kind, run_start, i);
+                        run_kind = kind;
+                        run_start = i;
+                    }
+                }
+                flush_run!(run_kind, run_start, num_slots);
+            }
+        }
+    }
+
     /// Write `range` elements from ListViewArray `array`
     fn write_list_view<O: OffsetSizeTrait>(
         child: &mut LevelInfoBuilder,
@@ -734,8 +903,8 @@ impl LevelInfoBuilder {
     fn visit_leaves(&mut self, visit: impl Fn(&mut ArrayLevels) + Copy) {
         match self {
             LevelInfoBuilder::Primitive(info) => visit(info),
-            LevelInfoBuilder::List(c, _, _, _)
-            | LevelInfoBuilder::LargeList(c, _, _, _)
+            LevelInfoBuilder::List(c, _, _, _, _)
+            | LevelInfoBuilder::LargeList(c, _, _, _, _)
             | LevelInfoBuilder::FixedSizeList(c, _, _, _)
             | LevelInfoBuilder::ListView(c, _, _, _, _)
             | LevelInfoBuilder::LargeListView(c, _, _, _, _) => 
c.visit_leaves(visit),

Reply via email to