adriangb commented on code in PR #21527:
URL: https://github.com/apache/datafusion/pull/21527#discussion_r3073852152
##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -1424,6 +1426,50 @@ impl ExecutionPlan for SortExec {
Ok(FilterDescription::new().with_child(child))
}
+
+ fn handle_child_pushdown_result(
+ &self,
+ phase: FilterPushdownPhase,
+ child_pushdown_result: ChildPushdownResult,
+ _config: &datafusion_common::config::ConfigOptions,
+ ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
+ // Only absorb filters in Pre phase for a plain sort (no fetch).
+ // A sort with fetch (TopK) must not accept filters: reordering
+ // filter vs. limit would change semantics.
+ if phase != FilterPushdownPhase::Pre || self.fetch.is_some() {
+ return
Ok(FilterPushdownPropagation::if_all(child_pushdown_result));
+ }
+
+ // Collect parent filters that were NOT successfully pushed to our
child.
+ let unsupported_filters: Vec<Arc<dyn PhysicalExpr>> =
child_pushdown_result
+ .parent_filters
+ .iter()
+ .filter(|&f| matches!(f.all(), PushedDown::No))
+ .map(|f| Arc::clone(&f.filter))
+ .collect();
+
+ if unsupported_filters.is_empty() {
+ // All filters were pushed — nothing extra to do.
+ return
Ok(FilterPushdownPropagation::if_all(child_pushdown_result));
+ }
+
+ // Build a single conjunctive predicate from the unsupported filters
+ // and insert a FilterExec between this SortExec and its child.
+ let predicate =
datafusion_physical_expr::conjunction(unsupported_filters);
+ let new_child =
+ Arc::new(FilterExec::try_new(predicate, Arc::clone(self.input()))?)
+ as Arc<dyn ExecutionPlan>;
+ let new_sort = Arc::new(
+ SortExec::new(self.expr.clone(), new_child)
+ .with_fetch(self.fetch())
+ .with_preserve_partitioning(self.preserve_partitioning()),
+ ) as Arc<dyn ExecutionPlan>;
+
+ Ok(FilterPushdownPropagation {
+ filters: vec![PushedDown::Yes;
child_pushdown_result.parent_filters.len()],
+ updated_node: Some(new_sort),
+ })
Review Comment:
Swapping the order from `FilterExec -> SortExec` to `SortExec -> FilterExec`
makes sense to me 👍🏻
##########
datafusion/core/tests/physical_optimizer/filter_pushdown.rs:
##########
@@ -5146,3 +5150,350 @@ async fn
test_hashjoin_dynamic_filter_pushdown_left_semi_join() {
"
);
}
+
+// ==== Filter pushdown through SortExec tests ====
+
+/// FilterExec above a plain SortExec (no fetch) should be pushed below it.
+/// The scan supports pushdown, so the filter lands in the DataSourceExec.
+#[test]
Review Comment:
Can we add these as SLT tests instead?
##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -1424,6 +1426,50 @@ impl ExecutionPlan for SortExec {
Ok(FilterDescription::new().with_child(child))
}
+
+ fn handle_child_pushdown_result(
+ &self,
+ phase: FilterPushdownPhase,
+ child_pushdown_result: ChildPushdownResult,
+ _config: &datafusion_common::config::ConfigOptions,
+ ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
+ // Only absorb filters in Pre phase for a plain sort (no fetch).
+ // A sort with fetch (TopK) must not accept filters: reordering
+ // filter vs. limit would change semantics.
+ if phase != FilterPushdownPhase::Pre || self.fetch.is_some() {
Review Comment:
Could you explain why `FilterPushdownPhase::Post` should not be allowed,
maybe with examples of where it would be incorrect?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]