mapleFU commented on code in PR #10085:
URL: https://github.com/apache/arrow-rs/pull/10085#discussion_r3367081856


##########
parquet/src/arrow/arrow_writer/levels.rs:
##########
@@ -352,131 +352,128 @@ impl LevelInfoBuilder {
             return;
         }
 
-        // Fast path for "last-level list": when the child has no nested 
rep_levels,
-        // each child element produces exactly one rep_level entry. We can 
batch
-        // contiguous non-empty list slots into a single child.write() call, 
then
-        // fix up the rep_levels at list-slot boundaries using offsets 
directly.
-        //
-        // Kept as a separate function so the compiler can optimize 
write_list's
+        // Dispatch to separate functions so the compiler can optimize each
         // hot loop independently (function body size affects codegen quality).
         if is_last_level {
-            Self::write_list_last_level(child, ctx, offsets, nulls, range);
-            return;
+            Self::write_list_direct(child, ctx, offsets, nulls, range);
+        } else {
+            Self::write_list_scan(child, ctx, offsets, nulls, range);
         }
+    }
 
-        let offsets = &offsets[range.start..range.end + 1];
-
-        let write_non_null_slice =
-            |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
-                child.write(start_idx..end_idx);
-                child.visit_leaves(|leaf| {
-                    let rep_levels = 
leaf.rep_levels.materialize_mut().unwrap();
-                    let mut rev = rep_levels.iter_mut().rev();
-                    let mut remaining = end_idx - start_idx;
+    /// Batch write for lists whose child not has nested repetition.
+    ///
+    /// direct means direct write the child rep-levels by offsets without scan.
+    fn write_list_direct<O: OffsetSizeTrait>(
+        child: &mut LevelInfoBuilder,
+        ctx: &LevelContext,
+        offsets: &[O],
+        nulls: Option<&NullBuffer>,
+        range: Range<usize>,
+    ) {
+        let list_start_rep = ctx.rep_level - 1;
 
-                    loop {
-                        let next = rev.next().unwrap();
-                        if *next > ctx.rep_level {
-                            // Nested element - ignore
-                            continue;
-                        }
+        let emit_non_empty_run = |child: &mut LevelInfoBuilder, run_offsets: 
&[O]| {
+            debug_assert!(run_offsets.len() >= 2);
+            let values_start = run_offsets[0].as_usize();
+            let values_end = run_offsets[run_offsets.len() - 1].as_usize();
+            debug_assert!(values_end > values_start);
 
-                        remaining -= 1;
-                        if remaining == 0 {
-                            *next = ctx.rep_level - 1;
-                            break;
-                        }
-                    }
-                })
-            };
+            child.write(values_start..values_end);
 
-        // In a list column, each row falls into one of three categories:
-        // - "null": the list slot is absent (!is_valid), encoded at def_level 
- 2
-        // - "empty": the list slot is present but has zero elements
-        //   (offsets[i] == offsets[i+1]), encoded at def_level - 1
-        // - non-empty: the list slot has child values, which are recursed into
-        //
-        // Consecutive runs of null or empty rows are batched and written 
together.
-        let write_null_run = |child: &mut LevelInfoBuilder, count: usize| {
-            if count > 0 {
-                child.visit_leaves(|leaf| {
-                    leaf.append_rep_level_run(ctx.rep_level - 1, count);
-                    leaf.append_def_level_run(ctx.def_level - 2, count);
-                });
-            }
+            // The first element of each list slot needs rep_level =
+            // list_start_rep to mark a new list boundary. Because there's a 
1:1
+            // mapping between child elements and rep_level entries, the 
position
+            // of each slot's first element is directly computable from 
offsets.
+            child.visit_leaves(|leaf| {
+                debug_assert!(leaf.max_rep_level == ctx.rep_level);
+                let rep_levels = leaf.rep_levels.materialize_mut().unwrap();
+                let batch_len = values_end - values_start;
+                let batch_base = rep_levels.len() - batch_len;
+                for slot_offset in run_offsets.iter().take(run_offsets.len() - 
1) {
+                    let pos = batch_base + (slot_offset.as_usize() - 
values_start);
+                    rep_levels[pos] = list_start_rep;
+                }
+            });
         };
 
-        let write_empty_run = |child: &mut LevelInfoBuilder, count: usize| {
-            if count > 0 {
-                child.visit_leaves(|leaf| {
-                    leaf.append_rep_level_run(ctx.rep_level - 1, count);
-                    leaf.append_def_level_run(ctx.def_level - 1, count);
-                });
-            }
-        };
+        Self::write_list_impl(child, ctx, offsets, nulls, range, 
emit_non_empty_run);
+    }
 
