yashrb24 opened a new issue, #21246:
URL: https://github.com/apache/datafusion/issues/21246

   ### Describe the bug
   
   `ProjectionExec::gather_filters_for_pushdown` silently rewrites filter 
predicates to the wrong source column when the projection's output schema 
contains duplicate column names. This produces incorrect query results/missing 
rows with no error.
   
   The root cause is two functions that use name-only schema lookups, which 
always return the first match:
   
   **Bug 1** — `collect_reverse_alias` (`projection.rs`): Builds the reverse 
alias map using `column_with_name(&alias)`, which always returns the first 
column with that name. When two projection outputs share a name (e.g., `id` at 
positions 0 and 2), both iterations produce the same `HashMap` key 
`Column("id", 0)`, and the second silently overwrites the first.
   
   ```rust
   // column_with_name always returns the FIRST match
   let (aliased_index, _) = 
self.projector.output_schema().column_with_name(&projection.alias)?;      // ← 
always returns 0 for duplicate names
   let aliased_col = Column::new(&projection.alias, aliased_index);
   alias_map.insert(aliased_col, Arc::clone( & projection.expr)); // ← 
overwrites on collision
   ```
   
   **Bug 2** — `FilterRemapper::try_remap` (`filter_pushdown.rs`): Uses 
`schema.index_of(col.name())` which also returns the first match. A filter on 
`id@2` (the second occurrence) gets silently rewritten to `id@0`.
   
   ```rust
   // index_of always returns the FIRST match
   let Ok(new_index) = self.child_schema.index_of(col.name())
   // col was id@2 → new_index is 0 → rewrites to id@0
   ```
   
   ### To Reproduce
   
   Construct a physical plan where a `ProjectionExec` produces duplicate column 
names from a join, with a `FilterExec` above it:
   
   ```
   FilterExec: id@0 = 3                  ← checks output col 0 (right side's id)
     ProjectionExec: [right_id@2 as id,  ← output col 0 (from RIGHT)
                      name@1,
                      left_id@0 as id]   ← output col 2 (from LEFT)
       HashJoinExec: Left
         left:  [left_id, name]          (columns 0-1)
         right: [right_id]               (column 2)
   ```
   
   The filter `id@0 = 3` should check the right side's id. After 
`FilterPushdown`:
   
   - Bug 1 overwrites `Column("id", 0) → right_id@2` with `Column("id", 0) → 
left_id@0`
   - The filter gets remapped to `left_id@0 = 3` — filtering the wrong column
   - Query returns incorrect results
   
   
   Here's the test to reproduce this issue, for convenience add it to 
