This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 8186094f9b [python][ray] Pin merge source table snapshot (#8110)
8186094f9b is described below
commit 8186094f9b14fcf69317972b6dbfee31d30e6d3f
Author: QuakeWang <[email protected]>
AuthorDate: Thu Jun 4 12:13:45 2026 +0800
[python][ray] Pin merge source table snapshot (#8110)
Ray merge-into already pins target reads to the base snapshot, but
Paimon source tables were still normalized through `read_paimon` without
an explicit snapshot. Because Ray Dataset execution is lazy, source
planning could otherwise observe a later table snapshot than the one
seen during merge preparation.
This PR captures the latest snapshot id for string source tables during
`_prepare` and passes it to `read_paimon`, so the source side uses a
stable snapshot throughout merge planning and execution.
---
.../pypaimon/ray/data_evolution_merge_into.py | 25 ++++++++++++++---
.../tests/ray_data_evolution_merge_into_test.py | 31 ++++++++++++++++++++++
2 files changed, 53 insertions(+), 3 deletions(-)
diff --git a/paimon-python/pypaimon/ray/data_evolution_merge_into.py
b/paimon-python/pypaimon/ray/data_evolution_merge_into.py
index 5be68f301e..379655ec77 100644
--- a/paimon-python/pypaimon/ray/data_evolution_merge_into.py
+++ b/paimon-python/pypaimon/ray/data_evolution_merge_into.py
@@ -169,7 +169,19 @@ def _prepare(target, source, catalog_options,
when_matched, when_not_matched, on
for c in when_not_matched
]
- source_ds = _normalize_source(source, catalog_options)
+ source_snapshot_id = None
+ if isinstance(source, str):
+ source_snapshot = (
+ catalog.get_table(source)
+ .snapshot_manager()
+ .get_latest_snapshot()
+ )
+ if source_snapshot is not None:
+ source_snapshot_id = source_snapshot.id
+
+ source_ds = _normalize_source(
+ source, catalog_options, source_snapshot_id=source_snapshot_id,
+ )
_validate_source_on_cols(source_ds, source_on_cols)
_validate_source_has_target_cols(
source_ds, settable_field_names, on_map,
@@ -438,14 +450,21 @@ def _normalize_set_spec(
return {col: f"s.{on_map.get(col, col)}" for col in target_field_names}
-def _normalize_source(source: Any, catalog_options: Dict[str, str]):
+def _normalize_source(
+ source: Any,
+ catalog_options: Dict[str, str],
+ source_snapshot_id: Optional[int] = None,
+):
import ray.data
if isinstance(source, ray.data.Dataset):
return source
if isinstance(source, str):
from pypaimon.ray.ray_paimon import read_paimon
- return read_paimon(source, catalog_options)
+ read_kwargs = {}
+ if source_snapshot_id is not None:
+ read_kwargs["snapshot_id"] = source_snapshot_id
+ return read_paimon(source, catalog_options, **read_kwargs)
if isinstance(source, pa.Table):
return ray.data.from_arrow(source)
try:
diff --git a/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py
b/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py
index 47981088f2..7be8668320 100644
--- a/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py
+++ b/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py
@@ -21,6 +21,7 @@ import shutil
import tempfile
import unittest
import uuid
+from unittest.mock import Mock, patch
import pyarrow as pa
import ray
@@ -108,6 +109,36 @@ class RayDataEvolutionMergeIntoTest(unittest.TestCase):
snap = table.snapshot_manager().get_latest_snapshot()
return snap.id if snap is not None else None
+ def test_paimon_source_table_pins_snapshot(self):
+ from pypaimon.ray import data_evolution_merge_into as m
+
+ target = self._create_table()
+ source = self._create_table()
+ self._write(source, self._source(ids=(1,)))
+ expected_snapshot_id = self._snapshot_id(source)
+
+ fake_ds = Mock()
+ fake_ds.schema.return_value = pa.schema([
+ ('id', pa.int32()),
+ ('name', pa.string()),
+ ('age', pa.int32()),
+ ])
+
+ with patch(
+ 'pypaimon.ray.ray_paimon.read_paimon',
+ return_value=fake_ds,
+ ) as mock_read_paimon:
+ m._prepare(
+ target, source, self.catalog_options,
+ [WhenMatched(update='*')], [], ['id'],
+ )
+
+ mock_read_paimon.assert_called_once_with(
+ source,
+ self.catalog_options,
+ snapshot_id=expected_snapshot_id,
+ )
+
def test_no_clause_raises(self):
target = self._create_table()
with self.assertRaises(ValueError):