jerry-024 commented on code in PR #269:
URL: https://github.com/apache/paimon-rust/pull/269#discussion_r3109203003


##########
crates/integrations/datafusion/src/update.rs:
##########
@@ -141,6 +157,158 @@ async fn execute_update_once(
     ok_result(ctx, total_count)
 }
 
+// ---------------------------------------------------------------------------
+// Copy-on-Write path (append-only tables, no PK)
+// ---------------------------------------------------------------------------
+
+/// Execute UPDATE on an append-only table with retry on delete conflict.
+async fn execute_cow_update(
+    ctx: &SessionContext,
+    update: &Update,
+    table: &Table,
+) -> DFResult<DataFrame> {
+    retry_on_conflict("CoW UPDATE", is_delete_conflict, || {
+        execute_cow_update_once(ctx, update, table)
+    })
+    .await
+}
+
+/// Single attempt of CoW UPDATE execution.
+async fn execute_cow_update_once(
+    ctx: &SessionContext,
+    update: &Update,
+    table: &Table,
+) -> DFResult<DataFrame> {
+    let (columns, exprs) = extract_set_assignments(update)?;
+
+    let table_ref = update.table.to_string();
+    let where_str = update.selection.as_ref().map(|e| e.to_string());
+    let partition_set =
+        build_partition_set_from_where(ctx, table, &table_ref, 
where_str.as_deref()).await?;
+
+    let mut writer = CopyOnWriteMergeWriter::new(table, columns.clone(), 
partition_set)
+        .await
+        .map_err(to_datafusion_error)?;
+
+    let (has_data, cow_table_name) = register_cow_target_table(ctx, table, 
&writer).await?;
+    if !has_data {
+        return ok_result(ctx, 0);
+    }
+
+    let result =
+        execute_cow_update_inner(ctx, &columns, &exprs, &cow_table_name, 
update, &mut writer).await;
+    let _ = ctx.deregister_table(&cow_table_name);
+    let total_count = result?;
+
+    let messages = writer.prepare_commit().await.map_err(to_datafusion_error)?;
+    if !messages.is_empty() {
+        let commit = table.new_write_builder().new_commit();
+        commit.commit(messages).await.map_err(to_datafusion_error)?;
+    }
+
+    ok_result(ctx, total_count)
+}
+
+async fn execute_cow_update_inner(
+    ctx: &SessionContext,
+    columns: &[String],
+    exprs: &[String],
+    cow_table_name: &str,
+    update: &Update,
+    writer: &mut CopyOnWriteMergeWriter,
+) -> DFResult<u64> {
+    let select_parts: Vec<String> = 
std::iter::once("\"__paimon_file_idx\"".to_string())
+        .chain(std::iter::once("\"__paimon_row_offset\"".to_string()))
+        .chain(
+            columns
+                .iter()
+                .zip(exprs.iter())
+                .map(|(col, expr)| format!("{expr} AS \"__upd_{col}\"")),

Review Comment:
   **Must fix: inconsistent identifier quoting.** Here `col` is raw `id.value` 
(unescaped) wrapped as `"__upd_{col}"`. Elsewhere (`insert_select_clause`) 
`ins.columns` comes from `Ident::to_string()` (already quoted) then gets 
wrapped again in `"{field}"` — double-quoted. Same statement emits inconsistent 
SQL for a column containing `"`, and a user with DDL privileges who creates a 
column named `x" FROM t; DROP TABLE t; --` gets genuine injection. Fix: one 
helper `fn quote_identifier(name: &str) -> String { Ident { value: name.into(), 
quote_style: Some('"') }.to_string() }` and route every identifier 
interpolation through it. Remove the double-wrap in `insert_select_clause`.



##########
crates/integrations/datafusion/src/merge_into.rs:
##########
@@ -582,29 +1074,320 @@ fn extract_source_ref(source: &TableFactor) -> 
DFResult<(String, Option<String>)
     }
 }
 
