haohuaijin commented on code in PR #21527:
URL: https://github.com/apache/datafusion/pull/21527#discussion_r3078556470
##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -1424,6 +1426,50 @@ impl ExecutionPlan for SortExec {
Ok(FilterDescription::new().with_child(child))
}
+
+ fn handle_child_pushdown_result(
+ &self,
+ phase: FilterPushdownPhase,
+ child_pushdown_result: ChildPushdownResult,
+ _config: &datafusion_common::config::ConfigOptions,
+ ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
+ // Only absorb filters in Pre phase for a plain sort (no fetch).
+ // A sort with fetch (TopK) must not accept filters: reordering
+ // filter vs. limit would change semantics.
+ if phase != FilterPushdownPhase::Pre || self.fetch.is_some() {
Review Comment:
i add a test case in
[10f1462](https://github.com/apache/datafusion/pull/21527/commits/10f14626f56c49da40d982d5382436f436c4cb9b)
if i remove the `phase != FilterPushdownPhase::Pre`, the result will like
below, we insert the dynamic filter under the sort, before the datasource if
datasource do not accept dynamic filter,
```rust
// The probe-side DataSourceExec does not accept the dynamic filter, so
// SortExec intercepts it and inserts a FilterExec below itself.
// The FilterExec holds the DynamicFilter predicate and evaluates it at
// runtime once HashJoin populates it with the build-side values.
insta::assert_snapshot!(
OptimizationTest::new(
Arc::clone(&plan),
FilterPushdown::new_post_optimization(),
true
),
@r"
OptimizationTest:
input:
- HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0),
(b@1, b@1)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b], file_type=test, pushdown_supported=true
- SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
- DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b], file_type=test, pushdown_supported=false
output:
Ok:
- HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0),
(b@1, b@1)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b], file_type=test, pushdown_supported=true
- SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
- FilterExec: DynamicFilter [ empty ]
- DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b], file_type=test, pushdown_supported=false
"
);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]