alamb commented on code in PR #8951:
URL: https://github.com/apache/arrow-rs/pull/8951#discussion_r2702698934


##########
arrow-select/src/coalesce/primitive.rs:
##########
@@ -95,6 +96,151 @@ impl<T: ArrowPrimitiveType + Debug> InProgressArray for 
InProgressPrimitiveArray
         Ok(())
     }
 
+    /// Copy rows using a predicate
+    fn copy_rows_by_filter(
+        &mut self,
+        filter: &FilterPredicate,
+        offset: usize,
+        len: usize,
+    ) -> Result<(), ArrowError> {
+        self.ensure_capacity();
+
+        let s = self
+            .source
+            .as_ref()
+            .ok_or_else(|| {
+                ArrowError::InvalidArgumentError(
+                    "Internal Error: InProgressPrimitiveArray: source not 
set".to_string(),
+                )
+            })?
+            .slice(offset, len);
+        let s = s.as_primitive::<T>();
+
+        let values = s.values();
+        let count = filter.count();
+
+        // Use the predicate's strategy for optimal iteration
+        match filter.strategy() {
+            IterationStrategy::SlicesIterator => {
+                // Copy values, nulls using slices
+                if let Some(nulls) = s.nulls().filter(|n| n.null_count() > 0) {
+                    for (start, end) in 
SlicesIterator::new(filter.filter_array()) {
+                        // SAFETY: slices are derived from filter predicate
+                        self.current
+                            .extend_from_slice(unsafe { 
values.get_unchecked(start..end) });
+                        let slice = nulls.slice(start, end - start);
+                        self.nulls.append_buffer(&slice);
+                    }
+                } else {
+                    for (start, end) in 
SlicesIterator::new(filter.filter_array()) {
+                        // SAFETY: SlicesIterator produces valid ranges 
derived from filter
+                        self.current
+                            .extend_from_slice(unsafe { 
values.get_unchecked(start..end) });
+                    }
+                    self.nulls.append_n_non_nulls(count);
+                }
+            }
+            IterationStrategy::Slices(slices) => {
+                // Copy values and nulls using precomputed slices - single 
iteration
+                if let Some(nulls) = s.nulls().filter(|n| n.null_count() > 0) {
+                    for &(start, end) in slices {
+                        // SAFETY: slices are derived from filter predicate
+                        self.current
+                            .extend_from_slice(unsafe { 
values.get_unchecked(start..end) });
+                        let slice = nulls.slice(start, end - start);
+                        self.nulls.append_buffer(&slice);
+                    }
+                } else {
+                    for &(start, end) in slices {
+                        // SAFETY: slices are derived from filter predicate
+                        self.current
+                            .extend_from_slice(unsafe { 
values.get_unchecked(start..end) });
+                    }
+                    self.nulls.append_n_non_nulls(count);
+                }
+            }
+            IterationStrategy::IndexIterator => {
+                // Copy values and nulls for each index
+                if let Some(nulls) = s.nulls().filter(|n| n.null_count() > 0) {
+                    let null_buffer = nulls.inner();
+                    let null_ptr = null_buffer.values().as_ptr();
+                    let null_offset = null_buffer.offset();
+
+                    // Collect indices for reuse (values + nulls)
+                    let indices = IndexIterator::new(filter.filter_array(), 
count);
+
+                    // Efficiently extend null buffer
+                    // SAFETY: indices iterator reports correct length
+                    unsafe {
+                        self.nulls.extend_trusted_len(
+                            indices.map(|idx| bit_util::get_bit_raw(null_ptr, 
idx + null_offset)),
+                        );
+                    }
+                    let indices = IndexIterator::new(filter.filter_array(), 
count);
+
+                    // Copy values
+                    // SAFETY: indices are derived from filter predicate
+                    self.current
+                        .extend(indices.map(|idx| unsafe { 
*values.get_unchecked(idx) }));
+                } else {
+                    self.nulls.append_n_non_nulls(count);
+                    let indices = IndexIterator::new(filter.filter_array(), 
count);
+                    // SAFETY: indices are derived from filter predicate
+                    self.current
+                        .extend(indices.map(|idx: usize| unsafe { 
*values.get_unchecked(idx) }));
+                }
+            }
+            IterationStrategy::Indices(indices) => {
+                // Copy values and nulls using precomputed indices
+                if let Some(nulls) = s.nulls().filter(|n| n.null_count() > 0) {
+                    let null_buffer = nulls.inner();
+                    let null_ptr = null_buffer.values().as_ptr();
+                    let null_offset = null_buffer.offset();
+
+                    // Efficiently extend null buffer
+                    // SAFETY: indices iterator reports correct length
+                    unsafe {
+                        self.nulls.extend_trusted_len(
+                            indices
+                                .iter()
+                                .map(|&idx| bit_util::get_bit_raw(null_ptr, 
idx + null_offset)),
+                        );
+                    }
+
+                    // Copy values
+                    // SAFETY: indices are derived from filter predicate
+                    self.current.extend(
+                        indices
+                            .iter()
+                            .map(|&idx| unsafe { *values.get_unchecked(idx) }),
+                    );
+                } else {
+                    self.nulls.append_n_non_nulls(count);
+                    // SAFETY: indices are derived from filter predicate
+                    self.current.extend(
+                        indices
+                            .iter()
+                            .map(|&idx| unsafe { *values.get_unchecked(idx) }),
+                    )
+                };
+            }
+            IterationStrategy::All => {
+                // Copy all values

Review Comment:
   While trying to write a test to cover this code, I am pretty sure this is 
unreachable (as if the entire batch is selected, then the fast path above like 
this is executed:
   
   ```rust
           // Fast path: if all rows selected, just push the batch
           if selected_count == num_rows {
               return self.push_batch(batch);
           }
   ```
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to