This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f0907ef7da [python] Fix duplicate /snapshot REST call in plan flow 
(#7997)
f0907ef7da is described below

commit f0907ef7da3df9c738dd98b54803f1cabc47a911
Author: XiaoHongbo <[email protected]>
AuthorDate: Thu May 28 17:42:29 2026 +0800

    [python] Fix duplicate /snapshot REST call in plan flow (#7997)
---
 .../pypaimon/globalindex/global_index_scanner.py   | 11 ++-
 .../pypaimon/read/scanner/file_scanner.py          | 17 +++--
 paimon-python/pypaimon/tests/global_index_test.py  | 84 ++++++++++++++++++++++
 3 files changed, 102 insertions(+), 10 deletions(-)

diff --git a/paimon-python/pypaimon/globalindex/global_index_scanner.py 
b/paimon-python/pypaimon/globalindex/global_index_scanner.py
index e8e34f641a..4f73ecc882 100644
--- a/paimon-python/pypaimon/globalindex/global_index_scanner.py
+++ b/paimon-python/pypaimon/globalindex/global_index_scanner.py
@@ -82,12 +82,16 @@ class GlobalIndexScanner:
         return GlobalIndexEvaluator(fields, readers_function, self._executor)
 
     @staticmethod
-    def create(table, index_files=None, partition_filter=None, predicate=None) 
-> Optional['GlobalIndexScanner']:
+    def create(table, index_files=None, partition_filter=None, predicate=None,
+               snapshot=None) -> Optional['GlobalIndexScanner']:
         """Create a GlobalIndexScanner.
 
         Can be called in two ways:
         1. create(table, index_files) - with explicit index files
-        2. create(table, partition_filter=..., predicate=...) - scan index 
files from snapshot
+        2. create(table, partition_filter=..., predicate=..., snapshot=...) -
+           scan index files from snapshot. ``snapshot`` may be passed in by the
+           caller to avoid a duplicate ``get_latest_snapshot`` REST round-trip
+           (the caller usually already fetched it for manifest scanning).
         """
         from pypaimon.index.index_file_handler import IndexFileHandler
 
@@ -119,7 +123,8 @@ class GlobalIndexScanner:
                 return False
             return global_index_meta.index_field_id in filter_field_ids
 
-        snapshot = table.snapshot_manager().get_latest_snapshot()
+        if snapshot is None:
+            snapshot = table.snapshot_manager().get_latest_snapshot()
         index_file_handler = IndexFileHandler(table=table)
         entries = index_file_handler.scan(snapshot, index_file_filter)
         scanned_index_files = [entry.index_file for entry in entries]
diff --git a/paimon-python/pypaimon/read/scanner/file_scanner.py 
b/paimon-python/pypaimon/read/scanner/file_scanner.py
index 1d2831c194..5424d2d519 100755
--- a/paimon-python/pypaimon/read/scanner/file_scanner.py
+++ b/paimon-python/pypaimon/read/scanner/file_scanner.py
@@ -285,8 +285,14 @@ class FileScanner:
     def _create_data_evolution_split_generator(self):
         row_ranges = None
         score_getter = None
+        # Fetch snapshot once and share with global index evaluation to avoid
+        # a duplicate /snapshot REST round-trip (#7513).
+        manifest_files, snapshot = self.manifest_scanner()
+        self._scanned_snapshot = snapshot
+        self._scanned_snapshot_id = snapshot.id if snapshot else None
+
         global_index_result = self._global_index_result if 
self._global_index_result is not None \
-            else self._eval_global_index()
+            else self._eval_global_index(snapshot)
         if global_index_result is not None:
             row_ranges = global_index_result.results().to_range_list()
             if isinstance(global_index_result, ScoredGlobalIndexResult):
@@ -294,10 +300,6 @@ class FileScanner:
         if row_ranges is None and self.predicate is not None:
             row_ranges = _row_ranges_from_predicate(self.predicate)
 
-        manifest_files, snapshot = self.manifest_scanner()
-        self._scanned_snapshot = snapshot
-        self._scanned_snapshot_id = snapshot.id if snapshot else None
-
         # Filter manifest files by row ranges if available
         if row_ranges is not None:
             manifest_files = 
_filter_manifest_files_by_row_ranges(manifest_files, row_ranges)
@@ -324,7 +326,7 @@ class FileScanner:
             return []
         return self.read_manifest_entries(manifest_files)
 
-    def _eval_global_index(self):
+    def _eval_global_index(self, snapshot=None):
         # No filter - nothing to evaluate
         if self.predicate is None:
             return None
@@ -339,7 +341,8 @@ class FileScanner:
             scanner = GlobalIndexScanner.create(
                 self.table,
                 partition_filter=self.partition_key_predicate,
-                predicate=self.predicate
+                predicate=self.predicate,
+                snapshot=snapshot,
             )
             if scanner is None:
                 return None
diff --git a/paimon-python/pypaimon/tests/global_index_test.py 
b/paimon-python/pypaimon/tests/global_index_test.py
index 691c081c10..873e933584 100644
--- a/paimon-python/pypaimon/tests/global_index_test.py
+++ b/paimon-python/pypaimon/tests/global_index_test.py
@@ -16,8 +16,17 @@
 # under the License.
 
 import unittest
+from unittest.mock import patch
+
+import pyarrow as pa
 
 from pypaimon.globalindex.global_index_result import GlobalIndexResult
+from pypaimon.index.index_file_handler import IndexFileHandler
+from pypaimon.snapshot.snapshot_manager import SnapshotManager
+from pypaimon.tests.data_evolution_test_helpers import (
+    BatchModeMixin,
+    DataEvolutionTestBase,
+)
 from pypaimon.utils.range import Range
 
 
@@ -38,3 +47,78 @@ class GlobalIndexTest(unittest.TestCase):
             result = result.and_(other)
 
         self.assertEqual(result.results().cardinality(), 10001)
+
+
+class PlanSnapshotFetchRegressionTest(
+        BatchModeMixin, DataEvolutionTestBase, unittest.TestCase):
+
+    table_options = {
+        'row-tracking.enabled': 'true',
+        'data-evolution.enabled': 'true',
+        'global-index.enabled': 'true',
+        'bucket': '-1',
+    }
+
+    def test_plan_fetches_latest_snapshot_only_once(self):
+        table = self._create_table()
+        self._write_arrow(table, pa.table(
+            {'id': [1, 2, 3], 'name': ['a', 'b', 'c'],
+             'age': [10, 20, 30], 'city': ['x', 'y', 'z']},
+            schema=self.pa_schema))
+
+        fresh_table = self.catalog.get_table(table.identifier.get_full_name())
+        rb = fresh_table.new_read_builder()
+        rb = rb.with_filter(rb.new_predicate_builder().equal('id', 1))
+
+        orig_get_latest = SnapshotManager.get_latest_snapshot
+        call_count = [0]
+
+        def counting(self_sm, *args, **kwargs):
+            call_count[0] += 1
+            return orig_get_latest(self_sm, *args, **kwargs)
+
+        with patch.object(SnapshotManager, 'get_latest_snapshot', counting):
+            rb.new_scan().plan().splits()
+
+        self.assertEqual(
+            1, call_count[0],
+            msg=f"Plan fetched latest snapshot {call_count[0]} times — "
+                "duplicate from #7513: manifest_scanner + "
+                "GlobalIndexScanner.create both fetch independently.")
+
+    def test_time_travel_plan(self):
+        table = self._create_table()
+        self._write_arrow(table, pa.table(
+            {'id': [1], 'name': ['a'], 'age': [10], 'city': ['x']},
+            schema=self.pa_schema))
+        snapshot_1_id = table.snapshot_manager().get_latest_snapshot().id
+        self._write_arrow(table, pa.table(
+            {'id': [2], 'name': ['b'], 'age': [20], 'city': ['y']},
+            schema=self.pa_schema))
+
+        travel_table = self.catalog.get_table(
+            table.identifier.get_full_name()
+        ).copy({'scan.snapshot-id': str(snapshot_1_id)})
+        rb = travel_table.new_read_builder()
+        rb = rb.with_filter(rb.new_predicate_builder().equal('id', 1))
+
+        orig_scan = IndexFileHandler.scan
+        seen_snapshot_ids = []
+
+        def spy_scan(self_h, snapshot, entry_filter=None):
+            seen_snapshot_ids.append(snapshot.id if snapshot else None)
+            return orig_scan(self_h, snapshot, entry_filter)
+
+        with patch.object(IndexFileHandler, 'scan', spy_scan):
+            rb.new_scan().plan().splits()
+
+        self.assertTrue(seen_snapshot_ids,
+                        "IndexFileHandler.scan was never called")
+        self.assertEqual(
+            snapshot_1_id, seen_snapshot_ids[0],
+            msg=f"Global index evaluated against snapshot "
+                f"{seen_snapshot_ids[0]}, expected time-travel snapshot "
+                f"{snapshot_1_id}. Before #7513 was fixed, "
+                "GlobalIndexScanner.create self-fetched latest snapshot, "
+                "so global index used latest while manifest used the "
+                "time-travel snapshot — silent correctness bug.")

Reply via email to