This is an automated email from the ASF dual-hosted git repository.
comphead pushed a commit to branch branch-53
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/branch-53 by this push:
new 05e00aeb17 [branch-53] Fix DELETE/UPDATE filter extraction when
predicates are pushed down into TableScan (#19884) (#20898)
05e00aeb17 is described below
commit 05e00aeb17fc17adc3ffea90de8c786b7d8a9673
Author: Andrew Lamb <[email protected]>
AuthorDate: Thu Mar 12 11:59:10 2026 -0400
[branch-53] Fix DELETE/UPDATE filter extraction when predicates are pushed
down into TableScan (#19884) (#20898)
- Part of https://github.com/apache/datafusion/issues/19692
- Closes https://github.com/apache/datafusion/issues/19840 on branch-53
This PR:
- Backports https://github.com/apache/datafusion/pull/19884 from @kosiew
to the branch-53 line
Co-authored-by: kosiew <[email protected]>
---
datafusion/core/src/physical_planner.rs | 153 +++++++-
.../tests/custom_sources_cases/dml_planning.rs | 426 ++++++++++++++++++++-
datafusion/sql/src/statement.rs | 11 +-
datafusion/sqllogictest/test_files/update.slt | 63 +--
4 files changed, 610 insertions(+), 43 deletions(-)
diff --git a/datafusion/core/src/physical_planner.rs
b/datafusion/core/src/physical_planner.rs
index 14f3e5cf03..12406b6c29 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -18,7 +18,7 @@
//! Planner for [`LogicalPlan`] to [`ExecutionPlan`]
use std::borrow::Cow;
-use std::collections::HashMap;
+use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use crate::datasource::file_format::file_type_to_format;
@@ -84,7 +84,7 @@ use datafusion_expr::expr::{
};
use datafusion_expr::expr_rewriter::unnormalize_cols;
use
datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary;
-use datafusion_expr::utils::split_conjunction;
+use datafusion_expr::utils::{expr_to_columns, split_conjunction};
use datafusion_expr::{
Analyze, BinaryExpr, DescribeTable, DmlStatement, Explain, ExplainFormat,
Extension,
FetchType, Filter, JoinType, Operator, RecursiveQuery, SkipType,
StringifiedPlan,
@@ -757,7 +757,7 @@ impl DefaultPhysicalPlanner {
if let Some(provider) =
target.as_any().downcast_ref::<DefaultTableSource>()
{
- let filters = extract_dml_filters(input)?;
+ let filters = extract_dml_filters(input, table_name)?;
provider
.table_provider
.delete_from(session_state, filters)
@@ -783,7 +783,7 @@ impl DefaultPhysicalPlanner {
{
// For UPDATE, the assignments are encoded in the
projection of input
// We pass the filters and let the provider handle the
projection
- let filters = extract_dml_filters(input)?;
+ let filters = extract_dml_filters(input, table_name)?;
// Extract assignments from the projection in input plan
let assignments = extract_update_assignments(input)?;
provider
@@ -2067,24 +2067,149 @@ fn get_physical_expr_pair(
}
/// Extract filter predicates from a DML input plan (DELETE/UPDATE).
-/// Walks the logical plan tree and collects Filter predicates,
-/// splitting AND conjunctions into individual expressions.
-/// Column qualifiers are stripped so expressions can be evaluated against
-/// the TableProvider's schema.
///
-fn extract_dml_filters(input: &Arc<LogicalPlan>) -> Result<Vec<Expr>> {
+/// Walks the logical plan tree and collects Filter predicates and any filters
+/// pushed down into TableScan nodes, splitting AND conjunctions into
individual expressions.
+///
+/// For UPDATE...FROM queries involving multiple tables, this function only
extracts predicates
+/// that reference the target table. Filters from source table scans are
excluded to prevent
+/// incorrect filter semantics.
+///
+/// Column qualifiers are stripped so expressions can be evaluated against the
TableProvider's
+/// schema. Deduplication is performed because filters may appear in both
Filter nodes and
+/// TableScan.filters when the optimizer performs partial (Inexact) filter
pushdown.
+///
+/// # Parameters
+/// - `input`: The logical plan tree to extract filters from (typically a
DELETE or UPDATE plan)
+/// - `target`: The target table reference to scope filter extraction
(prevents multi-table filter leakage)
+///
+/// # Returns
+/// A vector of unqualified filter expressions that can be passed to the
TableProvider for execution.
+/// Returns an empty vector if no applicable filters are found.
+///
+fn extract_dml_filters(
+ input: &Arc<LogicalPlan>,
+ target: &TableReference,
+) -> Result<Vec<Expr>> {
let mut filters = Vec::new();
+ let mut allowed_refs = vec![target.clone()];
+
+ // First pass: collect any alias references to the target table
+ input.apply(|node| {
+ if let LogicalPlan::SubqueryAlias(alias) = node
+ // Check if this alias points to the target table
+ && let LogicalPlan::TableScan(scan) = alias.input.as_ref()
+ && scan.table_name.resolved_eq(target)
+ {
+ allowed_refs.push(TableReference::bare(alias.alias.to_string()));
+ }
+ Ok(TreeNodeRecursion::Continue)
+ })?;
input.apply(|node| {
- if let LogicalPlan::Filter(filter) = node {
- // Split AND predicates into individual expressions
-
filters.extend(split_conjunction(&filter.predicate).into_iter().cloned());
+ match node {
+ LogicalPlan::Filter(filter) => {
+ // Split AND predicates into individual expressions
+ for predicate in split_conjunction(&filter.predicate) {
+ if predicate_is_on_target_multi(predicate, &allowed_refs)?
{
+ filters.push(predicate.clone());
+ }
+ }
+ }
+ LogicalPlan::TableScan(TableScan {
+ table_name,
+ filters: scan_filters,
+ ..
+ }) => {
+ // Only extract filters from the target table scan.
+ // This prevents incorrect filter extraction in UPDATE...FROM
scenarios
+ // where multiple table scans may have filters.
+ if table_name.resolved_eq(target) {
+ for filter in scan_filters {
+
filters.extend(split_conjunction(filter).into_iter().cloned());
+ }
+ }
+ }
+ // Plans without filter information
+ LogicalPlan::EmptyRelation(_)
+ | LogicalPlan::Values(_)
+ | LogicalPlan::DescribeTable(_)
+ | LogicalPlan::Explain(_)
+ | LogicalPlan::Analyze(_)
+ | LogicalPlan::Distinct(_)
+ | LogicalPlan::Extension(_)
+ | LogicalPlan::Statement(_)
+ | LogicalPlan::Dml(_)
+ | LogicalPlan::Ddl(_)
+ | LogicalPlan::Copy(_)
+ | LogicalPlan::Unnest(_)
+ | LogicalPlan::RecursiveQuery(_) => {
+ // No filters to extract from leaf/meta plans
+ }
+ // Plans with inputs (may contain filters in children)
+ LogicalPlan::Projection(_)
+ | LogicalPlan::SubqueryAlias(_)
+ | LogicalPlan::Limit(_)
+ | LogicalPlan::Sort(_)
+ | LogicalPlan::Union(_)
+ | LogicalPlan::Join(_)
+ | LogicalPlan::Repartition(_)
+ | LogicalPlan::Aggregate(_)
+ | LogicalPlan::Window(_)
+ | LogicalPlan::Subquery(_) => {
+ // Filter information may appear in child nodes; continue
traversal
+ // to extract filters from Filter/TableScan nodes deeper in
the plan
+ }
}
Ok(TreeNodeRecursion::Continue)
})?;
- // Strip table qualifiers from column references
- filters.into_iter().map(strip_column_qualifiers).collect()
+ // Strip qualifiers and deduplicate. This ensures:
+ // 1. Only target-table predicates are retained from Filter nodes
+ // 2. Qualifiers stripped for TableProvider compatibility
+ // 3. Duplicates removed (from Filter nodes + TableScan.filters)
+ //
+ // Deduplication is necessary because filters may appear in both Filter
nodes
+ // and TableScan.filters when the optimizer performs partial (Inexact)
pushdown.
+ let mut seen_filters = HashSet::new();
+ filters
+ .into_iter()
+ .try_fold(Vec::new(), |mut deduped, filter| {
+ let unqualified = strip_column_qualifiers(filter).map_err(|e| {
+ e.context(format!(
+ "Failed to strip column qualifiers for DML filter on table
'{target}'"
+ ))
+ })?;
+ if seen_filters.insert(unqualified.clone()) {
+ deduped.push(unqualified);
+ }
+ Ok(deduped)
+ })
+}
+
+/// Determine whether a predicate references only columns from the target table
+/// or its aliases.
+///
+/// Columns may be qualified with the target table name or any of its aliases.
+/// Unqualified columns are also accepted as they implicitly belong to the
target table.
+fn predicate_is_on_target_multi(
+ expr: &Expr,
+ allowed_refs: &[TableReference],
+) -> Result<bool> {
+ let mut columns = HashSet::new();
+ expr_to_columns(expr, &mut columns)?;
+
+ // Short-circuit on first mismatch: returns false if any column references
a table not in allowed_refs.
+ // Columns are accepted if:
+ // 1. They are unqualified (no relation specified), OR
+ // 2. Their relation matches one of the allowed table references using
resolved equality
+ Ok(!columns.iter().any(|column| {
+ column.relation.as_ref().is_some_and(|relation| {
+ !allowed_refs
+ .iter()
+ .any(|allowed| relation.resolved_eq(allowed))
+ })
+ }))
}
/// Strip table qualifiers from column references in an expression.
diff --git a/datafusion/core/tests/custom_sources_cases/dml_planning.rs
b/datafusion/core/tests/custom_sources_cases/dml_planning.rs
index c53819ffcc..8c4bae5e98 100644
--- a/datafusion/core/tests/custom_sources_cases/dml_planning.rs
+++ b/datafusion/core/tests/custom_sources_cases/dml_planning.rs
@@ -25,9 +25,12 @@ use async_trait::async_trait;
use datafusion::datasource::{TableProvider, TableType};
use datafusion::error::Result;
use datafusion::execution::context::{SessionConfig, SessionContext};
-use datafusion::logical_expr::Expr;
+use datafusion::logical_expr::{
+ Expr, LogicalPlan, TableProviderFilterPushDown, TableScan,
+};
use datafusion_catalog::Session;
use datafusion_common::ScalarValue;
+use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
use datafusion_physical_plan::ExecutionPlan;
use datafusion_physical_plan::empty::EmptyExec;
@@ -35,6 +38,8 @@ use datafusion_physical_plan::empty::EmptyExec;
struct CaptureDeleteProvider {
schema: SchemaRef,
received_filters: Arc<Mutex<Option<Vec<Expr>>>>,
+ filter_pushdown: TableProviderFilterPushDown,
+ per_filter_pushdown: Option<Vec<TableProviderFilterPushDown>>,
}
impl CaptureDeleteProvider {
@@ -42,6 +47,32 @@ impl CaptureDeleteProvider {
Self {
schema,
received_filters: Arc::new(Mutex::new(None)),
+ filter_pushdown: TableProviderFilterPushDown::Unsupported,
+ per_filter_pushdown: None,
+ }
+ }
+
+ fn new_with_filter_pushdown(
+ schema: SchemaRef,
+ filter_pushdown: TableProviderFilterPushDown,
+ ) -> Self {
+ Self {
+ schema,
+ received_filters: Arc::new(Mutex::new(None)),
+ filter_pushdown,
+ per_filter_pushdown: None,
+ }
+ }
+
+ fn new_with_per_filter_pushdown(
+ schema: SchemaRef,
+ per_filter_pushdown: Vec<TableProviderFilterPushDown>,
+ ) -> Self {
+ Self {
+ schema,
+ received_filters: Arc::new(Mutex::new(None)),
+ filter_pushdown: TableProviderFilterPushDown::Unsupported,
+ per_filter_pushdown: Some(per_filter_pushdown),
}
}
@@ -92,6 +123,19 @@ impl TableProvider for CaptureDeleteProvider {
Field::new("count", DataType::UInt64, false),
])))))
}
+
+ fn supports_filters_pushdown(
+ &self,
+ filters: &[&Expr],
+ ) -> Result<Vec<TableProviderFilterPushDown>> {
+ if let Some(per_filter) = &self.per_filter_pushdown
+ && per_filter.len() == filters.len()
+ {
+ return Ok(per_filter.clone());
+ }
+
+ Ok(vec![self.filter_pushdown.clone(); filters.len()])
+ }
}
/// A TableProvider that captures filters and assignments passed to update().
@@ -100,6 +144,8 @@ struct CaptureUpdateProvider {
schema: SchemaRef,
received_filters: Arc<Mutex<Option<Vec<Expr>>>>,
received_assignments: Arc<Mutex<Option<Vec<(String, Expr)>>>>,
+ filter_pushdown: TableProviderFilterPushDown,
+ per_filter_pushdown: Option<Vec<TableProviderFilterPushDown>>,
}
impl CaptureUpdateProvider {
@@ -108,6 +154,21 @@ impl CaptureUpdateProvider {
schema,
received_filters: Arc::new(Mutex::new(None)),
received_assignments: Arc::new(Mutex::new(None)),
+ filter_pushdown: TableProviderFilterPushDown::Unsupported,
+ per_filter_pushdown: None,
+ }
+ }
+
+ fn new_with_filter_pushdown(
+ schema: SchemaRef,
+ filter_pushdown: TableProviderFilterPushDown,
+ ) -> Self {
+ Self {
+ schema,
+ received_filters: Arc::new(Mutex::new(None)),
+ received_assignments: Arc::new(Mutex::new(None)),
+ filter_pushdown,
+ per_filter_pushdown: None,
}
}
@@ -164,6 +225,19 @@ impl TableProvider for CaptureUpdateProvider {
Field::new("count", DataType::UInt64, false),
])))))
}
+
+ fn supports_filters_pushdown(
+ &self,
+ filters: &[&Expr],
+ ) -> Result<Vec<TableProviderFilterPushDown>> {
+ if let Some(per_filter) = &self.per_filter_pushdown
+ && per_filter.len() == filters.len()
+ {
+ return Ok(per_filter.clone());
+ }
+
+ Ok(vec![self.filter_pushdown.clone(); filters.len()])
+ }
}
/// A TableProvider that captures whether truncate() was called.
@@ -307,6 +381,168 @@ async fn test_delete_complex_expr() -> Result<()> {
Ok(())
}
+#[tokio::test]
+async fn test_delete_filter_pushdown_extracts_table_scan_filters() ->
Result<()> {
+ let provider = Arc::new(CaptureDeleteProvider::new_with_filter_pushdown(
+ test_schema(),
+ TableProviderFilterPushDown::Exact,
+ ));
+ let ctx = SessionContext::new();
+ ctx.register_table("t", Arc::clone(&provider) as Arc<dyn TableProvider>)?;
+
+ let df = ctx.sql("DELETE FROM t WHERE id = 1").await?;
+ let optimized_plan = df.clone().into_optimized_plan()?;
+
+ let mut scan_filters = Vec::new();
+ optimized_plan.apply(|node| {
+ if let LogicalPlan::TableScan(TableScan { filters, .. }) = node {
+ scan_filters.extend(filters.clone());
+ }
+ Ok(TreeNodeRecursion::Continue)
+ })?;
+
+ assert_eq!(scan_filters.len(), 1);
+ assert!(scan_filters[0].to_string().contains("id"));
+
+ df.collect().await?;
+
+ let filters = provider
+ .captured_filters()
+ .expect("filters should be captured");
+ assert_eq!(filters.len(), 1);
+ assert!(filters[0].to_string().contains("id"));
+ Ok(())
+}
+
+#[tokio::test]
+async fn test_delete_compound_filters_with_pushdown() -> Result<()> {
+ let provider = Arc::new(CaptureDeleteProvider::new_with_filter_pushdown(
+ test_schema(),
+ TableProviderFilterPushDown::Exact,
+ ));
+ let ctx = SessionContext::new();
+ ctx.register_table("t", Arc::clone(&provider) as Arc<dyn TableProvider>)?;
+
+ ctx.sql("DELETE FROM t WHERE id = 1 AND status = 'active'")
+ .await?
+ .collect()
+ .await?;
+
+ let filters = provider
+ .captured_filters()
+ .expect("filters should be captured");
+ // Should receive both filters, not deduplicate valid separate predicates
+ assert_eq!(
+ filters.len(),
+ 2,
+ "compound filters should not be over-suppressed"
+ );
+
+ let filter_strs: Vec<String> = filters.iter().map(|f|
f.to_string()).collect();
+ assert!(
+ filter_strs.iter().any(|s| s.contains("id")),
+ "should contain id filter"
+ );
+ assert!(
+ filter_strs.iter().any(|s| s.contains("status")),
+ "should contain status filter"
+ );
+ Ok(())
+}
+
+#[tokio::test]
+async fn test_delete_mixed_filter_locations() -> Result<()> {
+ // Test mixed-location filters: some in Filter node, some in
TableScan.filters
+ // This happens when provider uses TableProviderFilterPushDown::Inexact,
+ // meaning it can push down some predicates but not others.
+ let provider = Arc::new(CaptureDeleteProvider::new_with_filter_pushdown(
+ test_schema(),
+ TableProviderFilterPushDown::Inexact,
+ ));
+ let ctx = SessionContext::new();
+ ctx.register_table("t", Arc::clone(&provider) as Arc<dyn TableProvider>)?;
+
+ // Execute DELETE with compound WHERE clause
+ ctx.sql("DELETE FROM t WHERE id = 1 AND status = 'active'")
+ .await?
+ .collect()
+ .await?;
+
+ // Verify that both predicates are extracted and passed to delete_from(),
+ // even though they may be split between Filter node and TableScan.filters
+ let filters = provider
+ .captured_filters()
+ .expect("filters should be captured");
+ assert_eq!(
+ filters.len(),
+ 2,
+ "should extract both predicates (union of Filter and
TableScan.filters)"
+ );
+
+ let filter_strs: Vec<String> = filters.iter().map(|f|
f.to_string()).collect();
+ assert!(
+ filter_strs.iter().any(|s| s.contains("id")),
+ "should contain id filter"
+ );
+ assert!(
+ filter_strs.iter().any(|s| s.contains("status")),
+ "should contain status filter"
+ );
+ Ok(())
+}
+
+#[tokio::test]
+async fn test_delete_per_filter_pushdown_mixed_locations() -> Result<()> {
+ // Force per-filter pushdown decisions to exercise mixed locations in one
query.
+ // First predicate is pushed down (Exact), second stays as residual
(Unsupported).
+ let provider =
Arc::new(CaptureDeleteProvider::new_with_per_filter_pushdown(
+ test_schema(),
+ vec![
+ TableProviderFilterPushDown::Exact,
+ TableProviderFilterPushDown::Unsupported,
+ ],
+ ));
+
+ let ctx = SessionContext::new();
+ ctx.register_table("t", Arc::clone(&provider) as Arc<dyn TableProvider>)?;
+
+ let df = ctx
+ .sql("DELETE FROM t WHERE id = 1 AND status = 'active'")
+ .await?;
+ let optimized_plan = df.clone().into_optimized_plan()?;
+
+ // Only the first predicate should be pushed to TableScan.filters.
+ let mut scan_filters = Vec::new();
+ optimized_plan.apply(|node| {
+ if let LogicalPlan::TableScan(TableScan { filters, .. }) = node {
+ scan_filters.extend(filters.clone());
+ }
+ Ok(TreeNodeRecursion::Continue)
+ })?;
+ assert_eq!(scan_filters.len(), 1);
+ assert!(scan_filters[0].to_string().contains("id"));
+
+ // Both predicates should still reach the provider (union + dedup
behavior).
+ df.collect().await?;
+
+ let filters = provider
+ .captured_filters()
+ .expect("filters should be captured");
+ assert_eq!(filters.len(), 2);
+
+ let filter_strs: Vec<String> = filters.iter().map(|f|
f.to_string()).collect();
+ assert!(
+ filter_strs.iter().any(|s| s.contains("id")),
+ "should contain pushed-down id filter"
+ );
+ assert!(
+ filter_strs.iter().any(|s| s.contains("status")),
+ "should contain residual status filter"
+ );
+
+ Ok(())
+}
+
#[tokio::test]
async fn test_update_assignments() -> Result<()> {
let provider = Arc::new(CaptureUpdateProvider::new(test_schema()));
@@ -330,6 +566,80 @@ async fn test_update_assignments() -> Result<()> {
Ok(())
}
+#[tokio::test]
+async fn test_update_filter_pushdown_extracts_table_scan_filters() ->
Result<()> {
+ let provider = Arc::new(CaptureUpdateProvider::new_with_filter_pushdown(
+ test_schema(),
+ TableProviderFilterPushDown::Exact,
+ ));
+ let ctx = SessionContext::new();
+ ctx.register_table("t", Arc::clone(&provider) as Arc<dyn TableProvider>)?;
+
+ let df = ctx.sql("UPDATE t SET value = 100 WHERE id = 1").await?;
+ let optimized_plan = df.clone().into_optimized_plan()?;
+
+ // Verify that the optimizer pushed down the filter into TableScan
+ let mut scan_filters = Vec::new();
+ optimized_plan.apply(|node| {
+ if let LogicalPlan::TableScan(TableScan { filters, .. }) = node {
+ scan_filters.extend(filters.clone());
+ }
+ Ok(TreeNodeRecursion::Continue)
+ })?;
+
+ assert_eq!(scan_filters.len(), 1);
+ assert!(scan_filters[0].to_string().contains("id"));
+
+ // Execute the UPDATE and verify filters were extracted and passed to
update()
+ df.collect().await?;
+
+ let filters = provider
+ .captured_filters()
+ .expect("filters should be captured");
+ assert_eq!(filters.len(), 1);
+ assert!(filters[0].to_string().contains("id"));
+ Ok(())
+}
+
+#[tokio::test]
+async fn test_update_filter_pushdown_passes_table_scan_filters() -> Result<()>
{
+ let provider = Arc::new(CaptureUpdateProvider::new_with_filter_pushdown(
+ test_schema(),
+ TableProviderFilterPushDown::Exact,
+ ));
+ let ctx = SessionContext::new();
+ ctx.register_table("t", Arc::clone(&provider) as Arc<dyn TableProvider>)?;
+
+ let df = ctx
+ .sql("UPDATE t SET value = 42 WHERE status = 'ready'")
+ .await?;
+ let optimized_plan = df.clone().into_optimized_plan()?;
+
+ let mut scan_filters = Vec::new();
+ optimized_plan.apply(|node| {
+ if let LogicalPlan::TableScan(TableScan { filters, .. }) = node {
+ scan_filters.extend(filters.clone());
+ }
+ Ok(TreeNodeRecursion::Continue)
+ })?;
+
+ assert!(
+ !scan_filters.is_empty(),
+ "expected filter pushdown to populate TableScan filters"
+ );
+
+ df.collect().await?;
+
+ let filters = provider
+ .captured_filters()
+ .expect("filters should be captured");
+ assert!(
+ !filters.is_empty(),
+ "expected filters extracted from TableScan during UPDATE"
+ );
+ Ok(())
+}
+
#[tokio::test]
async fn test_truncate_calls_provider() -> Result<()> {
let provider = Arc::new(CaptureTruncateProvider::new(test_schema()));
@@ -379,6 +689,120 @@ async fn test_unsupported_table_update() -> Result<()> {
Ok(())
}
+#[tokio::test]
+async fn test_delete_target_table_scoping() -> Result<()> {
+ // Test that DELETE only extracts filters from the target table,
+ // not from other tables (important for DELETE...FROM safety)
+ let target_provider =
Arc::new(CaptureDeleteProvider::new_with_filter_pushdown(
+ test_schema(),
+ TableProviderFilterPushDown::Exact,
+ ));
+ let ctx = SessionContext::new();
+ ctx.register_table(
+ "target_t",
+ Arc::clone(&target_provider) as Arc<dyn TableProvider>,
+ )?;
+
+ // For now, we test single-table DELETE
+ // and validate that the scoping logic is correct
+ let df = ctx.sql("DELETE FROM target_t WHERE id > 5").await?;
+ df.collect().await?;
+
+ let filters = target_provider
+ .captured_filters()
+ .expect("filters should be captured");
+ assert_eq!(filters.len(), 1);
+ assert!(
+ filters[0].to_string().contains("id"),
+ "Filter should be for id column"
+ );
+ assert!(
+ filters[0].to_string().contains("5"),
+ "Filter should contain the value 5"
+ );
+ Ok(())
+}
+
+#[tokio::test]
+async fn test_update_from_drops_non_target_predicates() -> Result<()> {
+ // UPDATE ... FROM is currently not working
+ // TODO fix https://github.com/apache/datafusion/issues/19950
+ let target_provider =
Arc::new(CaptureUpdateProvider::new_with_filter_pushdown(
+ test_schema(),
+ TableProviderFilterPushDown::Exact,
+ ));
+ let ctx = SessionContext::new();
+ ctx.register_table("t1", Arc::clone(&target_provider) as Arc<dyn
TableProvider>)?;
+
+ let source_schema = Arc::new(Schema::new(vec![
+ Field::new("id", DataType::Int32, false),
+ Field::new("status", DataType::Utf8, true),
+ // t2-only column to avoid false negatives after qualifier stripping
+ Field::new("src_only", DataType::Utf8, true),
+ ]));
+ let source_table =
datafusion::datasource::empty::EmptyTable::new(source_schema);
+ ctx.register_table("t2", Arc::new(source_table))?;
+
+ let result = ctx
+ .sql(
+ "UPDATE t1 SET value = 1 FROM t2 \
+ WHERE t1.id = t2.id AND t2.src_only = 'active' AND t1.value > 10",
+ )
+ .await;
+
+ // Verify UPDATE ... FROM is rejected with appropriate error
+ // TODO fix https://github.com/apache/datafusion/issues/19950
+ assert!(result.is_err());
+ let err = result.unwrap_err();
+ assert!(
+ err.to_string().contains("UPDATE ... FROM is not supported"),
+ "Expected 'UPDATE ... FROM is not supported' error, got: {err}"
+ );
+ Ok(())
+}
+
+#[tokio::test]
+async fn test_delete_qualifier_stripping_and_validation() -> Result<()> {
+ // Test that filter qualifiers are properly stripped and validated
+ // Unqualified predicates should work fine
+ let provider = Arc::new(CaptureDeleteProvider::new_with_filter_pushdown(
+ test_schema(),
+ TableProviderFilterPushDown::Exact,
+ ));
+ let ctx = SessionContext::new();
+ ctx.register_table("t", Arc::clone(&provider) as Arc<dyn TableProvider>)?;
+
+ // Execute DELETE with unqualified column reference
+ // (After parsing, the planner adds qualifiers, but our validation should
accept them)
+ let df = ctx.sql("DELETE FROM t WHERE id = 1").await?;
+ df.collect().await?;
+
+ let filters = provider
+ .captured_filters()
+ .expect("filters should be captured");
+ assert!(!filters.is_empty(), "Should have extracted filter");
+
+ // Verify qualifiers are stripped: check that Column expressions have no
qualifier
+ let has_qualified_column = filters[0]
+ .exists(|expr| Ok(matches!(expr, Expr::Column(col) if
col.relation.is_some())))?;
+ assert!(
+ !has_qualified_column,
+ "Filter should have unqualified columns after stripping"
+ );
+
+ // Also verify the string representation doesn't contain table qualifiers
+ let filter_str = filters[0].to_string();
+ assert!(
+ !filter_str.contains("t.id"),
+ "Filter should not contain qualified column reference, got:
{filter_str}"
+ );
+ assert!(
+ filter_str.contains("id") || filter_str.contains("1"),
+ "Filter should reference id column or the value 1, got: {filter_str}"
+ );
+ Ok(())
+}
+
#[tokio::test]
async fn test_unsupported_table_truncate() -> Result<()> {
let schema = test_schema();
diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs
index 32bc8cb244..b91e38e537 100644
--- a/datafusion/sql/src/statement.rs
+++ b/datafusion/sql/src/statement.rs
@@ -1078,9 +1078,18 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
});
// TODO: support multiple tables in UPDATE SET FROM
if from_clauses.as_ref().is_some_and(|f| f.len() > 1) {
- plan_err!("Multiple tables in UPDATE SET FROM not yet
supported")?;
+ not_impl_err!(
+ "Multiple tables in UPDATE SET FROM not yet supported"
+ )?;
}
let update_from = from_clauses.and_then(|mut f| f.pop());
+
+ // UPDATE ... FROM is currently not working
+ // TODO fix https://github.com/apache/datafusion/issues/19950
+ if update_from.is_some() {
+ return not_impl_err!("UPDATE ... FROM is not supported");
+ }
+
if returning.is_some() {
plan_err!("Update-returning clause not yet supported")?;
}
diff --git a/datafusion/sqllogictest/test_files/update.slt
b/datafusion/sqllogictest/test_files/update.slt
index a652ae7633..1cd2b626e3 100644
--- a/datafusion/sqllogictest/test_files/update.slt
+++ b/datafusion/sqllogictest/test_files/update.slt
@@ -67,39 +67,48 @@ logical_plan
physical_plan_error This feature is not implemented: Physical plan does not
support logical expression ScalarSubquery(<subquery>)
# set from other table
-query TT
+# UPDATE ... FROM is currently unsupported
+# TODO fix https://github.com/apache/datafusion/issues/19950
+query error DataFusion error: This feature is not implemented: UPDATE ... FROM
is not supported
explain update t1 set b = t2.b, c = t2.a, d = 1 from t2 where t1.a = t2.a and
t1.b > 'foo' and t2.c > 1.0;
-----
-logical_plan
-01)Dml: op=[Update] table=[t1]
-02)--Projection: t1.a AS a, t2.b AS b, CAST(t2.a AS Float64) AS c,
CAST(Int64(1) AS Int32) AS d
-03)----Filter: t1.a = t2.a AND t1.b > CAST(Utf8("foo") AS Utf8View) AND t2.c >
Float64(1)
-04)------Cross Join:
-05)--------TableScan: t1
-06)--------TableScan: t2
-physical_plan
-01)CooperativeExec
-02)--DmlResultExec: rows_affected=0
+# test update from other table with actual data
statement ok
-create table t3(a int, b varchar, c double, d int);
+insert into t1 values (1, 'zoo', 2.0, 10), (2, 'qux', 3.0, 20), (3, 'bar',
4.0, 30);
+
+statement ok
+insert into t2 values (1, 'updated_b', 5.0, 40), (2, 'updated_b2', 2.5, 50),
(4, 'updated_b3', 1.5, 60);
+
+# UPDATE ... FROM is currently unsupported - qualifier stripping breaks source
column references
+# causing assignments like 'b = t2.b' to resolve to target table's 'b' instead
of source table's 'b'
+# TODO fix https://github.com/apache/datafusion/issues/19950
+statement error DataFusion error: This feature is not implemented: UPDATE ...
FROM is not supported
+update t1 set b = t2.b, c = t2.a, d = 1 from t2 where t1.a = t2.a and t1.b >
'foo' and t2.c > 1.0;
# set from multiple tables, DataFusion only supports from one table
-query error DataFusion error: Error during planning: Multiple tables in UPDATE
SET FROM not yet supported
+statement error DataFusion error: This feature is not implemented: Multiple
tables in UPDATE SET FROM not yet supported
explain update t1 set b = t2.b, c = t3.a, d = 1 from t2, t3 where t1.a = t2.a
and t1.a = t3.a;
# test table alias
-query TT
+# UPDATE ... FROM is currently unsupported
+# TODO fix https://github.com/apache/datafusion/issues/19950
+statement error DataFusion error: This feature is not implemented: UPDATE ...
FROM is not supported
explain update t1 as T set b = t2.b, c = t.a, d = 1 from t2 where t.a = t2.a
and t.b > 'foo' and t2.c > 1.0;
-----
-logical_plan
-01)Dml: op=[Update] table=[t1]
-02)--Projection: t.a AS a, t2.b AS b, CAST(t.a AS Float64) AS c, CAST(Int64(1)
AS Int32) AS d
-03)----Filter: t.a = t2.a AND t.b > CAST(Utf8("foo") AS Utf8View) AND t2.c >
Float64(1)
-04)------Cross Join:
-05)--------SubqueryAlias: t
-06)----------TableScan: t1
-07)--------TableScan: t2
-physical_plan
-01)CooperativeExec
-02)--DmlResultExec: rows_affected=0
+
+# test update with table alias with actual data
+statement ok
+delete from t1;
+
+statement ok
+delete from t2;
+
+statement ok
+insert into t1 values (1, 'zebra', 1.5, 5), (2, 'wolf', 2.0, 10), (3, 'apple',
3.5, 15);
+
+statement ok
+insert into t2 values (1, 'new_val', 2.0, 100), (2, 'new_val2', 1.5, 200);
+
+# UPDATE ... FROM is currently unsupported
+# TODO fix https://github.com/apache/datafusion/issues/19950
+statement error DataFusion error: This feature is not implemented: UPDATE ...
FROM is not supported
+update t1 as T set b = t2.b, c = t.a, d = 1 from t2 where t.a = t2.a and t.b >
'foo' and t2.c > 1.0;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]