This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f0907ef7da [python] Fix duplicate /snapshot REST call in plan flow
(#7997)
f0907ef7da is described below
commit f0907ef7da3df9c738dd98b54803f1cabc47a911
Author: XiaoHongbo <[email protected]>
AuthorDate: Thu May 28 17:42:29 2026 +0800
[python] Fix duplicate /snapshot REST call in plan flow (#7997)
---
.../pypaimon/globalindex/global_index_scanner.py | 11 ++-
.../pypaimon/read/scanner/file_scanner.py | 17 +++--
paimon-python/pypaimon/tests/global_index_test.py | 84 ++++++++++++++++++++++
3 files changed, 102 insertions(+), 10 deletions(-)
diff --git a/paimon-python/pypaimon/globalindex/global_index_scanner.py
b/paimon-python/pypaimon/globalindex/global_index_scanner.py
index e8e34f641a..4f73ecc882 100644
--- a/paimon-python/pypaimon/globalindex/global_index_scanner.py
+++ b/paimon-python/pypaimon/globalindex/global_index_scanner.py
@@ -82,12 +82,16 @@ class GlobalIndexScanner:
return GlobalIndexEvaluator(fields, readers_function, self._executor)
@staticmethod
- def create(table, index_files=None, partition_filter=None, predicate=None)
-> Optional['GlobalIndexScanner']:
+ def create(table, index_files=None, partition_filter=None, predicate=None,
+ snapshot=None) -> Optional['GlobalIndexScanner']:
"""Create a GlobalIndexScanner.
Can be called in two ways:
1. create(table, index_files) - with explicit index files
- 2. create(table, partition_filter=..., predicate=...) - scan index
files from snapshot
+ 2. create(table, partition_filter=..., predicate=..., snapshot=...) -
+ scan index files from snapshot. ``snapshot`` may be passed in by the
+ caller to avoid a duplicate ``get_latest_snapshot`` REST round-trip
+ (the caller usually already fetched it for manifest scanning).
"""
from pypaimon.index.index_file_handler import IndexFileHandler
@@ -119,7 +123,8 @@ class GlobalIndexScanner:
return False
return global_index_meta.index_field_id in filter_field_ids
- snapshot = table.snapshot_manager().get_latest_snapshot()
+ if snapshot is None:
+ snapshot = table.snapshot_manager().get_latest_snapshot()
index_file_handler = IndexFileHandler(table=table)
entries = index_file_handler.scan(snapshot, index_file_filter)
scanned_index_files = [entry.index_file for entry in entries]
diff --git a/paimon-python/pypaimon/read/scanner/file_scanner.py
b/paimon-python/pypaimon/read/scanner/file_scanner.py
index 1d2831c194..5424d2d519 100755
--- a/paimon-python/pypaimon/read/scanner/file_scanner.py
+++ b/paimon-python/pypaimon/read/scanner/file_scanner.py
@@ -285,8 +285,14 @@ class FileScanner:
def _create_data_evolution_split_generator(self):
row_ranges = None
score_getter = None
+ # Fetch snapshot once and share with global index evaluation to avoid
+ # a duplicate /snapshot REST round-trip (#7513).
+ manifest_files, snapshot = self.manifest_scanner()
+ self._scanned_snapshot = snapshot
+ self._scanned_snapshot_id = snapshot.id if snapshot else None
+
global_index_result = self._global_index_result if
self._global_index_result is not None \
- else self._eval_global_index()
+ else self._eval_global_index(snapshot)
if global_index_result is not None:
row_ranges = global_index_result.results().to_range_list()
if isinstance(global_index_result, ScoredGlobalIndexResult):
@@ -294,10 +300,6 @@ class FileScanner:
if row_ranges is None and self.predicate is not None:
row_ranges = _row_ranges_from_predicate(self.predicate)
- manifest_files, snapshot = self.manifest_scanner()
- self._scanned_snapshot = snapshot
- self._scanned_snapshot_id = snapshot.id if snapshot else None
-
# Filter manifest files by row ranges if available
if row_ranges is not None:
manifest_files =
_filter_manifest_files_by_row_ranges(manifest_files, row_ranges)
@@ -324,7 +326,7 @@ class FileScanner:
return []
return self.read_manifest_entries(manifest_files)
- def _eval_global_index(self):
+ def _eval_global_index(self, snapshot=None):
# No filter - nothing to evaluate
if self.predicate is None:
return None
@@ -339,7 +341,8 @@ class FileScanner:
scanner = GlobalIndexScanner.create(
self.table,
partition_filter=self.partition_key_predicate,
- predicate=self.predicate
+ predicate=self.predicate,
+ snapshot=snapshot,
)
if scanner is None:
return None
diff --git a/paimon-python/pypaimon/tests/global_index_test.py
b/paimon-python/pypaimon/tests/global_index_test.py
index 691c081c10..873e933584 100644
--- a/paimon-python/pypaimon/tests/global_index_test.py
+++ b/paimon-python/pypaimon/tests/global_index_test.py
@@ -16,8 +16,17 @@
# under the License.
import unittest
+from unittest.mock import patch
+
+import pyarrow as pa
from pypaimon.globalindex.global_index_result import GlobalIndexResult
+from pypaimon.index.index_file_handler import IndexFileHandler
+from pypaimon.snapshot.snapshot_manager import SnapshotManager
+from pypaimon.tests.data_evolution_test_helpers import (
+ BatchModeMixin,
+ DataEvolutionTestBase,
+)
from pypaimon.utils.range import Range
@@ -38,3 +47,78 @@ class GlobalIndexTest(unittest.TestCase):
result = result.and_(other)
self.assertEqual(result.results().cardinality(), 10001)
+
+
+class PlanSnapshotFetchRegressionTest(
+ BatchModeMixin, DataEvolutionTestBase, unittest.TestCase):
+
+ table_options = {
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ 'global-index.enabled': 'true',
+ 'bucket': '-1',
+ }
+
+ def test_plan_fetches_latest_snapshot_only_once(self):
+ table = self._create_table()
+ self._write_arrow(table, pa.table(
+ {'id': [1, 2, 3], 'name': ['a', 'b', 'c'],
+ 'age': [10, 20, 30], 'city': ['x', 'y', 'z']},
+ schema=self.pa_schema))
+
+ fresh_table = self.catalog.get_table(table.identifier.get_full_name())
+ rb = fresh_table.new_read_builder()
+ rb = rb.with_filter(rb.new_predicate_builder().equal('id', 1))
+
+ orig_get_latest = SnapshotManager.get_latest_snapshot
+ call_count = [0]
+
+ def counting(self_sm, *args, **kwargs):
+ call_count[0] += 1
+ return orig_get_latest(self_sm, *args, **kwargs)
+
+ with patch.object(SnapshotManager, 'get_latest_snapshot', counting):
+ rb.new_scan().plan().splits()
+
+ self.assertEqual(
+ 1, call_count[0],
+ msg=f"Plan fetched latest snapshot {call_count[0]} times — "
+ "duplicate from #7513: manifest_scanner + "
+ "GlobalIndexScanner.create both fetch independently.")
+
+ def test_time_travel_plan(self):
+ table = self._create_table()
+ self._write_arrow(table, pa.table(
+ {'id': [1], 'name': ['a'], 'age': [10], 'city': ['x']},
+ schema=self.pa_schema))
+ snapshot_1_id = table.snapshot_manager().get_latest_snapshot().id
+ self._write_arrow(table, pa.table(
+ {'id': [2], 'name': ['b'], 'age': [20], 'city': ['y']},
+ schema=self.pa_schema))
+
+ travel_table = self.catalog.get_table(
+ table.identifier.get_full_name()
+ ).copy({'scan.snapshot-id': str(snapshot_1_id)})
+ rb = travel_table.new_read_builder()
+ rb = rb.with_filter(rb.new_predicate_builder().equal('id', 1))
+
+ orig_scan = IndexFileHandler.scan
+ seen_snapshot_ids = []
+
+ def spy_scan(self_h, snapshot, entry_filter=None):
+ seen_snapshot_ids.append(snapshot.id if snapshot else None)
+ return orig_scan(self_h, snapshot, entry_filter)
+
+ with patch.object(IndexFileHandler, 'scan', spy_scan):
+ rb.new_scan().plan().splits()
+
+ self.assertTrue(seen_snapshot_ids,
+ "IndexFileHandler.scan was never called")
+ self.assertEqual(
+ snapshot_1_id, seen_snapshot_ids[0],
+ msg=f"Global index evaluated against snapshot "
+ f"{seen_snapshot_ids[0]}, expected time-travel snapshot "
+ f"{snapshot_1_id}. Before #7513 was fixed, "
+ "GlobalIndexScanner.create self-fetched latest snapshot, "
+ "so global index used latest while manifest used the "
+ "time-travel snapshot — silent correctness bug.")