This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 53e6e8e  fix: validate data evolution matched row ids (#312)
53e6e8e is described below

commit 53e6e8ee331d8355a1f8572afa2d6a7ab999f28d
Author: QuakeWang <[email protected]>
AuthorDate: Tue May 12 15:00:27 2026 +0800

    fix: validate data evolution matched row ids (#312)
---
 .../datafusion/tests/merge_into_tests.rs           |  31 +++
 crates/paimon/src/table/data_evolution_writer.rs   | 297 +++++++++++++++++----
 2 files changed, 276 insertions(+), 52 deletions(-)

diff --git a/crates/integrations/datafusion/tests/merge_into_tests.rs 
b/crates/integrations/datafusion/tests/merge_into_tests.rs
index 97d355b..0969c09 100644
--- a/crates/integrations/datafusion/tests/merge_into_tests.rs
+++ b/crates/integrations/datafusion/tests/merge_into_tests.rs
@@ -795,6 +795,37 @@ async fn test_merge_into_row_id_for_inserted_rows() {
     );
 }
 
+#[tokio::test]
+async fn test_merge_into_rejects_duplicate_matched_updates() {
+    let (_tmp, catalog) = create_test_env();
+    let sql_context = create_sql_context(catalog.clone()).await;
+    setup_data_evolution_table(&sql_context).await;
+
+    sql_context
+        .sql("INSERT INTO paimon.test_db.target (id, name, value) VALUES (1, 
'alice', 10)")
+        .await
+        .unwrap()
+        .collect()
+        .await
+        .unwrap();
+
+    enable_data_evolution(&sql_context).await;
+
+    register_source(
+        &sql_context,
+        "CREATE TEMPORARY TABLE paimon.test_db.src_dup AS SELECT * FROM 
(VALUES (1, 'ALICE'), (1, 'ALICIA')) AS t(id, name)",
+    )
+    .await;
+
+    assert_merge_error(
+        &sql_context,
+        "MERGE INTO paimon.test_db.target t USING paimon.test_db.src_dup s ON 
t.id = s.id \
+         WHEN MATCHED THEN UPDATE SET name = s.name",
+        "duplicate UPDATE",
+    )
+    .await;
+}
+
 #[tokio::test]
 async fn test_rejects_when_matched_delete() {
     let (_tmp, catalog) = create_test_env();
diff --git a/crates/paimon/src/table/data_evolution_writer.rs 
b/crates/paimon/src/table/data_evolution_writer.rs
index ce323b3..fe9e234 100644
--- a/crates/paimon/src/table/data_evolution_writer.rs
+++ b/crates/paimon/src/table/data_evolution_writer.rs
@@ -38,14 +38,14 @@ use arrow_array::{Array, ArrayRef, Int64Array, RecordBatch};
 use arrow_select::concat::concat_batches;
 use arrow_select::interleave::interleave;
 use futures::TryStreamExt;
-use std::collections::HashMap;
+use std::collections::{HashMap, HashSet};
 
 /// Engine-agnostic writer for partial-column updates via `_ROW_ID`.
 ///
 /// Usage:
 /// 1. Create via [`DataEvolutionWriter::new`] (validates preconditions).
 /// 2. Feed matched rows via [`add_matched_batch`](Self::add_matched_batch).
-///    Each batch must contain a `_ROW_ID` (Int64) column plus the update 
columns.
+///    Each batch must contain a non-null `_ROW_ID` (Int64) column plus the 
update columns.
 /// 3. Call [`prepare_commit`](Self::prepare_commit) to produce 
`CommitMessage`s.
 /// 4. Commit via [`TableCommit`](super::TableCommit) (caller's 
responsibility).
 ///
@@ -122,20 +122,16 @@ impl DataEvolutionWriter {
     /// Add a batch of matched rows.
     ///
     /// The batch must contain:
-    /// - A `_ROW_ID` column (Int64) identifying which rows to update
+    /// - A non-null `_ROW_ID` column (Int64) identifying which rows to update
     /// - One column for each entry in `update_columns` with the new values
     pub fn add_matched_batch(&mut self, batch: RecordBatch) -> Result<()> {
         if batch.num_rows() == 0 {
             return Ok(());
         }
 
-        // Validate _ROW_ID column exists
-        if batch.column_by_name("_ROW_ID").is_none() {
-            return Err(crate::Error::DataInvalid {
-                message: "Matched batch must contain a '_ROW_ID' 
column".to_string(),
-                source: None,
-            });
-        }
+        let row_id_col = row_id_column(&batch)?;
+        validate_row_id_not_null(row_id_col)?;
+        validate_update_columns(&batch, &self.update_columns)?;
 
         self.matched_batches.push(batch);
         Ok(())
@@ -210,37 +206,7 @@ impl DataEvolutionWriter {
         }
 
         // 2. Group matched rows by their owning file
-        let mut file_matches: HashMap<usize, Vec<MatchedRow>> = HashMap::new();
-
-        for (batch_idx, batch) in self.matched_batches.iter().enumerate() {
-            let row_id_col = batch
-                .column_by_name("_ROW_ID")
-                .unwrap()
-                .as_any()
-                .downcast_ref::<Int64Array>()
-                .ok_or_else(|| crate::Error::DataInvalid {
-                    message: "_ROW_ID column must be Int64".to_string(),
-                    source: None,
-                })?;
-
-            for row_idx in 0..batch.num_rows() {
-                let row_id = row_id_col.value(row_idx);
-                let (file_pos, file_range) =
-                    find_owning_file(&file_index, row_id).ok_or_else(|| {
-                        crate::Error::DataInvalid {
-                            message: format!("No file found for _ROW_ID 
{row_id}"),
-                            source: None,
-                        }
-                    })?;
-
-                let offset = (row_id - file_range.first_row_id) as usize;
-                file_matches.entry(file_pos).or_default().push(MatchedRow {
-                    offset,
-                    batch_idx,
-                    row_idx,
-                });
-            }
-        }
+        let file_matches = group_matched_rows_by_file(&self.matched_batches, 
&file_index)?;
 
         // 3. For each affected file: read original columns, apply updates, 
write partial files
         let mut writer = DataEvolutionPartialWriter::new(&self.table, 
self.update_columns.clone())?;
@@ -308,14 +274,6 @@ impl DataEvolutionWriter {
                 let original_col = original_batch.column(col_idx);
                 let original_dtype = original_col.data_type();
 
-                let join_col_idx = self.matched_batches[0]
-                    .schema()
-                    .index_of(col_name)
-                    .map_err(|e| crate::Error::DataInvalid {
-                        message: format!("Column {col_name} not found in 
matched batch: {e}"),
-                        source: None,
-                    })?;
-
                 // Gather update values into a single array (one entry per 
matched row, in offset order)
                 let update_indices: Vec<(usize, usize)> = sorted_matches
                     .iter()
@@ -332,16 +290,17 @@ impl DataEvolutionWriter {
                     let arr_idx = match batch_id_map.get(&batch_idx) {
                         Some(&idx) => idx,
                         None => {
-                            let src_col = 
self.matched_batches[batch_idx].column(join_col_idx);
+                            let src_col =
+                                
matched_column(&self.matched_batches[batch_idx], col_name)?;
                             let casted = if src_col.data_type() != 
original_dtype {
-                                arrow_cast::cast(src_col, 
original_dtype).map_err(|e| {
+                                arrow_cast::cast(src_col.as_ref(), 
original_dtype).map_err(|e| {
                                     crate::Error::DataInvalid {
                                         message: format!("Failed to cast 
column {col_name}: {e}"),
                                         source: None,
                                     }
                                 })?
                             } else {
-                                src_col.clone()
+                                src_col
                             };
                             let idx = batch_arrays.len();
                             batch_arrays.push(casted);
@@ -421,6 +380,102 @@ fn find_owning_file(file_index: &[FileRowRange], row_id: 
i64) -> Option<(usize,
     }
 }
 
+fn group_matched_rows_by_file(
+    matched_batches: &[RecordBatch],
+    file_index: &[FileRowRange],
+) -> Result<HashMap<usize, Vec<MatchedRow>>> {
+    let mut file_matches: HashMap<usize, Vec<MatchedRow>> = HashMap::new();
+    let mut seen_updates: HashSet<(usize, usize)> = HashSet::new();
+
+    for (batch_idx, batch) in matched_batches.iter().enumerate() {
+        let row_id_col = row_id_column(batch)?;
+        validate_row_id_not_null(row_id_col)?;
+
+        for row_idx in 0..batch.num_rows() {
+            let row_id = row_id_col.value(row_idx);
+            let (file_pos, file_range) =
+                find_owning_file(file_index, row_id).ok_or_else(|| 
crate::Error::DataInvalid {
+                    message: format!("No file found for _ROW_ID {row_id}"),
+                    source: None,
+                })?;
+
+            let offset = (row_id - file_range.first_row_id) as usize;
+            if !seen_updates.insert((file_pos, offset)) {
+                return Err(crate::Error::DataInvalid {
+                    message: format!(
+                        "row_offset {offset} has duplicate UPDATE operations; \
+                         this may indicate a many-to-many join in the MERGE 
source"
+                    ),
+                    source: None,
+                });
+            }
+
+            file_matches.entry(file_pos).or_default().push(MatchedRow {
+                offset,
+                batch_idx,
+                row_idx,
+            });
+        }
+    }
+
+    Ok(file_matches)
+}
+
+fn row_id_column(batch: &RecordBatch) -> Result<&Int64Array> {
+    batch
+        .column_by_name("_ROW_ID")
+        .ok_or_else(|| crate::Error::DataInvalid {
+            message: "Matched batch must contain a '_ROW_ID' 
column".to_string(),
+            source: None,
+        })?
+        .as_any()
+        .downcast_ref::<Int64Array>()
+        .ok_or_else(|| crate::Error::DataInvalid {
+            message: "_ROW_ID column must be Int64".to_string(),
+            source: None,
+        })
+}
+
+fn validate_row_id_not_null(row_id_col: &Int64Array) -> Result<()> {
+    if row_id_col.null_count() == 0 {
+        return Ok(());
+    }
+
+    for row_idx in 0..row_id_col.len() {
+        if row_id_col.is_null(row_idx) {
+            return Err(crate::Error::DataInvalid {
+                message: format!("_ROW_ID must not be null at matched row 
{row_idx}"),
+                source: None,
+            });
+        }
+    }
+
+    Ok(())
+}
+
+fn validate_update_columns(batch: &RecordBatch, update_columns: &[String]) -> 
Result<()> {
+    for col in update_columns {
+        matched_column_index(batch, col)?;
+    }
+
+    Ok(())
+}
+
+fn matched_column_index(batch: &RecordBatch, col: &str) -> Result<usize> {
+    batch
+        .schema()
+        .index_of(col)
+        .map_err(|e| crate::Error::DataInvalid {
+            message: format!("Column {col} not found in matched batch: {e}"),
+            source: None,
+        })
+}
+
+fn matched_column(batch: &RecordBatch, col: &str) -> Result<ArrayRef> {
+    let idx = matched_column_index(batch, col)?;
+    Ok(batch.column(idx).clone())
+}
+
 struct FileRowRange {
     first_row_id: i64,
     last_row_id: i64,
@@ -682,6 +737,42 @@ mod tests {
         RecordBatch::try_new(schema, 
vec![Arc::new(StringArray::from(names))]).unwrap()
     }
 
+    fn make_matched_batch(row_ids: Vec<Option<i64>>, names: Vec<&str>) -> 
RecordBatch {
+        let schema = Arc::new(ArrowSchema::new(vec![
+            ArrowField::new("_ROW_ID", ArrowDataType::Int64, true),
+            ArrowField::new("name", ArrowDataType::Utf8, true),
+        ]));
+        RecordBatch::try_new(
+            schema,
+            vec![
+                Arc::new(Int64Array::from(row_ids)),
+                Arc::new(StringArray::from(names)),
+            ],
+        )
+        .unwrap()
+    }
+
+    fn make_matched_batch_with_schema(
+        fields: Vec<ArrowField>,
+        columns: Vec<ArrayRef>,
+    ) -> RecordBatch {
+        RecordBatch::try_new(Arc::new(ArrowSchema::new(fields)), 
columns).unwrap()
+    }
+
+    fn test_file_index() -> Vec<FileRowRange> {
+        vec![FileRowRange {
+            first_row_id: 10,
+            last_row_id: 12,
+            row_count: 3,
+            partition: vec![],
+            bucket: 0,
+            bucket_path: String::new(),
+            snapshot_id: 1,
+            total_buckets: 1,
+            files: vec![make_test_file_meta("base.parquet", 3, Some(10), 1, 
None)],
+        }]
+    }
+
     #[tokio::test]
     async fn test_write_partial_column_file() {
         let file_io = test_file_io();
@@ -742,6 +833,108 @@ mod tests {
         assert_eq!(files[1].row_count, 1);
     }
 
+    #[test]
+    fn test_add_matched_batch_rejects_null_row_id() {
+        let file_io = test_file_io();
+        let table = test_table(&file_io, "memory:/test_de_add_batch");
+        let mut writer = DataEvolutionWriter::new(&table, 
vec!["name".to_string()]).unwrap();
+
+        let err = writer
+            .add_matched_batch(make_matched_batch(
+                vec![Some(10), None],
+                vec!["alice", "bob"],
+            ))
+            .err()
+            .unwrap();
+
+        assert!(
+            matches!(err, crate::Error::DataInvalid { message, .. } if 
message.contains("_ROW_ID must not be null"))
+        );
+    }
+
+    #[test]
+    fn test_add_matched_batch_rejects_missing_update_column() {
+        let file_io = test_file_io();
+        let table = test_table(&file_io, 
"memory:/test_de_add_batch_missing_col");
+        let mut writer = DataEvolutionWriter::new(&table, 
vec!["value".to_string()]).unwrap();
+
+        let err = writer
+            .add_matched_batch(make_matched_batch(vec![Some(10)], 
vec!["alice"]))
+            .err()
+            .unwrap();
+
+        assert!(
+            matches!(err, crate::Error::DataInvalid { message, .. } if 
message.contains("Column value not found"))
+        );
+    }
+
+    #[test]
+    fn test_group_matched_rows_rejects_null_row_id() {
+        let batches = vec![make_matched_batch(
+            vec![Some(10), None],
+            vec!["alice", "bob"],
+        )];
+
+        let err = group_matched_rows_by_file(&batches, &test_file_index())
+            .err()
+            .unwrap();
+
+        assert!(
+            matches!(err, crate::Error::DataInvalid { message, .. } if 
message.contains("_ROW_ID must not be null"))
+        );
+    }
+
+    #[test]
+    fn test_group_matched_rows_rejects_duplicate_update() {
+        let batches = vec![make_matched_batch(
+            vec![Some(10), Some(10)],
+            vec!["alice", "ALICE"],
+        )];
+
+        let err = group_matched_rows_by_file(&batches, &test_file_index())
+            .err()
+            .unwrap();
+
+        assert!(
+            matches!(err, crate::Error::DataInvalid { message, .. } if 
message.contains("duplicate UPDATE"))
+        );
+    }
+
+    #[test]
+    fn test_group_matched_rows_rejects_duplicate_update_across_batches() {
+        let batches = vec![
+            make_matched_batch(vec![Some(10)], vec!["alice"]),
+            make_matched_batch(vec![Some(10)], vec!["ALICE"]),
+        ];
+
+        let err = group_matched_rows_by_file(&batches, &test_file_index())
+            .err()
+            .unwrap();
+
+        assert!(
+            matches!(err, crate::Error::DataInvalid { message, .. } if 
message.contains("duplicate UPDATE"))
+        );
+    }
+
+    #[test]
+    fn test_matched_column_uses_batch_schema() {
+        let batch = make_matched_batch_with_schema(
+            vec![
+                ArrowField::new("value", ArrowDataType::Int32, true),
+                ArrowField::new("name", ArrowDataType::Utf8, true),
+            ],
+            vec![
+                Arc::new(arrow_array::Int32Array::from(vec![20])),
+                Arc::new(StringArray::from(vec!["bob"])),
+            ],
+        );
+
+        let column = matched_column(&batch, "name").unwrap();
+        let names = column.as_any().downcast_ref::<StringArray>().unwrap();
+
+        assert_eq!(names.value(0), "bob");
+    }
+
     #[test]
     fn test_find_owning_file_with_grouped_ranges() {
         // Simulate a file group: base file (3 cols, 100 rows) + partial file 
(1 col, 100 rows)

Reply via email to