This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 625fd3fef7 [python][ray] Fix overly strict rejection of matched 
updates on partitioned tables (#8127)
625fd3fef7 is described below

commit 625fd3fef7ada4e2dbc40d9824875d2a51f6edc4
Author: XiaoHongbo <[email protected]>
AuthorDate: Sat Jun 6 16:49:15 2026 +0800

    [python][ray] Fix overly strict rejection of matched updates on partitioned 
tables (#8127)
---
 .../pypaimon/ray/data_evolution_merge_into.py      | 15 ++++++---
 .../tests/ray_data_evolution_merge_into_test.py    | 39 +++++++++++++++++++---
 2 files changed, 45 insertions(+), 9 deletions(-)

diff --git a/paimon-python/pypaimon/ray/data_evolution_merge_into.py 
b/paimon-python/pypaimon/ray/data_evolution_merge_into.py
index cbfcef907d..31627f9c16 100644
--- a/paimon-python/pypaimon/ray/data_evolution_merge_into.py
+++ b/paimon-python/pypaimon/ray/data_evolution_merge_into.py
@@ -126,11 +126,6 @@ def _prepare(target, source, catalog_options, 
when_matched, when_not_matched, on
         c for c in full_target_field_names if c not in blob_cols
     ]
     on_map = dict(zip(target_on_cols, source_on_cols))
-    if when_matched and table.partition_keys:
-        raise ValueError(
-            "merge_into does not support matched clauses on partitioned "
-            "tables; cross-partition row movement is not implemented."
-        )
     matched_specs = [
         _NormalizedClause(
             spec=_normalize_set_spec(
@@ -140,6 +135,16 @@ def _prepare(target, source, catalog_options, 
when_matched, when_not_matched, on
         )
         for c in when_matched
     ]
+    if matched_specs and table.partition_keys:
+        partition_set = set(table.partition_keys)
+        for clause in matched_specs:
+            modified_partition_cols = partition_set & set(clause.spec.keys())
+            if modified_partition_cols:
+                raise ValueError(
+                    f"merge_into does not support updating partition columns "
+                    f"{sorted(modified_partition_cols)}; cross-partition row "
+                    f"movement is not implemented."
+                )
     has_condition = any(
         c.condition is not None
         for c in list(when_matched) + list(when_not_matched)
diff --git a/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py 
b/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py
index b54eeb5cf0..c13e80818d 100644
--- a/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py
+++ b/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py
@@ -577,7 +577,7 @@ class RayDataEvolutionMergeIntoTest(unittest.TestCase):
 
         self.assertEqual(self._snapshot_id(target), before)
 
-    def test_partitioned_matched_update_rejected(self):
+    def test_matched_on_partitioned_table(self):
         pt_schema = pa.schema([
             ('pt', pa.string()),
             ('id', pa.int32()),
@@ -589,25 +589,56 @@ class RayDataEvolutionMergeIntoTest(unittest.TestCase):
         )
         self.catalog.create_table(name, s, False)
 
+        table = self.catalog.get_table(name)
+        wb = table.new_batch_write_builder()
+        writer = wb.new_write()
+        writer.write_arrow(pa.Table.from_pydict(
+            {
+                'pt': ['a', 'a'],
+                'id': pa.array([1, 2], type=pa.int32()),
+                'name': ['old_1', 'old_2'],
+            },
+            schema=pt_schema,
+        ))
+        wb.new_commit().commit(writer.prepare_commit())
+        writer.close()
+
         source = pa.Table.from_pydict(
             {
                 'pt': ['a'],
                 'id': pa.array([1], type=pa.int32()),
-                'name': ['x'],
+                'name': ['new_1'],
             },
             schema=pt_schema,
         )
 
+        # Non-partition column update should succeed
+        merge_into(
+            target=name,
+            source=source,
+            catalog_options=self.catalog_options,
+            on=['id'],
+            when_matched=[WhenMatched(update={'name': source_col('name')})],
+            num_partitions=_TEST_NUM_PARTITIONS,
+        )
+
+        rb = table.new_read_builder()
+        splits = rb.new_scan().plan().splits()
+        out = rb.new_read().to_arrow(splits).sort_by('id').to_pydict()
+        self.assertEqual(out['name'], ['new_1', 'old_2'])
+        self.assertEqual(out['pt'], ['a', 'a'])
+
+        # Partition column update should be rejected
         with self.assertRaises(ValueError) as ctx:
             merge_into(
                 target=name,
                 source=source,
                 catalog_options=self.catalog_options,
                 on=['id'],
-                when_matched=[WhenMatched(update='*')],
+                when_matched=[WhenMatched(update={'pt': source_col('pt')})],
                 num_partitions=_TEST_NUM_PARTITIONS,
             )
-        self.assertIn('partitioned', str(ctx.exception))
+        self.assertIn('partition', str(ctx.exception))
 
     def test_partitioned_insert_allowed(self):
         pt_schema = pa.schema([

Reply via email to