This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 26c3b25  feat: support dynamic-bucket partial-update (#317)
26c3b25 is described below

commit 26c3b2501152ea190dbffe0743f6d303c72b5569
Author: QuakeWang <[email protected]>
AuthorDate: Thu May 14 14:41:13 2026 +0800

    feat: support dynamic-bucket partial-update (#317)
---
 .../datafusion/tests/dynamic_bucket_tables.rs      | 424 ++++++++++++++++++++-
 crates/paimon/src/spec/schema.rs                   |  36 ++
 crates/paimon/src/table/bucket_assigner_dynamic.rs |   2 +-
 crates/paimon/src/table/kv_file_writer.rs          |  26 --
 crates/paimon/src/table/table_write.rs             |  72 +++-
 5 files changed, 512 insertions(+), 48 deletions(-)

diff --git a/crates/integrations/datafusion/tests/dynamic_bucket_tables.rs 
b/crates/integrations/datafusion/tests/dynamic_bucket_tables.rs
index 59804b6..4c8cbcb 100644
--- a/crates/integrations/datafusion/tests/dynamic_bucket_tables.rs
+++ b/crates/integrations/datafusion/tests/dynamic_bucket_tables.rs
@@ -22,10 +22,10 @@ mod common;
 use common::{
     collect_id_name, collect_id_value, create_sql_context, create_test_env, 
setup_sql_context,
 };
-use datafusion::arrow::array::{Int32Array, StringArray};
+use datafusion::arrow::array::{Array, Int32Array, StringArray};
 use paimon::catalog::Identifier;
 use paimon::spec::{IndexManifest, IndexManifestEntry};
-use paimon::{Catalog, CatalogOptions, FileSystemCatalog, Options, 
SnapshotManager};
+use paimon::{Catalog, CatalogOptions, DataSplit, FileSystemCatalog, Options, 
SnapshotManager};
 
 /// PK table with bucket=-1 (dynamic bucket) should write and read correctly.
 #[tokio::test]
@@ -82,6 +82,268 @@ async fn test_pk_dynamic_bucket() {
     );
 }
 
+async fn collect_partial_update_rows(
+    sql_context: &paimon_datafusion::SQLContext,
+    sql: &str,
+) -> Vec<(i32, Option<i32>, Option<String>)> {
+    let batches = sql_context.sql(sql).await.unwrap().collect().await.unwrap();
+    let mut rows = Vec::new();
+    for batch in &batches {
+        let ids = batch
+            .column_by_name("id")
+            .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
+            .unwrap();
+        let ints = batch
+            .column_by_name("v_int")
+            .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
+            .unwrap();
+        let strs = batch
+            .column_by_name("v_str")
+            .and_then(|c| c.as_any().downcast_ref::<StringArray>())
+            .unwrap();
+        for i in 0..batch.num_rows() {
+            rows.push((
+                ids.value(i),
+                if ints.is_null(i) {
+                    None
+                } else {
+                    Some(ints.value(i))
+                },
+                if strs.is_null(i) {
+                    None
+                } else {
+                    Some(strs.value(i).to_string())
+                },
+            ));
+        }
+    }
+    rows.sort_by_key(|row| row.0);
+    rows
+}
+
+#[tokio::test]
+async fn test_pk_dynamic_bucket_partial_update() {
+    let (_tmp, sql_context) = setup_sql_context().await;
+
+    sql_context
+        .sql(
+            "CREATE TABLE paimon.test_db.t_dyn_partial_update (
+                id INT NOT NULL, v_int INT, v_str STRING,
+                PRIMARY KEY (id)
+            ) WITH ('bucket' = '-1', 'merge-engine' = 'partial-update')",
+        )
+        .await
+        .unwrap();
+
+    sql_context
+        .sql(
+            "INSERT INTO paimon.test_db.t_dyn_partial_update VALUES
+             (1, 10, 'old-1'),
+             (2, 20, 'old-2')",
+        )
+        .await
+        .unwrap()
+        .collect()
+        .await
+        .unwrap();
+
+    sql_context
+        .sql(
+            "INSERT INTO paimon.test_db.t_dyn_partial_update VALUES
+             (1, CAST(NULL AS INT), 'new-1'),
+             (2, 200, CAST(NULL AS STRING)),
+             (3, 30, CAST(NULL AS STRING))",
+        )
+        .await
+        .unwrap()
+        .collect()
+        .await
+        .unwrap();
+
+    sql_context
+        .sql(
+            "INSERT INTO paimon.test_db.t_dyn_partial_update VALUES
+             (1, 111, CAST(NULL AS STRING))",
+        )
+        .await
+        .unwrap()
+        .collect()
+        .await
+        .unwrap();
+
+    let rows = collect_partial_update_rows(
+        &sql_context,
+        "SELECT id, v_int, v_str FROM paimon.test_db.t_dyn_partial_update",
+    )
+    .await;
+
+    assert_eq!(
+        rows,
+        vec![
+            (1, Some(111), Some("new-1".to_string())),
+            (2, Some(200), Some("old-2".to_string())),
+            (3, Some(30), None),
+        ]
+    );
+}
+
+async fn latest_splits(table: &paimon::Table) -> Vec<DataSplit> {
+    table
+        .new_read_builder()
+        .new_scan()
+        .with_scan_all_files()
+        .plan()
+        .await
+        .unwrap()
+        .splits()
+        .to_vec()
+}
+
+async fn bucket_containing_id(table: &paimon::Table, id: i32) -> i32 {
+    let read_builder = table.new_read_builder();
+    let read = read_builder.new_read().unwrap();
+    let splits = latest_splits(table).await;
+    let mut buckets = Vec::new();
+    for split in &splits {
+        let batches: Vec<_> =
+            
futures::TryStreamExt::try_collect(read.to_arrow(std::slice::from_ref(split)).unwrap())
+                .await
+                .unwrap();
+        let contains_id = batches.iter().any(|batch| {
+            let ids = batch
+                .column_by_name("id")
+                .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
+                .unwrap();
+            (0..batch.num_rows()).any(|i| ids.value(i) == id)
+        });
+        if contains_id {
+            buckets.push(split.bucket());
+        }
+    }
+
+    buckets.sort();
+    buckets.dedup();
+    assert_eq!(
+        buckets.len(),
+        1,
+        "id={id} should be readable from exactly one bucket"
+    );
+    buckets[0]
+}
+
+async fn index_bucket_count(table: &paimon::Table) -> usize {
+    let entries = read_hash_index_entries(table).await;
+    let mut buckets = entries.iter().map(|entry| 
entry.bucket).collect::<Vec<_>>();
+    buckets.sort();
+    buckets.dedup();
+    buckets.len()
+}
+
+#[tokio::test]
+async fn test_pk_dynamic_bucket_partial_update_restores_existing_bucket() {
+    let (_tmp, catalog) = create_test_env();
+    let sql_context = create_sql_context(catalog.clone()).await;
+    sql_context
+        .sql("CREATE SCHEMA paimon.test_db")
+        .await
+        .expect("CREATE SCHEMA failed");
+
+    sql_context
+        .sql(
+            "CREATE TABLE paimon.test_db.t_dyn_partial_route (
+                id INT NOT NULL, v_int INT, v_str STRING,
+                PRIMARY KEY (id)
+            ) WITH (
+                'bucket' = '-1',
+                'dynamic-bucket.target-row-num' = '1',
+                'merge-engine' = 'partial-update'
+            )",
+        )
+        .await
+        .unwrap();
+
+    sql_context
+        .sql(
+            "INSERT INTO paimon.test_db.t_dyn_partial_route VALUES
+             (2, 20, 'old-2'),
+             (1, 10, 'old-1')",
+        )
+        .await
+        .unwrap()
+        .collect()
+        .await
+        .unwrap();
+
+    let table = catalog
+        .get_table(&Identifier::new("test_db", "t_dyn_partial_route"))
+        .await
+        .unwrap();
+    assert_eq!(
+        index_bucket_count(&table).await,
+        2,
+        "target row number 1 should put two new keys into two HASH index 
buckets"
+    );
+    let id1_bucket = bucket_containing_id(&table, 1).await;
+    assert_ne!(
+        id1_bucket, 0,
+        "test setup writes id=1 second so missing index restore would allocate 
a different bucket"
+    );
+
+    sql_context
+        .sql(
+            "INSERT INTO paimon.test_db.t_dyn_partial_route VALUES
+             (1, CAST(NULL AS INT), 'new-1')",
+        )
+        .await
+        .unwrap()
+        .collect()
+        .await
+        .unwrap();
+
+    let table = catalog
+        .get_table(&Identifier::new("test_db", "t_dyn_partial_route"))
+        .await
+        .unwrap();
+    let id1_bucket_after = bucket_containing_id(&table, 1).await;
+    assert_eq!(
+        id1_bucket_after, id1_bucket,
+        "restored HASH index should route id=1 back to its original bucket"
+    );
+
+    let splits = latest_splits(&table).await;
+    let id1_data_files_in_bucket: usize = splits
+        .iter()
+        .filter(|split| split.bucket() == id1_bucket)
+        .map(|split| split.data_files().len())
+        .sum();
+    assert_eq!(
+        id1_data_files_in_bucket, 2,
+        "id=1 initial row and later partial update should be in the same 
bucket"
+    );
+    let other_bucket_file_count: usize = splits
+        .iter()
+        .filter(|split| split.bucket() != id1_bucket)
+        .map(|split| split.data_files().len())
+        .sum();
+    assert_eq!(
+        other_bucket_file_count, 1,
+        "id=2 should remain in a separate bucket when target row number is 1"
+    );
+
+    let rows = collect_partial_update_rows(
+        &sql_context,
+        "SELECT id, v_int, v_str FROM paimon.test_db.t_dyn_partial_route",
+    )
+    .await;
+    assert_eq!(
+        rows,
+        vec![
+            (1, Some(10), Some("new-1".to_string())),
+            (2, Some(20), Some("old-2".to_string())),
+        ]
+    );
+}
+
 /// Dynamic bucket with partitioned table.
 #[tokio::test]
 async fn test_pk_dynamic_bucket_partitioned() {
@@ -160,6 +422,162 @@ async fn test_pk_dynamic_bucket_partitioned() {
     );
 }
 
