jackkleeman opened a new issue, #20285:
URL: https://github.com/apache/datafusion/issues/20285

   ### Describe the bug
   
   We have been noticing errors during execution of dynamic-filter-pushdown 
queries like:
   ```
   restate sql 'select 1 from sys_invocation_status where  partition_key is not 
null order by modified_at  limit 5'
   Error: <UNKNOWN> Datafusion error: Arrow error: Invalid argument error: 
Invalid comparison operation: UInt64 < Timestamp(ms, "+00:00")
   ```
   Implying that our dynamic filters are referring to the wrong columns 
(partition_key mixed up with modified_at). The table provider we maintain does 
not do any column name based remapping so the column indices need to be correct.
   
   I have bisected and the issue is introduced in 
https://github.com/apache/datafusion/pull/18719 ie v52 and v52.1 cc @adriangb, 
because with that PR, dynamic filters can pass through projections into 
filterexecs and this exposes the issue in filterexec where the indices are not 
remapped. I have managed to write a test fails now, and passes again with a fix 
that I will make a PR for shortly, but it also failed before #18719 (as the bug 
was still present but wasn't being triggered). I've struggled to write an 
integration test for this bug, because TestSource uses reassign_expr_columns 
and so is resistant to index mixups? Is this what I should be doing too?
   
   ### To Reproduce
   
   ```rust
    #[test]
       fn test_filter_with_projection_remaps_post_phase_parent_filters() -> 
Result<()> {
           // Test that FilterExec with a projection must remap parent dynamic
           // filter column indices from its output schema to the input schema
           // before passing them to the child.
           let input_schema = Arc::new(Schema::new(vec![
               Field::new("a", DataType::Int32, false),
               Field::new("b", DataType::Utf8, false),
               Field::new("c", DataType::Float64, false),
           ]));
           let input = Arc::new(EmptyExec::new(Arc::clone(&input_schema)));
   
           // FilterExec: a > 0, projection=[c@2]
           let predicate = Arc::new(BinaryExpr::new(
               Arc::new(Column::new("a", 0)),
               Operator::Gt,
               Arc::new(Literal::new(ScalarValue::Int32(Some(0)))),
           ));
           let filter = FilterExecBuilder::new(predicate, input)
               .apply_projection(Some(vec![2]))?
               .build()?;
   
           // Output schema should be [c:Float64]
           let output_schema = filter.schema();
           assert_eq!(output_schema.fields().len(), 1);
           assert_eq!(output_schema.field(0).name(), "c");
   
           // Simulate a parent dynamic filter referencing output column c@0
           let parent_filter: Arc<dyn PhysicalExpr> = Arc::new(Column::new("c", 
0));
   
           let config = ConfigOptions::new();
           let desc = filter.gather_filters_for_pushdown(
               FilterPushdownPhase::Post,
               vec![parent_filter],
               &config,
           )?;
   
           // The filter pushed to the child must reference c@2 (input schema),
           // not c@0 (output schema).
           let parent_filters = desc.parent_filters();
           assert_eq!(parent_filters.len(), 1); // one child
           assert_eq!(parent_filters[0].len(), 1); // one filter
           let remapped = &parent_filters[0][0].predicate;
           let display = format!("{remapped}");
           assert_eq!(
               display, "c@2",
               "Post-phase parent filter column index must be remapped \
                from output schema (c@0) to input schema (c@2)"
           );
   
           Ok(())
       }
   ```
   
   ### Expected behavior
   
   Passed on parent dynamic filters refer to the input schema indices, but 
instead they refer to the output schema indices
   
   ### Additional context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to