tub commented on code in PR #7347:
URL: https://github.com/apache/paimon/pull/7347#discussion_r2895271651


##########
paimon-python/pypaimon/common/identifier.py:
##########
@@ -37,13 +37,78 @@ def create(cls, database: str, object: str) -> "Identifier":
 
     @classmethod
     def from_string(cls, full_name: str) -> "Identifier":
-        parts = full_name.split(".")
-        if len(parts) == 2:
-            return cls(parts[0], parts[1])
-        elif len(parts) == 3:
-            return cls(parts[0], parts[1], parts[2])
-        else:
-            raise ValueError("Invalid identifier format: {}".format(full_name))
+        """

Review Comment:
   Too verbose, can be described in a sentence or two



##########
paimon-python/pypaimon/common/options/core_options.py:
##########
@@ -46,6 +45,21 @@ class MergeEngine(str, Enum):
     FIRST_ROW = "first-row"
 
 
+class ChangelogProducer(str, Enum):

Review Comment:
   We don't need to add ChangelogProducer yet, move to #7348 



##########
paimon-python/pypaimon/common/identifier.py:
##########
@@ -37,13 +37,78 @@ def create(cls, database: str, object: str) -> "Identifier":
 
     @classmethod
     def from_string(cls, full_name: str) -> "Identifier":
-        parts = full_name.split(".")
-        if len(parts) == 2:
-            return cls(parts[0], parts[1])
-        elif len(parts) == 3:
-            return cls(parts[0], parts[1], parts[2])
-        else:
-            raise ValueError("Invalid identifier format: {}".format(full_name))
+        """
+        Parse an identifier from a string.
+
+        Supports two modes:
+        1. Backtick-quoted (for names with periods):
+           - `database.name`.table → database="database.name", object="table"
+           - `db`.`table` → database="db", object="table"
+
+        2. Simple split on first period (Java-compatible fallback):
+           - database.table → database="database", object="table"
+
+        For database names containing periods, use backticks or
+        Identifier.create("database.name", "table") directly.
+
+        Args:
+            full_name: The full identifier string
+
+        Returns:
+            Identifier instance
+
+        Raises:
+            ValueError: If the format is invalid
+        """
+        if not full_name or not full_name.strip():
+            raise ValueError("fullName cannot be null or empty")
+
+        # Check if backticks are used - if so, parse with backtick support
+        if '`' in full_name:
+            return cls._parse_with_backticks(full_name)
+
+        # Otherwise, use Java-compatible split on first period only
+        parts = full_name.split(".", 1)
+
+        if len(parts) != 2:
+            raise ValueError(
+                f"Cannot get splits from '{full_name}' to get database and 
object"
+            )
+
+        return cls(parts[0], parts[1])
+
+    @classmethod
+    def _parse_with_backticks(cls, full_name: str) -> "Identifier":
+        """
+        Parse identifier with backtick-quoted segments.
+
+        Examples:
+        - `db.name`.table → database="db.name", object="table"
+        - `db`.`table` → database="db", object="table"

Review Comment:
   ```suggestion
   ```



##########
paimon-python/pypaimon/filesystem/pyarrow_file_io.py:
##########
@@ -233,13 +234,36 @@ def exists(self, path: str) -> bool:
         path_str = self.to_filesystem_path(path)
         return self._get_file_info(path_str).type != pafs.FileType.NotFound
 
+    def exists_batch(self, paths: List[str]) -> Dict[str, bool]:
+        """
+        Check existence of multiple paths in one S3 API call.

Review Comment:
   Too verbose



##########
paimon-python/pypaimon/manifest/manifest_list_manager.py:
##########
@@ -50,10 +58,60 @@ def read_all(self, snapshot: Optional[Snapshot]) -> 
List[ManifestFileMeta]:
         manifest_files.extend(delta_manifests)
         return manifest_files
 
+    def read_base(self, snapshot: Snapshot) -> List[ManifestFileMeta]:
+        """
+        Read only base manifest list (all files at this snapshot point).
+
+        This is useful for computing diffs between two snapshots, where
+        we only need the base state and not the delta changes.
+
+        Args:
+            snapshot: The snapshot to read base manifest from
+
+        Returns:
+            List of ManifestFileMeta representing all files at this snapshot
+        """
+        return self.read(snapshot.base_manifest_list)
+
     def read_delta(self, snapshot: Snapshot) -> List[ManifestFileMeta]:
         return self.read(snapshot.delta_manifest_list)
 
+    def read_changelog(self, snapshot: Snapshot) -> List[ManifestFileMeta]:
+        """

