JingsongLi commented on code in PR #7347:
URL: https://github.com/apache/paimon/pull/7347#discussion_r2898863833
##########
paimon-python/pypaimon/snapshot/snapshot_manager.py:
##########
@@ -158,18 +179,80 @@ def earlier_or_equal_time_mills(self, timestamp: int) ->
Optional[Snapshot]:
return final_snapshot
def get_snapshot_by_id(self, snapshot_id: int) -> Optional[Snapshot]:
- """
- Get a snapshot by its ID.
-
- Args:
- snapshot_id: The snapshot ID
-
- Returns:
- The snapshot with the specified ID, or None if not found
- """
+ """Get a snapshot by its ID, using cache if available."""
snapshot_file = self.get_snapshot_path(snapshot_id)
if not self.file_io.exists(snapshot_file):
return None
+ return self._load_snapshot(snapshot_id)
- snapshot_content = self.file_io.read_file_utf8(snapshot_file)
+ @cachedmethod(lambda self: self._cache)
+ def _load_snapshot(self, snapshot_id: int) -> Snapshot:
+ """Load a snapshot from storage."""
+ snapshot_content =
self.file_io.read_file_utf8(self.get_snapshot_path(snapshot_id))
return JSON.from_json(snapshot_content, Snapshot)
+
+ def get_snapshots_batch(
+ self, snapshot_ids: List[int], max_workers: int = 4
+ ) -> Dict[int, Optional[Snapshot]]:
+ """Fetch multiple snapshots in parallel, returning {id:
Snapshot|None}."""
+ if not snapshot_ids:
+ return {}
+
+ # First, batch check which snapshot files exist
+ paths = [self.get_snapshot_path(sid) for sid in snapshot_ids]
+ existence = self.file_io.exists_batch(paths)
+
+ # Filter to only existing snapshots
+ existing_ids = [
+ sid for sid, path in zip(snapshot_ids, paths)
+ if existence.get(path, False)
+ ]
+
+ if not existing_ids:
+ return {sid: None for sid in snapshot_ids}
+
+ # Fetch existing snapshots in parallel
+ def fetch_one(sid: int) -> tuple:
+ try:
+ return (sid, self.get_snapshot_by_id(sid))
+ except Exception:
+ return (sid, None)
+
+ results = {sid: None for sid in snapshot_ids}
+
+ with ThreadPoolExecutor(max_workers=min(len(existing_ids),
max_workers)) as executor:
+ for sid, snapshot in executor.map(fetch_one, existing_ids):
+ results[sid] = snapshot
+
+ return results
+
+ def find_next_scannable(
Review Comment:
What's the use of this?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]