alamb commented on code in PR #20417:
URL: https://github.com/apache/datafusion/pull/20417#discussion_r2905271472
##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -459,27 +460,37 @@ impl FileOpener for ParquetOpener {
// `row_filter` for details.
//
---------------------------------------------------------------------
- // Filter pushdown: evaluate predicates during scan
- if let Some(predicate) =
pushdown_filters.then_some(predicate).flatten() {
- let row_filter = row_filter::build_row_filter(
+ // Filter pushdown: evaluate predicates during scan.
Review Comment:
If we are deciding what filters to push down based on projection and filter
columns, is the ParquetOpener the right place? I wonder if we should move the
determiniation earlier (like maybe don't bother to try and push down filters at
all ?)
##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -2004,4 +2037,169 @@ mod test {
"Reverse scan with non-contiguous row groups should correctly map
RowSelection"
);
}
+
+ /// Per-conjunct RowFilter demotion: when a conjunct's required columns
+ /// cover all projected columns, it provides no column-decode savings
+ /// and is demoted to batch-level filtering. Conjuncts with extra
+ /// projected columns stay in the RowFilter for late materialization.
+ #[tokio::test]
+ async fn test_skip_row_filter_when_filter_cols_subset_of_projection() {
+ let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
+
+ // 4 rows: a=[1,2,2,4], b=[10,20,30,40]
+ let batch = record_batch!(
+ ("a", Int32, vec![Some(1), Some(2), Some(2), Some(4)]),
+ ("b", Int32, vec![Some(10), Some(20), Some(30), Some(40)])
+ )
+ .unwrap();
+ let data_size =
+ write_parquet(Arc::clone(&store), "test.parquet",
batch.clone()).await;
+ let schema = batch.schema();
+ let file = PartitionedFile::new(
+ "test.parquet".to_string(),
+ u64::try_from(data_size).unwrap(),
+ );
+
+ // Case 1: filter_cols == projection_cols → batch filter path
+ // Filter: a = 2, Projection: [a]
+ // Conjunct cols = {0}, projection = {0} → no extra cols to skip
+ // decoding → demoted to batch filter.
+ let expr = col("a").eq(lit(2));
+ let predicate = logical2physical(&expr, &schema);
+ let opener = ParquetOpenerBuilder::new()
+ .with_store(Arc::clone(&store))
+ .with_schema(Arc::clone(&schema))
+ .with_projection_indices(&[0])
+ .with_predicate(predicate)
+ .with_pushdown_filters(true)
+ .with_reorder_filters(true)
+ .build();
+ let stream = opener.open(file.clone()).unwrap().await.unwrap();
Review Comment:
I feel like this test should also be asserting something about the
predicates more directly (this is asserting the number of rows that comes out,
rather than the fact that filter is pushed down)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]