tustvold commented on code in PR #8733:
URL: https://github.com/apache/arrow-rs/pull/8733#discussion_r2483701565


##########
parquet/src/arrow/arrow_reader/mod.rs:
##########
@@ -1034,9 +1049,82 @@ impl ParquetRecordBatchReader {
         let mut read_records = 0;
         let batch_size = self.batch_size();
         match self.read_plan.selection_mut() {
-            Some(selection) => {
-                while read_records < batch_size && !selection.is_empty() {
-                    let front = selection.pop_front().unwrap();
+            Some(selection_cursor) => {
+                if selection_cursor.is_mask_backed() {
+                    // Stream the record batch reader using contiguous 
segments of the selection
+                    // mask, avoiding the need to materialize intermediate 
`RowSelector` ranges.
+                    while !selection_cursor.is_empty() {
+                        let Some(mask_chunk) = 
selection_cursor.next_mask_chunk(batch_size) else {
+                            return Ok(None);
+                        };
+
+                        if mask_chunk.initial_skip > 0 {
+                            let skipped =
+                                
self.array_reader.skip_records(mask_chunk.initial_skip)?;
+                            if skipped != mask_chunk.initial_skip {
+                                return Err(general_err!(
+                                    "failed to skip rows, expected {}, got {}",
+                                    mask_chunk.initial_skip,
+                                    skipped
+                                ));
+                            }
+                        }
+
+                        if mask_chunk.chunk_rows == 0 {
+                            if selection_cursor.is_empty() && 
mask_chunk.selected_rows == 0 {
+                                return Ok(None);
+                            }
+                            continue;
+                        }
+
+                        let mask = selection_cursor
+                            .mask_values_for(&mask_chunk)
+                            .ok_or_else(|| general_err!("row selection mask 
out of bounds"))?;
+
+                        let read = 
self.array_reader.read_records(mask_chunk.chunk_rows)?;

Review Comment:
   This will likely lead to OOMs for any non-trivial parquet file - it will 
fetch the entire selector (i.e. the entire row group) into memory and then 
filter it



##########
parquet/src/arrow/arrow_reader/mod.rs:
##########
@@ -1034,9 +1049,82 @@ impl ParquetRecordBatchReader {
         let mut read_records = 0;
         let batch_size = self.batch_size();
         match self.read_plan.selection_mut() {
-            Some(selection) => {
-                while read_records < batch_size && !selection.is_empty() {
-                    let front = selection.pop_front().unwrap();
+            Some(selection_cursor) => {
+                if selection_cursor.is_mask_backed() {
+                    // Stream the record batch reader using contiguous 
segments of the selection
+                    // mask, avoiding the need to materialize intermediate 
`RowSelector` ranges.
+                    while !selection_cursor.is_empty() {
+                        let Some(mask_chunk) = 
selection_cursor.next_mask_chunk(batch_size) else {
+                            return Ok(None);
+                        };
+
+                        if mask_chunk.initial_skip > 0 {
+                            let skipped =
+                                
self.array_reader.skip_records(mask_chunk.initial_skip)?;
+                            if skipped != mask_chunk.initial_skip {
+                                return Err(general_err!(
+                                    "failed to skip rows, expected {}, got {}",
+                                    mask_chunk.initial_skip,
+                                    skipped
+                                ));
+                            }
+                        }
+
+                        if mask_chunk.chunk_rows == 0 {
+                            if selection_cursor.is_empty() && 
mask_chunk.selected_rows == 0 {
+                                return Ok(None);
+                            }
+                            continue;
+                        }
+
+                        let mask = selection_cursor
+                            .mask_values_for(&mask_chunk)
+                            .ok_or_else(|| general_err!("row selection mask 
out of bounds"))?;
+
+                        let read = 
self.array_reader.read_records(mask_chunk.chunk_rows)?;
+                        if read == 0 {
+                            return Err(general_err!(
+                                "reached end of column while expecting {} 
rows",
+                                mask_chunk.chunk_rows
+                            ));
+                        }
+                        if read != mask_chunk.chunk_rows {
+                            return Err(general_err!(
+                                "insufficient rows read from array reader - 
expected {}, got {}",
+                                mask_chunk.chunk_rows,
+                                read
+                            ));
+                        }
+
+                        let array = self.array_reader.consume_batch()?;

Review Comment:
   I think this will cause the reader to produce batches potentially much 
smaller than the requested batch size, which at the very least should be 
documented. Ideally we'd just store the mask somewhere, and flush + filter when 
we have enough rows. 
   
   Otherwise consumers like DataFusion are going to need to wrap ParquetExec in 
a CoalesceBatches and effectively buffer all the data again.
   
   ~Edit edit: Filtering small batches may also have performance implications - 
the filter kernels are optimized for filtering thousands of rows at a time.~
   
   Edit edit edit: actually realised the selector is for the entire row group!!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to