Review Comment:
   Too verbose



##########
paimon-python/pypaimon/snapshot/snapshot_manager.py:
##########
@@ -28,7 +31,10 @@
 class SnapshotManager:
     """Manager for snapshot files using unified FileIO."""
 
-    def __init__(self, table):
+    # Default cache size for snapshots (matching Java's typical cache size)

Review Comment:
   ```suggestion
   ```



##########
paimon-python/pypaimon/snapshot/snapshot_manager.py:
##########
@@ -159,17 +192,134 @@ def earlier_or_equal_time_mills(self, timestamp: int) -> 
Optional[Snapshot]:
 
     def get_snapshot_by_id(self, snapshot_id: int) -> Optional[Snapshot]:
         """
-        Get a snapshot by its ID.
+        Get a snapshot by its ID, using cache if available.
 
         Args:
             snapshot_id: The snapshot ID
 
         Returns:
             The snapshot with the specified ID, or None if not found
         """
+        # Check cache first
+        if snapshot_id in self._cache:
+            self._cache_hits += 1
+            return self._cache[snapshot_id]
+
+        self._cache_misses += 1
         snapshot_file = self.get_snapshot_path(snapshot_id)
         if not self.file_io.exists(snapshot_file):
             return None
 
         snapshot_content = self.file_io.read_file_utf8(snapshot_file)
-        return JSON.from_json(snapshot_content, Snapshot)
+        snapshot = JSON.from_json(snapshot_content, Snapshot)
+
+        # Cache the result
+        if snapshot is not None:
+            self._cache[snapshot_id] = snapshot
+
+        return snapshot
+
+    def get_snapshots_batch(
+        self, snapshot_ids: List[int], max_workers: int = 4
+    ) -> Dict[int, Optional[Snapshot]]:
+        """

Review Comment:
   Too verbose



##########
paimon-python/pypaimon/snapshot/snapshot_manager.py:
##########
@@ -159,17 +192,134 @@ def earlier_or_equal_time_mills(self, timestamp: int) -> 
Optional[Snapshot]:
 
     def get_snapshot_by_id(self, snapshot_id: int) -> Optional[Snapshot]:
         """
-        Get a snapshot by its ID.
+        Get a snapshot by its ID, using cache if available.
 
         Args:
             snapshot_id: The snapshot ID
 
         Returns:
             The snapshot with the specified ID, or None if not found
         """
+        # Check cache first
+        if snapshot_id in self._cache:
+            self._cache_hits += 1
+            return self._cache[snapshot_id]
+
+        self._cache_misses += 1
         snapshot_file = self.get_snapshot_path(snapshot_id)
         if not self.file_io.exists(snapshot_file):
             return None
 
         snapshot_content = self.file_io.read_file_utf8(snapshot_file)
-        return JSON.from_json(snapshot_content, Snapshot)
+        snapshot = JSON.from_json(snapshot_content, Snapshot)
+
+        # Cache the result
+        if snapshot is not None:
+            self._cache[snapshot_id] = snapshot
+
+        return snapshot
+
+    def get_snapshots_batch(
+        self, snapshot_ids: List[int], max_workers: int = 4
+    ) -> Dict[int, Optional[Snapshot]]:
+        """
+        Fetch multiple snapshots in parallel.
+
+        This is more efficient than sequential fetches when reading from S3
+        or other high-latency storage systems.
+
+        Args:
+            snapshot_ids: List of snapshot IDs to fetch
+            max_workers: Maximum number of parallel workers
+
+        Returns:
+            Dictionary mapping snapshot ID to Snapshot (or None if not found)
+        """
+        if not snapshot_ids:
+            return {}
+
+        # First, batch check which snapshot files exist
+        paths = [self.get_snapshot_path(sid) for sid in snapshot_ids]
+        existence = self.file_io.exists_batch(paths)
+
+        # Filter to only existing snapshots
+        existing_ids = [
+            sid for sid, path in zip(snapshot_ids, paths)
+            if existence.get(path, False)
+        ]
+
+        if not existing_ids:
+            return {sid: None for sid in snapshot_ids}
+
+        # Fetch existing snapshots in parallel
+        def fetch_one(sid: int) -> tuple:
+            try:
+                return (sid, self.get_snapshot_by_id(sid))
+            except Exception:
+                return (sid, None)
+
+        results = {sid: None for sid in snapshot_ids}
+
+        with ThreadPoolExecutor(max_workers=min(len(existing_ids), 
max_workers)) as executor:
+            for sid, snapshot in executor.map(fetch_one, existing_ids):
+                results[sid] = snapshot
+
+        return results
+
+    def find_next_scannable(
+        self,
+        start_id: int,
+        should_scan: Callable[[Snapshot], bool],
+        lookahead_size: int = 10,
+        max_workers: int = 4
+    ) -> tuple:
+        """
+        Find the next scannable snapshot using batch lookahead.
+
+        This method batch-fetches multiple snapshots ahead and finds the first
+        one that passes the should_scan filter. This is more efficient than
+        sequential fetching when many snapshots may be skipped (e.g., COMPACT
+        commits when only APPEND is desired).

