tustvold commented on code in PR #8733:
URL: https://github.com/apache/arrow-rs/pull/8733#discussion_r2483681768


##########
parquet/src/arrow/arrow_reader/mod.rs:
##########
@@ -1034,9 +1049,82 @@ impl ParquetRecordBatchReader {
         let mut read_records = 0;
         let batch_size = self.batch_size();
         match self.read_plan.selection_mut() {
-            Some(selection) => {
-                while read_records < batch_size && !selection.is_empty() {
-                    let front = selection.pop_front().unwrap();
+            Some(selection_cursor) => {
+                if selection_cursor.is_mask_backed() {
+                    // Stream the record batch reader using contiguous 
segments of the selection
+                    // mask, avoiding the need to materialize intermediate 
`RowSelector` ranges.
+                    while !selection_cursor.is_empty() {
+                        let Some(mask_chunk) = 
selection_cursor.next_mask_chunk(batch_size) else {
+                            return Ok(None);
+                        };
+
+                        if mask_chunk.initial_skip > 0 {
+                            let skipped =
+                                
self.array_reader.skip_records(mask_chunk.initial_skip)?;
+                            if skipped != mask_chunk.initial_skip {
+                                return Err(general_err!(
+                                    "failed to skip rows, expected {}, got {}",
+                                    mask_chunk.initial_skip,
+                                    skipped
+                                ));
+                            }
+                        }
+
+                        if mask_chunk.chunk_rows == 0 {
+                            if selection_cursor.is_empty() && 
mask_chunk.selected_rows == 0 {
+                                return Ok(None);
+                            }
+                            continue;
+                        }
+
+                        let mask = selection_cursor
+                            .mask_values_for(&mask_chunk)
+                            .ok_or_else(|| general_err!("row selection mask 
out of bounds"))?;
+
+                        let read = 
self.array_reader.read_records(mask_chunk.chunk_rows)?;
+                        if read == 0 {
+                            return Err(general_err!(
+                                "reached end of column while expecting {} 
rows",
+                                mask_chunk.chunk_rows
+                            ));
+                        }
+                        if read != mask_chunk.chunk_rows {
+                            return Err(general_err!(
+                                "insufficient rows read from array reader - 
expected {}, got {}",
+                                mask_chunk.chunk_rows,
+                                read
+                            ));
+                        }
+
+                        let array = self.array_reader.consume_batch()?;

Review Comment:
   I think this will cause the reader to produce batches potentially much 
smaller than the requested batch size, which at the very least should be 
documented. Ideally we'd just store the mask somewhere, and flush + filter when 
we have enough rows. 
   
   Otherwise consumers like DataFusion are going to need to wrap ParquetExec in 
a CoalesceBatches and effectively buffer all the data again.
   
   Edit edit: Filtering small batches may also have performance implications - 
the filter kernels are optimized for filtering thousands of rows at a time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to