This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 87a4c91747 [python] Backtick quoting for identifiers, exists_batch
optimization (#7347)
87a4c91747 is described below
commit 87a4c917478f89d96c91f5ce50cb036514f7ef8a
Author: Toby Cole <[email protected]>
AuthorDate: Tue Mar 10 00:24:17 2026 +0000
[python] Backtick quoting for identifiers, exists_batch optimization (#7347)
- Add backtick quoting to `Identifier` for SQL-safe formatting
- Add `exists_batch()` for bulk file existence checks
- Expose read_changelog in manifest list manager for future streaming reads
---
paimon-python/pypaimon/common/file_io.py | 34 ++--
paimon-python/pypaimon/common/identifier.py | 50 +++++-
.../pypaimon/filesystem/pyarrow_file_io.py | 52 ++++--
.../pypaimon/manifest/manifest_file_manager.py | 15 +-
.../pypaimon/manifest/manifest_list_manager.py | 15 +-
.../pypaimon/snapshot/snapshot_manager.py | 37 ++--
paimon-python/pypaimon/tests/identifier_test.py | 97 ++++++++++
.../tests/manifest/manifest_manager_test.py | 196 +++++++++++++++++++++
8 files changed, 441 insertions(+), 55 deletions(-)
diff --git a/paimon-python/pypaimon/common/file_io.py
b/paimon-python/pypaimon/common/file_io.py
index 31539a9f00..a9dfbbdbd4 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/common/file_io.py
@@ -19,7 +19,7 @@ import logging
import uuid
from abc import ABC, abstractmethod
from pathlib import Path
-from typing import List, Optional
+from typing import Dict, List, Optional
import pyarrow # noqa: F401
import pyarrow.fs as pafs
@@ -31,7 +31,7 @@ class FileIO(ABC):
"""
File IO interface to read and write files.
"""
-
+
@abstractmethod
def new_input_stream(self, path: str):
pass
@@ -39,11 +39,11 @@ class FileIO(ABC):
@abstractmethod
def new_output_stream(self, path: str):
pass
-
+
@abstractmethod
def get_file_status(self, path: str):
pass
-
+
@abstractmethod
def list_status(self, path: str):
pass
@@ -52,18 +52,22 @@ class FileIO(ABC):
def exists(self, path: str) -> bool:
pass
+ def exists_batch(self, paths: List[str]) -> Dict[str, bool]:
+ """Check existence of multiple paths, returning {path: bool}."""
+ return {path: self.exists(path) for path in paths}
+
@abstractmethod
def delete(self, path: str, recursive: bool = False) -> bool:
pass
-
+
@abstractmethod
def mkdirs(self, path: str) -> bool:
pass
-
+
@abstractmethod
def rename(self, src: str, dst: str) -> bool:
pass
-
+
def delete_quietly(self, path: str):
logger = logging.getLogger(__name__)
if logger.isEnabledFor(logging.DEBUG):
@@ -115,7 +119,7 @@ class FileIO(ABC):
if self.exists(path):
if self.is_dir(path):
return False
-
+
temp_path = path + str(uuid.uuid4()) + ".tmp"
success = False
try:
@@ -143,7 +147,7 @@ class FileIO(ABC):
target_str = self.to_filesystem_path(target_path)
target_parent = Path(target_str).parent
-
+
if str(target_parent) and not self.exists(str(target_parent)):
self.mkdirs(str(target_parent))
@@ -191,8 +195,8 @@ class FileIO(ABC):
return path
def parse_location(self, location: str):
- from urllib.parse import urlparse
import os
+ from urllib.parse import urlparse
uri = urlparse(location)
if not uri.scheme:
@@ -249,10 +253,10 @@ class FileIO(ABC):
def write_blob(self, path: str, data, **kwargs):
"""Write Blob format file."""
raise NotImplementedError("write_blob must be implemented by FileIO
subclasses")
-
+
def close(self):
pass
-
+
@staticmethod
def get(path: str, catalog_options: Optional[Options] = None) -> 'FileIO':
"""
@@ -261,13 +265,13 @@ class FileIO(ABC):
- PyArrowFileIO for remote file systems (oss://, s3://, hdfs://, etc.)
"""
from urllib.parse import urlparse
-
+
uri = urlparse(path)
scheme = uri.scheme
-
+
if not scheme or scheme == "file":
from pypaimon.filesystem.local_file_io import LocalFileIO
return LocalFileIO(path, catalog_options)
-
+
from pypaimon.filesystem.pyarrow_file_io import PyArrowFileIO
return PyArrowFileIO(path, catalog_options or Options({}))
diff --git a/paimon-python/pypaimon/common/identifier.py
b/paimon-python/pypaimon/common/identifier.py
index 6851a18074..10b008ae96 100755
--- a/paimon-python/pypaimon/common/identifier.py
+++ b/paimon-python/pypaimon/common/identifier.py
@@ -37,13 +37,49 @@ class Identifier:
@classmethod
def from_string(cls, full_name: str) -> "Identifier":
- parts = full_name.split(".")
- if len(parts) == 2:
- return cls(parts[0], parts[1])
- elif len(parts) == 3:
- return cls(parts[0], parts[1], parts[2])
- else:
- raise ValueError("Invalid identifier format: {}".format(full_name))
+ """Parse a 'database.object' identifier, with optional backtick
quoting."""
+ if not full_name or not full_name.strip():
+ raise ValueError("fullName cannot be null or empty")
+
+ # Check if backticks are used - if so, parse with backtick support
+ if '`' in full_name:
+ return cls._parse_with_backticks(full_name)
+
+ # Otherwise, use Java-compatible split on first period only
+ parts = full_name.split(".", 1)
+
+ if len(parts) != 2:
+ raise ValueError(
+ f"Cannot get splits from '{full_name}' to get database and
object"
+ )
+
+ return cls(parts[0], parts[1])
+
+ @classmethod
+ def _parse_with_backticks(cls, full_name: str) -> "Identifier":
+ parts = []
+ current = ""
+ in_backticks = False
+
+ for char in full_name:
+ if char == '`':
+ in_backticks = not in_backticks
+ elif char == '.' and not in_backticks:
+ parts.append(current)
+ current = ""
+ else:
+ current += char
+
+ if current:
+ parts.append(current)
+
+ if in_backticks:
+ raise ValueError(f"Unclosed backtick in identifier: {full_name}")
+
+ if len(parts) != 2:
+ raise ValueError(f"Invalid identifier format: {full_name}")
+
+ return cls(parts[0], parts[1])
def get_full_name(self) -> str:
if self.branch:
diff --git a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
index 55f055ae9a..c4b64445f7 100644
--- a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
+++ b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
@@ -34,8 +34,9 @@ from pypaimon.common.file_io import FileIO
from pypaimon.common.options import Options
from pypaimon.common.options.config import OssOptions, S3Options
from pypaimon.common.uri_reader import UriReaderFactory
-from pypaimon.schema.data_types import DataField, AtomicType,
PyarrowFieldParser
-from pypaimon.table.row.blob import BlobData, BlobDescriptor, Blob
+from pypaimon.schema.data_types import (AtomicType, DataField,
+ PyarrowFieldParser)
+from pypaimon.table.row.blob import Blob, BlobData, BlobDescriptor
from pypaimon.table.row.generic_row import GenericRow
from pypaimon.table.row.row_kind import RowKind
from pypaimon.write.blob_format_writer import BlobFormatWriter
@@ -189,7 +190,7 @@ class PyArrowFileIO(FileIO):
parent_dir = '/'.join(path_str.split('/')[:-1])
else:
parent_dir = ''
-
+
if parent_dir and not self.exists(parent_dir):
self.mkdirs(parent_dir)
else:
@@ -214,10 +215,10 @@ class PyArrowFileIO(FileIO):
def get_file_status(self, path: str):
path_str = self.to_filesystem_path(path)
file_info = self._get_file_info(path_str)
-
+
if file_info.type == pafs.FileType.NotFound:
raise FileNotFoundError(f"File {path} (resolved as {path_str})
does not exist")
-
+
return file_info
def list_status(self, path: str):
@@ -233,13 +234,25 @@ class PyArrowFileIO(FileIO):
path_str = self.to_filesystem_path(path)
return self._get_file_info(path_str).type != pafs.FileType.NotFound
+ def exists_batch(self, paths: List[str]) -> Dict[str, bool]:
+ """Check existence of multiple paths in a single batched API call."""
+ if not paths:
+ return {}
+
+ path_strs = [self.to_filesystem_path(p) for p in paths]
+ file_infos = self.filesystem.get_file_info(path_strs)
+ return {
+ paths[i]: info.type != pyarrow.fs.FileType.NotFound
+ for i, info in enumerate(file_infos)
+ }
+
def delete(self, path: str, recursive: bool = False) -> bool:
path_str = self.to_filesystem_path(path)
file_info = self._get_file_info(path_str)
-
+
if file_info.type == pafs.FileType.NotFound:
return False
-
+
if file_info.type == pafs.FileType.Directory:
if not recursive:
selector = pafs.FileSelector(path_str, recursive=False,
allow_not_found=True)
@@ -258,7 +271,7 @@ class PyArrowFileIO(FileIO):
def mkdirs(self, path: str) -> bool:
path_str = self.to_filesystem_path(path)
file_info = self._get_file_info(path_str)
-
+
if file_info.type == pafs.FileType.NotFound:
self.filesystem.create_dir(path_str, recursive=True)
return True
@@ -266,7 +279,7 @@ class PyArrowFileIO(FileIO):
return True
elif file_info.type == pafs.FileType.File:
raise FileExistsError(f"Path exists but is not a directory:
{path}")
-
+
self.filesystem.create_dir(path_str, recursive=True)
return True
@@ -275,13 +288,13 @@ class PyArrowFileIO(FileIO):
dst_parent = Path(dst_str).parent
if str(dst_parent) and not self.exists(str(dst_parent)):
self.mkdirs(str(dst_parent))
-
+
src_str = self.to_filesystem_path(src)
-
+
try:
if hasattr(self.filesystem, 'rename'):
return self.filesystem.rename(src_str, dst_str)
-
+
dst_file_info = self._get_file_info(dst_str)
if dst_file_info.type != pafs.FileType.NotFound:
if dst_file_info.type == pafs.FileType.File:
@@ -293,7 +306,7 @@ class PyArrowFileIO(FileIO):
final_dst_info = self._get_file_info(dst_str)
if final_dst_info.type != pafs.FileType.NotFound:
return False
-
+
self.filesystem.move(src_str, dst_str)
return True
except FileNotFoundError:
@@ -331,7 +344,7 @@ class PyArrowFileIO(FileIO):
file_info = self._get_file_info(path_str)
if file_info.type == pafs.FileType.Directory:
return False
-
+
temp_path = path + str(uuid.uuid4()) + ".tmp"
success = False
try:
@@ -349,7 +362,7 @@ class PyArrowFileIO(FileIO):
source_str = self.to_filesystem_path(source_path)
target_str = self.to_filesystem_path(target_path)
target_parent = Path(target_str).parent
-
+
if str(target_parent) and not self.exists(str(target_parent)):
self.mkdirs(str(target_parent))
@@ -373,13 +386,14 @@ class PyArrowFileIO(FileIO):
zstd_level: int = 1, **kwargs):
try:
"""Write ORC file using PyArrow ORC writer.
-
+
Note: PyArrow's ORC writer doesn't support compression_level
parameter.
ORC files will use zstd compression with default level
(which is 3, see
https://github.com/facebook/zstd/blob/dev/programs/zstdcli.c)
instead of the specified level.
"""
import sys
+
import pyarrow.orc as orc
data = self._cast_time_columns_for_orc(data)
@@ -434,7 +448,7 @@ class PyArrowFileIO(FileIO):
'zstd': 'zstandard', # zstd is commonly used in Paimon
}
compression_lower = compression.lower()
-
+
codec = codec_map.get(compression_lower)
if codec is None:
raise ValueError(
@@ -450,6 +464,7 @@ class PyArrowFileIO(FileIO):
def write_lance(self, path: str, data: pyarrow.Table, **kwargs):
try:
import lance
+
from pypaimon.read.reader.lance_utils import to_lance_specified
file_path_for_lance, storage_options = to_lance_specified(self,
path)
@@ -516,9 +531,10 @@ class PyArrowFileIO(FileIO):
raise RuntimeError(f"Failed to write blob file {path}: {e}") from e
def to_filesystem_path(self, path: str) -> str:
- from pyarrow.fs import S3FileSystem
import re
+ from pyarrow.fs import S3FileSystem
+
parsed = urlparse(path)
normalized_path = re.sub(r'/+', '/', parsed.path) if parsed.path else
''
diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py
b/paimon-python/pypaimon/manifest/manifest_file_manager.py
index 5975fcbc9f..0ed5091825 100644
--- a/paimon-python/pypaimon/manifest/manifest_file_manager.py
+++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py
@@ -53,6 +53,17 @@ class ManifestFileManager:
def _process_single_manifest(manifest_file: ManifestFileMeta) ->
List[ManifestEntry]:
return self.read(manifest_file.file_name, manifest_entry_filter,
drop_stats)
+ def _entry_identifier(e: ManifestEntry) -> tuple:
+ return (
+ tuple(e.partition.values),
+ e.bucket,
+ e.file.level,
+ e.file.file_name,
+ tuple(e.file.extra_files) if e.file.extra_files else (),
+ e.file.embedded_index,
+ e.file.external_path,
+ )
+
deleted_entry_keys = set()
added_entries = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
@@ -62,11 +73,11 @@ class ManifestFileManager:
if entry.kind == 0: # ADD
added_entries.append(entry)
else: # DELETE
- deleted_entry_keys.add(entry.identifier())
+ deleted_entry_keys.add(_entry_identifier(entry))
final_entries = [
entry for entry in added_entries
- if entry.identifier() not in deleted_entry_keys
+ if _entry_identifier(entry) not in deleted_entry_keys
]
return final_entries
diff --git a/paimon-python/pypaimon/manifest/manifest_list_manager.py
b/paimon-python/pypaimon/manifest/manifest_list_manager.py
index 0bc0cc2b88..bc14fd86e4 100644
--- a/paimon-python/pypaimon/manifest/manifest_list_manager.py
+++ b/paimon-python/pypaimon/manifest/manifest_list_manager.py
@@ -20,7 +20,6 @@ from io import BytesIO
from typing import List, Optional
import fastavro
-
from pypaimon.manifest.schema.manifest_file_meta import (
MANIFEST_FILE_META_SCHEMA, ManifestFileMeta)
from pypaimon.manifest.schema.simple_stats import SimpleStats
@@ -50,10 +49,24 @@ class ManifestListManager:
manifest_files.extend(delta_manifests)
return manifest_files
+ def read_base(self, snapshot: Snapshot) -> List[ManifestFileMeta]:
+ """Read only the base manifest list for the given snapshot."""
+ return self.read(snapshot.base_manifest_list)
+
def read_delta(self, snapshot: Snapshot) -> List[ManifestFileMeta]:
return self.read(snapshot.delta_manifest_list)
+ def read_changelog(self, snapshot: Snapshot) -> List[ManifestFileMeta]:
+ """Read changelog manifest files from snapshot, or empty list if
none."""
+ if snapshot.changelog_manifest_list is None:
+ return []
+ return self.read(snapshot.changelog_manifest_list)
+
def read(self, manifest_list_name: str) -> List[ManifestFileMeta]:
+ return self._read_from_storage(manifest_list_name)
+
+ def _read_from_storage(self, manifest_list_name: str) ->
List[ManifestFileMeta]:
+ """Read manifest list from storage."""
manifest_files = []
manifest_list_path = f"{self.manifest_path}/{manifest_list_name}"
diff --git a/paimon-python/pypaimon/snapshot/snapshot_manager.py
b/paimon-python/pypaimon/snapshot/snapshot_manager.py
index e6b2ca5c64..61678e2d96 100644
--- a/paimon-python/pypaimon/snapshot/snapshot_manager.py
+++ b/paimon-python/pypaimon/snapshot/snapshot_manager.py
@@ -102,15 +102,7 @@ class SnapshotManager:
return str(max_snapshot_id)
def get_snapshot_path(self, snapshot_id: int) -> str:
- """
- Get the path for a snapshot file.
-
- Args:
- snapshot_id: The snapshot ID
-
- Returns:
- Path to the snapshot file
- """
+ """Get the file path for the given snapshot ID."""
return f"{self.snapshot_dir}/snapshot-{snapshot_id}"
def try_get_earliest_snapshot(self) -> Optional[Snapshot]:
@@ -137,13 +129,35 @@ class SnapshotManager:
Returns:
The latest snapshot with time_millis <= timestamp, or None if no
such snapshot exists
"""
- earliest = 1
- latest = self.get_latest_snapshot().id
+ earliest_snap = self.try_get_earliest_snapshot()
+ latest_snap = self.get_latest_snapshot()
+
+ if earliest_snap is None or latest_snap is None:
+ return None
+
+ earliest = earliest_snap.id
+ latest = latest_snap.id
final_snapshot = None
while earliest <= latest:
mid = earliest + (latest - earliest) // 2
snapshot = self.get_snapshot_by_id(mid)
+
+ # Handle gaps in snapshot sequence (expired snapshots)
+ if snapshot is None:
+ # Search forward to find next existing snapshot
+ found = False
+ for i in range(mid + 1, latest + 1):
+ snapshot = self.get_snapshot_by_id(i)
+ if snapshot is not None:
+ mid = i
+ found = True
+ break
+ if not found:
+ # No snapshots from mid to latest, search lower half
+ latest = mid - 1
+ continue
+
commit_time = snapshot.time_millis
if commit_time > timestamp:
@@ -170,6 +184,5 @@ class SnapshotManager:
snapshot_file = self.get_snapshot_path(snapshot_id)
if not self.file_io.exists(snapshot_file):
return None
-
snapshot_content = self.file_io.read_file_utf8(snapshot_file)
return JSON.from_json(snapshot_content, Snapshot)
diff --git a/paimon-python/pypaimon/tests/identifier_test.py
b/paimon-python/pypaimon/tests/identifier_test.py
new file mode 100644
index 0000000000..7bb6bb548a
--- /dev/null
+++ b/paimon-python/pypaimon/tests/identifier_test.py
@@ -0,0 +1,97 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+"""
+Tests for Identifier parsing, including backtick support for database names
with periods.
+"""
+
+import unittest
+
+from pypaimon.common.identifier import Identifier
+
+
+class IdentifierTest(unittest.TestCase):
+ """Tests for Identifier.from_string()."""
+
+ def test_simple_identifier(self):
+ """Simple database.table parsing."""
+ identifier = Identifier.from_string("mydb.mytable")
+ self.assertEqual(identifier.database, "mydb")
+ self.assertEqual(identifier.object, "mytable")
+
+ def test_java_compatible_split_on_first_period(self):
+ """Java-compatible: splits on first period only, allowing periods in
table name."""
+ identifier = Identifier.from_string("mydb.my.table.name")
+ self.assertEqual(identifier.database, "mydb")
+ self.assertEqual(identifier.object, "my.table.name")
+
+ def test_backtick_quoted_database_name_with_period(self):
+ """Backtick-quoted database name containing a period."""
+ identifier = Identifier.from_string("`db.name`.table_name")
+ self.assertEqual(identifier.database, "db.name")
+ self.assertEqual(identifier.object, "table_name")
+
+ def test_backtick_quoted_both_parts(self):
+ """Both database and table names backtick-quoted."""
+ identifier = Identifier.from_string("`db.name`.`table.name`")
+ self.assertEqual(identifier.database, "db.name")
+ self.assertEqual(identifier.object, "table.name")
+
+ def test_backtick_quoted_database_only(self):
+ """Only database name backtick-quoted."""
+ identifier = Identifier.from_string("`my.database`.simple_table")
+ self.assertEqual(identifier.database, "my.database")
+ self.assertEqual(identifier.object, "simple_table")
+
+ def test_get_full_name(self):
+ """get_full_name() returns database.object format."""
+ identifier = Identifier.create("mydb", "mytable")
+ self.assertEqual(identifier.get_full_name(), "mydb.mytable")
+
+ def test_get_full_name_with_branch(self):
+ """get_full_name() includes branch when set."""
+ identifier = Identifier(database="mydb", object="mytable",
branch="feature")
+ self.assertEqual(identifier.get_full_name(), "mydb.mytable.feature")
+
+ def test_empty_string_raises_error(self):
+ """Empty string should raise ValueError."""
+ with self.assertRaises(ValueError):
+ Identifier.from_string("")
+
+ def test_whitespace_only_raises_error(self):
+ """Whitespace-only string should raise ValueError."""
+ with self.assertRaises(ValueError):
+ Identifier.from_string(" ")
+
+ def test_no_period_raises_error(self):
+ """String without period should raise ValueError."""
+ with self.assertRaises(ValueError):
+ Identifier.from_string("nodothere")
+
+ def test_unclosed_backtick_raises_error(self):
+ """Unclosed backtick should raise ValueError."""
+ with self.assertRaises(ValueError):
+ Identifier.from_string("`unclosed.db.mytable")
+
+ def test_invalid_backtick_format_raises_error(self):
+ """Invalid backtick format (too many parts) should raise ValueError."""
+ with self.assertRaises(ValueError):
+ Identifier.from_string("`a`.`b`.`c`")
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/paimon-python/pypaimon/tests/manifest/manifest_manager_test.py
b/paimon-python/pypaimon/tests/manifest/manifest_manager_test.py
new file mode 100644
index 0000000000..60e92407d3
--- /dev/null
+++ b/paimon-python/pypaimon/tests/manifest/manifest_manager_test.py
@@ -0,0 +1,196 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+import tempfile
+import unittest
+
+import pyarrow as pa
+
+from pypaimon.catalog.filesystem_catalog import FileSystemCatalog
+from pypaimon.common.identifier import Identifier
+from pypaimon.common.options import Options
+from pypaimon.common.options.config import CatalogOptions
+from pypaimon.data.timestamp import Timestamp
+from pypaimon.manifest.manifest_file_manager import ManifestFileManager
+from pypaimon.manifest.manifest_list_manager import ManifestListManager
+from pypaimon.manifest.schema.data_file_meta import DataFileMeta
+from pypaimon.manifest.schema.manifest_entry import ManifestEntry
+from pypaimon.manifest.schema.manifest_file_meta import ManifestFileMeta
+from pypaimon.manifest.schema.simple_stats import SimpleStats
+from pypaimon.schema.schema import Schema
+from pypaimon.table.row.generic_row import GenericRow
+
+_EMPTY_ROW = GenericRow([], [])
+_EMPTY_STATS = SimpleStats(min_values=_EMPTY_ROW, max_values=_EMPTY_ROW,
null_counts=[])
+
+
+class _ManifestManagerSetup(unittest.TestCase):
+ """Shared setup for manifest manager tests.
+
+ Subclasses must set _table_name and implement _make_manager / _write_one.
+ """
+
+ _table_name: str
+
+ @classmethod
+ def setUpClass(cls):
+ cls.tempdir = tempfile.mkdtemp()
+ cls.catalog = FileSystemCatalog(
+ Options({CatalogOptions.WAREHOUSE.key(): cls.tempdir})
+ )
+ cls.catalog.create_database('default', False)
+
+ def setUp(self):
+ table_id = f'default.{self._table_name}'
+ try:
+ table_identifier = Identifier.from_string(table_id)
+ table_path = self.catalog.get_table_path(table_identifier)
+ if self.catalog.file_io.exists(table_path):
+ self.catalog.file_io.delete(table_path, recursive=True)
+ except Exception:
+ pass
+
+ pa_schema = pa.schema([('id', pa.int32()), ('value', pa.string())])
+ schema = Schema.from_pyarrow_schema(pa_schema)
+ self.catalog.create_table(table_id, schema, False)
+ self.table = self.catalog.get_table(table_id)
+
+ def _make_manager(self):
+ raise NotImplementedError
+
+ def _write_one(self, manager, name):
+ """Write a single item that can be read back by manager.read(name)."""
+ raise NotImplementedError
+
+
+class ManifestFileManagerTest(_ManifestManagerSetup):
+ """Tests for ManifestFileManager."""
+
+ _table_name = 'manager_test'
+
+ def _make_manager(self):
+ return ManifestFileManager(self.table)
+
+ def _write_one(self, manager, name):
+ entry = ManifestEntry(
+ kind=0,
+ partition=_EMPTY_ROW,
+ bucket=0,
+ total_buckets=1,
+ file=DataFileMeta(
+ file_name="data.parquet", file_size=1024, row_count=100,
+ min_key=_EMPTY_ROW, max_key=_EMPTY_ROW,
+ key_stats=_EMPTY_STATS, value_stats=_EMPTY_STATS,
+ min_sequence_number=1, max_sequence_number=100,
+ schema_id=0, level=0, extra_files=[],
+ creation_time=Timestamp.from_epoch_millis(0),
+ delete_row_count=0, embedded_index=None, file_source=None,
+ value_stats_cols=None, external_path=None,
+ first_row_id=None, write_cols=None,
+ ),
+ )
+ manager.write(name, [entry])
+
+ def _create_manifest_entry(self, file_name, bucket=0):
+ entry = ManifestEntry(
+ kind=0,
+ partition=_EMPTY_ROW,
+ bucket=bucket,
+ total_buckets=1,
+ file=DataFileMeta(
+ file_name=file_name, file_size=1024, row_count=100,
+ min_key=_EMPTY_ROW, max_key=_EMPTY_ROW,
+ key_stats=_EMPTY_STATS, value_stats=_EMPTY_STATS,
+ min_sequence_number=1, max_sequence_number=100,
+ schema_id=0, level=0, extra_files=[],
+ creation_time=Timestamp.from_epoch_millis(0),
+ delete_row_count=0, embedded_index=None, file_source=None,
+ value_stats_cols=None, external_path=None,
+ first_row_id=None, write_cols=None,
+ ),
+ )
+ return entry
+
+ def test_filter_applied_after_read(self):
+ manager = self._make_manager()
+
+ entries = [
+ self._create_manifest_entry("data-1.parquet", bucket=0),
+ self._create_manifest_entry("data-2.parquet", bucket=1),
+ self._create_manifest_entry("data-3.parquet", bucket=0),
+ ]
+ manager.write("test-manifest.avro", entries)
+
+ result_all = manager.read("test-manifest.avro")
+ self.assertEqual(len(result_all), 3)
+
+ result_filtered = manager.read(
+ "test-manifest.avro", manifest_entry_filter=lambda e: e.bucket ==
0)
+ self.assertEqual(len(result_filtered), 2)
+
+
+class ManifestListManagerTest(_ManifestManagerSetup):
+ """Tests for ManifestListManager."""
+
+ _table_name = 'list_manager_test'
+
+ def _make_manager(self):
+ return ManifestListManager(self.table)
+
+ def _write_one(self, manager, name):
+ meta = ManifestFileMeta(
+ file_name="manifest.avro", file_size=1024,
+ num_added_files=1, num_deleted_files=0,
+ partition_stats=SimpleStats.empty_stats(), schema_id=0,
+ )
+ manager.write(name, [meta])
+
+ def _make_snapshot(self, base_manifest_list,
delta_manifest_list="delta-manifest-list"):
+ from pypaimon.snapshot.snapshot import Snapshot
+ return Snapshot(
+ version=3, id=1, schema_id=0,
+ base_manifest_list=base_manifest_list,
+ delta_manifest_list=delta_manifest_list,
+ commit_user="test", commit_identifier=1, commit_kind="APPEND",
+ time_millis=1234567890, total_record_count=100,
delta_record_count=10,
+ )
+
+ def test_read_base_returns_only_base_manifest(self):
+ manager = self._make_manager()
+
+ base_meta = ManifestFileMeta(
+ file_name="manifest-base.avro", file_size=1024,
+ num_added_files=1, num_deleted_files=0,
+ partition_stats=SimpleStats.empty_stats(), schema_id=0,
+ )
+ delta_meta = ManifestFileMeta(
+ file_name="manifest-delta.avro", file_size=1024,
+ num_added_files=1, num_deleted_files=0,
+ partition_stats=SimpleStats.empty_stats(), schema_id=0,
+ )
+ manager.write("base-manifest-list", [base_meta])
+ manager.write("delta-manifest-list", [delta_meta])
+
+ snapshot = self._make_snapshot("base-manifest-list",
"delta-manifest-list")
+ result = manager.read_base(snapshot)
+
+ self.assertEqual(len(result), 1)
+ self.assertEqual(result[0].file_name, "manifest-base.avro")
+
+
+if __name__ == '__main__':
+ unittest.main()