darmie commented on code in PR #20417:
URL: https://github.com/apache/datafusion/pull/20417#discussion_r2828103459
##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -460,26 +463,80 @@ impl FileOpener for ParquetOpener {
//
---------------------------------------------------------------------
// Filter pushdown: evaluate predicates during scan
- if let Some(predicate) =
pushdown_filters.then_some(predicate).flatten() {
- let row_filter = row_filter::build_row_filter(
- &predicate,
- &physical_file_schema,
- builder.metadata(),
- reorder_predicates,
- &file_metrics,
- );
+ //
+ // When predicate columns exactly match the projection columns
+ // and there is at most one static conjunct (excluding dynamic
+ // filters from TopK / joins), the RowFilter (late
+ // materialization) path provides no benefit: every projected
+ // column is a predicate column, so there are no extra columns
+ // whose decode could be skipped for non-matching rows, and
+ // with a single static conjunct there is no incremental
+ // evaluation advantage. Apply the predicate as a batch-level
+ // filter after decoding instead, avoiding the overhead of
+ // CachedArrayReader / ReadPlanBuilder / try_next_batch.
+ //
+ // When there are non-predicate projection columns (e.g.
+ // SELECT * WHERE col = X), RowFilter is valuable because it
+ // skips decoding those extra columns for non-matching rows.
+ //
+ // Multi-conjunct static predicates are left on the RowFilter
+ // path because the RowFilter evaluates conjuncts incrementally
+ // — a selective first conjunct can avoid decoding expensive
+ // later columns for non-matching rows.
+ let batch_filter_predicate = if let Some(predicate) =
+ pushdown_filters.then_some(predicate).flatten()
+ {
+ let predicate_col_indices: HashSet<usize> =
+ collect_columns(&predicate)
+ .iter()
+ .map(|c| c.index())
+ .collect();
+ let projection_col_indices: HashSet<usize> =
+ projection.column_indices().into_iter().collect();
+
+ // Count only static conjuncts — dynamic filters (e.g.
+ // from TopK or join pushdown) are runtime-generated and
+ // reference the same projected columns, so they don't
+ // benefit from RowFilter's incremental evaluation.
+ let static_conjunct_count = split_conjunction(&predicate)
+ .iter()
+ .filter(|c| !is_dynamic_physical_expr(c))
+ .count();
+ let skip_row_filter = !predicate_col_indices.is_empty()
+ && predicate_col_indices == projection_col_indices
+ && static_conjunct_count <= 1;
+
+ if skip_row_filter {
Review Comment:
Hmm, I believe that example is handled correctly.
For `select a, b from t where a = 1 and b = 2`:
- `predicate_col_indices = {a, b}`, `projection_col_indices = {a, b}` exact
match
- `static_conjunct_count = 2` , the `<= 1` check fails, RowFilter path is
used
The batch filter path only triggers when these conditions hold:
1. predicate columns exactly equal projection columns (strict `==`, not
subset)
2. at most one static conjunct
3. predicate columns are non-empty
I agree that the ideal approach is per-conjunct: evaluate each filter
individually and only promote to RowFilter the ones that provide column-skip
savings. That's closer to what adriangb is exploring in #20363 with adaptive
selectivity tracking. This PR is a narrower fix for the simplest degenerate
case.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]