-        match nulls {
-            Some(nulls) => {
-                let null_offset = range.start;
-                let mut pending_nulls: usize = 0;
-                let mut pending_empties: usize = 0;
+    /// Batch write for lists whose child has nested repetition.
+    ///
+    /// After batch-writing child elements, scans backward through rep_levels
+    /// counting child-element starts to find and stamp slot boundaries.
+    ///
+    /// Scan backward because we don't know start offset before writing.
+    fn write_list_scan<O: OffsetSizeTrait>(

Review Comment:
   I previously want to remove the branch for write non-nested childs, however 
benchmark shows that adding a more branch will hurt the performance. So I split 
it to two functions.



##########
parquet/src/arrow/arrow_writer/levels.rs:
##########
@@ -352,131 +352,128 @@ impl LevelInfoBuilder {
             return;
         }
 
-        // Fast path for "last-level list": when the child has no nested 
rep_levels,
-        // each child element produces exactly one rep_level entry. We can 
batch
-        // contiguous non-empty list slots into a single child.write() call, 
then
-        // fix up the rep_levels at list-slot boundaries using offsets 
directly.
-        //
-        // Kept as a separate function so the compiler can optimize 
write_list's
+        // Dispatch to separate functions so the compiler can optimize each
         // hot loop independently (function body size affects codegen quality).
         if is_last_level {
-            Self::write_list_last_level(child, ctx, offsets, nulls, range);
-            return;
+            Self::write_list_direct(child, ctx, offsets, nulls, range);
+        } else {
+            Self::write_list_scan(child, ctx, offsets, nulls, range);
         }
+    }
 
-        let offsets = &offsets[range.start..range.end + 1];
-
-        let write_non_null_slice =
-            |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
-                child.write(start_idx..end_idx);
-                child.visit_leaves(|leaf| {
-                    let rep_levels = 
leaf.rep_levels.materialize_mut().unwrap();
-                    let mut rev = rep_levels.iter_mut().rev();
-                    let mut remaining = end_idx - start_idx;
+    /// Batch write for lists whose child not has nested repetition.
+    ///
+    /// direct means direct write the child rep-levels by offsets without scan.
+    fn write_list_direct<O: OffsetSizeTrait>(
+        child: &mut LevelInfoBuilder,
+        ctx: &LevelContext,
+        offsets: &[O],
+        nulls: Option<&NullBuffer>,
+        range: Range<usize>,
+    ) {
+        let list_start_rep = ctx.rep_level - 1;
 
-                    loop {
-                        let next = rev.next().unwrap();
-                        if *next > ctx.rep_level {
-                            // Nested element - ignore
-                            continue;
-                        }
+        let emit_non_empty_run = |child: &mut LevelInfoBuilder, run_offsets: 
&[O]| {
+            debug_assert!(run_offsets.len() >= 2);
+            let values_start = run_offsets[0].as_usize();
+            let values_end = run_offsets[run_offsets.len() - 1].as_usize();
+            debug_assert!(values_end > values_start);
 
-                        remaining -= 1;
-                        if remaining == 0 {
-                            *next = ctx.rep_level - 1;
-                            break;
-                        }
-                    }
-                })
-            };
+            child.write(values_start..values_end);
 
-        // In a list column, each row falls into one of three categories:
-        // - "null": the list slot is absent (!is_valid), encoded at def_level 
- 2
-        // - "empty": the list slot is present but has zero elements
-        //   (offsets[i] == offsets[i+1]), encoded at def_level - 1
-        // - non-empty: the list slot has child values, which are recursed into
-        //
-        // Consecutive runs of null or empty rows are batched and written 
together.
-        let write_null_run = |child: &mut LevelInfoBuilder, count: usize| {
-            if count > 0 {
-                child.visit_leaves(|leaf| {
-                    leaf.append_rep_level_run(ctx.rep_level - 1, count);
-                    leaf.append_def_level_run(ctx.def_level - 2, count);
-                });
-            }
+            // The first element of each list slot needs rep_level =
+            // list_start_rep to mark a new list boundary. Because there's a 
1:1
+            // mapping between child elements and rep_level entries, the 
position
+            // of each slot's first element is directly computable from 
offsets.
+            child.visit_leaves(|leaf| {
+                debug_assert!(leaf.max_rep_level == ctx.rep_level);
+                let rep_levels = leaf.rep_levels.materialize_mut().unwrap();
+                let batch_len = values_end - values_start;
+                let batch_base = rep_levels.len() - batch_len;
+                for slot_offset in run_offsets.iter().take(run_offsets.len() - 
1) {
+                    let pos = batch_base + (slot_offset.as_usize() - 
values_start);
+                    rep_levels[pos] = list_start_rep;
+                }
+            });
         };
 
-        let write_empty_run = |child: &mut LevelInfoBuilder, count: usize| {
-            if count > 0 {
-                child.visit_leaves(|leaf| {
-                    leaf.append_rep_level_run(ctx.rep_level - 1, count);
-                    leaf.append_def_level_run(ctx.def_level - 1, count);
-                });
-            }
-        };
+        Self::write_list_impl(child, ctx, offsets, nulls, range, 
emit_non_empty_run);
+    }
 
-        match nulls {
-            Some(nulls) => {
-                let null_offset = range.start;
-                let mut pending_nulls: usize = 0;
-                let mut pending_empties: usize = 0;
+    /// Batch write for lists whose child has nested repetition.
+    ///
+    /// After batch-writing child elements, scans backward through rep_levels
+    /// counting child-element starts to find and stamp slot boundaries.
+    ///
+    /// Scan backward because we don't know start offset before writing.
+    fn write_list_scan<O: OffsetSizeTrait>(
+        child: &mut LevelInfoBuilder,
+        ctx: &LevelContext,
+        offsets: &[O],
+        nulls: Option<&NullBuffer>,
+        range: Range<usize>,
+    ) {
+        let list_start_rep = ctx.rep_level - 1;
 
-                // TODO: Faster bitmask iteration (#1757)
-                for (idx, w) in offsets.windows(2).enumerate() {
-                    let is_valid = nulls.is_valid(idx + null_offset);
-                    let start_idx = w[0].as_usize();
-                    let end_idx = w[1].as_usize();
+        let emit_non_empty_run = |child: &mut LevelInfoBuilder, run_offsets: 
&[O]| {
+            debug_assert!(run_offsets.len() >= 2);
+            let values_start = run_offsets[0].as_usize();
+            let values_end = run_offsets[run_offsets.len() - 1].as_usize();
+            debug_assert!(values_end > values_start);
 
-                    if !is_valid {
-                        write_empty_run(child, pending_empties);
-                        pending_empties = 0;
-                        pending_nulls += 1;
-                    } else if start_idx == end_idx {
-                        write_null_run(child, pending_nulls);
-                        pending_nulls = 0;
-                        pending_empties += 1;
-                    } else {
-                        write_null_run(child, pending_nulls);
-                        pending_nulls = 0;
-                        write_empty_run(child, pending_empties);
-                        pending_empties = 0;
-                        write_non_null_slice(child, start_idx, end_idx);
+            child.write(values_start..values_end);
+
+            child.visit_leaves(|leaf| {
+                let rep_levels = leaf.rep_levels.materialize_mut().unwrap();
+
+                if leaf.max_rep_level == ctx.rep_level {
+                    // This algorithm is same as write_list_direct.
+                    // Use separate function because the branch code size 
would affect codegen
+                    // quality of the hot loop of write_list_direct.
+                    let batch_len = values_end - values_start;
+                    let batch_base = rep_levels.len() - batch_len;
+                    for slot_offset in 
run_offsets.iter().take(run_offsets.len() - 1) {
+                        let pos = batch_base + (slot_offset.as_usize() - 
values_start);
+                        rep_levels[pos] = list_start_rep;
                     }
-                }
-                write_null_run(child, pending_nulls);
-                write_empty_run(child, pending_empties);
-            }
-            None => {
-                let mut pending_empties: usize = 0;
-                for w in offsets.windows(2) {
-                    let start_idx = w[0].as_usize();
-                    let end_idx = w[1].as_usize();
-                    if start_idx == end_idx {
-                        pending_empties += 1;
-                    } else {
-                        write_empty_run(child, pending_empties);
-                        pending_empties = 0;
-                        write_non_null_slice(child, start_idx, end_idx);
+                } else {
+                    // Backward scan: count child-element starts (rep <= 
ctx.rep_level)
+                    // and stamp list_start_rep at slot boundaries.
+                    let mut slot_bounds = run_offsets[..run_offsets.len() - 
1].iter().rev();
+                    let mut next_stamp_at = values_end - 
slot_bounds.next().unwrap().as_usize();
+                    let mut seen = 0usize;
+
+                    for rep in rep_levels.iter_mut().rev() {
+                        // This can uses `==`, since list write is recursive 
and the child is written
+                        // before the parent.
+                        if *rep <= ctx.rep_level {

Review Comment:
   I still use `<=` because benchmark shows no performance enhancement between 
`<=` and `==`, so just uses `<=`



##########
parquet/src/arrow/arrow_writer/levels.rs:
##########
@@ -352,131 +352,128 @@ impl LevelInfoBuilder {
             return;
         }
 
-        // Fast path for "last-level list": when the child has no nested 
rep_levels,
-        // each child element produces exactly one rep_level entry. We can 
batch
-        // contiguous non-empty list slots into a single child.write() call, 
then
-        // fix up the rep_levels at list-slot boundaries using offsets 
directly.
-        //
-        // Kept as a separate function so the compiler can optimize 
write_list's
+        // Dispatch to separate functions so the compiler can optimize each
         // hot loop independently (function body size affects codegen quality).
         if is_last_level {
-            Self::write_list_last_level(child, ctx, offsets, nulls, range);
-            return;
+            Self::write_list_direct(child, ctx, offsets, nulls, range);
+        } else {
+            Self::write_list_scan(child, ctx, offsets, nulls, range);
         }
+    }
 
-        let offsets = &offsets[range.start..range.end + 1];
-
-        let write_non_null_slice =
-            |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
-                child.write(start_idx..end_idx);
-                child.visit_leaves(|leaf| {
-                    let rep_levels = 
leaf.rep_levels.materialize_mut().unwrap();
-                    let mut rev = rep_levels.iter_mut().rev();
-                    let mut remaining = end_idx - start_idx;
+    /// Batch write for lists whose child not has nested repetition.
+    ///
+    /// direct means direct write the child rep-levels by offsets without scan.
+    fn write_list_direct<O: OffsetSizeTrait>(
+        child: &mut LevelInfoBuilder,
+        ctx: &LevelContext,
+        offsets: &[O],
+        nulls: Option<&NullBuffer>,
+        range: Range<usize>,
+    ) {
+        let list_start_rep = ctx.rep_level - 1;
 
-                    loop {
-                        let next = rev.next().unwrap();
-                        if *next > ctx.rep_level {
-                            // Nested element - ignore
-                            continue;
-                        }
+        let emit_non_empty_run = |child: &mut LevelInfoBuilder, run_offsets: 
&[O]| {
+            debug_assert!(run_offsets.len() >= 2);
+            let values_start = run_offsets[0].as_usize();
+            let values_end = run_offsets[run_offsets.len() - 1].as_usize();
+            debug_assert!(values_end > values_start);
 
-                        remaining -= 1;
-                        if remaining == 0 {
-                            *next = ctx.rep_level - 1;
-                            break;
-                        }
-                    }
-                })
-            };
+            child.write(values_start..values_end);
 
-        // In a list column, each row falls into one of three categories:
-        // - "null": the list slot is absent (!is_valid), encoded at def_level 
- 2
-        // - "empty": the list slot is present but has zero elements
-        //   (offsets[i] == offsets[i+1]), encoded at def_level - 1
-        // - non-empty: the list slot has child values, which are recursed into
-        //
-        // Consecutive runs of null or empty rows are batched and written 
together.
-        let write_null_run = |child: &mut LevelInfoBuilder, count: usize| {
-            if count > 0 {
-                child.visit_leaves(|leaf| {
-                    leaf.append_rep_level_run(ctx.rep_level - 1, count);
-                    leaf.append_def_level_run(ctx.def_level - 2, count);
-                });
-            }
+            // The first element of each list slot needs rep_level =
+            // list_start_rep to mark a new list boundary. Because there's a 
1:1
+            // mapping between child elements and rep_level entries, the 
position
+            // of each slot's first element is directly computable from 
offsets.
+            child.visit_leaves(|leaf| {
+                debug_assert!(leaf.max_rep_level == ctx.rep_level);
+                let rep_levels = leaf.rep_levels.materialize_mut().unwrap();
+                let batch_len = values_end - values_start;
+                let batch_base = rep_levels.len() - batch_len;
+                for slot_offset in run_offsets.iter().take(run_offsets.len() - 
1) {
+                    let pos = batch_base + (slot_offset.as_usize() - 
values_start);
+                    rep_levels[pos] = list_start_rep;
+                }
+            });
         };
 
-        let write_empty_run = |child: &mut LevelInfoBuilder, count: usize| {
-            if count > 0 {
-                child.visit_leaves(|leaf| {
-                    leaf.append_rep_level_run(ctx.rep_level - 1, count);
-                    leaf.append_def_level_run(ctx.def_level - 1, count);
-                });
-            }
-        };
+        Self::write_list_impl(child, ctx, offsets, nulls, range, 
emit_non_empty_run);
+    }
 
-        match nulls {
-            Some(nulls) => {
-                let null_offset = range.start;
-                let mut pending_nulls: usize = 0;
-                let mut pending_empties: usize = 0;
+    /// Batch write for lists whose child has nested repetition.
+    ///
+    /// After batch-writing child elements, scans backward through rep_levels
+    /// counting child-element starts to find and stamp slot boundaries.
+    ///
+    /// Scan backward because we don't know start offset before writing.
+    fn write_list_scan<O: OffsetSizeTrait>(
+        child: &mut LevelInfoBuilder,
+        ctx: &LevelContext,
+        offsets: &[O],
+        nulls: Option<&NullBuffer>,
+        range: Range<usize>,
+    ) {
+        let list_start_rep = ctx.rep_level - 1;
 
-                // TODO: Faster bitmask iteration (#1757)
-                for (idx, w) in offsets.windows(2).enumerate() {
-                    let is_valid = nulls.is_valid(idx + null_offset);
-                    let start_idx = w[0].as_usize();
-                    let end_idx = w[1].as_usize();
+        let emit_non_empty_run = |child: &mut LevelInfoBuilder, run_offsets: 
&[O]| {
+            debug_assert!(run_offsets.len() >= 2);
+            let values_start = run_offsets[0].as_usize();
+            let values_end = run_offsets[run_offsets.len() - 1].as_usize();
+            debug_assert!(values_end > values_start);
 
-                    if !is_valid {
-                        write_empty_run(child, pending_empties);
-                        pending_empties = 0;
-                        pending_nulls += 1;
-                    } else if start_idx == end_idx {
-                        write_null_run(child, pending_nulls);
-                        pending_nulls = 0;
-                        pending_empties += 1;
-                    } else {
-                        write_null_run(child, pending_nulls);
-                        pending_nulls = 0;
-                        write_empty_run(child, pending_empties);
-                        pending_empties = 0;
-                        write_non_null_slice(child, start_idx, end_idx);
+            child.write(values_start..values_end);
+
+            child.visit_leaves(|leaf| {
+                let rep_levels = leaf.rep_levels.materialize_mut().unwrap();
+
+                if leaf.max_rep_level == ctx.rep_level {
+                    // This algorithm is same as write_list_direct.
+                    // Use separate function because the branch code size 
would affect codegen
+                    // quality of the hot loop of write_list_direct.
+                    let batch_len = values_end - values_start;
+                    let batch_base = rep_levels.len() - batch_len;
+                    for slot_offset in 
run_offsets.iter().take(run_offsets.len() - 1) {
+                        let pos = batch_base + (slot_offset.as_usize() - 
values_start);
+                        rep_levels[pos] = list_start_rep;
                     }
-                }
-                write_null_run(child, pending_nulls);
-                write_empty_run(child, pending_empties);
-            }
-            None => {
-                let mut pending_empties: usize = 0;
-                for w in offsets.windows(2) {
-                    let start_idx = w[0].as_usize();
-                    let end_idx = w[1].as_usize();
-                    if start_idx == end_idx {
-                        pending_empties += 1;
-                    } else {
-                        write_empty_run(child, pending_empties);
-                        pending_empties = 0;
-                        write_non_null_slice(child, start_idx, end_idx);
+                } else {
+                    // Backward scan: count child-element starts (rep <= 
ctx.rep_level)
+                    // and stamp list_start_rep at slot boundaries.
+                    let mut slot_bounds = run_offsets[..run_offsets.len() - 
1].iter().rev();
+                    let mut next_stamp_at = values_end - 
slot_bounds.next().unwrap().as_usize();
+                    let mut seen = 0usize;
+
+                    for rep in rep_levels.iter_mut().rev() {
+                        // This can uses `==`, since list write is recursive 
and the child is written
+                        // before the parent.
+                        if *rep <= ctx.rep_level {
+                            seen += 1;
+                            if seen == next_stamp_at {

Review Comment:
   Maybe we can SIMD this in the future, this is not cirtical in this branch. 
Or we can "batching" if list offsets is large. E.g. checking not one-by-one, 
and just batch by batch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to