alamb commented on code in PR #9848:
URL: https://github.com/apache/arrow-rs/pull/9848#discussion_r3382893736


##########
arrow-buffer/src/builder/boolean.rs:
##########
@@ -619,6 +659,102 @@ mod tests {
         }
     }
 
+    /// Helper: build via append_word and verify against bit-by-bit append.
+    fn check_append_word(initial_bits: usize, word: u64, count: usize) {
+        let mut got = BooleanBufferBuilder::new(0);
+        let mut expected = BooleanBufferBuilder::new(0);
+        got.append_n(initial_bits, true);
+        expected.append_n(initial_bits, true);
+        got.append_word(word, count);
+        for i in 0..count {
+            expected.append(word & (1 << i) != 0);
+        }
+        assert_eq!(got.len(), expected.len());
+        assert_eq!(got.finish(), expected.finish());
+    }
+
+    #[test]
+    fn test_append_word_zero_count() {
+        check_append_word(0, u64::MAX, 0);
+        check_append_word(3, u64::MAX, 0);
+    }
+
+    #[test]
+    fn test_append_word_aligned() {
+        for count in [1, 5, 8, 17, 64] {
+            check_append_word(0, 0xDEAD_BEEF_CAFE_BABE, count);

Review Comment:
   This brings me back to 6.004 in my college days! 



##########
parquet/src/util/bit_util.rs:
##########
@@ -944,6 +982,45 @@ mod tests {
     use rand::distr::{Distribution, StandardUniform};
     use std::fmt::Debug;
 
+    #[test]
+    #[cfg(feature = "arrow")]

Review Comment:
   why does this need the "arrow" feature?



##########
parquet/src/util/bit_util.rs:
##########
@@ -936,6 +936,44 @@ impl From<Vec<u8>> for BitReader {
     }
 }
 
+/// Parallel bit extract: for each set bit in `mask`, extract the
+/// corresponding bit from `value` and pack them contiguously into the low
+/// bits of the return value.
+///
+/// Equivalent to the x86 BMI2 `PEXT` instruction. When compiled with the
+/// `bmi2` target feature enabled (for example `-C target-cpu=x86-64-v3`)
+/// this lowers to the hardware `pext` instruction; otherwise it falls back
+/// to a portable scalar loop.
+///
+/// Replace with `value.compress(mask)` when `uint_gather_scatter_bits`
+/// is stabilised: <https://github.com/rust-lang/rust/issues/149069>
+#[inline]

Review Comment:
   FYI @XiangpengHao  -- we are finally getting BMI instructions!
   
   It would be sweet to start using more of this in other places, such as 
filter_null_mask -- perhaps as a follow on PR we can move this into 
arrow-buffer and see if we can get a performance gain
   
   I'll file a ticket



##########
parquet/src/arrow/array_reader/struct_array.rs:
##########
@@ -82,107 +88,68 @@ impl ArrayReader for StructArrayReader {
         Ok(read.unwrap_or(0))
     }
 
-    /// Consume struct records.
-    ///
-    /// Definition levels of struct array is calculated as following:
-    /// ```ignore
-    /// def_levels[i] = min(child1_def_levels[i], child2_def_levels[i], ...,
-    /// childn_def_levels[i]);
-    /// ```
-    ///
-    /// Repetition levels of struct array is calculated as following:
-    /// ```ignore
-    /// rep_levels[i] = child1_rep_levels[i];
-    /// ```
-    ///
-    /// The null bitmap of struct array is calculated from def_levels:
-    /// ```ignore
-    /// null_bitmap[i] = (def_levels[i] >= self.def_level);
-    /// ```
-    ///
     fn consume_batch(&mut self) -> Result<ArrayRef> {
         if self.children.is_empty() {
             return Ok(Arc::new(StructArray::from(Vec::new())));
         }
 
-        let children_array = self
+        let children_arrays = self
             .children
             .iter_mut()
             .map(|reader| reader.consume_batch())
             .collect::<Result<Vec<_>>>()?;
 
-        // check that array child data has same size
-        let children_array_len = children_array
-            .first()
-            .map(|arr| arr.len())
-            .ok_or_else(|| general_err!("Struct array reader should have at 
least one child!"))?;
-
-        let all_children_len_eq = children_array
-            .iter()
-            .all(|arr| arr.len() == children_array_len);
-        if !all_children_len_eq {
-            return Err(general_err!("Not all children array length are the 
same!"));
-        }
-
         let DataType::Struct(fields) = &self.data_type else {
             return Err(general_err!(
                 "Internal: StructArrayReader must have struct data type, got 
{:?}",
                 self.data_type
             ));
         };
-        let fields = fields.clone(); // cloning Fields is cheap (Arc 
internally)
+        let fields = fields.clone();
 
-        let mut nulls = None;
-        if self.nullable {
-            // calculate struct def level data
+        let item_count = children_arrays.first().map(|a| a.len()).unwrap_or(0);
 
-            // children should have consistent view of parent, only need to 
inspect first child
+        if !children_arrays.windows(2).all(|w| w[0].len() == w[1].len()) {
+            return Err(general_err!("Not all children array length are the 
same!"));
+        }
+
+        // Build struct null bitmap if the struct is nullable.
+        // We iterate def/rep levels and select entries that correspond to
+        // struct rows: skip parent-level padding (d < threshold) and
+        // inner-list continuations (r > struct_rep_level).
+        let nulls = if self.nullable {
             let def_levels = self.children[0]
                 .get_def_levels()
-                .expect("child with nullable parents must have definition 
level");
-
-            // calculate bitmap for current array
-            let mut bitmap_builder = 
BooleanBufferBuilder::new(children_array_len);
-
-            match self.children[0].get_rep_levels() {
-                Some(rep_levels) => {
-                    // Sanity check
-                    assert_eq!(rep_levels.len(), def_levels.len());
-
-                    for (rep_level, def_level) in 
rep_levels.iter().zip(def_levels) {
-                        if rep_level > &self.struct_rep_level {
-                            // Already handled by inner list - SKIP
-                            continue;
-                        }
-                        bitmap_builder.append(*def_level >= 
self.struct_def_level)
-                    }
-                }
-                None => {
-                    // Safety: slice iterator has a trusted length
-                    unsafe {
-                        bitmap_builder.extend_trusted_len(
-                            def_levels
-                                .iter()
-                                .map(|level| *level >= self.struct_def_level),
-                        )
-                    }
-                }
+                .ok_or_else(|| general_err!("child def levels are None"))?;
+            let rep_levels = self.children[0].get_rep_levels();
+
+            let mut bitmap = BooleanBufferBuilder::new(item_count);
+            
crate::arrow::record_reader::definition_levels::build_filtered_validity_bitmap(

Review Comment:
   minor nit is that most of the rest of the code imports the function like 
`use`
   
   so instead of
   ```rust
               
crate::arrow::record_reader::definition_levels::build_filtered_validity_bitmap(
   ```
   
   This
   ```rust
               use 
crate::arrow::record_reader::definition_levels::build_filtered_validity_bitmap.
   ...
               build_filtered_validity_bitmap(...)
   ```
   



##########
parquet/src/arrow/array_reader/list_array.rs:
##########
@@ -97,121 +108,81 @@ impl<OffsetSize: OffsetSizeTrait> ArrayReader for 
ListArrayReader<OffsetSize> {
             .get_rep_levels()
             .ok_or_else(|| general_err!("item_reader rep levels are None."))?;
 
-        if OffsetSize::from_usize(next_batch_array.len()).is_none() {
-            return Err(general_err!(
-                "offset of {} would overflow list array",
-                next_batch_array.len()
-            ));
+        if def_levels.is_empty() {
+            return Ok(new_empty_array(&self.data_type));
         }
 
-        if !rep_levels.is_empty() && rep_levels[0] != 0 {
-            // This implies either the source data was invalid, or the leaf 
column
-            // reader did not correctly delimit semantic records
+        if rep_levels[0] != 0 {
             return Err(general_err!("first repetition level of batch must be 
0"));
         }
 
-        // A non-nullable list has a single definition level indicating if the 
list is empty
-        //
-        // A nullable list has two definition levels associated with it:
-        //
-        // The first identifies if the list is null
-        // The second identifies if the list is empty
-        //
-        // The child data returned above is padded with a value for each 
not-fully defined level.
-        // Therefore null and empty lists will correspond to a value in the 
child array.
-        //
-        // Whilst nulls may have a non-zero slice in the offsets array, empty 
lists must
-        // be of zero length. As a result we MUST filter out values 
corresponding to empty
-        // lists, and for consistency we do the same for nulls.
-
-        // The output offsets for the computed ListArray
-        let mut list_offsets: Vec<OffsetSize> = 
Vec::with_capacity(next_batch_array.len() + 1);
-
-        // The validity mask of the computed ListArray if nullable
-        let mut validity = self
-            .nullable
-            .then(|| BooleanBufferBuilder::new(next_batch_array.len()));
-
-        // The offset into the filtered child data of the current level being 
considered
-        let mut cur_offset = 0;
-
-        // Identifies the start of a run of values to copy from the source 
child data
-        let mut filter_start = None;
-
-        // The number of child values skipped due to empty lists or nulls
-        let mut skipped = 0;
-
-        // Builder used to construct the filtered child data, skipping empty 
lists and nulls
-        let data = next_batch_array.to_data();
-        let mut child_data_builder =
-            MutableArrayData::new(vec![&data], false, next_batch_array.len());
-
-        def_levels.iter().zip(rep_levels).try_for_each(|(d, r)| {
-            match r.cmp(&self.rep_level) {
-                Ordering::Greater => {
-                    // Repetition level greater than current => already 
handled by inner array
-                    if *d < self.def_level {
-                        return Err(general_err!(
-                            "Encountered repetition level too large for 
definition level"
-                        ));
-                    }
-                }
-                Ordering::Equal => {
-                    // New value in the current list
-                    cur_offset += 1;
-                }
-                Ordering::Less => {
-                    // Create new array slice
-                    // Already checked that this cannot overflow
-                    
list_offsets.push(OffsetSize::from_usize(cur_offset).unwrap());
-
-                    if *d >= self.def_level {
-                        // Fully defined value
+        if OffsetSize::from_usize(child_array.len()).is_none() {
+            return Err(general_err!(
+                "offset of {} would overflow list array",
+                child_array.len()
+            ));
+        }
 
-                        // Record current offset if it is None
-                        filter_start.get_or_insert(cur_offset + skipped);
+        let levels_len = def_levels.len();
+        let mut list_offsets: Vec<OffsetSize> = Vec::with_capacity(levels_len 
+ 1);
+        let mut validity = self.nullable.then(|| 
BooleanBufferBuilder::new(levels_len));
 
-                        cur_offset += 1;
+        let mut cur_offset: usize = 0;
 
-                        if let Some(validity) = validity.as_mut() {
-                            validity.append(true)
-                        }
-                    } else {
-                        // Flush the current slice of child values if any
-                        if let Some(start) = filter_start.take() {
-                            child_data_builder.extend(0, start, cur_offset + 
skipped);
-                        }
+        for (&d, &r) in def_levels.iter().zip(rep_levels) {
+            // When this list is a child of another list, skip entries
+            // belonging to null/empty parent lists.
+            if let Some(threshold) = self.parent_threshold {

Review Comment:
   If you wanted to squeeze more performance out of this loop (perhaps as a 
follow on ) you could potentially generate specialized versions for if 
`parent_threshold` and `validitiy` were null, rather than checking for each row



##########
parquet/src/arrow/array_reader/list_array.rs:
##########
@@ -97,121 +108,81 @@ impl<OffsetSize: OffsetSizeTrait> ArrayReader for 
ListArrayReader<OffsetSize> {
             .get_rep_levels()
             .ok_or_else(|| general_err!("item_reader rep levels are None."))?;
 
-        if OffsetSize::from_usize(next_batch_array.len()).is_none() {
-            return Err(general_err!(
-                "offset of {} would overflow list array",
-                next_batch_array.len()
-            ));
+        if def_levels.is_empty() {
+            return Ok(new_empty_array(&self.data_type));
         }
 
-        if !rep_levels.is_empty() && rep_levels[0] != 0 {
-            // This implies either the source data was invalid, or the leaf 
column
-            // reader did not correctly delimit semantic records
+        if rep_levels[0] != 0 {
             return Err(general_err!("first repetition level of batch must be 
0"));
         }
 
-        // A non-nullable list has a single definition level indicating if the 
list is empty

Review Comment:
   FWIW I found this context / comment helpful for reading the code and not 
being as familar with the repetition level and definition levels as i would 
like. Maybe we can restore it to help future readers



##########
parquet/src/arrow/array_reader/list_array.rs:
##########
@@ -97,121 +108,81 @@ impl<OffsetSize: OffsetSizeTrait> ArrayReader for 
ListArrayReader<OffsetSize> {
             .get_rep_levels()
             .ok_or_else(|| general_err!("item_reader rep levels are None."))?;
 
-        if OffsetSize::from_usize(next_batch_array.len()).is_none() {
-            return Err(general_err!(
-                "offset of {} would overflow list array",
-                next_batch_array.len()
-            ));
+        if def_levels.is_empty() {
+            return Ok(new_empty_array(&self.data_type));
         }
 
-        if !rep_levels.is_empty() && rep_levels[0] != 0 {
-            // This implies either the source data was invalid, or the leaf 
column
-            // reader did not correctly delimit semantic records
+        if rep_levels[0] != 0 {
             return Err(general_err!("first repetition level of batch must be 
0"));
         }
 
-        // A non-nullable list has a single definition level indicating if the 
list is empty

Review Comment:
   For example, this table might help:
   ```
   For a nullable list, definition levels encode three descending states. 
   The list reader's self.def_level helps determine if "the list is present and 
contributes a child value":
   
   ┌───────────────────────────┬────────────────────────────────────┐
   │           State           │             def level              │
   ├───────────────────────────┼────────────────────────────────────┤
   │ List present with a value │ d >= def_level                     │
   ├───────────────────────────┼────────────────────────────────────┤
   │ List present but empty    │ d == def_level - 1                 │
   ├───────────────────────────┼────────────────────────────────────┤
   │ List is null              │ d <= def_level - 2 ← "lower still" │
   └───────────────────────────┴────────────────────────────────────┘
   ```



##########
parquet/src/arrow/record_reader/mod.rs:
##########
@@ -62,6 +62,19 @@ pub struct GenericRecordReader<V, CV> {
     num_records: usize,
     /// Capacity hint for pre-allocating buffers based on batch size
     capacity_hint: usize,
+    /// Number of values in the values buffer (may differ from num_values when
+    /// padding_threshold is set, since list-level padding is excluded).
+    values_written: usize,
+    /// When set, `pad_nulls` only pads item-level nulls (def >= threshold)

Review Comment:
   it took me quite a while to grok exactly what the padding threshold was all 
about. I think the idea is that it is used to avoid materializing child values 
when the parent is null.  Is there some good place to document the design I 
wonder 🤔 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to