hhhizzz commented on code in PR #8733:
URL: https://github.com/apache/arrow-rs/pull/8733#discussion_r2480526458
##########
parquet/src/arrow/arrow_reader/mod.rs:
##########
@@ -1035,8 +1050,85 @@ impl ParquetRecordBatchReader {
let batch_size = self.batch_size();
match self.read_plan.selection_mut() {
Some(selection) => {
+ if selection.is_mask_backed() {
+ // Stream the record batch reader using contiguous
segments of the selection
+ // mask, avoiding the need to materialize intermediate
`RowSelector` ranges.
+ loop {
+ let mask_chunk = match
selection.next_mask_chunk(batch_size) {
+ Some(batch) => batch,
+ None => return Ok(None),
+ };
+
+ if mask_chunk.initial_skip > 0 {
+ let skipped =
+
self.array_reader.skip_records(mask_chunk.initial_skip)?;
+ if skipped != mask_chunk.initial_skip {
+ return Err(general_err!(
+ "failed to skip rows, expected {}, got {}",
+ mask_chunk.initial_skip,
+ skipped
+ ));
+ }
+ }
+
+ if mask_chunk.chunk_rows == 0 {
+ if selection.is_empty() &&
mask_chunk.selected_rows == 0 {
+ return Ok(None);
+ }
+ continue;
+ }
+
+ let mask = selection
+ .mask_values_for(&mask_chunk)
+ .ok_or_else(|| general_err!("row selection mask
out of bounds"))?;
+
+ let read =
self.array_reader.read_records(mask_chunk.chunk_rows)?;
+ if read == 0 {
+ return Err(general_err!(
+ "reached end of column while expecting {}
rows",
+ mask_chunk.chunk_rows
+ ));
+ }
+ if read != mask_chunk.chunk_rows {
+ return Err(general_err!(
+ "insufficient rows read from array reader -
expected {}, got {}",
+ mask_chunk.chunk_rows,
+ read
+ ));
+ }
+
+ let array = self.array_reader.consume_batch()?;
+ // The column reader exposes the projection as a
struct array; convert this
+ // into a record batch before applying the boolean
filter mask.
+ let struct_array = array.as_struct_opt().ok_or_else(||
{
+ ArrowError::ParquetError(
+ "Struct array reader should return struct
array".to_string(),
+ )
+ })?;
+
+ let filtered_batch =
+
filter_record_batch(&RecordBatch::from(struct_array), &mask)?;
+
+ if filtered_batch.num_rows() !=
mask_chunk.selected_rows {
+ return Err(general_err!(
+ "filtered rows mismatch selection - expected
{}, got {}",
+ mask_chunk.selected_rows,
+ filtered_batch.num_rows()
+ ));
+ }
+
+ if filtered_batch.num_rows() == 0 {
+ if selection.is_empty() {
Review Comment:
Yes, updated to use while loop to check empty.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]