+#[tokio::test]
+async fn test_pk_dynamic_bucket_partitioned_partial_update() {
+    let (_tmp, sql_context) = setup_sql_context().await;
+
+    sql_context
+        .sql(
+            "CREATE TABLE paimon.test_db.t_dyn_part_partial_update (
+                dt STRING, id INT NOT NULL, v_int INT, v_str STRING,
+                PRIMARY KEY (dt, id)
+            ) PARTITIONED BY (dt)
+            WITH ('bucket' = '-1', 'merge-engine' = 'partial-update')",
+        )
+        .await
+        .unwrap();
+
+    sql_context
+        .sql(
+            "INSERT INTO paimon.test_db.t_dyn_part_partial_update VALUES
+             ('2024-01-01', 1, 10, 'old-a'),
+             ('2024-01-02', 1, 100, 'old-b')",
+        )
+        .await
+        .unwrap()
+        .collect()
+        .await
+        .unwrap();
+
+    sql_context
+        .sql(
+            "INSERT INTO paimon.test_db.t_dyn_part_partial_update VALUES
+             ('2024-01-01', 1, CAST(NULL AS INT), 'new-a'),
+             ('2024-01-02', 1, 200, CAST(NULL AS STRING))",
+        )
+        .await
+        .unwrap()
+        .collect()
+        .await
+        .unwrap();
+
+    let batches = sql_context
+        .sql(
+            "SELECT dt, id, v_int, v_str
+             FROM paimon.test_db.t_dyn_part_partial_update
+             ORDER BY dt, id",
+        )
+        .await
+        .unwrap()
+        .collect()
+        .await
+        .unwrap();
+
+    let mut rows = Vec::new();
+    for batch in &batches {
+        let dts = batch
+            .column_by_name("dt")
+            .and_then(|c| c.as_any().downcast_ref::<StringArray>())
+            .unwrap();
+        let ids = batch
+            .column_by_name("id")
+            .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
+            .unwrap();
+        let ints = batch
+            .column_by_name("v_int")
+            .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
+            .unwrap();
+        let strs = batch
+            .column_by_name("v_str")
+            .and_then(|c| c.as_any().downcast_ref::<StringArray>())
+            .unwrap();
+        for i in 0..batch.num_rows() {
+            rows.push((
+                dts.value(i).to_string(),
+                ids.value(i),
+                if ints.is_null(i) {
+                    None
+                } else {
+                    Some(ints.value(i))
+                },
+                if strs.is_null(i) {
+                    None
+                } else {
+                    Some(strs.value(i).to_string())
+                },
+            ));
+        }
+    }
+
+    assert_eq!(
+        rows,
+        vec![
+            (
+                "2024-01-01".to_string(),
+                1,
+                Some(10),
+                Some("new-a".to_string())
+            ),
+            (
+                "2024-01-02".to_string(),
+                1,
+                Some(200),
+                Some("old-b".to_string())
+            ),
+        ]
+    );
+}
+
+#[tokio::test]
+async fn test_rejects_cross_partition_dynamic_bucket_partial_update() {
+    let (_tmp, sql_context) = setup_sql_context().await;
+
+    sql_context
+        .sql(
+            "CREATE TABLE paimon.test_db.t_cross_partial_update (
+                dt STRING, id INT NOT NULL, v_int INT,
+                PRIMARY KEY (id)
+            ) PARTITIONED BY (dt)
+            WITH ('bucket' = '-1', 'merge-engine' = 'partial-update')",
+        )
+        .await
+        .unwrap();
+
+    let result = sql_context
+        .sql("INSERT INTO paimon.test_db.t_cross_partial_update VALUES 
('2024-01-01', 1, 10)")
+        .await
+        .unwrap()
+        .collect()
+        .await;
+
+    let err = result.unwrap_err().to_string();
+    assert!(
+        err.contains("cross-partition update"),
+        "expected cross-partition partial-update rejection, got: {err}"
+    );
+}
+
+#[tokio::test]
+async fn test_rejects_partition_only_primary_key_partial_update() {
+    let (_tmp, sql_context) = setup_sql_context().await;
+
+    let result = sql_context
+        .sql(
+            "CREATE TABLE paimon.test_db.t_partition_only_pk (
+                dt STRING NOT NULL, v_int INT,
+                PRIMARY KEY (dt)
+            ) PARTITIONED BY (dt)
+            WITH ('bucket' = '-1', 'merge-engine' = 'partial-update')",
+        )
+        .await;
+
+    let err = result.unwrap_err().to_string();
+    assert!(
+        err.contains("only one record in a partition"),
+        "expected partition-only primary key rejection, got: {err}"
+    );
+}
+
 /// Dynamic bucket with three commits — verifies sequence number tracking.
 #[tokio::test]
 async fn test_pk_dynamic_bucket_three_commits() {
@@ -225,7 +643,7 @@ async fn read_hash_index_entries(table: &paimon::Table) -> 
Vec<IndexManifestEntr
         .collect()
 }
 
