This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new e838bfcd6f [python][ray] Reject matched updates on partitioned tables
in merge_into (#8078)
e838bfcd6f is described below
commit e838bfcd6f47d45d4d2d3675d5b4c7194fdd425c
Author: XiaoHongbo <[email protected]>
AuthorDate: Tue Jun 2 21:42:59 2026 +0800
[python][ray] Reject matched updates on partitioned tables in merge_into
(#8078)
---
docs/docs/pypaimon/ray-data.md | 4 ++
.../pypaimon/ray/data_evolution_merge_into.py | 6 +-
.../tests/ray_data_evolution_merge_into_test.py | 69 ++++++++++++++++++++++
3 files changed, 78 insertions(+), 1 deletion(-)
diff --git a/docs/docs/pypaimon/ray-data.md b/docs/docs/pypaimon/ray-data.md
index 589a15b9ca..cdbc7939ac 100644
--- a/docs/docs/pypaimon/ray-data.md
+++ b/docs/docs/pypaimon/ray-data.md
@@ -369,6 +369,10 @@ and `num_unchanged` is always `0`; conditional clauses
(added later) can
make `num_unchanged > 0`.
**Notes:**
+- Partition key columns cannot be updated by matched clauses. If the target
+ table is partitioned, `merge_into` raises an error when `when_matched` is
+ specified, because cross-partition row movement is not implemented.
+ Not-matched inserts into partitioned tables work normally.
- Blob columns are not written by `merge_into`: update leaves the existing
`.blob` files untouched, and insert fills blob columns with `NULL`. The
source data does not need to (and should not) carry blob columns.
diff --git a/paimon-python/pypaimon/ray/data_evolution_merge_into.py
b/paimon-python/pypaimon/ray/data_evolution_merge_into.py
index 7ab1ce70f8..b90abdc745 100644
--- a/paimon-python/pypaimon/ray/data_evolution_merge_into.py
+++ b/paimon-python/pypaimon/ray/data_evolution_merge_into.py
@@ -126,6 +126,11 @@ def _prepare(target, source, catalog_options,
when_matched, when_not_matched, on
c for c in full_target_field_names if c not in blob_cols
]
on_map = dict(zip(target_on_cols, source_on_cols))
+ if when_matched and table.partition_keys:
+ raise ValueError(
+ "merge_into does not support matched clauses on partitioned "
+ "tables; cross-partition row movement is not implemented."
+ )
matched_specs = [
_NormalizedClause(
spec=_normalize_set_spec(
@@ -261,7 +266,6 @@ def _execute_and_commit(
f.row_count for m in insert_msgs for f in m.new_files
)
all_msgs.extend(insert_msgs)
- # TODO: add global-index update action check after PR #8045 merges
if all_msgs:
wb = table.new_batch_write_builder()
tc = wb.new_commit()
diff --git a/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py
b/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py
index 6b918264bd..727185a2e4 100644
--- a/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py
+++ b/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py
@@ -450,6 +450,75 @@ class RayDataEvolutionMergeIntoTest(unittest.TestCase):
self.assertEqual(self._snapshot_id(target), before)
+ def test_partitioned_matched_update_rejected(self):
+ pt_schema = pa.schema([
+ ('pt', pa.string()),
+ ('id', pa.int32()),
+ ('name', pa.string()),
+ ])
+ name = f'default.tbl_{uuid.uuid4().hex[:8]}'
+ s = Schema.from_pyarrow_schema(
+ pt_schema, partition_keys=['pt'], options=self.de_options,
+ )
+ self.catalog.create_table(name, s, False)
+
+ source = pa.Table.from_pydict(
+ {
+ 'pt': ['a'],
+ 'id': pa.array([1], type=pa.int32()),
+ 'name': ['x'],
+ },
+ schema=pt_schema,
+ )
+
+ with self.assertRaises(ValueError) as ctx:
+ merge_into(
+ target=name,
+ source=source,
+ catalog_options=self.catalog_options,
+ on=['id'],
+ when_matched=[WhenMatched(update='*')],
+ num_partitions=_TEST_NUM_PARTITIONS,
+ )
+ self.assertIn('partitioned', str(ctx.exception))
+
+ def test_partitioned_insert_allowed(self):
+ pt_schema = pa.schema([
+ ('pt', pa.string()),
+ ('id', pa.int32()),
+ ('name', pa.string()),
+ ])
+ name = f'default.tbl_{uuid.uuid4().hex[:8]}'
+ s = Schema.from_pyarrow_schema(
+ pt_schema, partition_keys=['pt'], options=self.de_options,
+ )
+ self.catalog.create_table(name, s, False)
+
+ source = pa.Table.from_pydict(
+ {
+ 'pt': ['a', 'b'],
+ 'id': pa.array([1, 2], type=pa.int32()),
+ 'name': ['x', 'y'],
+ },
+ schema=pt_schema,
+ )
+
+ merge_into(
+ target=name,
+ source=source,
+ catalog_options=self.catalog_options,
+ on=['id'],
+ when_not_matched=[WhenNotMatched(insert='*')],
+ num_partitions=_TEST_NUM_PARTITIONS,
+ )
+
+ table = self.catalog.get_table(name)
+ rb = table.new_read_builder()
+ splits = rb.new_scan().plan().splits()
+ out = rb.new_read().to_arrow(splits).sort_by('id').to_pydict()
+ self.assertEqual(out['id'], [1, 2])
+ self.assertEqual(out['pt'], ['a', 'b'])
+
class TargetProjectionTest(unittest.TestCase):