This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 6f4d88769f [Parquet] test adaptive predicate pushdown with skipped 
page (#9243)
6f4d88769f is described below

commit 6f4d88769fc20178e644e5dd2360b50ff50aed1a
Author: Adam Curtis <[email protected]>
AuthorDate: Thu Jan 22 13:38:09 2026 -0500

    [Parquet] test adaptive predicate pushdown with skipped page (#9243)
    
    - Regression test for https://github.com/apache/arrow-rs/issues/9239
    
    Remove the `should_panic` when the issue is fixed.
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 parquet/src/arrow/async_reader/mod.rs | 148 +++++++++++++++++++++++++++++++++-
 1 file changed, 147 insertions(+), 1 deletion(-)

diff --git a/parquet/src/arrow/async_reader/mod.rs 
b/parquet/src/arrow/async_reader/mod.rs
index 38eef7343e..539a3b2b51 100644
--- a/parquet/src/arrow/async_reader/mod.rs
+++ b/parquet/src/arrow/async_reader/mod.rs
@@ -786,7 +786,7 @@ mod tests {
     use arrow::error::Result as ArrowResult;
     use arrow_array::builder::{Float32Builder, ListBuilder, StringBuilder};
     use arrow_array::cast::AsArray;
-    use arrow_array::types::Int32Type;
+    use arrow_array::types::{Int32Type, TimestampNanosecondType};
     use arrow_array::{
         Array, ArrayRef, BooleanArray, Int8Array, Int32Array, Int64Array, 
RecordBatchReader,
         Scalar, StringArray, StructArray, UInt64Array,
@@ -2316,4 +2316,150 @@ mod tests {
 
         Ok(())
     }
+
+    /// Regression test for adaptive predicate pushdown attempting to read 
skipped pages.
+    /// Related issue: https://github.com/apache/arrow-rs/issues/9239
+    #[tokio::test]
+    /// TODO: Remove should_panic once the bug is fixed
+    #[should_panic(expected = "Invalid offset in sparse column chunk data")]
+    async fn test_predicate_pushdown_with_skipped_pages() {
+        use arrow_array::TimestampNanosecondArray;
+        use arrow_schema::TimeUnit;
+
+        // Time range constants
+        const TIME_IN_RANGE_START: i64 = 1_704_092_400_000_000_000;
+        const TIME_IN_RANGE_END: i64 = 1_704_110_400_000_000_000;
+        const TIME_BEFORE_RANGE: i64 = 1_704_078_000_000_000_000;
+
+        // Create test data: 2 row groups, 300 rows each
+        // "tag" column: 'a', 'b', 'c' (100 rows each, sorted)
+        // "time" column: alternating in-range/out-of-range timestamps
+        let schema = Arc::new(Schema::new(vec![
+            Field::new(
+                "time",
+                DataType::Timestamp(TimeUnit::Nanosecond, None),
+                false,
+            ),
+            Field::new("tag", DataType::Utf8, false),
+        ]));
+
+        let props = WriterProperties::builder()
+            .set_max_row_group_size(300)
+            .set_data_page_row_count_limit(33)
+            .build();
+
+        let mut buffer = Vec::new();
+        let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), 
Some(props)).unwrap();
+
+        // Write 2 row groups
+        for _ in 0..2 {
+            for (tag_idx, tag) in ["a", "b", "c"].iter().enumerate() {
+                let times: Vec<i64> = (0..100)
+                    .map(|j| {
+                        let row_idx = tag_idx * 100 + j;
+                        if row_idx % 2 == 0 {
+                            TIME_IN_RANGE_START + (j as i64 * 1_000_000)
+                        } else {
+                            TIME_BEFORE_RANGE + (j as i64 * 1_000_000)
+                        }
+                    })
+                    .collect();
+                let tags: Vec<&str> = (0..100).map(|_| *tag).collect();
+
+                let batch = RecordBatch::try_new(
+                    schema.clone(),
+                    vec![
+                        Arc::new(TimestampNanosecondArray::from(times)) as 
ArrayRef,
+                        Arc::new(StringArray::from(tags)) as ArrayRef,
+                    ],
+                )
+                .unwrap();
+                writer.write(&batch).unwrap();
+            }
+            writer.flush().unwrap();
+        }
+        writer.close().unwrap();
+        let buffer = Bytes::from(buffer);
+        // Read back with various page index policies, should get the same 
answer with all
+        for policy in [
+            PageIndexPolicy::Skip,
+            PageIndexPolicy::Optional,
+            PageIndexPolicy::Required,
+        ] {
+            println!("Testing with page index policy: {:?}", policy);
+            let reader = TestReader::new(buffer.clone());
+            let options = 
ArrowReaderOptions::default().with_page_index_policy(policy);
+            let builder = 
ParquetRecordBatchStreamBuilder::new_with_options(reader, options)
+                .await
+                .unwrap();
+
+            let schema_descr = 
builder.metadata().file_metadata().schema_descr_ptr();
+            let num_row_groups = builder.metadata().num_row_groups();
+
+            // Initial selection: skip middle 100 rows (tag='b') per row group
+            let mut selectors = Vec::new();
+            for _ in 0..num_row_groups {
+                selectors.push(RowSelector::select(100));
+                selectors.push(RowSelector::skip(100));
+                selectors.push(RowSelector::select(100));
+            }
+            let selection = RowSelection::from(selectors);
+
+            // Predicate 1: tag in ('a', 'c')
+            let tag_predicate =
+                ArrowPredicateFn::new(ProjectionMask::roots(&schema_descr, 
[1]), |batch| {
+                    let col = batch.column(0).as_string::<i32>();
+                    Ok(BooleanArray::from_iter(
+                        col.iter().map(|t| t.map(|v| v == "a" || v == "c")),
+                    ))
+                });
+
+            // Predicate 2: time >= START
+            let time_gte_predicate =
+                ArrowPredicateFn::new(ProjectionMask::roots(&schema_descr, 
[0]), |batch| {
+                    let col = 
batch.column(0).as_primitive::<TimestampNanosecondType>();
+                    Ok(BooleanArray::from_iter(
+                        col.iter().map(|t| t.map(|v| v >= 
TIME_IN_RANGE_START)),
+                    ))
+                });
+
+            // Predicate 3: time < END
+            let time_lt_predicate =
+                ArrowPredicateFn::new(ProjectionMask::roots(&schema_descr, 
[0]), |batch| {
+                    let col = 
batch.column(0).as_primitive::<TimestampNanosecondType>();
+                    Ok(BooleanArray::from_iter(
+                        col.iter().map(|t| t.map(|v| v < TIME_IN_RANGE_END)),
+                    ))
+                });
+
+            let row_filter = RowFilter::new(vec![
+                Box::new(tag_predicate),
+                Box::new(time_gte_predicate),
+                Box::new(time_lt_predicate),
+            ]);
+
+            // Output projection: Only tag column (time not in output)
+            let projection = ProjectionMask::roots(&schema_descr, [1]);
+
+            let stream = builder
+                .with_row_filter(row_filter)
+                .with_row_selection(selection)
+                .with_projection(projection)
+                .build()
+                .unwrap();
+
+            // Stream should complete without error and the same results
+            let batches: Vec<RecordBatch> = 
stream.try_collect().await.unwrap();
+
+            let batch = concat_batches(&batches[0].schema(), 
&batches).unwrap();
+            assert_eq!(batch.num_columns(), 1);
+            let expected = StringArray::from_iter_values(
+                std::iter::repeat_n("a", 50)
+                    .chain(std::iter::repeat_n("c", 50))
+                    .chain(std::iter::repeat_n("a", 50))
+                    .chain(std::iter::repeat_n("c", 50)),
+            );
+            assert_eq!(batch.column(0).as_string(), &expected);
+        }
+    }
 }

Reply via email to