This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 53e6e8e fix: validate data evolution matched row ids (#312)
53e6e8e is described below
commit 53e6e8ee331d8355a1f8572afa2d6a7ab999f28d
Author: QuakeWang <[email protected]>
AuthorDate: Tue May 12 15:00:27 2026 +0800
fix: validate data evolution matched row ids (#312)
---
.../datafusion/tests/merge_into_tests.rs | 31 +++
crates/paimon/src/table/data_evolution_writer.rs | 297 +++++++++++++++++----
2 files changed, 276 insertions(+), 52 deletions(-)
diff --git a/crates/integrations/datafusion/tests/merge_into_tests.rs
b/crates/integrations/datafusion/tests/merge_into_tests.rs
index 97d355b..0969c09 100644
--- a/crates/integrations/datafusion/tests/merge_into_tests.rs
+++ b/crates/integrations/datafusion/tests/merge_into_tests.rs
@@ -795,6 +795,37 @@ async fn test_merge_into_row_id_for_inserted_rows() {
);
}
+#[tokio::test]
+async fn test_merge_into_rejects_duplicate_matched_updates() {
+ let (_tmp, catalog) = create_test_env();
+ let sql_context = create_sql_context(catalog.clone()).await;
+ setup_data_evolution_table(&sql_context).await;
+
+ sql_context
+ .sql("INSERT INTO paimon.test_db.target (id, name, value) VALUES (1,
'alice', 10)")
+ .await
+ .unwrap()
+ .collect()
+ .await
+ .unwrap();
+
+ enable_data_evolution(&sql_context).await;
+
+ register_source(
+ &sql_context,
+ "CREATE TEMPORARY TABLE paimon.test_db.src_dup AS SELECT * FROM
(VALUES (1, 'ALICE'), (1, 'ALICIA')) AS t(id, name)",
+ )
+ .await;
+
+ assert_merge_error(
+ &sql_context,
+ "MERGE INTO paimon.test_db.target t USING paimon.test_db.src_dup s ON
t.id = s.id \
+ WHEN MATCHED THEN UPDATE SET name = s.name",
+ "duplicate UPDATE",
+ )
+ .await;
+}
+
#[tokio::test]
async fn test_rejects_when_matched_delete() {
let (_tmp, catalog) = create_test_env();
diff --git a/crates/paimon/src/table/data_evolution_writer.rs
b/crates/paimon/src/table/data_evolution_writer.rs
index ce323b3..fe9e234 100644
--- a/crates/paimon/src/table/data_evolution_writer.rs
+++ b/crates/paimon/src/table/data_evolution_writer.rs
@@ -38,14 +38,14 @@ use arrow_array::{Array, ArrayRef, Int64Array, RecordBatch};
use arrow_select::concat::concat_batches;
use arrow_select::interleave::interleave;
use futures::TryStreamExt;
-use std::collections::HashMap;
+use std::collections::{HashMap, HashSet};
/// Engine-agnostic writer for partial-column updates via `_ROW_ID`.
///
/// Usage:
/// 1. Create via [`DataEvolutionWriter::new`] (validates preconditions).
/// 2. Feed matched rows via [`add_matched_batch`](Self::add_matched_batch).
-/// Each batch must contain a `_ROW_ID` (Int64) column plus the update
columns.
+/// Each batch must contain a non-null `_ROW_ID` (Int64) column plus the
update columns.
/// 3. Call [`prepare_commit`](Self::prepare_commit) to produce
`CommitMessage`s.
/// 4. Commit via [`TableCommit`](super::TableCommit) (caller's
responsibility).
///
@@ -122,20 +122,16 @@ impl DataEvolutionWriter {
/// Add a batch of matched rows.
///
/// The batch must contain:
- /// - A `_ROW_ID` column (Int64) identifying which rows to update
+ /// - A non-null `_ROW_ID` column (Int64) identifying which rows to update
/// - One column for each entry in `update_columns` with the new values
pub fn add_matched_batch(&mut self, batch: RecordBatch) -> Result<()> {
if batch.num_rows() == 0 {
return Ok(());
}
- // Validate _ROW_ID column exists
- if batch.column_by_name("_ROW_ID").is_none() {
- return Err(crate::Error::DataInvalid {
- message: "Matched batch must contain a '_ROW_ID'
column".to_string(),
- source: None,
- });
- }
+ let row_id_col = row_id_column(&batch)?;
+ validate_row_id_not_null(row_id_col)?;
+ validate_update_columns(&batch, &self.update_columns)?;
self.matched_batches.push(batch);
Ok(())
@@ -210,37 +206,7 @@ impl DataEvolutionWriter {
}
// 2. Group matched rows by their owning file
- let mut file_matches: HashMap<usize, Vec<MatchedRow>> = HashMap::new();
-
- for (batch_idx, batch) in self.matched_batches.iter().enumerate() {
- let row_id_col = batch
- .column_by_name("_ROW_ID")
- .unwrap()
- .as_any()
- .downcast_ref::<Int64Array>()
- .ok_or_else(|| crate::Error::DataInvalid {
- message: "_ROW_ID column must be Int64".to_string(),
- source: None,
- })?;
-
- for row_idx in 0..batch.num_rows() {
- let row_id = row_id_col.value(row_idx);
- let (file_pos, file_range) =
- find_owning_file(&file_index, row_id).ok_or_else(|| {
- crate::Error::DataInvalid {
- message: format!("No file found for _ROW_ID
{row_id}"),
- source: None,
- }
- })?;
-
- let offset = (row_id - file_range.first_row_id) as usize;
- file_matches.entry(file_pos).or_default().push(MatchedRow {
- offset,
- batch_idx,
- row_idx,
- });
- }
- }
+ let file_matches = group_matched_rows_by_file(&self.matched_batches,
&file_index)?;
// 3. For each affected file: read original columns, apply updates,
write partial files
let mut writer = DataEvolutionPartialWriter::new(&self.table,
self.update_columns.clone())?;
@@ -308,14 +274,6 @@ impl DataEvolutionWriter {
let original_col = original_batch.column(col_idx);
let original_dtype = original_col.data_type();
- let join_col_idx = self.matched_batches[0]
- .schema()
- .index_of(col_name)
- .map_err(|e| crate::Error::DataInvalid {
- message: format!("Column {col_name} not found in
matched batch: {e}"),
- source: None,
- })?;
-
// Gather update values into a single array (one entry per
matched row, in offset order)
let update_indices: Vec<(usize, usize)> = sorted_matches
.iter()
@@ -332,16 +290,17 @@ impl DataEvolutionWriter {
let arr_idx = match batch_id_map.get(&batch_idx) {
Some(&idx) => idx,
None => {
- let src_col =
self.matched_batches[batch_idx].column(join_col_idx);
+ let src_col =
+
matched_column(&self.matched_batches[batch_idx], col_name)?;
let casted = if src_col.data_type() !=
original_dtype {
- arrow_cast::cast(src_col,
original_dtype).map_err(|e| {
+ arrow_cast::cast(src_col.as_ref(),
original_dtype).map_err(|e| {
crate::Error::DataInvalid {
message: format!("Failed to cast
column {col_name}: {e}"),
source: None,
}
})?
} else {
- src_col.clone()
+ src_col
};
let idx = batch_arrays.len();
batch_arrays.push(casted);
@@ -421,6 +380,102 @@ fn find_owning_file(file_index: &[FileRowRange], row_id:
i64) -> Option<(usize,
}
}
+fn group_matched_rows_by_file(
+ matched_batches: &[RecordBatch],
+ file_index: &[FileRowRange],
+) -> Result<HashMap<usize, Vec<MatchedRow>>> {
+ let mut file_matches: HashMap<usize, Vec<MatchedRow>> = HashMap::new();
+ let mut seen_updates: HashSet<(usize, usize)> = HashSet::new();
+
+ for (batch_idx, batch) in matched_batches.iter().enumerate() {
+ let row_id_col = row_id_column(batch)?;
+ validate_row_id_not_null(row_id_col)?;
+
+ for row_idx in 0..batch.num_rows() {
+ let row_id = row_id_col.value(row_idx);
+ let (file_pos, file_range) =
+ find_owning_file(file_index, row_id).ok_or_else(||
crate::Error::DataInvalid {
+ message: format!("No file found for _ROW_ID {row_id}"),
+ source: None,
+ })?;
+
+ let offset = (row_id - file_range.first_row_id) as usize;
+ if !seen_updates.insert((file_pos, offset)) {
+ return Err(crate::Error::DataInvalid {
+ message: format!(
+ "row_offset {offset} has duplicate UPDATE operations; \
+ this may indicate a many-to-many join in the MERGE
source"
+ ),
+ source: None,
+ });
+ }
+
+ file_matches.entry(file_pos).or_default().push(MatchedRow {
+ offset,
+ batch_idx,
+ row_idx,
+ });
+ }
+ }
+
+ Ok(file_matches)
+}
+
+fn row_id_column(batch: &RecordBatch) -> Result<&Int64Array> {
+ batch
+ .column_by_name("_ROW_ID")
+ .ok_or_else(|| crate::Error::DataInvalid {
+ message: "Matched batch must contain a '_ROW_ID'
column".to_string(),
+ source: None,
+ })?
+ .as_any()
+ .downcast_ref::<Int64Array>()
+ .ok_or_else(|| crate::Error::DataInvalid {
+ message: "_ROW_ID column must be Int64".to_string(),
+ source: None,
+ })
+}
+
+fn validate_row_id_not_null(row_id_col: &Int64Array) -> Result<()> {
+ if row_id_col.null_count() == 0 {
+ return Ok(());
+ }
+
+ for row_idx in 0..row_id_col.len() {
+ if row_id_col.is_null(row_idx) {
+ return Err(crate::Error::DataInvalid {
+ message: format!("_ROW_ID must not be null at matched row
{row_idx}"),
+ source: None,
+ });
+ }
+ }
+
+ Ok(())
+}
+
+fn validate_update_columns(batch: &RecordBatch, update_columns: &[String]) ->
Result<()> {
+ for col in update_columns {
+ matched_column_index(batch, col)?;
+ }
+
+ Ok(())
+}
+
+fn matched_column_index(batch: &RecordBatch, col: &str) -> Result<usize> {
+ batch
+ .schema()
+ .index_of(col)
+ .map_err(|e| crate::Error::DataInvalid {
+ message: format!("Column {col} not found in matched batch: {e}"),
+ source: None,
+ })
+}
+
+fn matched_column(batch: &RecordBatch, col: &str) -> Result<ArrayRef> {
+ let idx = matched_column_index(batch, col)?;
+ Ok(batch.column(idx).clone())
+}
+
struct FileRowRange {
first_row_id: i64,
last_row_id: i64,
@@ -682,6 +737,42 @@ mod tests {
RecordBatch::try_new(schema,
vec![Arc::new(StringArray::from(names))]).unwrap()
}
+ fn make_matched_batch(row_ids: Vec<Option<i64>>, names: Vec<&str>) ->
RecordBatch {
+ let schema = Arc::new(ArrowSchema::new(vec![
+ ArrowField::new("_ROW_ID", ArrowDataType::Int64, true),
+ ArrowField::new("name", ArrowDataType::Utf8, true),
+ ]));
+ RecordBatch::try_new(
+ schema,
+ vec![
+ Arc::new(Int64Array::from(row_ids)),
+ Arc::new(StringArray::from(names)),
+ ],
+ )
+ .unwrap()
+ }
+
+ fn make_matched_batch_with_schema(
+ fields: Vec<ArrowField>,
+ columns: Vec<ArrayRef>,
+ ) -> RecordBatch {
+ RecordBatch::try_new(Arc::new(ArrowSchema::new(fields)),
columns).unwrap()
+ }
+
+ fn test_file_index() -> Vec<FileRowRange> {
+ vec![FileRowRange {
+ first_row_id: 10,
+ last_row_id: 12,
+ row_count: 3,
+ partition: vec![],
+ bucket: 0,
+ bucket_path: String::new(),
+ snapshot_id: 1,
+ total_buckets: 1,
+ files: vec![make_test_file_meta("base.parquet", 3, Some(10), 1,
None)],
+ }]
+ }
+
#[tokio::test]
async fn test_write_partial_column_file() {
let file_io = test_file_io();
@@ -742,6 +833,108 @@ mod tests {
assert_eq!(files[1].row_count, 1);
}
+ #[test]
+ fn test_add_matched_batch_rejects_null_row_id() {
+ let file_io = test_file_io();
+ let table = test_table(&file_io, "memory:/test_de_add_batch");
+ let mut writer = DataEvolutionWriter::new(&table,
vec!["name".to_string()]).unwrap();
+
+ let err = writer
+ .add_matched_batch(make_matched_batch(
+ vec![Some(10), None],
+ vec!["alice", "bob"],
+ ))
+ .err()
+ .unwrap();
+
+ assert!(
+ matches!(err, crate::Error::DataInvalid { message, .. } if
message.contains("_ROW_ID must not be null"))
+ );
+ }
+
+ #[test]
+ fn test_add_matched_batch_rejects_missing_update_column() {
+ let file_io = test_file_io();
+ let table = test_table(&file_io,
"memory:/test_de_add_batch_missing_col");
+ let mut writer = DataEvolutionWriter::new(&table,
vec!["value".to_string()]).unwrap();
+
+ let err = writer
+ .add_matched_batch(make_matched_batch(vec![Some(10)],
vec!["alice"]))
+ .err()
+ .unwrap();
+
+ assert!(
+ matches!(err, crate::Error::DataInvalid { message, .. } if
message.contains("Column value not found"))
+ );
+ }
+
+ #[test]
+ fn test_group_matched_rows_rejects_null_row_id() {
+ let batches = vec![make_matched_batch(
+ vec![Some(10), None],
+ vec!["alice", "bob"],
+ )];
+
+ let err = group_matched_rows_by_file(&batches, &test_file_index())
+ .err()
+ .unwrap();
+
+ assert!(
+ matches!(err, crate::Error::DataInvalid { message, .. } if
message.contains("_ROW_ID must not be null"))
+ );
+ }
+
+ #[test]
+ fn test_group_matched_rows_rejects_duplicate_update() {
+ let batches = vec![make_matched_batch(
+ vec![Some(10), Some(10)],
+ vec!["alice", "ALICE"],
+ )];
+
+ let err = group_matched_rows_by_file(&batches, &test_file_index())
+ .err()
+ .unwrap();
+
+ assert!(
+ matches!(err, crate::Error::DataInvalid { message, .. } if
message.contains("duplicate UPDATE"))
+ );
+ }
+
+ #[test]
+ fn test_group_matched_rows_rejects_duplicate_update_across_batches() {
+ let batches = vec![
+ make_matched_batch(vec![Some(10)], vec!["alice"]),
+ make_matched_batch(vec![Some(10)], vec!["ALICE"]),
+ ];
+
+ let err = group_matched_rows_by_file(&batches, &test_file_index())
+ .err()
+ .unwrap();
+
+ assert!(
+ matches!(err, crate::Error::DataInvalid { message, .. } if
message.contains("duplicate UPDATE"))
+ );
+ }
+
+ #[test]
+ fn test_matched_column_uses_batch_schema() {
+ let batch = make_matched_batch_with_schema(
+ vec![
+ ArrowField::new("value", ArrowDataType::Int32, true),
+ ArrowField::new("name", ArrowDataType::Utf8, true),
+ ],
+ vec![
+ Arc::new(arrow_array::Int32Array::from(vec![20])),
+ Arc::new(StringArray::from(vec!["bob"])),
+ ],
+ );
+
+ let column = matched_column(&batch, "name").unwrap();
+ let names = column.as_any().downcast_ref::<StringArray>().unwrap();
+
+ assert_eq!(names.value(0), "bob");
+ }
+
#[test]
fn test_find_owning_file_with_grouped_ranges() {
// Simulate a file group: base file (3 cols, 100 rows) + partial file
(1 col, 100 rows)