alamb commented on code in PR #4902:
URL: https://github.com/apache/arrow-datafusion/pull/4902#discussion_r1070551813


##########
datafusion/sql/src/statement.rs:
##########
@@ -505,6 +533,192 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         }))
     }
 
+    fn delete_to_plan(
+        &self,
+        table_factor: TableFactor,
+        predicate_expr: Option<Expr>,
+    ) -> Result<LogicalPlan> {
+        let table_name = match &table_factor {
+            TableFactor::Table { name, .. } => name.clone(),
+            _ => Err(DataFusionError::Plan(
+                "Unsupported table type for delete!".to_string(),
+            ))?,
+        };
+
+        // Do a table lookup to verify the table exists
+        let table_ref = object_name_to_table_reference(table_name.clone())?;
+        let provider = self
+            .schema_provider
+            .get_table_provider((&table_ref).into())?;
+        let schema = (*provider.schema()).clone();
+        let schema = DFSchema::try_from(schema)?;
+        let scan =
+            LogicalPlanBuilder::scan(table_name.to_string(), provider, 
None)?.build()?;
+        let mut planner_context = PlannerContext::new();
+
+        let source = match predicate_expr {
+            None => scan,
+            Some(predicate_expr) => {
+                let filter_expr =
+                    self.sql_to_expr(predicate_expr, &schema, &mut 
planner_context)?;
+                let schema = Arc::new(schema.clone());
+                let mut using_columns = HashSet::new();
+                expr_to_columns(&filter_expr, &mut using_columns)?;
+                let filter_expr = normalize_col_with_schemas(
+                    filter_expr,
+                    &[&schema],
+                    &[using_columns],
+                )?;
+                LogicalPlan::Filter(Filter::try_new(filter_expr, 
Arc::new(scan))?)
+            }
+        };
+
+        let plan = LogicalPlan::Write(WriteRel {
+            table_name: table_ref,
+            table_schema: schema.into(),
+            op: WriteOp::Delete,
+            input: Arc::new(source),
+        });
+        Ok(plan)
+    }
+
+    fn update_to_plan(
+        &self,
+        table: TableWithJoins,
+        assignments: Vec<Assignment>,
+        from: Option<TableWithJoins>,
+        predicate_expr: Option<Expr>,
+    ) -> Result<LogicalPlan> {
+        let table_name = match &table.relation {
+            TableFactor::Table { name, .. } => name.clone(),
+            _ => Err(DataFusionError::Plan(
+                "Unsupported table type for update!".to_string(),
+            ))?,
+        };
+
+        // Do a table lookup to verify the table exists
+        let table_ref = object_name_to_table_reference(table_name)?;
+        let provider = self
+            .schema_provider
+            .get_table_provider((&table_ref).into())?;
+
+        // Build schema
+        let mut planner_context = PlannerContext::new();
+        let table_schema = provider.schema();
+        let mut fields = vec![];
+        let mut values = vec![];
+        for assign in assignments.iter() {
+            let col_name: &Ident = assign
+                .id
+                .iter()
+                .last()
+                .ok_or(DataFusionError::Plan("Empty column id".to_string()))?;
+            let col_name = col_name.value.as_str();
+            values.push((col_name.to_string(), assign.value.clone()));
+            let (_, field) =
+                table_schema
+                    .column_with_name(col_name)
+                    .ok_or(DataFusionError::Plan(format!(
+                        "Column not found: {col_name}"
+                    )))?;
+            let field = DFField::from(field.clone());
+            fields.push(field);
+        }
+
+        // Build scan
+        let from = from.unwrap_or(table);
+        let scan = self.plan_from_tables(vec![from], &mut planner_context)?;
+
+        // Filter
+        let source = match predicate_expr {
+            None => scan,
+            Some(predicate_expr) => {
+                let plan_schema = scan.schema();
+                let filter_expr =
+                    self.sql_to_expr(predicate_expr, plan_schema, &mut 
planner_context)?;
+                let schema = Arc::new(plan_schema.clone());
+                let mut using_columns = HashSet::new();
+                expr_to_columns(&filter_expr, &mut using_columns)?;
+                for use_col in using_columns.iter() {
+                    let col_name = use_col.name.clone();
+                    let rel = use_col.relation.as_ref().cloned();
+                    let field = plan_schema
+                        .field_with_name(rel.as_deref(), 
use_col.name.as_str())?
+                        .clone();
+                    fields.push(field.clone());
+                    values.push((
+                        col_name.clone(),
+                        
ast::Expr::Identifier(ast::Ident::from(col_name.as_str())),
+                    ));
+                }
+                let filter_expr = normalize_col_with_schemas(
+                    filter_expr,
+                    &[&schema],
+                    &[using_columns],
+                )?;
+                LogicalPlan::Filter(Filter::try_new(filter_expr, 
Arc::new(scan))?)
+            }
+        };
+
+        // Projection
+        let proj_schema = DFSchema::new_with_metadata(fields, HashMap::new())?;
+        let mut exprs = vec![];
+        for (col_name, expr) in values.into_iter() {
+            let expr = self.sql_to_expr(expr, &proj_schema, &mut 
planner_context)?;
+            let expr = expr.alias(col_name);
+            exprs.push(expr);
+        }
+        let source = project(source, exprs)?;
+
+        let plan = LogicalPlan::Write(WriteRel {
+            table_name: table_ref,
+            table_schema: proj_schema.into(),

Review Comment:
   The way I have seen updates work in the past is that an operation like 
`UPDATE` with a where clause looks something like the following (this is what 
you have in the plans)
   
   ```sql
   UPDATE foo set col = 'new_val' where x = 5
   ```
   
   The corresponding plan input looks something like
   
   ```text
   Update
      Projection: 'new_val' as col, x, y, z, ...
        Filter: expr x=5
          Table Scan foo cols = *
   ```
   
   aka the engine creates a plan that effectively selects the rows to update 
that have the same schema as the target table:
   
   ```
   select 'new_val' as col, x, y, z from foo where x = 5;
   ```
   
   
   The exact details of what is selected / passed through differs by 
implementation (e.g. a row-id )
   
   So this is a long winded way of saying I would expect the schema on the 
writerel node to be the target table's schema



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to