This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 578030c895 parquet: rle skip decode loop when batch contains all max 
levels (aka no nulls) (#9258)
578030c895 is described below

commit 578030c895486a300c594b25a7f2bc64841aacd0
Author: Lanqing Yang <[email protected]>
AuthorDate: Thu Feb 5 14:23:07 2026 -0800

    parquet: rle skip decode loop when batch contains all max levels (aka no 
nulls) (#9258)
    
    # Which issue does this PR close?
    
    
    
    - Closes #NNN.
    
    # Rationale for this change
    parquet reading perf - if rle value is true and rle left have enough
    room for the current batch. lets skip the decode loop the overhead of
    count_set_bits for null bitmap.
    
    
    # What changes are included in this PR?
    
    
    
    # Are these changes tested?
    
    
    # Are there any user-facing changes?
---
 .../src/arrow/record_reader/definition_levels.rs   | 130 ++++++++++++++++++++-
 parquet/src/arrow/record_reader/mod.rs             |   2 +-
 2 files changed, 126 insertions(+), 6 deletions(-)

diff --git a/parquet/src/arrow/record_reader/definition_levels.rs 
b/parquet/src/arrow/record_reader/definition_levels.rs
index f51dee5c5c..9e7345e2d7 100644
--- a/parquet/src/arrow/record_reader/definition_levels.rs
+++ b/parquet/src/arrow/record_reader/definition_levels.rs
@@ -90,13 +90,23 @@ impl DefinitionLevelBuffer {
         }
     }
 
-    /// Returns the built null bitmask
-    pub fn consume_bitmask(&mut self) -> Buffer {
+    /// Returns the built null bitmask, or None if all values are valid
+    pub fn consume_bitmask(&mut self) -> Option<Buffer> {
         self.len = 0;
-        match &mut self.inner {
-            BufferInner::Full { nulls, .. } => nulls.finish().into_inner(),
-            BufferInner::Mask { nulls } => nulls.finish().into_inner(),
+        let nulls = match &mut self.inner {
+            BufferInner::Full { nulls, .. } => nulls,
+            BufferInner::Mask { nulls } => nulls,
+        };
+
+        // Always call finish() to reset the builder state for the next batch.
+        let buffer = nulls.finish().into_inner();
+
+        // If no bitmap was constructed, return None
+        if buffer.is_empty() {
+            return None;
         }
+
+        Some(buffer)
     }
 
     pub fn nulls(&self) -> &BooleanBufferBuilder {
@@ -171,6 +181,15 @@ impl DefinitionLevelDecoder for 
DefinitionLevelBufferDecoder {
             (BufferInner::Mask { nulls }, MaybePacked::Packed(decoder)) => {
                 assert_eq!(self.max_level, 1);
 
+                // Fast path: if all requested levels are valid (max 
definition level),
+                // we can skip RLE decoding and just append all-ones to the 
bitmap.
+                // This is faster than decoding RLE data.
+                if let Some(count) = 
decoder.try_consume_all_valid(num_levels)? {
+                    nulls.append_n(count, true);
+                    return Ok((count, count)); // values_read == levels_read 
when all valid
+                }
+
+                // Normal path: decode RLE data into the bitmap
                 let start = nulls.len();
                 let levels_read = decoder.read(nulls, num_levels)?;
 
@@ -286,6 +305,37 @@ impl PackedDecoder {
         self.data_offset = 0;
     }
 
+    /// Try to consume `len` levels if all are valid (max definition level).
+    ///
+    /// Returns `Ok(Some(count))` if successfully consumed `count` all-valid 
levels.
+    /// Returns `Ok(None)` if there are any nulls or packed data that prevents 
fast path.
+    ///
+    /// Note: On `None`, the decoder state may have advanced to the next RLE 
block,
+    /// but only if `rle_left` was zero (i.e., the block would have been loaded
+    /// on the next read anyway).
+    fn try_consume_all_valid(&mut self, len: usize) -> Result<Option<usize>> {
+        // If no active run and no packed data pending, try to parse the next 
RLE block
+        if self.rle_left == 0 && self.packed_count == self.packed_offset {
+            if self.data_offset < self.data.len() {
+                self.next_rle_block()?;
+            } else {
+                // No more data available
+                return Ok(None);
+            }
+        }
+
+        // Fast path only works when we have an active RLE run of true values
+        // that covers the entire requested length.
+        if self.rle_left >= len && self.rle_value {
+            self.rle_left -= len;
+            return Ok(Some(len));
+        }
+
+        // Any other case (null run, packed data, or insufficient length)
+        // falls back to normal path
+        Ok(None)
+    }
+
     fn read(&mut self, buffer: &mut BooleanBufferBuilder, len: usize) -> 
Result<usize> {
         let mut read = 0;
         while read != len {
@@ -447,4 +497,74 @@ mod tests {
         assert_eq!(read_level + skip_level, len);
         assert_eq!(read_value + skip_value, total_value);
     }
+
+    #[test]
+    fn test_try_consume_all_valid() {
+        // Test with all-valid data (all 1s) - single RLE run
+        let len = 100;
+        let mut encoder = RleEncoder::new(1, 1024);
+        for _ in 0..len {
+            encoder.put(1); // all valid
+        }
+        let encoded = encoder.consume();
+        let mut decoder = PackedDecoder::new();
+        decoder.set_data(Encoding::RLE, encoded.into());
+
+        // try_consume_all_valid now parses the RLE block itself, no need to 
read first
+        let result = decoder.try_consume_all_valid(len).unwrap();
+        assert_eq!(result, Some(len));
+
+        // Test with all-null data (all 0s)
+        let mut encoder = RleEncoder::new(1, 1024);
+        for _ in 0..len {
+            encoder.put(0); // all null
+        }
+        let encoded = encoder.consume();
+        let mut decoder = PackedDecoder::new();
+        decoder.set_data(Encoding::RLE, encoded.into());
+
+        // Should return None because rle_value is false (all nulls)
+        let result = decoder.try_consume_all_valid(len).unwrap();
+        assert_eq!(result, None);
+
+        // Test when requesting more than available in current RLE run
+        let mut encoder = RleEncoder::new(1, 1024);
+        for _ in 0..10 {
+            encoder.put(1); // small run of valid
+        }
+        for _ in 0..10 {
+            encoder.put(0); // followed by nulls
+        }
+        let encoded = encoder.consume();
+        let mut decoder = PackedDecoder::new();
+        decoder.set_data(Encoding::RLE, encoded.into());
+
+        // Request more than the valid run - should return None
+        // (because we don't look ahead to next block)
+        let result = decoder.try_consume_all_valid(20).unwrap();
+        assert_eq!(result, None);
+
+        // Reset decoder and try requesting within the run
+        decoder.set_data(Encoding::RLE, {
+            let mut encoder = RleEncoder::new(1, 1024);
+            for _ in 0..10 {
+                encoder.put(1);
+            }
+            for _ in 0..10 {
+                encoder.put(0);
+            }
+            encoder.consume().into()
+        });
+
+        let result = decoder.try_consume_all_valid(5).unwrap();
+        assert_eq!(result, Some(5));
+
+        // After skipping 5, we should have 5 left in the valid RLE run
+        let result = decoder.try_consume_all_valid(5).unwrap();
+        assert_eq!(result, Some(5));
+
+        // Now the valid run is exhausted, next call should parse the null run 
and return None
+        let result = decoder.try_consume_all_valid(5).unwrap();
+        assert_eq!(result, None);
+    }
 }
diff --git a/parquet/src/arrow/record_reader/mod.rs 
b/parquet/src/arrow/record_reader/mod.rs
index 758aea6ede..2092b4972d 100644
--- a/parquet/src/arrow/record_reader/mod.rs
+++ b/parquet/src/arrow/record_reader/mod.rs
@@ -193,7 +193,7 @@ where
         let mask = self
             .def_levels
             .as_mut()
-            .map(|levels| levels.consume_bitmask());
+            .and_then(|levels| levels.consume_bitmask());
 
         // While we always consume the bitmask here, we only want to return
         // the bitmask for nullable arrays. (Marking nulls on a non-nullable

Reply via email to