-/// Return a DataFrame with a single "count" column.
-pub(crate) fn ok_result(ctx: &SessionContext, count: u64) -> 
DFResult<DataFrame> {
-    let schema = Arc::new(Schema::new(vec![Field::new(
-        "count",
-        ArrowDataType::UInt64,
-        false,
-    )]));
-    let batch = RecordBatch::try_new(
-        schema.clone(),
-        vec![Arc::new(UInt64Array::from(vec![count]))],
-    )?;
-    ctx.read_batch(batch)
+/// Extract __paimon_file_idx and __paimon_row_offset columns from a JOIN 
result batch.
+pub(crate) fn extract_tracking_columns(
+    batch: &RecordBatch,
+) -> DFResult<(&Int32Array, &UInt32Array)> {
+    let file_idx_col = batch
+        .column_by_name("__paimon_file_idx")
+        .ok_or_else(|| DataFusionError::Internal("__paimon_file_idx not 
found".to_string()))?
+        .as_any()
+        .downcast_ref::<Int32Array>()
+        .ok_or_else(|| DataFusionError::Internal("__paimon_file_idx is not 
Int32".to_string()))?;
+
+    let row_offset_col = batch
+        .column_by_name("__paimon_row_offset")
+        .ok_or_else(|| DataFusionError::Internal("__paimon_row_offset not 
found".to_string()))?
+        .as_any()
+        .downcast_ref::<UInt32Array>()
+        .ok_or_else(|| {
+            DataFusionError::Internal("__paimon_row_offset is not 
UInt32".to_string())
+        })?;
+
+    Ok((file_idx_col, row_offset_col))
 }
 
-#[cfg(test)]
-mod tests {
-    use super::*;
-    use datafusion::prelude::SessionContext;
-    use datafusion::sql::sqlparser::dialect::GenericDialect;
-    use datafusion::sql::sqlparser::parser::Parser;
-    use paimon::catalog::{Catalog, Identifier};
-    use paimon::io::FileIOBuilder;
-    use paimon::spec::{DataType, IntType, Schema as PaimonSchema, TableSchema};
+/// Read all files from a table via the CoW writer's file index, attach 
`__paimon_file_idx`
+/// and `__paimon_row_offset` tracking columns, and register the result as a 
MemTable.
+///
+/// Returns `(has_data, table_name)`. The caller must deregister the table 
when done.
+///
+/// Note: all matching partition files are loaded into memory at once. For 
partitions
+/// with many large files this may cause significant memory pressure. A future
+/// optimization could stream or batch-process files instead of materializing 
everything.
+pub(crate) async fn register_cow_target_table(
+    ctx: &SessionContext,
+    table: &Table,
+    writer: &CopyOnWriteMergeWriter,
+) -> DFResult<(bool, String)> {
+    let file_index = writer.file_index();
+    if file_index.is_empty() {
+        let table_name = next_cow_table_name("__cow_target");
+        return Ok((false, table_name));
+    }
+
+    // Read all files in parallel
+    let read_futures: Vec<_> = file_index
+        .iter()
+        .enumerate()
+        .map(|(file_idx, file_info)| async move {
+            let single_split = DataSplitBuilder::new()
+                .with_snapshot(file_info.snapshot_id)
+                .with_partition(
+                    
paimon::spec::BinaryRow::from_serialized_bytes(&file_info.partition)
+                        .map_err(to_datafusion_error)?,
+                )
+                .with_bucket(file_info.bucket)
+                .with_bucket_path(file_info.bucket_path.clone())
+                .with_total_buckets(file_info.total_buckets)
+                .with_data_files(vec![file_info.file_meta.clone()])
+                .build()
+                .map_err(to_datafusion_error)?;
+
+            let read = table
+                .new_read_builder()
+                .new_read()
+                .map_err(to_datafusion_error)?;
+            let batches: Vec<RecordBatch> = read
+                .to_arrow(&[single_split])
+                .map_err(to_datafusion_error)?
+                .try_collect()
+                .await
+                .map_err(to_datafusion_error)?;
+
+            Ok::<_, DataFusionError>((file_idx, batches))
+        })
+        .collect();
+
+    let file_results = futures::future::try_join_all(read_futures).await?;
+
+    let mut all_batches: Vec<RecordBatch> = Vec::new();
+    let mut schema: Option<Arc<Schema>> = None;
+
+    for (file_idx, batches) in file_results {
+        let mut row_offset = 0u32;
+        for batch in batches {
+            let num_rows = batch.num_rows();
+            if num_rows == 0 {
+                continue;
+            }
+
+            let file_idx_i32 = i32::try_from(file_idx).map_err(|_| {
+                DataFusionError::Internal(format!("file_idx {file_idx} exceeds 
i32 range"))
+            })?;
+            let num_rows_u32 = u32::try_from(num_rows).map_err(|_| {
+                DataFusionError::Internal(format!("batch num_rows {num_rows} 
exceeds u32 range"))
+            })?;
+            let file_idx_col = Arc::new(Int32Array::from(vec![file_idx_i32; 
num_rows]));
+            let end_offset = 
row_offset.checked_add(num_rows_u32).ok_or_else(|| {
+                DataFusionError::Internal(format!(
+                    "row_offset overflow: {row_offset} + {num_rows_u32}"
+                ))
+            })?;
+            let row_offset_col = Arc::new(UInt32Array::from(
+                (row_offset..end_offset).collect::<Vec<_>>(),
+            ));
+
+            let mut fields: Vec<Field> = batch
+                .schema()
+                .fields()
+                .iter()
+                .map(|f| f.as_ref().clone())
+                .collect();
+            fields.push(Field::new("__paimon_file_idx", ArrowDataType::Int32, 
false));
+            fields.push(Field::new(
+                "__paimon_row_offset",
+                ArrowDataType::UInt32,
+                false,
+            ));
+            let augmented_schema = Arc::new(Schema::new(fields));
+
+            let mut columns: Vec<Arc<dyn Array>> = batch.columns().to_vec();
+            columns.push(file_idx_col);
+            columns.push(row_offset_col);
+
+            let augmented = RecordBatch::try_new(augmented_schema.clone(), 
columns)
+                .map_err(|e| DataFusionError::Internal(format!("Failed to 
augment batch: {e}")))?;
+
+            if schema.is_none() {
+                schema = Some(augmented.schema());
+            }
+            all_batches.push(augmented);
+            row_offset = end_offset;
+        }
+    }
+
+    let has_data = !all_batches.is_empty();
+    let table_name = next_cow_table_name("__cow_target");
+
+    if has_data {
+        let s = schema.unwrap();
+        let mem_table = MemTable::try_new(s, vec![all_batches])?;
+        ctx.register_table(&table_name, Arc::new(mem_table))?;
+    }
+
+    Ok((has_data, table_name))
+}
+
+/// Build a partition set from Arrow batches containing partition column 
values.
+///
+/// Converts each row's partition columns into serialized `BinaryRow` bytes.
+/// Returns `None` for non-partitioned tables.
+pub(crate) fn build_partition_set_from_batches(
+    table: &Table,
+    batches: &[RecordBatch],
+) -> DFResult<Option<HashSet<Vec<u8>>>> {
+    let partition_keys = table.schema().partition_keys();
+    if partition_keys.is_empty() {
+        return Ok(None);
+    }
+
+    let partition_fields = table.schema().partition_fields();
+    let mut partition_set = HashSet::new();
+
+    for batch in batches {
+        for row in 0..batch.num_rows() {
+            let datums: Vec<(Option<paimon::spec::Datum>, 
paimon::spec::DataType)> =
+                partition_fields
+                    .iter()
+                    .enumerate()
+                    .map(|(col_idx, field)| {
+                        let datum =
+                            extract_datum_from_arrow(batch, row, col_idx, 
field.data_type())
+                                .map_err(to_datafusion_error)?;
+                        Ok((datum, field.data_type().clone()))
+                    })
+                    .collect::<DFResult<_>>()?;
+            let refs: Vec<(&Option<paimon::spec::Datum>, 
&paimon::spec::DataType)> =
+                datums.iter().map(|(d, t)| (d, t)).collect();
+            partition_set.insert(datums_to_binary_row(&refs));
+        }
+    }
+
+    Ok(Some(partition_set))
+}
+
+/// Query a table for distinct partition values matching an optional WHERE 
clause.
+///
+/// Returns `None` for non-partitioned tables.
+pub(crate) async fn build_partition_set_from_where(
+    ctx: &SessionContext,
+    table: &Table,
+    table_ref: &str,
+    where_clause: Option<&str>,
+) -> DFResult<Option<HashSet<Vec<u8>>>> {
+    let partition_keys = table.schema().partition_keys();
+    if partition_keys.is_empty() {
+        return Ok(None);
+    }
+
+    let cols = partition_keys
+        .iter()
+        .map(|k| format!("\"{k}\""))
+        .collect::<Vec<_>>()
+        .join(", ");
+    let where_part = match where_clause {
+        Some(w) => format!(" WHERE {w}"),
+        None => String::new(),
+    };
+    let sql = format!("SELECT DISTINCT {cols} FROM {table_ref}{where_part}");
+    let batches = ctx.sql(&sql).await?.collect().await?;
+
+    build_partition_set_from_batches(table, &batches)
+}
+
+/// Query source table for distinct partition values and build a partition set.
+///
+/// Returns `None` for non-partitioned tables.
+async fn build_source_partition_set(
+    ctx: &SessionContext,
+    table: &Table,
+    source_ref: &str,
+    s_alias: &str,
+) -> DFResult<Option<HashSet<Vec<u8>>>> {
+    let partition_keys = table.schema().partition_keys();
+    if partition_keys.is_empty() {
+        return Ok(None);
+    }
+
+    let cols = partition_keys
+        .iter()
+        .map(|k| format!("{s_alias}.\"{k}\""))
+        .collect::<Vec<_>>()
+        .join(", ");
+    let sql = format!("SELECT DISTINCT {cols} FROM {source_ref} AS {s_alias}");

Review Comment:
   **Must fix: partitioned MERGE is incorrect.** This assumes the source has 
columns named identically to the target's partition keys. If source lacks them 
→ runtime `column not found`; if source has same-named columns with different 
meaning, or the ON maps a differently-named source column (`t.dt = 
s.event_date`), `partition_set` silently omits partitions the MERGE actually 
touches — rows that should UPDATE get inserted as NOT MATCHED. Minimum fix: 
fall back to `Ok(None)` (full-partition scan). Proper fix: resolve 
source→target partition mapping from the ON condition.



##########
crates/integrations/datafusion/src/update.rs:
##########
@@ -141,6 +157,158 @@ async fn execute_update_once(
     ok_result(ctx, total_count)
 }
 
+// ---------------------------------------------------------------------------
+// Copy-on-Write path (append-only tables, no PK)
+// ---------------------------------------------------------------------------
+
+/// Execute UPDATE on an append-only table with retry on delete conflict.
+async fn execute_cow_update(
+    ctx: &SessionContext,
+    update: &Update,
+    table: &Table,
+) -> DFResult<DataFrame> {
+    retry_on_conflict("CoW UPDATE", is_delete_conflict, || {
+        execute_cow_update_once(ctx, update, table)
+    })
+    .await
+}
+
+/// Single attempt of CoW UPDATE execution.
+async fn execute_cow_update_once(
+    ctx: &SessionContext,
+    update: &Update,
+    table: &Table,
+) -> DFResult<DataFrame> {
+    let (columns, exprs) = extract_set_assignments(update)?;
+
+    let table_ref = update.table.to_string();

Review Comment:
   **Must fix: `UPDATE t AS x` parses but fails at runtime.** 
`update.table.to_string()` keeps the alias in `table_ref`, but the inner query 
selects from the generated `__cow_target_N` (no alias), so `WHERE x.c = 1` 
resolves against a non-existent name and errors with `column x.c not found`. 
Same pattern in `delete.rs` (`extract_delete_table_ref` drops the alias but the 
user's WHERE still uses it). Minimum acceptable fix: explicitly reject 
`alias.is_some()` at both entry points with `DataFusionError::Plan("table alias 
in UPDATE/DELETE is not yet supported")`. Full fix: register the temp table 
under the user's alias.



##########
crates/integrations/datafusion/src/merge_into.rs:
##########
@@ -84,7 +113,470 @@ pub(crate) fn is_row_id_conflict(err: &DataFusionError) -> 
bool {
     }
 }
 
-/// Single attempt of MERGE INTO execution.
+/// Check if a DataFusion error is caused by a delete conflict during commit.
+pub(crate) fn is_delete_conflict(err: &DataFusionError) -> bool {
+    match err {
+        DataFusionError::External(e) => e.to_string().contains("Delete 
conflict"),
+        _ => false,
+    }
+}
+
+// ---------------------------------------------------------------------------
+// Data evolution path (existing)
+// ---------------------------------------------------------------------------
+
+/// Execute MERGE INTO on a data evolution table with retry on row ID conflict.
+async fn execute_data_evolution_merge(
+    ctx: &SessionContext,
+    merge: &Merge,
+    table: Table,
+) -> DFResult<DataFrame> {
+    retry_on_conflict("MERGE INTO", is_row_id_conflict, || {
+        execute_merge_into_once(ctx, merge, &table)
+    })
+    .await
+}
+
+// ---------------------------------------------------------------------------
+// Copy-on-Write path (append-only tables, no PK)
+// ---------------------------------------------------------------------------
+
+/// Parsed CoW merge clauses — supports UPDATE, DELETE, and INSERT.
+struct CowMergeClauses {
+    /// Ordered list of WHEN MATCHED clauses (preserves SQL ordering for 
correct semantics).
+    matched: Vec<CowMatchedClause>,
+    inserts: Vec<MergeInsertClause>,
+}
+
+/// A single WHEN MATCHED clause with optional predicate.
+struct CowMatchedClause {
+    action: CowMatchedAction,
+    predicate: Option<String>,
+}
+
+enum CowMatchedAction {
+    Update(MergeUpdateClause),
+    Delete,
+}
+
+/// Parse MERGE clauses for the CoW path (supports DELETE unlike the 
data-evolution parser).
+fn extract_cow_merge_clauses(merge: &Merge) -> DFResult<CowMergeClauses> {
+    let mut matched: Vec<CowMatchedClause> = Vec::new();
+    let mut inserts: Vec<MergeInsertClause> = Vec::new();
+
+    for clause in &merge.clauses {
+        match clause.clause_kind {
+            MergeClauseKind::Matched => {
+                let predicate = clause.predicate.as_ref().map(|p| 
p.to_string());
+                match &clause.action {
+                    MergeAction::Update(update_expr) => {
+                        let mut columns = Vec::new();
+                        let mut exprs = Vec::new();
+                        for assignment in &update_expr.assignments {
+                            let col_name = match &assignment.target {
+                                AssignmentTarget::ColumnName(name) => name
+                                    .0
+                                    .last()
+                                    .and_then(|p| p.as_ident())
+                                    .map(|id| id.value.clone())
+                                    .ok_or_else(|| {
+                                        DataFusionError::Plan(format!(
+                                            "Invalid column name in SET: 
{name}"
+                                        ))
+                                    })?,
+                                AssignmentTarget::Tuple(_) => {
+                                    return Err(DataFusionError::Plan(
+                                        "Tuple assignment in MERGE INTO SET is 
not supported"
+                                            .to_string(),
+                                    ));
+                                }
+                            };
+                            columns.push(col_name);
+                            exprs.push(assignment.value.to_string());
+                        }
+                        matched.push(CowMatchedClause {
+                            action: CowMatchedAction::Update(MergeUpdateClause 
{ columns, exprs }),
+                            predicate,
+                        });
+                    }
+                    MergeAction::Delete { .. } => {
+                        matched.push(CowMatchedClause {
+                            action: CowMatchedAction::Delete,
+                            predicate,
+                        });
+                    }
+                    MergeAction::Insert(_) => {
+                        return Err(DataFusionError::Plan(
+                            "WHEN MATCHED THEN INSERT is not valid 
SQL".to_string(),
+                        ));
+                    }
+                }
+            }
+            MergeClauseKind::NotMatched | MergeClauseKind::NotMatchedByTarget 
=> {
+                match &clause.action {
+                    MergeAction::Insert(insert_expr) => {
+                        let columns: Vec<String> =
+                            insert_expr.columns.iter().map(|c| 
c.to_string()).collect();
+                        let value_exprs = match &insert_expr.kind {
+                            MergeInsertKind::Values(values) => {
+                                if values.rows.is_empty() {
+                                    return Err(DataFusionError::Plan(
+                                        "INSERT VALUES must have at least one 
row".to_string(),
+                                    ));
+                                }
+                                values.rows[0].iter().map(|e| 
e.to_string()).collect()
+                            }
+                            MergeInsertKind::Row => Vec::new(),
+                        };
+                        let predicate = clause.predicate.as_ref().map(|p| 
p.to_string());
+                        inserts.push(MergeInsertClause {
+                            columns,
+                            value_exprs,
+                            predicate,
+                        });
+                    }
+                    _ => {
+                        return Err(DataFusionError::Plan(
+                            "WHEN NOT MATCHED only supports 
INSERT".to_string(),
+                        ));
+                    }
+                }
+            }
+            MergeClauseKind::NotMatchedBySource => {
+                return Err(DataFusionError::Plan(
+                    "WHEN NOT MATCHED BY SOURCE is not yet supported for CoW 
MERGE INTO"
+                        .to_string(),
+                ));
+            }
+        }
+    }
+
+    if matched.is_empty() && inserts.is_empty() {
+        return Err(DataFusionError::Plan(
+            "MERGE INTO requires at least one WHEN MATCHED or WHEN NOT MATCHED 
clause".to_string(),
+        ));
+    }
+
+    Ok(CowMergeClauses { matched, inserts })
+}
+
+/// Execute MERGE INTO on an append-only table with retry on delete conflict.
+async fn execute_cow_merge(
+    ctx: &SessionContext,
+    merge: &Merge,
+    table: Table,
+) -> DFResult<DataFrame> {
+    retry_on_conflict("CoW MERGE INTO", is_delete_conflict, || {
+        execute_cow_merge_once(ctx, merge, &table)
+    })
+    .await
+}
+
+/// Execute a single attempt of CoW MERGE INTO.
+async fn execute_cow_merge_once(
+    ctx: &SessionContext,
+    merge: &Merge,
+    table: &Table,
+) -> DFResult<DataFrame> {
+    let mut clauses = extract_cow_merge_clauses(merge)?;
+
+    // Collect the union of all update columns across matched clauses 
(preserving order)
+    let mut update_columns: Vec<String> = Vec::new();
+    for mc in &clauses.matched {
+        if let CowMatchedAction::Update(upd) = &mc.action {
+            for col in &upd.columns {
+                if !update_columns.contains(col) {
+                    update_columns.push(col.clone());
+                }
+            }
+        }
+    }
+
+    let (source_ref, source_alias) = extract_source_ref(&merge.source)?;
+    let (target_ref, target_alias) = extract_table_ref(&merge.table)?;
+    let on_condition = merge.on.to_string();
+    let s_alias = source_alias.as_deref().unwrap_or(&source_ref);
+    let t_alias = target_alias.as_deref().unwrap_or("__cow_t");
+
+    // Build partition filter from source data to avoid scanning all partitions
+    let partition_set = build_source_partition_set(ctx, table, &source_ref, 
s_alias).await?;
+
+    let mut writer = CopyOnWriteMergeWriter::new(table, 
update_columns.clone(), partition_set)
+        .await
+        .map_err(to_datafusion_error)?;
+
+    // Rewrite ON condition and all clause expressions: replace original table 
references with aliases
+    let on_condition = rewrite_condition(&on_condition, &target_ref, t_alias, 
&source_ref, s_alias);
+    for mc in &mut clauses.matched {
+        if let Some(ref mut pred) = mc.predicate {
+            *pred = rewrite_condition(pred, &target_ref, t_alias, &source_ref, 
s_alias);
+        }
+        if let CowMatchedAction::Update(ref mut upd) = mc.action {
+            for expr in &mut upd.exprs {
+                *expr = rewrite_condition(expr, &target_ref, t_alias, 
&source_ref, s_alias);
+            }
+        }
+    }
+    for ins in &mut clauses.inserts {
+        for expr in &mut ins.value_exprs {
+            *expr = rewrite_condition(expr, &target_ref, t_alias, &source_ref, 
s_alias);
+        }
+        if let Some(ref mut pred) = ins.predicate {
+            *pred = rewrite_condition(pred, &target_ref, t_alias, &source_ref, 
s_alias);
+        }
+    }
+
+    // Read each target file individually, attach __paimon_file_idx and 
__paimon_row_offset
+    let (has_target_data, cow_target_name) = register_cow_target_table(ctx, 
table, &writer).await?;
+
+    let merge_ctx = CowMergeContext {
+        source_ref: &source_ref,
+        s_alias,
+        t_alias,
+        on_condition: &on_condition,
+        has_target_data,
+        cow_target_name: &cow_target_name,
+        update_columns: &update_columns,
+    };
+
+    let result = execute_cow_merge_inner(ctx, &clauses, &mut writer, table, 
&merge_ctx).await;
+
+    if has_target_data {
+        let _ = ctx.deregister_table(&cow_target_name);
+    }
+
+    let (insert_messages, total_count) = result?;
+
+    // CoW rewrite: prepare_commit consumes the writer
+    let cow_messages = 
writer.prepare_commit().await.map_err(to_datafusion_error)?;
+
+    let mut all_messages = cow_messages;
+    all_messages.extend(insert_messages);
+
+    if !all_messages.is_empty() {
+        let commit = table.new_write_builder().new_commit();
+        commit
+            .commit(all_messages)
+            .await
+            .map_err(to_datafusion_error)?;
+    }
+
+    ok_result(ctx, total_count)
+}
+
+/// Context for CoW merge inner execution — groups join-related parameters.
+struct CowMergeContext<'a> {
+    source_ref: &'a str,
+    s_alias: &'a str,
+    t_alias: &'a str,
+    on_condition: &'a str,
+    has_target_data: bool,
+    cow_target_name: &'a str,
+    update_columns: &'a [String],
+}
+
+/// Inner function that populates the CoW writer with matched operations and 
handles INSERT.
+/// Returns (insert_commit_messages, total_affected_count).
+async fn execute_cow_merge_inner(
+    ctx: &SessionContext,
+    clauses: &CowMergeClauses,
+    writer: &mut CopyOnWriteMergeWriter,
+    table: &Table,
+    merge_ctx: &CowMergeContext<'_>,
+) -> DFResult<(Vec<paimon::table::CommitMessage>, u64)> {
+    let source_ref = merge_ctx.source_ref;
+    let s_alias = merge_ctx.s_alias;
+    let t_alias = merge_ctx.t_alias;
+    let on_condition = merge_ctx.on_condition;
+    let has_target_data = merge_ctx.has_target_data;
+    let cow_target_name = merge_ctx.cow_target_name;
+    let update_columns = merge_ctx.update_columns;
+    let mut insert_messages = Vec::new();
+    let mut total_count: u64 = 0;
+
+    if has_target_data && !clauses.matched.is_empty() {
+        let mut update_value_batches: Vec<RecordBatch> = Vec::new();
+        let mut update_batch_counter: usize = 0;
+        // Track consumed predicates for correct multi-clause ordering:
+        // each clause only applies to rows NOT matched by any previous clause.
+        let mut consumed_predicates: Vec<String> = Vec::new();
+
+        for mc in &clauses.matched {
+            // Build WHERE clause: exclude rows consumed by previous clauses, 
then apply this predicate
+            let mut conditions: Vec<String> = Vec::new();
+            for prev in &consumed_predicates {
+                conditions.push(format!("NOT ({prev})"));
+            }
+            if let Some(ref pred) = mc.predicate {
+                conditions.push(pred.clone());
+                consumed_predicates.push(pred.clone());

Review Comment:
   **Must fix: multi-clause ordering is wrong.** When `mc.predicate` is `None` 
(unconditional clause), nothing is pushed into `consumed_predicates`, so later 
clauses re-match the same rows. Combined with `cow_writer.rs` strict checks, 
legal SQL like `WHEN MATCHED THEN UPDATE ... / WHEN MATCHED AND s.flag=1 THEN 
UPDATE ...` fails at commit with `duplicate UPDATE operations` or `both DELETE 
and UPDATE`. Fix: when predicate is `None`, 
`consumed_predicates.push("TRUE".into())`, or reject same-kind clauses after an 
unconditional one (unreachable by SQL semantics). Same bug in the inserts loop 
at line 825 — fix both.



##########
crates/integrations/datafusion/src/delete.rs:
##########
@@ -0,0 +1,141 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! DELETE execution for Paimon tables.
+//!
+//! Supports copy-on-write file rewriting for append-only tables (no PK, no 
deletion vectors).
+
+use datafusion::error::{DataFusionError, Result as DFResult};
+use datafusion::prelude::{DataFrame, SessionContext};
+use datafusion::sql::sqlparser::ast::Delete;
+
+use paimon::spec::CoreOptions;
+use paimon::table::{CopyOnWriteMergeWriter, Table};
+
+use crate::error::to_datafusion_error;
+use crate::merge_into::{
+    build_partition_set_from_where, extract_tracking_columns, 
is_delete_conflict, ok_result,
+    register_cow_target_table, retry_on_conflict,
+};
+
+/// Execute a DELETE statement on a Paimon table.
+///
+/// `table_ref` is the SQL table reference string (e.g. `"paimon.test_db.t"`),
+/// already extracted by the caller for catalog resolution.
+pub(crate) async fn execute_delete(
+    ctx: &SessionContext,
+    delete: &Delete,
+    table: Table,
+    table_ref: &str,
+) -> DFResult<DataFrame> {
+    let schema = table.schema();
+    let core_options = CoreOptions::new(schema.options());
+
+    if core_options.data_evolution_enabled() {
+        return Err(DataFusionError::Plan(
+            "DELETE on data-evolution tables is not yet supported".to_string(),
+        ));
+    }
+    if !schema.trimmed_primary_keys().is_empty() {
+        return Err(DataFusionError::Plan(
+            "DELETE on primary-key tables is not yet supported".to_string(),
+        ));
+    }
+
+    execute_cow_delete(ctx, delete, &table, table_ref).await
+}
+
+/// Execute DELETE on an append-only table with retry on delete conflict.
+async fn execute_cow_delete(
+    ctx: &SessionContext,
+    delete: &Delete,
+    table: &Table,
+    table_ref: &str,
+) -> DFResult<DataFrame> {
+    retry_on_conflict("CoW DELETE", is_delete_conflict, || {
+        execute_cow_delete_once(ctx, delete, table, table_ref)
+    })
+    .await
+}
+
+/// Single attempt of CoW DELETE execution.
+async fn execute_cow_delete_once(
+    ctx: &SessionContext,
+    delete: &Delete,
+    table: &Table,
+    table_ref: &str,
+) -> DFResult<DataFrame> {
+    let where_str = delete.selection.as_ref().map(|e| e.to_string());
+    let partition_set =
+        build_partition_set_from_where(ctx, table, table_ref, 
where_str.as_deref()).await?;
+
+    let mut writer = CopyOnWriteMergeWriter::new(table, vec![], partition_set)
+        .await
+        .map_err(to_datafusion_error)?;
+
+    let (has_data, cow_table_name) = register_cow_target_table(ctx, table, 
&writer).await?;
+    if !has_data {
+        return ok_result(ctx, 0);
+    }
+
+    let result = execute_cow_delete_inner(ctx, &cow_table_name, delete, &mut 
writer).await;
+    let _ = ctx.deregister_table(&cow_table_name);

Review Comment:
   **Must fix: MemTable leaks on async cancellation.** Sequential `let _ = 
ctx.deregister_table(...)` is not RAII. If the future is cancelled (timeout, 
panic mid-SQL, ctx drop) between register and deregister, the `MemTable` (holds 
all target rows) stays in the `SessionContext`. `COW_TABLE_COUNTER` is 
monotonically increasing so names never collide — the leak is silent and 
unbounded on long-lived contexts. Fix: wrap `cow_table_name` in a Drop guard 
that calls `ctx.deregister_table` synchronously in `Drop`. Same pattern in 
`update.rs:200` and the merge_into register path.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to