ethan-tyler commented on code in PR #19884:
URL: https://github.com/apache/datafusion/pull/19884#discussion_r2706191701
##########
datafusion/core/tests/custom_sources_cases/dml_planning.rs:
##########
@@ -246,6 +269,75 @@ async fn test_delete_complex_expr() -> Result<()> {
Ok(())
}
+#[tokio::test]
+async fn test_delete_filter_pushdown_extracts_table_scan_filters() ->
Result<()> {
+ let provider = Arc::new(CaptureDeleteProvider::new_with_filter_pushdown(
Review Comment:
1) Nice DELETE+Exact pushdown regression. I would add the same coverage for
UPDATE … WHERE … + TableProviderFilterPushDown::Exact (since
extract_dml_filters is used by both).
2) I would think about adding a "mixed location" case (some conjuncts in
residual Filter, others in TableScan.filters) to lock in union+dedup behavior.
##########
datafusion/core/src/physical_planner.rs:
##########
@@ -1907,24 +1907,48 @@ fn get_physical_expr_pair(
}
/// Extract filter predicates from a DML input plan (DELETE/UPDATE).
-/// Walks the logical plan tree and collects Filter predicates,
-/// splitting AND conjunctions into individual expressions.
+/// Walks the logical plan tree and collects Filter predicates and any filters
+/// pushed down into TableScan nodes, splitting AND conjunctions into
individual expressions.
/// Column qualifiers are stripped so expressions can be evaluated against
-/// the TableProvider's schema.
+/// the TableProvider's schema. Deduplicates filters to avoid passing the same
+/// predicate twice when filters appear in both Filter and TableScan nodes.
///
fn extract_dml_filters(input: &Arc<LogicalPlan>) -> Result<Vec<Expr>> {
let mut filters = Vec::new();
input.apply(|node| {
- if let LogicalPlan::Filter(filter) = node {
- // Split AND predicates into individual expressions
-
filters.extend(split_conjunction(&filter.predicate).into_iter().cloned());
+ match node {
+ LogicalPlan::Filter(filter) => {
+ // Split AND predicates into individual expressions
+
filters.extend(split_conjunction(&filter.predicate).into_iter().cloned());
+ }
+ LogicalPlan::TableScan(TableScan {
Review Comment:
This collects TableScan.filters from any scan in the subtree. Works for
single-table DELETE/UPDATE, but unsafe for UPDATE … FROM (extra scans). Should
we restrict extraction to the DML target scan (match table_name or provider
identity) and fail-closed if multiple candidate scans exist?
##########
datafusion/core/src/physical_planner.rs:
##########
@@ -1907,24 +1907,48 @@ fn get_physical_expr_pair(
}
/// Extract filter predicates from a DML input plan (DELETE/UPDATE).
-/// Walks the logical plan tree and collects Filter predicates,
-/// splitting AND conjunctions into individual expressions.
+/// Walks the logical plan tree and collects Filter predicates and any filters
+/// pushed down into TableScan nodes, splitting AND conjunctions into
individual expressions.
/// Column qualifiers are stripped so expressions can be evaluated against
-/// the TableProvider's schema.
+/// the TableProvider's schema. Deduplicates filters to avoid passing the same
+/// predicate twice when filters appear in both Filter and TableScan nodes.
///
fn extract_dml_filters(input: &Arc<LogicalPlan>) -> Result<Vec<Expr>> {
let mut filters = Vec::new();
input.apply(|node| {
- if let LogicalPlan::Filter(filter) = node {
- // Split AND predicates into individual expressions
-
filters.extend(split_conjunction(&filter.predicate).into_iter().cloned());
+ match node {
+ LogicalPlan::Filter(filter) => {
+ // Split AND predicates into individual expressions
+
filters.extend(split_conjunction(&filter.predicate).into_iter().cloned());
+ }
+ LogicalPlan::TableScan(TableScan {
+ filters: scan_filters,
+ ..
+ }) => {
+ for filter in scan_filters {
+
filters.extend(split_conjunction(filter).into_iter().cloned());
+ }
+ }
+ _ => {}
}
Ok(TreeNodeRecursion::Continue)
})?;
- // Strip table qualifiers from column references
- filters.into_iter().map(strip_column_qualifiers).collect()
+ // Strip table qualifiers from column references and deduplicate.
+ // Deduplication is necessary because filters may appear in both Filter
nodes
+ // and TableScan.filters when the optimizer pushes some predicates down.
+ // We deduplicate by (unqualified) expression to avoid passing the same
filter twice.
+ let mut seen_filters = HashSet::new();
+ let deduped = filters
+ .into_iter()
+ .map(strip_column_qualifiers)
Review Comment:
Qualifier stripping is dangerous if any predicate references non-target
relations (t.id = u.id → id = id). I think we should validate all Column refs
belong to the DML target before stripping, and error out (fail closed)
otherwise.
##########
datafusion/core/src/physical_planner.rs:
##########
@@ -1907,24 +1907,48 @@ fn get_physical_expr_pair(
}
/// Extract filter predicates from a DML input plan (DELETE/UPDATE).
-/// Walks the logical plan tree and collects Filter predicates,
-/// splitting AND conjunctions into individual expressions.
+/// Walks the logical plan tree and collects Filter predicates and any filters
+/// pushed down into TableScan nodes, splitting AND conjunctions into
individual expressions.
/// Column qualifiers are stripped so expressions can be evaluated against
-/// the TableProvider's schema.
+/// the TableProvider's schema. Deduplicates filters to avoid passing the same
+/// predicate twice when filters appear in both Filter and TableScan nodes.
///
fn extract_dml_filters(input: &Arc<LogicalPlan>) -> Result<Vec<Expr>> {
let mut filters = Vec::new();
input.apply(|node| {
- if let LogicalPlan::Filter(filter) = node {
- // Split AND predicates into individual expressions
-
filters.extend(split_conjunction(&filter.predicate).into_iter().cloned());
+ match node {
+ LogicalPlan::Filter(filter) => {
+ // Split AND predicates into individual expressions
+
filters.extend(split_conjunction(&filter.predicate).into_iter().cloned());
+ }
+ LogicalPlan::TableScan(TableScan {
+ filters: scan_filters,
+ ..
+ }) => {
+ for filter in scan_filters {
+
filters.extend(split_conjunction(filter).into_iter().cloned());
+ }
+ }
+ _ => {}
}
Ok(TreeNodeRecursion::Continue)
})?;
- // Strip table qualifiers from column references
- filters.into_iter().map(strip_column_qualifiers).collect()
+ // Strip table qualifiers from column references and deduplicate.
+ // Deduplication is necessary because filters may appear in both Filter
nodes
+ // and TableScan.filters when the optimizer pushes some predicates down.
+ // We deduplicate by (unqualified) expression to avoid passing the same
filter twice.
+ let mut seen_filters = HashSet::new();
+ let deduped = filters
Review Comment:
Dedup is done after stripping qualifiers; that can collapse distinct
qualified predicates in multi-scan plans. I would either enforce
single-target-scan eligibility or adjust dedup so it can't drop distinct
predicates.
##########
datafusion/core/tests/custom_sources_cases/dml_planning.rs:
##########
@@ -91,6 +107,13 @@ impl TableProvider for CaptureDeleteProvider {
Field::new("count", DataType::UInt64, false),
])))))
}
+
+ fn supports_filters_pushdown(
Review Comment:
Right now pushdown mode is uniform for all filters. To test mixed-location
behavior (Inexact vs Unsupported per conjunct), I would consider making this
provider return per-filter decisions so we can force residual and pushed-down
predicates in one query.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]