alamb commented on code in PR #8733:
URL: https://github.com/apache/arrow-rs/pull/8733#discussion_r2479120538


##########
parquet/src/arrow/arrow_reader/mod.rs:
##########
@@ -1035,8 +1050,85 @@ impl ParquetRecordBatchReader {
         let batch_size = self.batch_size();
         match self.read_plan.selection_mut() {
             Some(selection) => {
+                if selection.is_mask_backed() {
+                    // Stream the record batch reader using contiguous 
segments of the selection
+                    // mask, avoiding the need to materialize intermediate 
`RowSelector` ranges.
+                    loop {
+                        let mask_chunk = match 
selection.next_mask_chunk(batch_size) {
+                            Some(batch) => batch,
+                            None => return Ok(None),
+                        };
+
+                        if mask_chunk.initial_skip > 0 {
+                            let skipped =
+                                
self.array_reader.skip_records(mask_chunk.initial_skip)?;
+                            if skipped != mask_chunk.initial_skip {
+                                return Err(general_err!(
+                                    "failed to skip rows, expected {}, got {}",
+                                    mask_chunk.initial_skip,
+                                    skipped
+                                ));
+                            }
+                        }
+
+                        if mask_chunk.chunk_rows == 0 {
+                            if selection.is_empty() && 
mask_chunk.selected_rows == 0 {
+                                return Ok(None);
+                            }
+                            continue;
+                        }
+
+                        let mask = selection
+                            .mask_values_for(&mask_chunk)
+                            .ok_or_else(|| general_err!("row selection mask 
out of bounds"))?;
+
+                        let read = 
self.array_reader.read_records(mask_chunk.chunk_rows)?;
+                        if read == 0 {
+                            return Err(general_err!(
+                                "reached end of column while expecting {} 
rows",
+                                mask_chunk.chunk_rows
+                            ));
+                        }
+                        if read != mask_chunk.chunk_rows {
+                            return Err(general_err!(
+                                "insufficient rows read from array reader - 
expected {}, got {}",
+                                mask_chunk.chunk_rows,
+                                read
+                            ));
+                        }
+
+                        let array = self.array_reader.consume_batch()?;
+                        // The column reader exposes the projection as a 
struct array; convert this
+                        // into a record batch before applying the boolean 
filter mask.
+                        let struct_array = array.as_struct_opt().ok_or_else(|| 
{
+                            ArrowError::ParquetError(
+                                "Struct array reader should return struct 
array".to_string(),
+                            )
+                        })?;
+
+                        let filtered_batch =
+                            
filter_record_batch(&RecordBatch::from(struct_array), &mask)?;
+
+                        if filtered_batch.num_rows() != 
mask_chunk.selected_rows {
+                            return Err(general_err!(
+                                "filtered rows mismatch selection - expected 
{}, got {}",
+                                mask_chunk.selected_rows,
+                                filtered_batch.num_rows()
+                            ));
+                        }
+
+                        if filtered_batch.num_rows() == 0 {
+                            if selection.is_empty() {

Review Comment:
   would it be better to check for empty selection before trying to apply 
filters ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to