alamb commented on code in PR #9118:
URL: https://github.com/apache/arrow-rs/pull/9118#discussion_r2718285149
##########
parquet/src/arrow/push_decoder/reader_builder/mod.rs:
##########
@@ -588,6 +582,23 @@ impl RowGroupReaderBuilder {
&mut self.buffers,
)?;
+ // before plan is build below
+ // check if plan is bitmask and if it is, put it in a variable
+ let page_offsets = if plan_builder.resolve_selection_strategy()
+ == RowSelectionStrategy::Mask
+ && plan_builder.selection().is_some_and(|selection| {
+ selection.requires_page_aware_mask(
+ &self.projection,
+ self.row_group_offset_index(row_group_idx),
+ )
+ }) {
+ self.row_group_offset_index(row_group_idx)
+ .and_then(|columns| columns.first())
Review Comment:
I think this is a bug -- it reads the page offsets from the first column
rater than the column being read
Maybe something like
```rust
self.row_group_offset_index(row_group_idx).and_then(|columns| {
columns
.iter()
.enumerate()
.find(|(leaf_idx, _)|
self.projection.leaf_included(*leaf_idx))
.map(|(_, column)| column.page_locations())
```
##########
parquet/src/arrow/arrow_reader/mod.rs:
##########
@@ -5661,12 +5676,235 @@ pub(crate) mod tests {
.build()
.unwrap();
- // Predicate pruning used to panic once mask-backed plans removed
whole pages.
- // Collecting into batches validates the plan now downgrades to
selectors instead.
let schema = reader.schema().clone();
let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
let result = concat_batches(&schema, &batches).unwrap();
assert_eq!(result.num_rows(), 2);
+ assert_eq!(
+ result.column(0).as_ref(),
+ &Int64Array::from(vec![first_value, last_value])
+ );
+ assert_eq!(
+ result.column(1).as_ref(),
+ &Int64Array::from(vec![first_value, last_value])
+ );
+ }
+
+ /// Test that bitmask-based row selection correctly handles page
boundaries.
+ /// This test creates a parquet file with multiple small pages and
verifies that
+ /// when using Mask policy, pages that are skipped entirely are handled
correctly.
+ #[test]
+ fn test_bitmask_page_aware_selection() {
+ let first_value: i64 = 1111;
+ let last_value: i64 = 9999;
+ let num_rows: usize = 20;
+
+ // Create a file with 20 rows, ~2 rows per page = 10 pages
+ // Selection will be: first row, skip middle rows, last row
+ // This forces the reader to handle skipped pages correctly
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("key", arrow_schema::DataType::Int64, false),
+ Field::new("value", arrow_schema::DataType::Int64, false),
+ ]));
+
+ let mut int_values: Vec<i64> = (0..num_rows as i64).collect();
+ int_values[0] = first_value;
+ int_values[num_rows - 1] = last_value;
+ let keys = Int64Array::from(int_values.clone());
+ let values = Int64Array::from(int_values.clone());
+ let batch = RecordBatch::try_new(
+ Arc::clone(&schema),
+ vec![Arc::new(keys) as ArrayRef, Arc::new(values) as ArrayRef],
+ )
+ .unwrap();
+
+ // Configure small pages to create multiple page boundaries
+ let props = WriterProperties::builder()
+ .set_write_batch_size(2)
+ .set_data_page_row_count_limit(2)
+ .build();
+
+ let mut buffer = Vec::new();
+ let mut writer = ArrowWriter::try_new(&mut buffer, schema,
Some(props)).unwrap();
+ writer.write(&batch).unwrap();
+ writer.close().unwrap();
+ let data = Bytes::from(buffer);
+
+ let options =
ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::from(true));
Review Comment:
maybe PageIndexPolicy::Required would be clearer 🤔
##########
parquet/src/arrow/push_decoder/reader_builder/mod.rs:
##########
@@ -603,7 +611,8 @@ impl RowGroupReaderBuilder {
.build_array_reader(self.fields.as_deref(),
&self.projection)
}?;
- let reader = ParquetRecordBatchReader::new(array_reader, plan);
+ let reader =
+ ParquetRecordBatchReader::new(array_reader, plan,
page_offsets.cloned());
Review Comment:
I agree with @hhhizzz that copying the offsets here is not good
I thought about it some more, and I think the reason the copy is currently
needed is that the decision of should the page be skipped is postponed until
the next MaskChunk is needed
One potential idea I had to avoid this, is to use the page index in the
ReadPlanBuilder when building, rather than pass in the page index to every call
for next_batch.
So maybe that would look something like extending MaskCursor from
```rust
/// Cursor for iterating a mask-backed [`RowSelection`]
///
/// This is best for dense selections where there are many small skips
/// or selections. For example, selecting every other row.
#[derive(Debug)]
pub struct MaskCursor {
mask: BooleanBuffer,
/// Current absolute offset into the selection
position: usize,
}
```
To also track what ranges should be skipped entirely. Maybe something like
```rust
#[derive(Debug)]
pub struct MaskCursor {
mask: BooleanBuffer,
/// Current absolute offset into the selection
position: usize,
/// Which row ranges should be skipped entirely?
skip_ranges: Vec<Range<usize>>,
}
```
That I think would simplify the logic for `next_mask_chunk` significantly
and it would avoid the need to copy the entire page inde
##########
parquet/src/arrow/arrow_reader/mod.rs:
##########
@@ -1375,8 +1377,11 @@ impl ParquetRecordBatchReader {
RowSelectionCursor::Mask(mask_cursor) => {
// Stream the record batch reader using contiguous segments of
the selection
// mask, avoiding the need to materialize intermediate
`RowSelector` ranges.
+ let page_locations = self.page_offsets.as_deref();
+
while !mask_cursor.is_empty() {
- let Some(mask_chunk) =
mask_cursor.next_mask_chunk(batch_size) else {
+ let Some(mask_chunk) =
mask_cursor.next_mask_chunk(batch_size, page_locations)
Review Comment:
I expect that this API needs to be extended -- it needs to be able to
represent "skip the next N rows without trying to decode them"
As written here I think the first page that doesn't have any rows selected
will return None (which will trigger the reader to think it is at the end of
the file, even if there is data left)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]