Review Comment:
   ```suggestion
   ```



##########
paimon-python/pypaimon/manifest/manifest_file_manager.py:
##########
@@ -16,27 +16,27 @@
 # limitations under the License.
 
################################################################################
 from concurrent.futures import ThreadPoolExecutor
+from datetime import datetime
 from io import BytesIO
-from typing import List
+from typing import List, Optional
 
 import fastavro
-
-from datetime import datetime
+from cachetools import LRUCache
 
 from pypaimon.manifest.schema.data_file_meta import DataFileMeta
 from pypaimon.manifest.schema.manifest_entry import (MANIFEST_ENTRY_SCHEMA,
                                                      ManifestEntry)
 from pypaimon.manifest.schema.manifest_file_meta import ManifestFileMeta
 from pypaimon.manifest.schema.simple_stats import SimpleStats
+from pypaimon.table.row.binary_row import BinaryRow
 from pypaimon.table.row.generic_row import (GenericRowDeserializer,
                                             GenericRowSerializer)
-from pypaimon.table.row.binary_row import BinaryRow
 
 
 class ManifestFileManager:
     """Writer for manifest files in Avro format using unified FileIO."""
 
-    def __init__(self, table):
+    def __init__(self, table, cache_max_size: int = 100):

Review Comment:
   This enables caching by default - is this safe to do? Should we leave it off 
by default and allow it to be configured?



##########
paimon-python/pypaimon/manifest/manifest_list_manager.py:
##########
@@ -50,10 +58,60 @@ def read_all(self, snapshot: Optional[Snapshot]) -> 
List[ManifestFileMeta]:
         manifest_files.extend(delta_manifests)
         return manifest_files
 
+    def read_base(self, snapshot: Snapshot) -> List[ManifestFileMeta]:
+        """

Review Comment:
   Too verbose



##########
paimon-python/pypaimon/manifest/manifest_list_manager.py:
##########
@@ -20,6 +20,7 @@
 from typing import List, Optional
 
 import fastavro
+from cachetools import LRUCache

Review Comment:
   If instead we use @cachetools.func.lru_cache with cache_info=True then the 
stats are handled for us already.



##########
paimon-python/pypaimon/manifest/manifest_list_manager.py:
##########
@@ -32,14 +33,21 @@
 class ManifestListManager:
     """Manager for manifest list files in Avro format using unified FileIO."""
 
-    def __init__(self, table):
+    def __init__(self, table, cache_max_size: int = 50):

Review Comment:
   Is it safe to enable this by default?



##########
paimon-python/pypaimon/snapshot/snapshot_manager.py:
##########
@@ -37,6 +43,11 @@ def __init__(self, table):
         self.snapshot_dir = f"{snapshot_path}/snapshot"
         self.latest_file = f"{self.snapshot_dir}/LATEST"
 
+        # LRU cache for snapshots (like Java's Cache<Path, Snapshot>)

Review Comment:
   again, lets try to use the decorator here with cache_info to avoid 
boilerplate



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to