yashrb24 opened a new issue, #21246:
URL: https://github.com/apache/datafusion/issues/21246
### Describe the bug
`ProjectionExec::gather_filters_for_pushdown` silently rewrites filter
predicates to the wrong source column when the projection's output schema
contains duplicate column names. This produces incorrect query results/missing
rows with no error.
The root cause is two functions that use name-only schema lookups, which
always return the first match:
**Bug 1** — `collect_reverse_alias` (`projection.rs`): Builds the reverse
alias map using `column_with_name(&alias)`, which always returns the first
column with that name. When two projection outputs share a name (e.g., `id` at
positions 0 and 2), both iterations produce the same `HashMap` key
`Column("id", 0)`, and the second silently overwrites the first.
```rust
// column_with_name always returns the FIRST match
let (aliased_index, _) =
self.projector.output_schema().column_with_name(&projection.alias)?; // ←
always returns 0 for duplicate names
let aliased_col = Column::new(&projection.alias, aliased_index);
alias_map.insert(aliased_col, Arc::clone( & projection.expr)); // ←
overwrites on collision
```
**Bug 2** — `FilterRemapper::try_remap` (`filter_pushdown.rs`): Uses
`schema.index_of(col.name())` which also returns the first match. A filter on
`id@2` (the second occurrence) gets silently rewritten to `id@0`.
```rust
// index_of always returns the FIRST match
let Ok(new_index) = self.child_schema.index_of(col.name())
// col was id@2 → new_index is 0 → rewrites to id@0
```
### To Reproduce
Construct a physical plan where a `ProjectionExec` produces duplicate column
names from a join, with a `FilterExec` above it:
```
FilterExec: id@0 = 3 ← checks output col 0 (right side's id)
ProjectionExec: [right_id@2 as id, ← output col 0 (from RIGHT)
name@1,
left_id@0 as id] ← output col 2 (from LEFT)
HashJoinExec: Left
left: [left_id, name] (columns 0-1)
right: [right_id] (column 2)
```
The filter `id@0 = 3` should check the right side's id. After
`FilterPushdown`:
- Bug 1 overwrites `Column("id", 0) → right_id@2` with `Column("id", 0) →
left_id@0`
- The filter gets remapped to `left_id@0 = 3` — filtering the wrong column
- Query returns incorrect results
Here's the test to reproduce this issue, for convenience add it to
`datafusion/core/tests/physical_optimizer/filter_pushdown.rs`. It constructs
the physical plan directly and runs it before and after FilterPushdown. The
original plan returns 3 correct rows; the optimized plan returns 0. You can run
this test with:
```bash
cargo test -p datafusion --test core_integration --
test_filter_pushdown_projection_duplicate_column_names --no-capture
```
```rust
/// Reproduces a bug where FilterPushdown through ProjectionExec with
duplicate
/// column names remaps filter predicates to the wrong source column.
///
/// Plan structure:
///
/// FilterExec: id@0 IS NULL ← checks output col 0 (right side's
id)
/// ProjectionExec: [right_id@2 as id, ← output col 0 (from RIGHT)
/// name@1,
/// left_id@0 as id] ← output col 2 (from LEFT)
/// HashJoinExec: Left
/// left: [left_id, name] (columns 0-1)
/// right: [right_id] (column 2)
///
/// Bug 1 overwrites Column("id", 0) → left_id@0 instead of right_id@2.
/// The filter `id@0 IS NULL` gets remapped to `left_id@0 IS NULL` (wrong
side).
#[tokio::test]
async fn test_filter_pushdown_projection_duplicate_column_names() {
use datafusion_common::JoinType;
use datafusion_physical_expr::expressions::is_null;
use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};
// Left table: orders (5 rows, all non-NULL left_id)
let left_batches = vec![record_batch!(
("left_id", Int32, [1, 2, 3, 4, 5]),
("name", Utf8, ["Alice", "Bob", "Charlie", "Diana", "Eve"])
)
.unwrap()];
let left_schema = Arc::new(Schema::new(vec![
Field::new("left_id", DataType::Int32, false),
Field::new("name", DataType::Utf8, false),
]));
let left_scan = TestScanBuilder::new(Arc::clone(&left_schema))
.with_batches(left_batches)
.build();
// Right table: returns (2 rows matching orders 1 and 3)
let right_batches = vec![
record_batch!(("right_id", Int32, [1, 3])).unwrap(),
];
let right_schema = Arc::new(Schema::new(vec![Field::new(
"right_id",
DataType::Int32,
false,
)]));
let right_scan = TestScanBuilder::new(Arc::clone(&right_schema))
.with_batches(right_batches)
.build();
// HashJoinExec: LEFT JOIN on left_id = right_id
// Join output schema: [left_id(0), name(1), right_id(2)]
let join = Arc::new(
HashJoinExec::try_new(
left_scan,
right_scan,
vec![(
col("left_id", &left_schema).unwrap(),
col("right_id", &right_schema).unwrap(),
)],
None,
&JoinType::Left,
None,
PartitionMode::CollectLeft,
datafusion_common::NullEquality::NullEqualsNothing,
false,
)
.unwrap(),
);
let join_schema = join.schema();
// ProjectionExec: creates duplicate "id" columns
// output col 0: right_id@2 AS id (from RIGHT side, NULL for
unmatched)
// output col 1: name@1
// output col 2: left_id@0 AS id (from LEFT side, never NULL)
let projection = Arc::new(
ProjectionExec::try_new(
vec![
(col("right_id", &join_schema).unwrap(), "id".to_string()),
(col("name", &join_schema).unwrap(), "name".to_string()),
(col("left_id", &join_schema).unwrap(), "id".to_string()),
],
join,
)
.unwrap(),
);
// FilterExec: id@0 IS NULL
// This should check the RIGHT side's id (output col 0 = right_id).
let filter_expr = is_null(Arc::new(Column::new("id", 0))).unwrap();
let plan = Arc::new(
FilterExec::try_new(filter_expr, projection).unwrap(),
) as Arc<dyn ExecutionPlan>;
// Apply the physical FilterPushdown optimizer
let config = ConfigOptions::default();
let optimized = FilterPushdown::new()
.optimize(Arc::clone(&plan), &config)
.unwrap();
// Execute both plans and compare
let session_ctx = SessionContext::new_with_config(SessionConfig::new());
session_ctx.register_object_store(
ObjectStoreUrl::parse("test://").unwrap().as_ref(),
Arc::new(InMemory::new()),
);
let task_ctx = session_ctx.state().task_ctx();
let original_batches = collect(Arc::clone(&plan), Arc::clone(&task_ctx))
.await
.unwrap();
let original_count: usize =
original_batches.iter().map(|b| b.num_rows()).sum();
let optimized_batches = collect(optimized, Arc::clone(&task_ctx))
.await
.unwrap();
let optimized_count: usize =
optimized_batches.iter().map(|b| b.num_rows()).sum();
// Original plan correctly returns 3 rows (orders 2, 4, 5 — unmatched)
assert_eq!(original_count, 3, "original plan should return 3 rows");
// BUG: optimized plan returns 0 rows — filter was remapped to wrong
column
// After fix, change to: assert_eq!(optimized_count, 3);
assert_eq!(optimized_count, 0,
"BUG: filter pushed to wrong column, expected 3 rows but got 0");
}
```
### Expected behavior
The filter should remain mapped to `right_id@2` and correctly filter on the
right side's column.
### Additional context
The alias-aware pushdown was added in #19404. Both `column_with_name` and
`index_of` are Arrow schema methods that return the first match by design and
the code needs to use positional indexing instead. The bug affects any use of
the physical plan API where plans are constructed directly instead of building
plans via the `LogicalPlanBuilder`. This code path is not exercised through
normal SQL because the logical optimizer's PushDownFilter resolves qualified
column references and pushes filters below projections before the physical plan
is created.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]