jackkleeman opened a new issue, #20285: URL: https://github.com/apache/datafusion/issues/20285
### Describe the bug We have been noticing errors during execution of dynamic-filter-pushdown queries like: ``` restate sql 'select 1 from sys_invocation_status where partition_key is not null order by modified_at limit 5' Error: <UNKNOWN> Datafusion error: Arrow error: Invalid argument error: Invalid comparison operation: UInt64 < Timestamp(ms, "+00:00") ``` Implying that our dynamic filters are referring to the wrong columns (partition_key mixed up with modified_at). The table provider we maintain does not do any column name based remapping so the column indices need to be correct. I have bisected and the issue is introduced in https://github.com/apache/datafusion/pull/18719 ie v52 and v52.1 cc @adriangb, because with that PR, dynamic filters can pass through projections into filterexecs and this exposes the issue in filterexec where the indices are not remapped. I have managed to write a test fails now, and passes again with a fix that I will make a PR for shortly, but it also failed before #18719 (as the bug was still present but wasn't being triggered). I've struggled to write an integration test for this bug, because TestSource uses reassign_expr_columns and so is resistant to index mixups? Is this what I should be doing too? ### To Reproduce ```rust #[test] fn test_filter_with_projection_remaps_post_phase_parent_filters() -> Result<()> { // Test that FilterExec with a projection must remap parent dynamic // filter column indices from its output schema to the input schema // before passing them to the child. let input_schema = Arc::new(Schema::new(vec![ Field::new("a", DataType::Int32, false), Field::new("b", DataType::Utf8, false), Field::new("c", DataType::Float64, false), ])); let input = Arc::new(EmptyExec::new(Arc::clone(&input_schema))); // FilterExec: a > 0, projection=[c@2] let predicate = Arc::new(BinaryExpr::new( Arc::new(Column::new("a", 0)), Operator::Gt, Arc::new(Literal::new(ScalarValue::Int32(Some(0)))), )); let filter = FilterExecBuilder::new(predicate, input) .apply_projection(Some(vec![2]))? .build()?; // Output schema should be [c:Float64] let output_schema = filter.schema(); assert_eq!(output_schema.fields().len(), 1); assert_eq!(output_schema.field(0).name(), "c"); // Simulate a parent dynamic filter referencing output column c@0 let parent_filter: Arc<dyn PhysicalExpr> = Arc::new(Column::new("c", 0)); let config = ConfigOptions::new(); let desc = filter.gather_filters_for_pushdown( FilterPushdownPhase::Post, vec![parent_filter], &config, )?; // The filter pushed to the child must reference c@2 (input schema), // not c@0 (output schema). let parent_filters = desc.parent_filters(); assert_eq!(parent_filters.len(), 1); // one child assert_eq!(parent_filters[0].len(), 1); // one filter let remapped = &parent_filters[0][0].predicate; let display = format!("{remapped}"); assert_eq!( display, "c@2", "Post-phase parent filter column index must be remapped \ from output schema (c@0) to input schema (c@2)" ); Ok(()) } ``` ### Expected behavior Passed on parent dynamic filters refer to the input schema indices, but instead they refer to the output schema indices ### Additional context _No response_ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
