adriangb commented on code in PR #9968:
URL: https://github.com/apache/arrow-rs/pull/9968#discussion_r3251198721
##########
parquet/src/arrow/push_decoder/mod.rs:
##########
@@ -1476,6 +1770,310 @@ mod test {
expect_finished(decoder.try_decode());
}
+ /// `into_builder` between row groups recovers a builder for the
+ /// not-yet-decoded row groups; rebuilding it with a new row filter
+ /// applies that filter to the subsequent row groups while leaving the
+ /// already-decoded row group's results untouched.
+ ///
+ /// Adaptive callers should drive the decoder with `try_next_reader`
+ /// rather than `try_decode`: `try_next_reader` returns once per row
+ /// group, giving the caller a clean window between two consecutive
+ /// returns to inspect stats and reconfigure the scan. `try_decode`
+ /// barrels through row-group boundaries and is unsuitable for in-flight
+ /// strategy changes.
+ #[test]
+ fn test_into_builder_installs_filter_between_row_groups() {
+ let metadata = test_file_parquet_metadata();
+ let schema_descr = metadata.file_metadata().schema_descr_ptr();
+
+ let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(metadata)
+ .unwrap()
+ .with_batch_size(1024)
+ .build()
+ .unwrap();
+ decoder
+ .push_range(test_file_range(), TEST_FILE_DATA.clone())
+ .unwrap();
+
+ // Reader for row group 0 — no filter.
+ let reader0 = expect_data(decoder.try_next_reader());
+ let batches0: Vec<_> = reader0.collect::<Result<_, _>>().unwrap();
+ let batch0 = concat_batches(&TEST_BATCH.schema(), &batches0).unwrap();
+ assert_eq!(batch0, TEST_BATCH.slice(0, 200));
+
+ // We're between row groups now. Rebuild with a filter on column "a".
+ assert!(decoder.is_at_row_group_boundary());
+ assert_eq!(decoder.row_groups_remaining(), 1);
+ let filter =
+ ArrowPredicateFn::new(ProjectionMask::columns(&schema_descr,
["a"]), |batch| {
+ gt(batch.column(0), &Int64Array::new_scalar(250))
+ });
+ let mut decoder = decoder
+ .into_builder()
+ .unwrap()
+ .with_row_filter(RowFilter::new(vec![Box::new(filter)]))
+ .build()
+ .unwrap();
+
+ // Reader for row group 1 — filter applied. The rebuilt decoder kept
+ // the buffered bytes (see
`test_into_builder_preserves_buffered_bytes`)
+ // so no data needs to be re-supplied. Column "a" in RG1 has values
+ // 200..399; `a > 250` keeps 251..399 = 149 rows.
+ let reader1 = expect_data(decoder.try_next_reader());
+ let batches1: Vec<_> = reader1.collect::<Result<_, _>>().unwrap();
+ let batch1 = concat_batches(&TEST_BATCH.schema(), &batches1).unwrap();
+ assert_eq!(batch1, TEST_BATCH.slice(251, 149));
+ expect_finished(decoder.try_next_reader());
+ }
+
+ /// `into_builder` is rejected while a row group's reader is being
+ /// drained (`DecodingRowGroup`); the error points at
+ /// `is_at_row_group_boundary`.
+ #[test]
+ fn test_into_builder_rejected_mid_row_group() {
+ let mut decoder =
ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
+ .unwrap()
+ .with_batch_size(50)
+ .build()
+ .unwrap();
+ decoder
+ .push_range(test_file_range(), TEST_FILE_DATA.clone())
+ .unwrap();
+
+ // After getting the first batch, we're inside `DecodingRowGroup`:
+ // an active reader is still alive. Mid-reader is not a boundary.
+ let _ = expect_data(decoder.try_decode());
Review Comment:
dropped the `let _`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]