adriangb commented on code in PR #21267:
URL: https://github.com/apache/datafusion/pull/21267#discussion_r3025836766
##########
datafusion/physical-plan/src/limit.rs:
##########
@@ -436,6 +467,29 @@ impl ExecutionPlan for LocalLimitExec {
fn cardinality_effect(&self) -> CardinalityEffect {
CardinalityEffect::LowerEqual
}
+
+ fn gather_filters_for_pushdown(
+ &self,
+ _phase: FilterPushdownPhase,
+ parent_filters: Vec<Arc<dyn PhysicalExpr>>,
+ _config: &ConfigOptions,
+ ) -> Result<FilterDescription> {
+ FilterDescription::from_children(parent_filters, &self.children())
+ }
+
+ fn handle_child_pushdown_result(
+ &self,
+ _phase: FilterPushdownPhase,
+ child_pushdown_result: ChildPushdownResult,
+ _config: &ConfigOptions,
+ ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
+ let mut result =
FilterPushdownPropagation::if_all(child_pushdown_result);
+ if let Some(updated_child) = result.updated_node {
+ result.updated_node =
+ Some(Arc::new(LocalLimitExec::new(updated_child, self.fetch))
as _);
Review Comment:
Same here
##########
datafusion/physical-plan/src/filter.rs:
##########
@@ -609,8 +655,14 @@ impl ExecutionPlan for FilterExec {
child_pushdown_result: ChildPushdownResult,
_config: &ConfigOptions,
) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
- if phase != FilterPushdownPhase::Pre {
- return
Ok(FilterPushdownPropagation::if_all(child_pushdown_result));
+ if !matches!(phase, FilterPushdownPhase::Pre) {
+ let mut result =
FilterPushdownPropagation::if_all(child_pushdown_result);
+ if let Some(updated_child) = result.updated_node {
+ let mut new_self = self.clone();
+ new_self.input = updated_child;
+ result.updated_node = Some(Arc::new(new_self) as _);
+ }
+ return Ok(result);
Review Comment:
Good catch! This is definitely a footgun. Any thoughts on how we can make
the API less error prone? It would also be nice to put this into its own PR so
we can fast track it.
##########
datafusion/physical-plan/src/projection.rs:
##########
@@ -433,7 +433,13 @@ impl ExecutionPlan for ProjectionExec {
child_pushdown_result: ChildPushdownResult,
_config: &ConfigOptions,
) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
- Ok(FilterPushdownPropagation::if_all(child_pushdown_result))
+ let mut result =
FilterPushdownPropagation::if_all(child_pushdown_result);
+ if let Some(updated_child) = result.updated_node {
+ let mut new_self = self.clone();
+ new_self.input = updated_child;
+ result.updated_node = Some(Arc::new(new_self) as _);
+ }
+ Ok(result)
Review Comment:
ππ»
##########
datafusion/physical-plan/src/filter_pushdown.rs:
##########
@@ -444,13 +444,15 @@ impl ChildFilterDescription {
})
}
- /// Mark all parent filters as unsupported for this child.
+ /// Create a child filter description where all parent filters are marked
as unsupported,
+ /// and no self filters are pushed down.
pub fn all_unsupported(parent_filters: &[Arc<dyn PhysicalExpr>]) -> Self {
+ let parent_filters = parent_filters
+ .iter()
+ .map(|f| PushedDownPredicate::unsupported(Arc::clone(f)))
+ .collect();
Self {
- parent_filters: parent_filters
- .iter()
- .map(|f| PushedDownPredicate::unsupported(Arc::clone(f)))
- .collect(),
+ parent_filters,
Review Comment:
ππ»
##########
datafusion/physical-plan/src/limit.rs:
##########
@@ -251,6 +256,32 @@ impl ExecutionPlan for GlobalLimitExec {
fn supports_limit_pushdown(&self) -> bool {
true
}
+
+ fn gather_filters_for_pushdown(
+ &self,
+ _phase: FilterPushdownPhase,
+ parent_filters: Vec<Arc<dyn PhysicalExpr>>,
+ _config: &ConfigOptions,
+ ) -> Result<FilterDescription> {
+ FilterDescription::from_children(parent_filters, &self.children())
+ }
+
+ fn handle_child_pushdown_result(
+ &self,
+ _phase: FilterPushdownPhase,
+ child_pushdown_result: ChildPushdownResult,
+ _config: &ConfigOptions,
+ ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
+ let mut result =
FilterPushdownPropagation::if_all(child_pushdown_result);
+ if let Some(updated_child) = result.updated_node {
+ result.updated_node = Some(Arc::new(GlobalLimitExec::new(
Review Comment:
```suggestion
result.updated_node = Some(Arc::new(Self::new(
```
?
##########
datafusion/physical-plan/src/coalesce_batches.rs:
##########
@@ -277,7 +277,13 @@ impl ExecutionPlan for CoalesceBatchesExec {
child_pushdown_result: ChildPushdownResult,
_config: &ConfigOptions,
) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
- Ok(FilterPushdownPropagation::if_all(child_pushdown_result))
+ let mut result =
FilterPushdownPropagation::if_all(child_pushdown_result);
+ if let Some(updated_child) = result.updated_node {
+ let mut new_self = self.clone();
+ new_self.input = updated_child;
+ result.updated_node = Some(Arc::new(new_self) as _);
+ }
+ Ok(result)
Review Comment:
ππ»
##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -1430,6 +1431,21 @@ impl ExecutionPlan for SortExec {
Ok(FilterDescription::new().with_child(child))
}
+
+ fn handle_child_pushdown_result(
+ &self,
+ _phase: FilterPushdownPhase,
+ child_pushdown_result: ChildPushdownResult,
+ _config: &datafusion_common::config::ConfigOptions,
+ ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
+ let mut result =
FilterPushdownPropagation::if_all(child_pushdown_result);
+ if let Some(updated_child) = result.updated_node {
+ let mut new_self = self.cloned();
+ new_self.input = updated_child;
+ result.updated_node = Some(Arc::new(new_self) as _);
+ }
+ Ok(result)
+ }
Review Comment:
Really need to eliminate this foot gun π¦Άπ»
##########
datafusion/physical-plan/src/repartition/mod.rs:
##########
@@ -1201,7 +1201,13 @@ impl ExecutionPlan for RepartitionExec {
child_pushdown_result: ChildPushdownResult,
_config: &ConfigOptions,
) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
- Ok(FilterPushdownPropagation::if_all(child_pushdown_result))
+ let mut result =
FilterPushdownPropagation::if_all(child_pushdown_result);
+ if let Some(updated_child) = result.updated_node {
+ let mut new_self = self.clone();
+ new_self.input = updated_child;
+ result.updated_node = Some(Arc::new(new_self) as _);
+ }
+ Ok(result)
}
Review Comment:
ππ»
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]