`datafusion/core/tests/physical_optimizer/filter_pushdown.rs`. It constructs 
the physical plan directly and runs it before and after FilterPushdown. The 
original plan returns 3 correct rows; the optimized plan returns 0. You can run 
this test with:
   ```bash
   cargo test -p datafusion --test core_integration -- 
test_filter_pushdown_projection_duplicate_column_names --no-capture
   ```
   
   ```rust
   /// Reproduces a bug where FilterPushdown through ProjectionExec with 
duplicate
   /// column names remaps filter predicates to the wrong source column.
   ///
   /// Plan structure:
   ///
   ///   FilterExec: id@0 IS NULL          ← checks output col 0 (right side's 
id)
   ///     ProjectionExec: [right_id@2 as id,   ← output col 0 (from RIGHT)
   ///                      name@1,
   ///                      left_id@0 as id]    ← output col 2 (from LEFT)
   ///       HashJoinExec: Left
   ///         left:  [left_id, name]       (columns 0-1)
   ///         right: [right_id]            (column 2)
   ///
   /// Bug 1 overwrites Column("id", 0) → left_id@0 instead of right_id@2.
   /// The filter `id@0 IS NULL` gets remapped to `left_id@0 IS NULL` (wrong 
side).
   #[tokio::test]
   async fn test_filter_pushdown_projection_duplicate_column_names() {
       use datafusion_common::JoinType;
       use datafusion_physical_expr::expressions::is_null;
       use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};
   
       // Left table: orders (5 rows, all non-NULL left_id)
       let left_batches = vec![record_batch!(
           ("left_id", Int32, [1, 2, 3, 4, 5]),
           ("name", Utf8, ["Alice", "Bob", "Charlie", "Diana", "Eve"])
       )
       .unwrap()];
       let left_schema = Arc::new(Schema::new(vec![
           Field::new("left_id", DataType::Int32, false),
           Field::new("name", DataType::Utf8, false),
       ]));
       let left_scan = TestScanBuilder::new(Arc::clone(&left_schema))
           .with_batches(left_batches)
           .build();
   
       // Right table: returns (2 rows matching orders 1 and 3)
       let right_batches = vec![
           record_batch!(("right_id", Int32, [1, 3])).unwrap(),
       ];
       let right_schema = Arc::new(Schema::new(vec![Field::new(
           "right_id",
           DataType::Int32,
           false,
       )]));
       let right_scan = TestScanBuilder::new(Arc::clone(&right_schema))
           .with_batches(right_batches)
           .build();
   
       // HashJoinExec: LEFT JOIN on left_id = right_id
       // Join output schema: [left_id(0), name(1), right_id(2)]
       let join = Arc::new(
           HashJoinExec::try_new(
               left_scan,
               right_scan,
               vec![(
                   col("left_id", &left_schema).unwrap(),
                   col("right_id", &right_schema).unwrap(),
               )],
               None,
               &JoinType::Left,
               None,
               PartitionMode::CollectLeft,
               datafusion_common::NullEquality::NullEqualsNothing,
               false,
           )
           .unwrap(),
       );
       let join_schema = join.schema();
   
       // ProjectionExec: creates duplicate "id" columns
       //   output col 0: right_id@2 AS id  (from RIGHT side, NULL for 
unmatched)
       //   output col 1: name@1
       //   output col 2: left_id@0 AS id   (from LEFT side, never NULL)
       let projection = Arc::new(
           ProjectionExec::try_new(
               vec![
                   (col("right_id", &join_schema).unwrap(), "id".to_string()),
                   (col("name", &join_schema).unwrap(), "name".to_string()),
                   (col("left_id", &join_schema).unwrap(), "id".to_string()),
               ],
               join,
           )
           .unwrap(),
       );
   
       // FilterExec: id@0 IS NULL
       // This should check the RIGHT side's id (output col 0 = right_id).
       let filter_expr = is_null(Arc::new(Column::new("id", 0))).unwrap();
       let plan = Arc::new(
           FilterExec::try_new(filter_expr, projection).unwrap(),
       ) as Arc<dyn ExecutionPlan>;
   
       // Apply the physical FilterPushdown optimizer
       let config = ConfigOptions::default();
       let optimized = FilterPushdown::new()
           .optimize(Arc::clone(&plan), &config)
           .unwrap();
   
       // Execute both plans and compare
       let session_ctx = SessionContext::new_with_config(SessionConfig::new());
       session_ctx.register_object_store(
           ObjectStoreUrl::parse("test://").unwrap().as_ref(),
           Arc::new(InMemory::new()),
       );
       let task_ctx = session_ctx.state().task_ctx();
   
       let original_batches = collect(Arc::clone(&plan), Arc::clone(&task_ctx))
           .await
           .unwrap();
       let original_count: usize =
           original_batches.iter().map(|b| b.num_rows()).sum();
   
       let optimized_batches = collect(optimized, Arc::clone(&task_ctx))
           .await
           .unwrap();
       let optimized_count: usize =
           optimized_batches.iter().map(|b| b.num_rows()).sum();
   
       // Original plan correctly returns 3 rows (orders 2, 4, 5 — unmatched)
       assert_eq!(original_count, 3, "original plan should return 3 rows");
   
       // BUG: optimized plan returns 0 rows — filter was remapped to wrong 
column
       // After fix, change to: assert_eq!(optimized_count, 3);
       assert_eq!(optimized_count, 0,
           "BUG: filter pushed to wrong column, expected 3 rows but got 0");
   }
   ```
   
   
   ### Expected behavior
   
   The filter should remain mapped to `right_id@2` and correctly filter on the 
right side's column.
   
   
   ### Additional context
   
   The alias-aware pushdown was added in #19404. Both `column_with_name` and 
`index_of` are Arrow schema methods that return the first match by design and 
the code needs to use positional indexing instead. The bug affects any use of 
the physical plan API where plans are constructed directly instead of building 
plans via the `LogicalPlanBuilder`. This code path is not exercised through 
normal SQL because the logical optimizer's PushDownFilter resolves qualified 
column references and pushes filters below projections before the physical plan 
is created.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to