This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 625fd3fef7 [python][ray] Fix overly strict rejection of matched
updates on partitioned tables (#8127)
625fd3fef7 is described below
commit 625fd3fef7ada4e2dbc40d9824875d2a51f6edc4
Author: XiaoHongbo <[email protected]>
AuthorDate: Sat Jun 6 16:49:15 2026 +0800
[python][ray] Fix overly strict rejection of matched updates on partitioned
tables (#8127)
---
.../pypaimon/ray/data_evolution_merge_into.py | 15 ++++++---
.../tests/ray_data_evolution_merge_into_test.py | 39 +++++++++++++++++++---
2 files changed, 45 insertions(+), 9 deletions(-)
diff --git a/paimon-python/pypaimon/ray/data_evolution_merge_into.py
b/paimon-python/pypaimon/ray/data_evolution_merge_into.py
index cbfcef907d..31627f9c16 100644
--- a/paimon-python/pypaimon/ray/data_evolution_merge_into.py
+++ b/paimon-python/pypaimon/ray/data_evolution_merge_into.py
@@ -126,11 +126,6 @@ def _prepare(target, source, catalog_options,
when_matched, when_not_matched, on
c for c in full_target_field_names if c not in blob_cols
]
on_map = dict(zip(target_on_cols, source_on_cols))
- if when_matched and table.partition_keys:
- raise ValueError(
- "merge_into does not support matched clauses on partitioned "
- "tables; cross-partition row movement is not implemented."
- )
matched_specs = [
_NormalizedClause(
spec=_normalize_set_spec(
@@ -140,6 +135,16 @@ def _prepare(target, source, catalog_options,
when_matched, when_not_matched, on
)
for c in when_matched
]
+ if matched_specs and table.partition_keys:
+ partition_set = set(table.partition_keys)
+ for clause in matched_specs:
+ modified_partition_cols = partition_set & set(clause.spec.keys())
+ if modified_partition_cols:
+ raise ValueError(
+ f"merge_into does not support updating partition columns "
+ f"{sorted(modified_partition_cols)}; cross-partition row "
+ f"movement is not implemented."
+ )
has_condition = any(
c.condition is not None
for c in list(when_matched) + list(when_not_matched)
diff --git a/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py
b/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py
index b54eeb5cf0..c13e80818d 100644
--- a/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py
+++ b/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py
@@ -577,7 +577,7 @@ class RayDataEvolutionMergeIntoTest(unittest.TestCase):
self.assertEqual(self._snapshot_id(target), before)
- def test_partitioned_matched_update_rejected(self):
+ def test_matched_on_partitioned_table(self):
pt_schema = pa.schema([
('pt', pa.string()),
('id', pa.int32()),
@@ -589,25 +589,56 @@ class RayDataEvolutionMergeIntoTest(unittest.TestCase):
)
self.catalog.create_table(name, s, False)
+ table = self.catalog.get_table(name)
+ wb = table.new_batch_write_builder()
+ writer = wb.new_write()
+ writer.write_arrow(pa.Table.from_pydict(
+ {
+ 'pt': ['a', 'a'],
+ 'id': pa.array([1, 2], type=pa.int32()),
+ 'name': ['old_1', 'old_2'],
+ },
+ schema=pt_schema,
+ ))
+ wb.new_commit().commit(writer.prepare_commit())
+ writer.close()
+
source = pa.Table.from_pydict(
{
'pt': ['a'],
'id': pa.array([1], type=pa.int32()),
- 'name': ['x'],
+ 'name': ['new_1'],
},
schema=pt_schema,
)
+ # Non-partition column update should succeed
+ merge_into(
+ target=name,
+ source=source,
+ catalog_options=self.catalog_options,
+ on=['id'],
+ when_matched=[WhenMatched(update={'name': source_col('name')})],
+ num_partitions=_TEST_NUM_PARTITIONS,
+ )
+
+ rb = table.new_read_builder()
+ splits = rb.new_scan().plan().splits()
+ out = rb.new_read().to_arrow(splits).sort_by('id').to_pydict()
+ self.assertEqual(out['name'], ['new_1', 'old_2'])
+ self.assertEqual(out['pt'], ['a', 'a'])
+
+ # Partition column update should be rejected
with self.assertRaises(ValueError) as ctx:
merge_into(
target=name,
source=source,
catalog_options=self.catalog_options,
on=['id'],
- when_matched=[WhenMatched(update='*')],
+ when_matched=[WhenMatched(update={'pt': source_col('pt')})],
num_partitions=_TEST_NUM_PARTITIONS,
)
- self.assertIn('partitioned', str(ctx.exception))
+ self.assertIn('partition', str(ctx.exception))
def test_partitioned_insert_allowed(self):
pt_schema = pa.schema([