This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 11f13a50a1 feat(parquet): batch RLE runs in level encoder via 
scan-ahead (#9830)
11f13a50a1 is described below

commit 11f13a50a1abb483eaa77da24e90cdf0accee244
Author: Hippolyte Barraud <[email protected]>
AuthorDate: Tue Apr 28 16:13:44 2026 -0400

    feat(parquet): batch RLE runs in level encoder via scan-ahead (#9830)
    
    # Which issue does this PR close?
    
    <!--
    We generally require a GitHub issue to be filed for all bug fixes and
    enhancements and this helps us generate change logs for our releases.
    You can link an issue to this PR using the GitHub syntax.
    -->
    
    - Spawn off from #9653
    - Contributes to #9731
    
    # Rationale for this change
    
    <!--
    Why are you proposing this change? If this is already explained clearly
    in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand
    your changes and offer better suggestions for fixes.
    -->
    
    See #9731
    
    # What changes are included in this PR?
    
    Add `is_accumulating_rle()` and `extend_run()` methods to `RleEncoder`
    that allow callers to detect when the encoder is in RLE accumulation
    mode and bulk-extend runs without per-element overhead.
    
    Upgrade `put_with_observer()` in `LevelEncoder` to exploit this: after
    each `put()`, it checks whether the encoder entered accumulation mode.
    If so, it scans ahead for the rest of the run, calls `extend_run()` to
    batch it in O(1), and fires the observer once with the full run length.
    
    This turns the previous O(n) per-value encoding + observation into O(1)
    amortized per RLE run, which is a significant improvement for sparse
    columns where long runs of identical levels are common.
    
    # Are these changes tested?
    
    All tests passing + added coverage around RLE accumulation mode trigger.
    
    # Are there any user-facing changes?
    
    None.
    
    ---------
    
    Signed-off-by: Hippolyte Barraud <[email protected]>
    Co-authored-by: Ed Seidl <[email protected]>
---
 parquet/src/encodings/levels.rs | 21 ++++++++++++---
 parquet/src/encodings/rle.rs    | 60 +++++++++++++++++++++++++++++++++++++++++
 2 files changed, 78 insertions(+), 3 deletions(-)

diff --git a/parquet/src/encodings/levels.rs b/parquet/src/encodings/levels.rs
index 144914a4f5..6b6de8b0e1 100644
--- a/parquet/src/encodings/levels.rs
+++ b/parquet/src/encodings/levels.rs
@@ -53,7 +53,8 @@ impl LevelEncoder {
     }
 
     /// Put/encode levels vector into this level encoder and call
-    /// `observer(value, count)` for each value encountered during encoding.
+    /// `observer(value, count)` for each run of identical values encountered
+    /// during encoding.
     ///
     /// Returns number of encoded values that are less than or equal to length
     /// of the input buffer.
@@ -68,9 +69,23 @@ impl LevelEncoder {
     {
         match *self {
             LevelEncoder::Rle(ref mut encoder) | LevelEncoder::RleV2(ref mut 
encoder) => {
-                for &value in buffer {
+                let mut remaining = buffer;
+                while let Some((&value, rest)) = remaining.split_first() {
                     encoder.put(value as u64);
-                    observer(value, 1);
+                    // After put(), check if the encoder just entered RLE
+                    // accumulation mode. If so, scan ahead for the rest of
+                    // this run to batch the observer call and bulk-extend.
+                    if encoder.is_accumulating_rle(value as u64) {
+                        let run_len = rest.iter().take_while(|&&v| v == 
value).count();
+                        if run_len > 0 {
+                            encoder.extend_run(run_len);
+                        }
+                        observer(value, 1 + run_len);
+                        remaining = &rest[run_len..];
+                    } else {
+                        observer(value, 1);
+                        remaining = rest;
+                    }
                 }
                 buffer.len()
             }
diff --git a/parquet/src/encodings/rle.rs b/parquet/src/encodings/rle.rs
index 806b41a353..47e52d00b0 100644
--- a/parquet/src/encodings/rle.rs
+++ b/parquet/src/encodings/rle.rs
@@ -129,6 +129,30 @@ impl RleEncoder {
         bit_packed_max_size.max(rle_max_size)
     }
 
+    /// Returns `true` if the encoder is currently in RLE accumulation mode
+    /// for the given value (i.e., `repeat_count >= BIT_PACK_GROUP_SIZE` and 
`current_value == value`).
+    ///
+    /// The encoder enters accumulation mode as soon as the 8th consecutive 
identical
+    /// value has been seen: at that point `flush_buffered_values` has 
committed the
+    /// RLE decision and cleared the staging buffer, so no more per-element 
work is
+    /// needed.  Callers may use [`extend_run`](Self::extend_run) to add 
further
+    /// repetitions in O(1) once this returns `true`.
+    #[inline]
+    pub fn is_accumulating_rle(&self, value: u64) -> bool {
+        self.repeat_count >= BIT_PACK_GROUP_SIZE && self.current_value == value
+    }
+
+    /// Extends the current RLE run by `count` additional repetitions.
+    ///
+    /// # Preconditions
+    /// The caller **must** have verified 
[`is_accumulating_rle`](Self::is_accumulating_rle)
+    /// returns `true` for the same value before calling this method.
+    #[inline]
+    pub fn extend_run(&mut self, count: usize) {
+        debug_assert!(self.repeat_count >= BIT_PACK_GROUP_SIZE);
+        self.repeat_count += count;
+    }
+
     /// Encodes `value`, which must be representable with `bit_width` bits.
     #[inline]
     pub fn put(&mut self, value: u64) {
@@ -1024,6 +1048,42 @@ mod tests {
         assert_eq!(actual_values[7], 0);
     }
 
+    /// The encoder enters RLE accumulation mode exactly on the 8th consecutive
+    /// identical value.
+    #[test]
+    fn test_is_accumulating_rle_boundary() {
+        let bit_width = 2;
+        let value = 1u64;
+
+        // 7 identical values: not yet accumulating
+        let mut enc = RleEncoder::new(bit_width, 256);
+        for _ in 0..7 {
+            enc.put(value);
+        }
+        assert!(
+            !enc.is_accumulating_rle(value),
+            "should not be accumulating after 7 values"
+        );
+
+        // 8th value tips into accumulation
+        enc.put(value);
+        assert!(
+            enc.is_accumulating_rle(value),
+            "should be accumulating after 8 values"
+        );
+
+        // extend_run from that state and verify the round-trip
+        enc.extend_run(92); // total: 100 identical values
+        let encoded = enc.consume();
+
+        let mut dec = RleDecoder::new(bit_width);
+        dec.set_data(encoded.into()).unwrap();
+        let mut out = vec![0i32; 100];
+        let n = dec.get_batch::<i32>(&mut out).unwrap();
+        assert_eq!(n, 100);
+        assert!(out.iter().all(|&v| v == value as i32));
+    }
+
     #[test]
     fn test_long_run() {
         // This writer does not write runs longer than 504 values as this 
allows

Reply via email to