This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new e838bfcd6f [python][ray] Reject matched updates on partitioned tables 
in merge_into (#8078)
e838bfcd6f is described below

commit e838bfcd6f47d45d4d2d3675d5b4c7194fdd425c
Author: XiaoHongbo <[email protected]>
AuthorDate: Tue Jun 2 21:42:59 2026 +0800

    [python][ray] Reject matched updates on partitioned tables in merge_into 
(#8078)
---
 docs/docs/pypaimon/ray-data.md                     |  4 ++
 .../pypaimon/ray/data_evolution_merge_into.py      |  6 +-
 .../tests/ray_data_evolution_merge_into_test.py    | 69 ++++++++++++++++++++++
 3 files changed, 78 insertions(+), 1 deletion(-)

diff --git a/docs/docs/pypaimon/ray-data.md b/docs/docs/pypaimon/ray-data.md
index 589a15b9ca..cdbc7939ac 100644
--- a/docs/docs/pypaimon/ray-data.md
+++ b/docs/docs/pypaimon/ray-data.md
@@ -369,6 +369,10 @@ and `num_unchanged` is always `0`; conditional clauses 
(added later) can
 make `num_unchanged > 0`.
 
 **Notes:**
+- Partition key columns cannot be updated by matched clauses. If the target
+  table is partitioned, `merge_into` raises an error when `when_matched` is
+  specified, because cross-partition row movement is not implemented.
+  Not-matched inserts into partitioned tables work normally.
 - Blob columns are not written by `merge_into`: update leaves the existing
   `.blob` files untouched, and insert fills blob columns with `NULL`. The
   source data does not need to (and should not) carry blob columns.
diff --git a/paimon-python/pypaimon/ray/data_evolution_merge_into.py 
b/paimon-python/pypaimon/ray/data_evolution_merge_into.py
index 7ab1ce70f8..b90abdc745 100644
--- a/paimon-python/pypaimon/ray/data_evolution_merge_into.py
+++ b/paimon-python/pypaimon/ray/data_evolution_merge_into.py
@@ -126,6 +126,11 @@ def _prepare(target, source, catalog_options, 
when_matched, when_not_matched, on
         c for c in full_target_field_names if c not in blob_cols
     ]
     on_map = dict(zip(target_on_cols, source_on_cols))
+    if when_matched and table.partition_keys:
+        raise ValueError(
+            "merge_into does not support matched clauses on partitioned "
+            "tables; cross-partition row movement is not implemented."
+        )
     matched_specs = [
         _NormalizedClause(
             spec=_normalize_set_spec(
@@ -261,7 +266,6 @@ def _execute_and_commit(
             f.row_count for m in insert_msgs for f in m.new_files
         )
         all_msgs.extend(insert_msgs)
-    # TODO: add global-index update action check after PR #8045 merges
     if all_msgs:
         wb = table.new_batch_write_builder()
         tc = wb.new_commit()
diff --git a/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py 
b/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py
index 6b918264bd..727185a2e4 100644
--- a/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py
+++ b/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py
@@ -450,6 +450,75 @@ class RayDataEvolutionMergeIntoTest(unittest.TestCase):
 
         self.assertEqual(self._snapshot_id(target), before)
 
+    def test_partitioned_matched_update_rejected(self):
+        pt_schema = pa.schema([
+            ('pt', pa.string()),
+            ('id', pa.int32()),
+            ('name', pa.string()),
+        ])
+        name = f'default.tbl_{uuid.uuid4().hex[:8]}'
+        s = Schema.from_pyarrow_schema(
+            pt_schema, partition_keys=['pt'], options=self.de_options,
+        )
+        self.catalog.create_table(name, s, False)
+
+        source = pa.Table.from_pydict(
+            {
+                'pt': ['a'],
+                'id': pa.array([1], type=pa.int32()),
+                'name': ['x'],
+            },
+            schema=pt_schema,
+        )
+
+        with self.assertRaises(ValueError) as ctx:
+            merge_into(
+                target=name,
+                source=source,
+                catalog_options=self.catalog_options,
+                on=['id'],
+                when_matched=[WhenMatched(update='*')],
+                num_partitions=_TEST_NUM_PARTITIONS,
+            )
+        self.assertIn('partitioned', str(ctx.exception))
+
+    def test_partitioned_insert_allowed(self):
+        pt_schema = pa.schema([
+            ('pt', pa.string()),
+            ('id', pa.int32()),
+            ('name', pa.string()),
+        ])
+        name = f'default.tbl_{uuid.uuid4().hex[:8]}'
+        s = Schema.from_pyarrow_schema(
+            pt_schema, partition_keys=['pt'], options=self.de_options,
+        )
+        self.catalog.create_table(name, s, False)
+
+        source = pa.Table.from_pydict(
+            {
+                'pt': ['a', 'b'],
+                'id': pa.array([1, 2], type=pa.int32()),
+                'name': ['x', 'y'],
+            },
+            schema=pt_schema,
+        )
+
+        merge_into(
+            target=name,
+            source=source,
+            catalog_options=self.catalog_options,
+            on=['id'],
+            when_not_matched=[WhenNotMatched(insert='*')],
+            num_partitions=_TEST_NUM_PARTITIONS,
+        )
+
+        table = self.catalog.get_table(name)
+        rb = table.new_read_builder()
+        splits = rb.new_scan().plan().splits()
+        out = rb.new_read().to_arrow(splits).sort_by('id').to_pydict()
+        self.assertEqual(out['id'], [1, 2])
+        self.assertEqual(out['pt'], ['a', 'b'])
+
 
 class TargetProjectionTest(unittest.TestCase):
 

Reply via email to