This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 72619096fc Minor: Add a parquet row_filter test, reduce some test 
boiler plate (#7522)
72619096fc is described below

commit 72619096fc5da4ba4f1778d0ed21f6007db22c39
Author: Andrew Lamb <[email protected]>
AuthorDate: Tue May 20 12:46:14 2025 -0400

    Minor: Add a parquet row_filter test, reduce some test boiler plate (#7522)
    
    * Add single row filter test
    
    * DRI
    
    * Update parquet/src/arrow/async_reader/mod.rs
    
    * Improve comments for other tests
---
 parquet/src/arrow/async_reader/mod.rs | 170 +++++++++++++++++++---------------
 1 file changed, 95 insertions(+), 75 deletions(-)

diff --git a/parquet/src/arrow/async_reader/mod.rs 
b/parquet/src/arrow/async_reader/mod.rs
index 0c38d36a5b..ac4d24ee27 100644
--- a/parquet/src/arrow/async_reader/mod.rs
+++ b/parquet/src/arrow/async_reader/mod.rs
@@ -1135,6 +1135,16 @@ mod tests {
         requests: Arc<Mutex<Vec<Range<usize>>>>,
     }
 
+    impl TestReader {
+        fn new(data: Bytes) -> Self {
+            Self {
+                data,
+                metadata: Default::default(),
+                requests: Default::default(),
+            }
+        }
+    }
+
     impl AsyncFileReader for TestReader {
         fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, 
Result<Bytes>> {
             let range = range.clone();
@@ -1167,11 +1177,7 @@ mod tests {
         let path = format!("{testdata}/alltypes_plain.parquet");
         let data = Bytes::from(std::fs::read(path).unwrap());
 
-        let async_reader = TestReader {
-            data: data.clone(),
-            metadata: Default::default(),
-            requests: Default::default(),
-        };
+        let async_reader = TestReader::new(data.clone());
 
         let requests = async_reader.requests.clone();
         let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
@@ -1220,11 +1226,7 @@ mod tests {
         let path = format!("{testdata}/alltypes_plain.parquet");
         let data = Bytes::from(std::fs::read(path).unwrap());
 
-        let async_reader = TestReader {
-            data: data.clone(),
-            metadata: Default::default(),
-            requests: Default::default(),
-        };
+        let async_reader = TestReader::new(data.clone());
 
         let requests = async_reader.requests.clone();
         let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
@@ -1281,11 +1283,7 @@ mod tests {
         let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
         let data = Bytes::from(std::fs::read(path).unwrap());
 
-        let async_reader = TestReader {
-            data: data.clone(),
-            metadata: Default::default(),
-            requests: Default::default(),
-        };
+        let async_reader = TestReader::new(data.clone());
 
         let options = ArrowReaderOptions::new().with_page_index(true);
         let builder = 
ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
@@ -1350,11 +1348,7 @@ mod tests {
 
         assert_eq!(metadata.num_row_groups(), 1);
 
-        let async_reader = TestReader {
-            data: data.clone(),
-            metadata: Default::default(),
-            requests: Default::default(),
-        };
+        let async_reader = TestReader::new(data.clone());
 
         let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
             .await
@@ -1391,11 +1385,7 @@ mod tests {
         let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
         let data = Bytes::from(std::fs::read(path).unwrap());
 
-        let async_reader = TestReader {
-            data: data.clone(),
-            metadata: Default::default(),
-            requests: Default::default(),
-        };
+        let async_reader = TestReader::new(data.clone());
 
         let options = ArrowReaderOptions::new().with_page_index(true);
         let builder = 
ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
@@ -1469,11 +1459,7 @@ mod tests {
 
             let selection = RowSelection::from(selectors);
 
-            let async_reader = TestReader {
-                data: data.clone(),
-                metadata: Default::default(),
-                requests: Default::default(),
-            };
+            let async_reader = TestReader::new(data.clone());
 
             let options = ArrowReaderOptions::new().with_page_index(true);
             let builder = 
ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
@@ -1535,11 +1521,7 @@ mod tests {
 
         let selection = RowSelection::from(selectors);
 
-        let async_reader = TestReader {
-            data: data.clone(),
-            metadata: Default::default(),
-            requests: Default::default(),
-        };
+        let async_reader = TestReader::new(data.clone());
 
         let options = ArrowReaderOptions::new().with_page_index(true);
         let builder = 
ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
@@ -1566,6 +1548,70 @@ mod tests {
 
     #[tokio::test]
     async fn test_row_filter() {
+        let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
+        let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
+        let data = RecordBatch::try_from_iter([
+            ("a", Arc::new(a) as ArrayRef),
+            ("b", Arc::new(b) as ArrayRef),
+        ])
+        .unwrap();
+
+        let mut buf = Vec::with_capacity(1024);
+        let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), 
None).unwrap();
+        writer.write(&data).unwrap();
+        writer.close().unwrap();
+
+        let data: Bytes = buf.into();
+        let metadata = ParquetMetaDataReader::new()
+            .parse_and_finish(&data)
+            .unwrap();
+        let parquet_schema = metadata.file_metadata().schema_descr_ptr();
+
+        let test = TestReader::new(data);
+        let requests = test.requests.clone();
+
+        let a_scalar = StringArray::from_iter_values(["b"]);
+        let a_filter = ArrowPredicateFn::new(
+            ProjectionMask::leaves(&parquet_schema, vec![0]),
+            move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
+        );
+
+        let filter = RowFilter::new(vec![Box::new(a_filter)]);
+
+        let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 1]);
+        let stream = ParquetRecordBatchStreamBuilder::new(test)
+            .await
+            .unwrap()
+            .with_projection(mask.clone())
+            .with_batch_size(1024)
+            .with_row_filter(filter)
+            .build()
+            .unwrap();
+
+        let batches: Vec<_> = stream.try_collect().await.unwrap();
+        assert_eq!(batches.len(), 1);
+
+        let batch = &batches[0];
+        assert_eq!(batch.num_columns(), 2);
+
+        // Filter should have kept only rows with "b" in column 0
+        assert_eq!(
+            batch.column(0).as_ref(),
+            &StringArray::from_iter_values(["b", "b", "b"])
+        );
+        assert_eq!(
+            batch.column(1).as_ref(),
+            &StringArray::from_iter_values(["2", "3", "4"])
+        );
+
+        // Should only have made 2 requests:
+        // * First request fetches data for evaluating the predicate
+        // * Second request fetches data for evaluating the projection
+        assert_eq!(requests.lock().unwrap().len(), 2);
+    }
+
+    #[tokio::test]
+    async fn test_two_row_filters() {
         let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
         let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
         let c = Int32Array::from_iter(0..6);
@@ -1587,11 +1633,7 @@ mod tests {
             .unwrap();
         let parquet_schema = metadata.file_metadata().schema_descr_ptr();
 
-        let test = TestReader {
-            data,
-            metadata: Default::default(),
-            requests: Default::default(),
-        };
+        let test = TestReader::new(data);
         let requests = test.requests.clone();
 
         let a_scalar = StringArray::from_iter_values(["b"]);
@@ -1634,6 +1676,9 @@ mod tests {
         assert_eq!(val, 3);
 
         // Should only have made 3 requests
+        // * First request fetches data for evaluating the first predicate
+        // * Second request fetches data for evaluating the second predicate
+        // * Third request fetches data for evaluating the projection
         assert_eq!(requests.lock().unwrap().len(), 3);
     }
 
@@ -1664,11 +1709,7 @@ mod tests {
 
         assert_eq!(metadata.num_row_groups(), 2);
 
-        let test = TestReader {
-            data,
-            metadata: Default::default(),
-            requests: Default::default(),
-        };
+        let test = TestReader::new(data);
 
         let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
             .await
@@ -1755,11 +1796,7 @@ mod tests {
 
         assert_eq!(metadata.num_row_groups(), 1);
 
-        let async_reader = TestReader {
-            data: data.clone(),
-            metadata: Default::default(),
-            requests: Default::default(),
-        };
+        let async_reader = TestReader::new(data.clone());
 
         let a_filter =
             ArrowPredicateFn::new(ProjectionMask::leaves(&parquet_schema, 
vec![1]), |batch| {
@@ -1823,11 +1860,7 @@ mod tests {
 
         assert_eq!(metadata.num_row_groups(), 1);
 
-        let async_reader = TestReader {
-            data: data.clone(),
-            metadata: Default::default(),
-            requests: Default::default(),
-        };
+        let async_reader = TestReader::new(data.clone());
 
         let requests = async_reader.requests.clone();
         let (_, fields) = parquet_to_arrow_schema_and_fields(
@@ -1893,11 +1926,7 @@ mod tests {
         let path = format!("{testdata}/alltypes_plain.parquet");
         let data = Bytes::from(std::fs::read(path).unwrap());
 
-        let async_reader = TestReader {
-            data: data.clone(),
-            metadata: Default::default(),
-            requests: Default::default(),
-        };
+        let async_reader = TestReader::new(data.clone());
 
         let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
             .await
@@ -2036,11 +2065,7 @@ mod tests {
         let testdata = arrow::util::test_util::parquet_test_data();
         let path = 
format!("{testdata}/data_index_bloom_encoding_stats.parquet");
         let data = Bytes::from(std::fs::read(path).unwrap());
-        let async_reader = TestReader {
-            data: data.clone(),
-            metadata: Default::default(),
-            requests: Default::default(),
-        };
+        let async_reader = TestReader::new(data.clone());
         let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
             .await
             .unwrap();
@@ -2063,11 +2088,7 @@ mod tests {
     }
 
     async fn test_get_row_group_column_bloom_filter(data: Bytes, with_length: 
bool) {
-        let async_reader = TestReader {
-            data: data.clone(),
-            metadata: Default::default(),
-            requests: Default::default(),
-        };
+        let async_reader = TestReader::new(data.clone());
 
         let mut builder = ParquetRecordBatchStreamBuilder::new(async_reader)
             .await
@@ -2206,11 +2227,7 @@ mod tests {
             .unwrap();
         let parquet_schema = metadata.file_metadata().schema_descr_ptr();
 
-        let test = TestReader {
-            data,
-            metadata: Default::default(),
-            requests: Default::default(),
-        };
+        let test = TestReader::new(data);
         let requests = test.requests.clone();
 
         let a_scalar = StringArray::from_iter_values(["b"]);
@@ -2261,6 +2278,9 @@ mod tests {
         assert_eq!(val, 3);
 
         // Should only have made 3 requests
+        // * First request fetches data for evaluating the first predicate
+        // * Second request fetches data for evaluating the second predicate
+        // * Third request fetches data for evaluating the projection
         assert_eq!(requests.lock().unwrap().len(), 3);
     }
 

Reply via email to