This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 6f4d88769f [Parquet] test adaptive predicate pushdown with skipped
page (#9243)
6f4d88769f is described below
commit 6f4d88769fc20178e644e5dd2360b50ff50aed1a
Author: Adam Curtis <[email protected]>
AuthorDate: Thu Jan 22 13:38:09 2026 -0500
[Parquet] test adaptive predicate pushdown with skipped page (#9243)
- Regression test for https://github.com/apache/arrow-rs/issues/9239
Remove the `should_panic` when the issue is fixed.
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
parquet/src/arrow/async_reader/mod.rs | 148 +++++++++++++++++++++++++++++++++-
1 file changed, 147 insertions(+), 1 deletion(-)
diff --git a/parquet/src/arrow/async_reader/mod.rs
b/parquet/src/arrow/async_reader/mod.rs
index 38eef7343e..539a3b2b51 100644
--- a/parquet/src/arrow/async_reader/mod.rs
+++ b/parquet/src/arrow/async_reader/mod.rs
@@ -786,7 +786,7 @@ mod tests {
use arrow::error::Result as ArrowResult;
use arrow_array::builder::{Float32Builder, ListBuilder, StringBuilder};
use arrow_array::cast::AsArray;
- use arrow_array::types::Int32Type;
+ use arrow_array::types::{Int32Type, TimestampNanosecondType};
use arrow_array::{
Array, ArrayRef, BooleanArray, Int8Array, Int32Array, Int64Array,
RecordBatchReader,
Scalar, StringArray, StructArray, UInt64Array,
@@ -2316,4 +2316,150 @@ mod tests {
Ok(())
}
+
+ /// Regression test for adaptive predicate pushdown attempting to read
skipped pages.
+ /// Related issue: https://github.com/apache/arrow-rs/issues/9239
+ #[tokio::test]
+ /// TODO: Remove should_panic once the bug is fixed
+ #[should_panic(expected = "Invalid offset in sparse column chunk data")]
+ async fn test_predicate_pushdown_with_skipped_pages() {
+ use arrow_array::TimestampNanosecondArray;
+ use arrow_schema::TimeUnit;
+
+ // Time range constants
+ const TIME_IN_RANGE_START: i64 = 1_704_092_400_000_000_000;
+ const TIME_IN_RANGE_END: i64 = 1_704_110_400_000_000_000;
+ const TIME_BEFORE_RANGE: i64 = 1_704_078_000_000_000_000;
+
+ // Create test data: 2 row groups, 300 rows each
+ // "tag" column: 'a', 'b', 'c' (100 rows each, sorted)
+ // "time" column: alternating in-range/out-of-range timestamps
+ let schema = Arc::new(Schema::new(vec![
+ Field::new(
+ "time",
+ DataType::Timestamp(TimeUnit::Nanosecond, None),
+ false,
+ ),
+ Field::new("tag", DataType::Utf8, false),
+ ]));
+
+ let props = WriterProperties::builder()
+ .set_max_row_group_size(300)
+ .set_data_page_row_count_limit(33)
+ .build();
+
+ let mut buffer = Vec::new();
+ let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(),
Some(props)).unwrap();
+
+ // Write 2 row groups
+ for _ in 0..2 {
+ for (tag_idx, tag) in ["a", "b", "c"].iter().enumerate() {
+ let times: Vec<i64> = (0..100)
+ .map(|j| {
+ let row_idx = tag_idx * 100 + j;
+ if row_idx % 2 == 0 {
+ TIME_IN_RANGE_START + (j as i64 * 1_000_000)
+ } else {
+ TIME_BEFORE_RANGE + (j as i64 * 1_000_000)
+ }
+ })
+ .collect();
+ let tags: Vec<&str> = (0..100).map(|_| *tag).collect();
+
+ let batch = RecordBatch::try_new(
+ schema.clone(),
+ vec![
+ Arc::new(TimestampNanosecondArray::from(times)) as
ArrayRef,
+ Arc::new(StringArray::from(tags)) as ArrayRef,
+ ],
+ )
+ .unwrap();
+ writer.write(&batch).unwrap();
+ }
+ writer.flush().unwrap();
+ }
+ writer.close().unwrap();
+ let buffer = Bytes::from(buffer);
+ // Read back with various page index policies, should get the same
answer with all
+ for policy in [
+ PageIndexPolicy::Skip,
+ PageIndexPolicy::Optional,
+ PageIndexPolicy::Required,
+ ] {
+ println!("Testing with page index policy: {:?}", policy);
+ let reader = TestReader::new(buffer.clone());
+ let options =
ArrowReaderOptions::default().with_page_index_policy(policy);
+ let builder =
ParquetRecordBatchStreamBuilder::new_with_options(reader, options)
+ .await
+ .unwrap();
+
+ let schema_descr =
builder.metadata().file_metadata().schema_descr_ptr();
+ let num_row_groups = builder.metadata().num_row_groups();
+
+ // Initial selection: skip middle 100 rows (tag='b') per row group
+ let mut selectors = Vec::new();
+ for _ in 0..num_row_groups {
+ selectors.push(RowSelector::select(100));
+ selectors.push(RowSelector::skip(100));
+ selectors.push(RowSelector::select(100));
+ }
+ let selection = RowSelection::from(selectors);
+
+ // Predicate 1: tag in ('a', 'c')
+ let tag_predicate =
+ ArrowPredicateFn::new(ProjectionMask::roots(&schema_descr,
[1]), |batch| {
+ let col = batch.column(0).as_string::<i32>();
+ Ok(BooleanArray::from_iter(
+ col.iter().map(|t| t.map(|v| v == "a" || v == "c")),
+ ))
+ });
+
+ // Predicate 2: time >= START
+ let time_gte_predicate =
+ ArrowPredicateFn::new(ProjectionMask::roots(&schema_descr,
[0]), |batch| {
+ let col =
batch.column(0).as_primitive::<TimestampNanosecondType>();
+ Ok(BooleanArray::from_iter(
+ col.iter().map(|t| t.map(|v| v >=
TIME_IN_RANGE_START)),
+ ))
+ });
+
+ // Predicate 3: time < END
+ let time_lt_predicate =
+ ArrowPredicateFn::new(ProjectionMask::roots(&schema_descr,
[0]), |batch| {
+ let col =
batch.column(0).as_primitive::<TimestampNanosecondType>();
+ Ok(BooleanArray::from_iter(
+ col.iter().map(|t| t.map(|v| v < TIME_IN_RANGE_END)),
+ ))
+ });
+
+ let row_filter = RowFilter::new(vec![
+ Box::new(tag_predicate),
+ Box::new(time_gte_predicate),
+ Box::new(time_lt_predicate),
+ ]);
+
+ // Output projection: Only tag column (time not in output)
+ let projection = ProjectionMask::roots(&schema_descr, [1]);
+
+ let stream = builder
+ .with_row_filter(row_filter)
+ .with_row_selection(selection)
+ .with_projection(projection)
+ .build()
+ .unwrap();
+
+ // Stream should complete without error and the same results
+ let batches: Vec<RecordBatch> =
stream.try_collect().await.unwrap();
+
+ let batch = concat_batches(&batches[0].schema(),
&batches).unwrap();
+ assert_eq!(batch.num_columns(), 1);
+ let expected = StringArray::from_iter_values(
+ std::iter::repeat_n("a", 50)
+ .chain(std::iter::repeat_n("c", 50))
+ .chain(std::iter::repeat_n("a", 50))
+ .chain(std::iter::repeat_n("c", 50)),
+ );
+ assert_eq!(batch.column(0).as_string(), &expected);
+ }
+ }
}