-/// Helper: read raw hash values from a hash index file (flat i32 
little-endian).
+/// Helper: read raw hash values from a hash index file (flat i32 big-endian).
 async fn read_hash_file(table: &paimon::Table, file_name: &str) -> Vec<i32> {
     let path = format!("{}/index/{}", table.location(), file_name);
     let input = table.file_io().new_input(&path).unwrap();
diff --git a/crates/paimon/src/spec/schema.rs b/crates/paimon/src/spec/schema.rs
index ddada6a..e6d5af9 100644
--- a/crates/paimon/src/spec/schema.rs
+++ b/crates/paimon/src/spec/schema.rs
@@ -356,6 +356,7 @@ impl Schema {
         Self::validate_no_duplicate_fields(&field_names)?;
         Self::validate_partition_keys(&field_names, partition_keys)?;
         Self::validate_primary_keys(&field_names, primary_keys)?;
+        Self::validate_primary_keys_not_partition_only(partition_keys, 
primary_keys)?;
 
         if primary_keys.is_empty() {
             return Ok(fields.to_vec());
@@ -445,6 +446,29 @@ impl Schema {
         Ok(())
     }
 
+    fn validate_primary_keys_not_partition_only(
+        partition_keys: &[String],
+        primary_keys: &[String],
+    ) -> crate::Result<()> {
+        if primary_keys.is_empty() || partition_keys.is_empty() {
+            return Ok(());
+        }
+
+        let partition_set: HashSet<&str> = 
partition_keys.iter().map(String::as_str).collect();
+        if primary_keys
+            .iter()
+            .all(|pk| partition_set.contains(pk.as_str()))
+        {
+            return Err(crate::Error::ConfigInvalid {
+                message: format!(
+                    "Primary key constraint {primary_keys:?} should not be 
same with partition fields {partition_keys:?}, this will result in only one 
record in a partition"
+                ),
+            });
+        }
+
+        Ok(())
+    }
+
     fn validate_blob_fields(
         fields: &[DataField],
         partition_keys: &[String],
@@ -826,6 +850,18 @@ mod tests {
             "primary key not in columns should be rejected"
         );
 
+        // Primary key cannot be fully covered by partition keys.
+        let res = Schema::builder()
+            .column("a", DataType::Int(IntType::with_nullable(false)))
+            .column("b", DataType::Int(IntType::new()))
+            .partition_keys(["a", "b"])
+            .primary_key(["a"])
+            .build();
+        assert!(
+            matches!(res, Err(crate::Error::ConfigInvalid { message }) if 
message.contains("only one record in a partition")),
+            "primary key fully covered by partition keys should be rejected"
+        );
+
         // primary-key in options and DDL at same time
         let res = Schema::builder()
             .column("a", DataType::Int(IntType::with_nullable(false)))
diff --git a/crates/paimon/src/table/bucket_assigner_dynamic.rs 
b/crates/paimon/src/table/bucket_assigner_dynamic.rs
index 6adafb2..9db4db9 100644
--- a/crates/paimon/src/table/bucket_assigner_dynamic.rs
+++ b/crates/paimon/src/table/bucket_assigner_dynamic.rs
@@ -41,7 +41,7 @@ const HASH_INDEX: &str = "HASH";
 
 /// Read/write hash index files.
 ///
-/// A hash index file is a flat binary file containing `i32` values in 
little-endian byte order.
+/// A hash index file is a flat binary file containing `i32` values in 
big-endian byte order.
 /// Each value is the hash code of a primary key that belongs to the 
associated bucket.
 struct HashIndexFile;
 
diff --git a/crates/paimon/src/table/kv_file_writer.rs 
b/crates/paimon/src/table/kv_file_writer.rs
index 6b43445..a6d7180 100644
--- a/crates/paimon/src/table/kv_file_writer.rs
+++ b/crates/paimon/src/table/kv_file_writer.rs
@@ -78,7 +78,6 @@ pub(crate) struct KeyValueWriteConfig {
     pub sequence_field_indices: Vec<usize>,
     /// Merge engine for deduplication.
     pub merge_engine: MergeEngine,
-    pub dynamic_bucket_enabled: bool,
     pub deletion_vectors_enabled: bool,
 }
 
@@ -100,15 +99,6 @@ impl KeyValueFileWriter {
                     ),
                 });
             }
-
-            if config.dynamic_bucket_enabled {
-                return Err(crate::Error::Unsupported {
-                    message: format!(
-                        "Table '{}' uses merge-engine=partial-update with 
bucket=-1, which is not supported yet; currently only fixed-bucket 
partial-update is supported",
-                        config.table_name
-                    ),
-                });
-            }
         }
 
         Ok(Self {
@@ -553,7 +543,6 @@ mod tests {
             primary_key_types: vec![DataType::Int(IntType::new())],
             sequence_field_indices: vec![1],
             merge_engine,
-            dynamic_bucket_enabled: false,
             deletion_vectors_enabled: false,
         }
     }
@@ -621,21 +610,6 @@ mod tests {
         assert_eq!(selected, vec![0, 1]);
     }
 
-    #[test]
-    fn test_new_rejects_partial_update_dynamic_bucket() {
-        let mut config = test_write_config(MergeEngine::PartialUpdate);
-        config.dynamic_bucket_enabled = true;
-
-        let err = 
KeyValueFileWriter::new(FileIOBuilder::new("memory").build().unwrap(), config, 
0)
-            .err()
-            .unwrap();
-
-        assert!(matches!(
-            err,
-            crate::Error::Unsupported { message } if 
message.contains("bucket=-1")
-        ));
-    }
-
     #[test]
     fn test_new_rejects_partial_update_with_deletion_vectors() {
         let mut config = test_write_config(MergeEngine::PartialUpdate);
diff --git a/crates/paimon/src/table/table_write.rs 
b/crates/paimon/src/table/table_write.rs
index 167cc85..49c0019 100644
--- a/crates/paimon/src/table/table_write.rs
+++ b/crates/paimon/src/table/table_write.rs
@@ -139,13 +139,15 @@ impl TableWrite {
         let has_primary_keys = !schema.primary_keys().is_empty();
         let is_dynamic_bucket = has_primary_keys && total_buckets == -1;
 
-        let is_cross_partition = is_dynamic_bucket && 
!schema.partition_keys().is_empty() && {
-            let pk_set: HashSet<&str> = 
schema.primary_keys().iter().map(String::as_str).collect();
-            schema
-                .partition_keys()
-                .iter()
-                .any(|p| !pk_set.contains(p.as_str()))
-        };
+        let is_dynamic_cross_partition =
+            is_dynamic_bucket && !schema.partition_keys().is_empty() && {
+                let pk_set: HashSet<&str> =
+                    schema.primary_keys().iter().map(String::as_str).collect();
+                schema
+                    .partition_keys()
+                    .iter()
+                    .any(|p| !pk_set.contains(p.as_str()))
+            };
 
         if has_primary_keys
             && !is_dynamic_bucket
@@ -223,6 +225,14 @@ impl TableWrite {
 
         let merge_engine = core_options.merge_engine()?;
 
+        if is_dynamic_cross_partition && merge_engine == 
MergeEngine::PartialUpdate {
+            return Err(crate::Error::Unsupported {
+                message:
+                    "merge-engine=partial-update with cross-partition update 
is not supported yet"
+                        .to_string(),
+            });
+        }
+
         if has_primary_keys && core_options.rowkind_field().is_some() {
             return Err(crate::Error::Unsupported {
                 message: "KeyValueFileWriter does not support 
rowkind.field".to_string(),
@@ -231,7 +241,7 @@ impl TableWrite {
 
         let target_bucket_row_number = 
core_options.dynamic_bucket_target_row_num();
 
-        let bucket_assigner = if is_cross_partition {
+        let bucket_assigner = if is_dynamic_cross_partition {
             
BucketAssignerEnum::CrossPartition(Box::new(CrossPartitionAssigner::new(
                 table.clone(),
                 partition_field_indices,
@@ -676,10 +686,6 @@ impl TableWrite {
                 primary_key_types: self.primary_key_types.clone(),
                 sequence_field_indices: self.sequence_field_indices.clone(),
                 merge_engine: self.merge_engine,
-                dynamic_bucket_enabled: matches!(
-                    self.bucket_assigner,
-                    BucketAssignerEnum::Dynamic(_) | 
BucketAssignerEnum::CrossPartition(_)
-                ),
                 deletion_vectors_enabled: 
CoreOptions::new(self.table.schema().options())
                     .deletion_vectors_enabled(),
             },
@@ -930,7 +936,7 @@ mod tests {
     }
 
     #[tokio::test]
-    async fn 
test_rejects_partial_update_dynamic_bucket_table_when_creating_writer() {
+    async fn test_allows_partial_update_dynamic_bucket_table() {
         let file_io = test_file_io();
         let table_path = "memory:/test_partial_update_dynamic_bucket_table";
         setup_dirs(&file_io, table_path).await;
@@ -953,13 +959,10 @@ mod tests {
         );
 
         let mut table_write = TableWrite::new(&table, 
"test-user".to_string()).unwrap();
-        let err = table_write
+        table_write
             .write_arrow_batch(&make_batch(vec![1], vec![10]))
             .await
-            .unwrap_err();
-        assert!(
-            matches!(err, crate::Error::Unsupported { message } if 
message.contains("bucket=-1"))
-        );
+            .unwrap();
     }
 
     #[tokio::test]
@@ -1900,6 +1903,39 @@ mod tests {
         ));
     }
 
+    #[test]
+    fn test_rejects_cross_partition_partial_update() {
+        let file_io = test_file_io();
+        let table_path = "memory:/test_cross_partial_update";
+        let schema = Schema::builder()
+            .column("pt", DataType::VarChar(VarCharType::string_type()))
+            .column("id", DataType::Int(IntType::new()))
+            .column("value", DataType::Int(IntType::new()))
+            .primary_key(["id"])
+            .partition_keys(["pt"])
+            .option("merge-engine", "partial-update")
+            .build()
+            .unwrap();
+        let table = Table::new(
+            file_io,
+            Identifier::new("default", "test_cross_partial_update"),
+            table_path.to_string(),
+            TableSchema::new(0, &schema),
+            None,
+        );
+
+        let err = match TableWrite::new(&table, "test-user".to_string()) {
+            Ok(_) => panic!("cross-partition partial-update should be 
rejected"),
+            Err(err) => err,
+        };
+
+        assert!(matches!(
+            err,
+            crate::Error::Unsupported { message }
+            if message.contains("cross-partition update")
+        ));
+    }
+
     #[tokio::test]
     async fn test_cross_partition_write_same_partition() {
         let file_io = test_file_io();

Reply via email to