This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 72619096fc Minor: Add a parquet row_filter test, reduce some test
boiler plate (#7522)
72619096fc is described below
commit 72619096fc5da4ba4f1778d0ed21f6007db22c39
Author: Andrew Lamb <[email protected]>
AuthorDate: Tue May 20 12:46:14 2025 -0400
Minor: Add a parquet row_filter test, reduce some test boiler plate (#7522)
* Add single row filter test
* DRI
* Update parquet/src/arrow/async_reader/mod.rs
* Improve comments for other tests
---
parquet/src/arrow/async_reader/mod.rs | 170 +++++++++++++++++++---------------
1 file changed, 95 insertions(+), 75 deletions(-)
diff --git a/parquet/src/arrow/async_reader/mod.rs
b/parquet/src/arrow/async_reader/mod.rs
index 0c38d36a5b..ac4d24ee27 100644
--- a/parquet/src/arrow/async_reader/mod.rs
+++ b/parquet/src/arrow/async_reader/mod.rs
@@ -1135,6 +1135,16 @@ mod tests {
requests: Arc<Mutex<Vec<Range<usize>>>>,
}
+ impl TestReader {
+ fn new(data: Bytes) -> Self {
+ Self {
+ data,
+ metadata: Default::default(),
+ requests: Default::default(),
+ }
+ }
+ }
+
impl AsyncFileReader for TestReader {
fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_,
Result<Bytes>> {
let range = range.clone();
@@ -1167,11 +1177,7 @@ mod tests {
let path = format!("{testdata}/alltypes_plain.parquet");
let data = Bytes::from(std::fs::read(path).unwrap());
- let async_reader = TestReader {
- data: data.clone(),
- metadata: Default::default(),
- requests: Default::default(),
- };
+ let async_reader = TestReader::new(data.clone());
let requests = async_reader.requests.clone();
let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
@@ -1220,11 +1226,7 @@ mod tests {
let path = format!("{testdata}/alltypes_plain.parquet");
let data = Bytes::from(std::fs::read(path).unwrap());
- let async_reader = TestReader {
- data: data.clone(),
- metadata: Default::default(),
- requests: Default::default(),
- };
+ let async_reader = TestReader::new(data.clone());
let requests = async_reader.requests.clone();
let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
@@ -1281,11 +1283,7 @@ mod tests {
let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
let data = Bytes::from(std::fs::read(path).unwrap());
- let async_reader = TestReader {
- data: data.clone(),
- metadata: Default::default(),
- requests: Default::default(),
- };
+ let async_reader = TestReader::new(data.clone());
let options = ArrowReaderOptions::new().with_page_index(true);
let builder =
ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
@@ -1350,11 +1348,7 @@ mod tests {
assert_eq!(metadata.num_row_groups(), 1);
- let async_reader = TestReader {
- data: data.clone(),
- metadata: Default::default(),
- requests: Default::default(),
- };
+ let async_reader = TestReader::new(data.clone());
let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
.await
@@ -1391,11 +1385,7 @@ mod tests {
let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
let data = Bytes::from(std::fs::read(path).unwrap());
- let async_reader = TestReader {
- data: data.clone(),
- metadata: Default::default(),
- requests: Default::default(),
- };
+ let async_reader = TestReader::new(data.clone());
let options = ArrowReaderOptions::new().with_page_index(true);
let builder =
ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
@@ -1469,11 +1459,7 @@ mod tests {
let selection = RowSelection::from(selectors);
- let async_reader = TestReader {
- data: data.clone(),
- metadata: Default::default(),
- requests: Default::default(),
- };
+ let async_reader = TestReader::new(data.clone());
let options = ArrowReaderOptions::new().with_page_index(true);
let builder =
ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
@@ -1535,11 +1521,7 @@ mod tests {
let selection = RowSelection::from(selectors);
- let async_reader = TestReader {
- data: data.clone(),
- metadata: Default::default(),
- requests: Default::default(),
- };
+ let async_reader = TestReader::new(data.clone());
let options = ArrowReaderOptions::new().with_page_index(true);
let builder =
ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
@@ -1566,6 +1548,70 @@ mod tests {
#[tokio::test]
async fn test_row_filter() {
+ let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
+ let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
+ let data = RecordBatch::try_from_iter([
+ ("a", Arc::new(a) as ArrayRef),
+ ("b", Arc::new(b) as ArrayRef),
+ ])
+ .unwrap();
+
+ let mut buf = Vec::with_capacity(1024);
+ let mut writer = ArrowWriter::try_new(&mut buf, data.schema(),
None).unwrap();
+ writer.write(&data).unwrap();
+ writer.close().unwrap();
+
+ let data: Bytes = buf.into();
+ let metadata = ParquetMetaDataReader::new()
+ .parse_and_finish(&data)
+ .unwrap();
+ let parquet_schema = metadata.file_metadata().schema_descr_ptr();
+
+ let test = TestReader::new(data);
+ let requests = test.requests.clone();
+
+ let a_scalar = StringArray::from_iter_values(["b"]);
+ let a_filter = ArrowPredicateFn::new(
+ ProjectionMask::leaves(&parquet_schema, vec![0]),
+ move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
+ );
+
+ let filter = RowFilter::new(vec![Box::new(a_filter)]);
+
+ let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 1]);
+ let stream = ParquetRecordBatchStreamBuilder::new(test)
+ .await
+ .unwrap()
+ .with_projection(mask.clone())
+ .with_batch_size(1024)
+ .with_row_filter(filter)
+ .build()
+ .unwrap();
+
+ let batches: Vec<_> = stream.try_collect().await.unwrap();
+ assert_eq!(batches.len(), 1);
+
+ let batch = &batches[0];
+ assert_eq!(batch.num_columns(), 2);
+
+ // Filter should have kept only rows with "b" in column 0
+ assert_eq!(
+ batch.column(0).as_ref(),
+ &StringArray::from_iter_values(["b", "b", "b"])
+ );
+ assert_eq!(
+ batch.column(1).as_ref(),
+ &StringArray::from_iter_values(["2", "3", "4"])
+ );
+
+ // Should only have made 2 requests:
+ // * First request fetches data for evaluating the predicate
+ // * Second request fetches data for evaluating the projection
+ assert_eq!(requests.lock().unwrap().len(), 2);
+ }
+
+ #[tokio::test]
+ async fn test_two_row_filters() {
let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
let c = Int32Array::from_iter(0..6);
@@ -1587,11 +1633,7 @@ mod tests {
.unwrap();
let parquet_schema = metadata.file_metadata().schema_descr_ptr();
- let test = TestReader {
- data,
- metadata: Default::default(),
- requests: Default::default(),
- };
+ let test = TestReader::new(data);
let requests = test.requests.clone();
let a_scalar = StringArray::from_iter_values(["b"]);
@@ -1634,6 +1676,9 @@ mod tests {
assert_eq!(val, 3);
// Should only have made 3 requests
+ // * First request fetches data for evaluating the first predicate
+ // * Second request fetches data for evaluating the second predicate
+ // * Third request fetches data for evaluating the projection
assert_eq!(requests.lock().unwrap().len(), 3);
}
@@ -1664,11 +1709,7 @@ mod tests {
assert_eq!(metadata.num_row_groups(), 2);
- let test = TestReader {
- data,
- metadata: Default::default(),
- requests: Default::default(),
- };
+ let test = TestReader::new(data);
let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
.await
@@ -1755,11 +1796,7 @@ mod tests {
assert_eq!(metadata.num_row_groups(), 1);
- let async_reader = TestReader {
- data: data.clone(),
- metadata: Default::default(),
- requests: Default::default(),
- };
+ let async_reader = TestReader::new(data.clone());
let a_filter =
ArrowPredicateFn::new(ProjectionMask::leaves(&parquet_schema,
vec![1]), |batch| {
@@ -1823,11 +1860,7 @@ mod tests {
assert_eq!(metadata.num_row_groups(), 1);
- let async_reader = TestReader {
- data: data.clone(),
- metadata: Default::default(),
- requests: Default::default(),
- };
+ let async_reader = TestReader::new(data.clone());
let requests = async_reader.requests.clone();
let (_, fields) = parquet_to_arrow_schema_and_fields(
@@ -1893,11 +1926,7 @@ mod tests {
let path = format!("{testdata}/alltypes_plain.parquet");
let data = Bytes::from(std::fs::read(path).unwrap());
- let async_reader = TestReader {
- data: data.clone(),
- metadata: Default::default(),
- requests: Default::default(),
- };
+ let async_reader = TestReader::new(data.clone());
let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
.await
@@ -2036,11 +2065,7 @@ mod tests {
let testdata = arrow::util::test_util::parquet_test_data();
let path =
format!("{testdata}/data_index_bloom_encoding_stats.parquet");
let data = Bytes::from(std::fs::read(path).unwrap());
- let async_reader = TestReader {
- data: data.clone(),
- metadata: Default::default(),
- requests: Default::default(),
- };
+ let async_reader = TestReader::new(data.clone());
let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
.await
.unwrap();
@@ -2063,11 +2088,7 @@ mod tests {
}
async fn test_get_row_group_column_bloom_filter(data: Bytes, with_length:
bool) {
- let async_reader = TestReader {
- data: data.clone(),
- metadata: Default::default(),
- requests: Default::default(),
- };
+ let async_reader = TestReader::new(data.clone());
let mut builder = ParquetRecordBatchStreamBuilder::new(async_reader)
.await
@@ -2206,11 +2227,7 @@ mod tests {
.unwrap();
let parquet_schema = metadata.file_metadata().schema_descr_ptr();
- let test = TestReader {
- data,
- metadata: Default::default(),
- requests: Default::default(),
- };
+ let test = TestReader::new(data);
let requests = test.requests.clone();
let a_scalar = StringArray::from_iter_values(["b"]);
@@ -2261,6 +2278,9 @@ mod tests {
assert_eq!(val, 3);
// Should only have made 3 requests
+ // * First request fetches data for evaluating the first predicate
+ // * Second request fetches data for evaluating the second predicate
+ // * Third request fetches data for evaluating the projection
assert_eq!(requests.lock().unwrap().len(), 3);
}