This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 8186094f9b [python][ray] Pin merge source table snapshot (#8110)
8186094f9b is described below

commit 8186094f9b14fcf69317972b6dbfee31d30e6d3f
Author: QuakeWang <[email protected]>
AuthorDate: Thu Jun 4 12:13:45 2026 +0800

    [python][ray] Pin merge source table snapshot (#8110)
    
    Ray merge-into already pins target reads to the base snapshot, but
    Paimon source tables were still normalized through `read_paimon` without
    an explicit snapshot. Because Ray Dataset execution is lazy, source
    planning could otherwise observe a later table snapshot than the one
    seen during merge preparation.
    
    This PR captures the latest snapshot id for string source tables during
    `_prepare` and passes it to `read_paimon`, so the source side uses a
    stable snapshot throughout merge planning and execution.
---
 .../pypaimon/ray/data_evolution_merge_into.py      | 25 ++++++++++++++---
 .../tests/ray_data_evolution_merge_into_test.py    | 31 ++++++++++++++++++++++
 2 files changed, 53 insertions(+), 3 deletions(-)

diff --git a/paimon-python/pypaimon/ray/data_evolution_merge_into.py 
b/paimon-python/pypaimon/ray/data_evolution_merge_into.py
index 5be68f301e..379655ec77 100644
--- a/paimon-python/pypaimon/ray/data_evolution_merge_into.py
+++ b/paimon-python/pypaimon/ray/data_evolution_merge_into.py
@@ -169,7 +169,19 @@ def _prepare(target, source, catalog_options, 
when_matched, when_not_matched, on
         for c in when_not_matched
     ]
 
-    source_ds = _normalize_source(source, catalog_options)
+    source_snapshot_id = None
+    if isinstance(source, str):
+        source_snapshot = (
+            catalog.get_table(source)
+            .snapshot_manager()
+            .get_latest_snapshot()
+        )
+        if source_snapshot is not None:
+            source_snapshot_id = source_snapshot.id
+
+    source_ds = _normalize_source(
+        source, catalog_options, source_snapshot_id=source_snapshot_id,
+    )
     _validate_source_on_cols(source_ds, source_on_cols)
     _validate_source_has_target_cols(
         source_ds, settable_field_names, on_map,
@@ -438,14 +450,21 @@ def _normalize_set_spec(
     return {col: f"s.{on_map.get(col, col)}" for col in target_field_names}
 
 
-def _normalize_source(source: Any, catalog_options: Dict[str, str]):
+def _normalize_source(
+    source: Any,
+    catalog_options: Dict[str, str],
+    source_snapshot_id: Optional[int] = None,
+):
     import ray.data
 
     if isinstance(source, ray.data.Dataset):
         return source
     if isinstance(source, str):
         from pypaimon.ray.ray_paimon import read_paimon
-        return read_paimon(source, catalog_options)
+        read_kwargs = {}
+        if source_snapshot_id is not None:
+            read_kwargs["snapshot_id"] = source_snapshot_id
+        return read_paimon(source, catalog_options, **read_kwargs)
     if isinstance(source, pa.Table):
         return ray.data.from_arrow(source)
     try:
diff --git a/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py 
b/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py
index 47981088f2..7be8668320 100644
--- a/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py
+++ b/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py
@@ -21,6 +21,7 @@ import shutil
 import tempfile
 import unittest
 import uuid
+from unittest.mock import Mock, patch
 
 import pyarrow as pa
 import ray
@@ -108,6 +109,36 @@ class RayDataEvolutionMergeIntoTest(unittest.TestCase):
         snap = table.snapshot_manager().get_latest_snapshot()
         return snap.id if snap is not None else None
 
+    def test_paimon_source_table_pins_snapshot(self):
+        from pypaimon.ray import data_evolution_merge_into as m
+
+        target = self._create_table()
+        source = self._create_table()
+        self._write(source, self._source(ids=(1,)))
+        expected_snapshot_id = self._snapshot_id(source)
+
+        fake_ds = Mock()
+        fake_ds.schema.return_value = pa.schema([
+            ('id', pa.int32()),
+            ('name', pa.string()),
+            ('age', pa.int32()),
+        ])
+
+        with patch(
+                'pypaimon.ray.ray_paimon.read_paimon',
+                return_value=fake_ds,
+        ) as mock_read_paimon:
+            m._prepare(
+                target, source, self.catalog_options,
+                [WhenMatched(update='*')], [], ['id'],
+            )
+
+        mock_read_paimon.assert_called_once_with(
+            source,
+            self.catalog_options,
+            snapshot_id=expected_snapshot_id,
+        )
+
     def test_no_clause_raises(self):
         target = self._create_table()
         with self.assertRaises(ValueError):

Reply via email to