This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 4aec277912 [python] Align SnapshotManager constructor with Java (#7758)
4aec277912 is described below
commit 4aec277912538a92d030492d563970fbbbb50a4e
Author: chaoyang <[email protected]>
AuthorDate: Fri May 8 11:19:01 2026 +0800
[python] Align SnapshotManager constructor with Java (#7758)
Switch the Python `SnapshotManager.__init__` from `(table)` to the
Java-aligned `(file_io, table_path, branch=None, snapshot_loader=None)`,
so the class no longer depends on `FileStoreTable`. Mirrors
`paimon-core/.../utils/SnapshotManager.java`'s constructor (which takes
`FileIO, Path tablePath, @Nullable String branchName, @Nullable
SnapshotLoader, @Nullable Cache`); the `Cache` parameter is left out of
scope (Caffeine has no Python equivalent yet).
The previous Python signature held `self.table = table` and reached
through `self.table.file_io` /
`self.table.catalog_environment.snapshot_loader()` /
`self.table.current_branch()` to populate fields. That made
`SnapshotManager` a layering-violation -- the snapshot layer reaching
back into the table layer -- and forced mock tests to stub a
four-attribute mock table just to construct the manager. The new
constructor takes the four basics directly.
---
.../acceptance/incremental_diff_acceptance_test.py | 5 +-
.../pypaimon/globalindex/global_index_scanner.py | 3 +-
.../pypaimon/read/scanner/file_scanner.py | 3 +-
.../pypaimon/read/streaming_table_scan.py | 3 +-
paimon-python/pypaimon/read/table_scan.py | 3 +-
.../pypaimon/snapshot/snapshot_manager.py | 37 +++++-----
paimon-python/pypaimon/table/file_store_table.py | 7 +-
.../pypaimon/table/source/full_text_scan.py | 3 +-
.../pypaimon/table/source/vector_search_scan.py | 3 +-
paimon-python/pypaimon/tests/blob_table_test.py | 3 +-
.../pypaimon/tests/branch_manager_test.py | 4 +-
.../pypaimon/tests/changelog_manager_test.py | 3 +-
.../pypaimon/tests/data_evolution_test.py | 3 +-
.../pypaimon/tests/file_store_commit_test.py | 19 +++--
.../pypaimon/tests/partition_predicate_test.py | 2 -
.../pypaimon/tests/py36/rest_ao_read_write_test.py | 5 +-
.../pypaimon/tests/reader_append_only_test.py | 11 ++-
paimon-python/pypaimon/tests/reader_base_test.py | 9 ++-
.../pypaimon/tests/reader_primary_key_test.py | 9 ++-
.../pypaimon/tests/snapshot_manager_test.py | 76 ++++++--------------
.../pypaimon/tests/streaming_table_scan_test.py | 83 ++++++++++------------
paimon-python/pypaimon/write/file_store_commit.py | 3 +-
22 files changed, 124 insertions(+), 173 deletions(-)
diff --git
a/paimon-python/pypaimon/acceptance/incremental_diff_acceptance_test.py
b/paimon-python/pypaimon/acceptance/incremental_diff_acceptance_test.py
index 0dc7ebf53f..a6a56850ff 100644
--- a/paimon-python/pypaimon/acceptance/incremental_diff_acceptance_test.py
+++ b/paimon-python/pypaimon/acceptance/incremental_diff_acceptance_test.py
@@ -38,7 +38,6 @@ from pypaimon.read.scanner.append_table_split_generator
import \
AppendTableSplitGenerator
from pypaimon.read.scanner.incremental_diff_scanner import \
IncrementalDiffScanner
-from pypaimon.snapshot.snapshot_manager import SnapshotManager
class IncrementalDiffAcceptanceTest(unittest.TestCase):
@@ -94,7 +93,7 @@ class IncrementalDiffAcceptanceTest(unittest.TestCase):
def _read_via_diff(self, table, start_snap_id, end_snap_id):
"""Read data using IncrementalDiffScanner between two snapshots."""
- snapshot_manager = SnapshotManager(table)
+ snapshot_manager = table.snapshot_manager()
start_snapshot = snapshot_manager.get_snapshot_by_id(start_snap_id)
end_snapshot = snapshot_manager.get_snapshot_by_id(end_snap_id)
@@ -106,7 +105,7 @@ class IncrementalDiffAcceptanceTest(unittest.TestCase):
def _read_via_delta(self, table, start_snap_id, end_snap_id):
"""Read data by iterating delta_manifest_lists between two
snapshots."""
- snapshot_manager = SnapshotManager(table)
+ snapshot_manager = table.snapshot_manager()
manifest_list_manager = ManifestListManager(table)
manifest_file_manager = ManifestFileManager(table)
diff --git a/paimon-python/pypaimon/globalindex/global_index_scanner.py
b/paimon-python/pypaimon/globalindex/global_index_scanner.py
index 38408059a9..c51be0731b 100644
--- a/paimon-python/pypaimon/globalindex/global_index_scanner.py
+++ b/paimon-python/pypaimon/globalindex/global_index_scanner.py
@@ -111,8 +111,7 @@ class GlobalIndexScanner:
return False
return global_index_meta.index_field_id in filter_field_ids
- from pypaimon.snapshot.snapshot_manager import SnapshotManager
- snapshot = SnapshotManager(table).get_latest_snapshot()
+ snapshot = table.snapshot_manager().get_latest_snapshot()
index_file_handler = IndexFileHandler(table=table)
entries = index_file_handler.scan(snapshot, index_file_filter)
scanned_index_files = [entry.index_file for entry in entries]
diff --git a/paimon-python/pypaimon/read/scanner/file_scanner.py
b/paimon-python/pypaimon/read/scanner/file_scanner.py
index ffbd83daf0..39c7401746 100755
--- a/paimon-python/pypaimon/read/scanner/file_scanner.py
+++ b/paimon-python/pypaimon/read/scanner/file_scanner.py
@@ -41,7 +41,6 @@ from pypaimon.read.scanner.primary_key_table_split_generator
import \
PrimaryKeyTableSplitGenerator
from pypaimon.read.split import DataSplit
from pypaimon.snapshot.snapshot import Snapshot
-from pypaimon.snapshot.snapshot_manager import SnapshotManager
from pypaimon.table.bucket_mode import BucketMode
from pypaimon.table.source.deletion_file import DeletionFile
@@ -179,7 +178,7 @@ class FileScanner:
self.predicate_for_stats = remove_row_id_filter(predicate) if
predicate else None
self.limit = limit
- self.snapshot_manager = SnapshotManager(table)
+ self.snapshot_manager = table.snapshot_manager()
self.manifest_list_manager = ManifestListManager(table)
self.manifest_file_manager = ManifestFileManager(table)
diff --git a/paimon-python/pypaimon/read/streaming_table_scan.py
b/paimon-python/pypaimon/read/streaming_table_scan.py
index f426d2064f..f7f8ca4182 100644
--- a/paimon-python/pypaimon/read/streaming_table_scan.py
+++ b/paimon-python/pypaimon/read/streaming_table_scan.py
@@ -48,7 +48,6 @@ from pypaimon.read.scanner.incremental_diff_scanner import \
from pypaimon.read.scanner.primary_key_table_split_generator import \
PrimaryKeyTableSplitGenerator
from pypaimon.snapshot.snapshot import Snapshot
-from pypaimon.snapshot.snapshot_manager import SnapshotManager
class AsyncStreamingTableScan:
@@ -103,7 +102,7 @@ class AsyncStreamingTableScan:
self._lookahead_size = 10 # How many snapshots to look ahead
# Initialize managers
- self._snapshot_manager = SnapshotManager(table)
+ self._snapshot_manager = table.snapshot_manager()
self._manifest_list_manager = ManifestListManager(table)
self._manifest_file_manager = ManifestFileManager(table)
diff --git a/paimon-python/pypaimon/read/table_scan.py
b/paimon-python/pypaimon/read/table_scan.py
index c754b50111..562bea26f5 100755
--- a/paimon-python/pypaimon/read/table_scan.py
+++ b/paimon-python/pypaimon/read/table_scan.py
@@ -23,7 +23,6 @@ from pypaimon.common.predicate import Predicate
from pypaimon.read.plan import Plan
from pypaimon.read.scanner.file_scanner import FileScanner
-from pypaimon.snapshot.snapshot_manager import SnapshotManager
from pypaimon.manifest.manifest_list_manager import ManifestListManager
@@ -48,7 +47,7 @@ class TableScan:
def _create_file_scanner(self) -> FileScanner:
options = self.table.options.options
- snapshot_manager = SnapshotManager(self.table)
+ snapshot_manager = self.table.snapshot_manager()
manifest_list_manager = ManifestListManager(self.table)
if options.contains(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP):
ts =
options.get(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP).split(",")
diff --git a/paimon-python/pypaimon/snapshot/snapshot_manager.py
b/paimon-python/pypaimon/snapshot/snapshot_manager.py
index 074fc844cd..6f2bef7225 100644
--- a/paimon-python/pypaimon/snapshot/snapshot_manager.py
+++ b/paimon-python/pypaimon/snapshot/snapshot_manager.py
@@ -29,24 +29,24 @@ from pypaimon.snapshot.snapshot_loader import SnapshotLoader
class SnapshotManager:
"""Manager for snapshot files using unified FileIO."""
- def __init__(self, table, branch: Optional[str] = None):
- # Lazy imports to avoid a cycle: pypaimon.branch.__init__
+ def __init__(
+ self,
+ file_io: FileIO,
+ table_path: str,
+ branch: Optional[str] = None,
+ snapshot_loader: Optional[SnapshotLoader] = None,
+ ):
+ # Lazy import to avoid a cycle: pypaimon.branch.__init__
# eagerly loads FileSystemBranchManager, which imports
# SnapshotManager.
from pypaimon.branch.branch_manager import BranchManager
- from pypaimon.common.identifier import DEFAULT_MAIN_BRANCH
- from pypaimon.table.file_store_table import FileStoreTable
-
- self.table: FileStoreTable = table
- self.file_io: FileIO = self.table.file_io
- self.snapshot_loader: Optional[SnapshotLoader] =
self.table.catalog_environment.snapshot_loader()
- if branch is None:
- branch = self.table.current_branch() or DEFAULT_MAIN_BRANCH
- self.branch = BranchManager.normalize_branch(branch)
+ self.file_io: FileIO = file_io
+ self.table_path: str = table_path.rstrip('/')
+ self.branch: str = BranchManager.normalize_branch(branch)
+ self.snapshot_loader: Optional[SnapshotLoader] = snapshot_loader
- table_root = self.table.table_path.rstrip('/')
- branch_root = BranchManager.branch_path(table_root, self.branch)
+ branch_root = BranchManager.branch_path(self.table_path, self.branch)
self.snapshot_dir = f"{branch_root}/snapshot"
self.latest_file = f"{self.snapshot_dir}/LATEST"
@@ -55,10 +55,13 @@ class SnapshotManager:
# carries a SnapshotLoader rebranched to ``branch_name`` so REST
# loads target the correct branch instead of falling back to the
# main-branch identifier.
- new_manager = SnapshotManager(self.table, branch_name)
- if self.snapshot_loader is not None:
- new_manager.snapshot_loader =
self.snapshot_loader.copy_with_branch(branch_name)
- return new_manager
+ rebranched_loader = (
+ self.snapshot_loader.copy_with_branch(branch_name)
+ if self.snapshot_loader is not None
+ else None
+ )
+ return SnapshotManager(
+ self.file_io, self.table_path, branch_name, rebranched_loader)
def get_latest_snapshot(self) -> Optional[Snapshot]:
"""
diff --git a/paimon-python/pypaimon/table/file_store_table.py
b/paimon-python/pypaimon/table/file_store_table.py
index 4b49a62869..35addad251 100644
--- a/paimon-python/pypaimon/table/file_store_table.py
+++ b/paimon-python/pypaimon/table/file_store_table.py
@@ -98,7 +98,12 @@ class FileStoreTable(Table):
def snapshot_manager(self):
"""Get the snapshot manager for this table."""
from pypaimon.snapshot.snapshot_manager import SnapshotManager
- return SnapshotManager(self)
+ return SnapshotManager(
+ self.file_io,
+ self.table_path,
+ self.current_branch(),
+ self.catalog_environment.snapshot_loader(),
+ )
def tag_manager(self):
"""Get the tag manager for this table."""
diff --git a/paimon-python/pypaimon/table/source/full_text_scan.py
b/paimon-python/pypaimon/table/source/full_text_scan.py
index 06f72405b3..50b2381d27 100644
--- a/paimon-python/pypaimon/table/source/full_text_scan.py
+++ b/paimon-python/pypaimon/table/source/full_text_scan.py
@@ -53,10 +53,9 @@ class FullTextScanImpl(FullTextScan):
def scan(self) -> FullTextScanPlan:
from pypaimon.index.index_file_handler import IndexFileHandler
- from pypaimon.snapshot.snapshot_manager import SnapshotManager
text_column = self._text_column
- snapshot = SnapshotManager(self._table).get_latest_snapshot()
+ snapshot = self._table.snapshot_manager().get_latest_snapshot()
from pypaimon.snapshot.time_travel_util import TimeTravelUtil
from pypaimon.common.options.options import Options
diff --git a/paimon-python/pypaimon/table/source/vector_search_scan.py
b/paimon-python/pypaimon/table/source/vector_search_scan.py
index 5245d41f19..035e51a846 100644
--- a/paimon-python/pypaimon/table/source/vector_search_scan.py
+++ b/paimon-python/pypaimon/table/source/vector_search_scan.py
@@ -60,7 +60,6 @@ class VectorSearchScanImpl(VectorSearchScan):
from pypaimon.common.options.options import Options
from pypaimon.index.index_file_handler import IndexFileHandler
from pypaimon.read.push_down_utils import _get_all_fields
- from pypaimon.snapshot.snapshot_manager import SnapshotManager
from pypaimon.snapshot.time_travel_util import TimeTravelUtil
vector_column = self._vector_column
@@ -81,7 +80,7 @@ class VectorSearchScanImpl(VectorSearchScan):
self._table.tag_manager()
)
if snapshot is None:
- snapshot = SnapshotManager(self._table).get_latest_snapshot()
+ snapshot = self._table.snapshot_manager().get_latest_snapshot()
index_file_handler = IndexFileHandler(table=self._table)
diff --git a/paimon-python/pypaimon/tests/blob_table_test.py
b/paimon-python/pypaimon/tests/blob_table_test.py
index 7e4d6c6d26..2692aa2ea0 100755
--- a/paimon-python/pypaimon/tests/blob_table_test.py
+++ b/paimon-python/pypaimon/tests/blob_table_test.py
@@ -2748,7 +2748,6 @@ class DataBlobWriterTest(unittest.TestCase):
"""Test concurrent blob writes to verify retry mechanism works
correctly."""
import threading
from pypaimon import Schema
- from pypaimon.snapshot.snapshot_manager import SnapshotManager
# Run the test 10 times to verify stability
iter_num = 2
@@ -2873,7 +2872,7 @@ class DataBlobWriterTest(unittest.TestCase):
self.assertIn(b'BLOB_PATTERN_', blob, f"Blob {i} should
contain pattern")
# Verify snapshot count (should have num_threads snapshots)
- snapshot_manager = SnapshotManager(table)
+ snapshot_manager = table.snapshot_manager()
latest_snapshot = snapshot_manager.get_latest_snapshot()
self.assertIsNotNone(latest_snapshot,
f"Iteration {test_iteration}: Latest snapshot
should not be None")
diff --git a/paimon-python/pypaimon/tests/branch_manager_test.py
b/paimon-python/pypaimon/tests/branch_manager_test.py
index a9419825a7..76a4a287c9 100644
--- a/paimon-python/pypaimon/tests/branch_manager_test.py
+++ b/paimon-python/pypaimon/tests/branch_manager_test.py
@@ -243,8 +243,8 @@ class SnapshotManagerBranchAwarenessTest(unittest.TestCase):
self.assertIsNot(branch_sm.snapshot_loader, sm.snapshot_loader)
self.assertEqual(branch_sm.snapshot_loader.identifier.branch, "b1")
self.assertEqual(
- branch_sm.snapshot_loader.identifier.database,
- sm.snapshot_loader.identifier.database)
+ branch_sm.snapshot_loader.identifier.get_database_name(),
+ sm.snapshot_loader.identifier.get_database_name())
self.assertEqual(
branch_sm.snapshot_loader.identifier.get_table_name(),
sm.snapshot_loader.identifier.get_table_name())
diff --git a/paimon-python/pypaimon/tests/changelog_manager_test.py
b/paimon-python/pypaimon/tests/changelog_manager_test.py
index 63d9157e5e..24c079316a 100644
--- a/paimon-python/pypaimon/tests/changelog_manager_test.py
+++ b/paimon-python/pypaimon/tests/changelog_manager_test.py
@@ -121,9 +121,8 @@ class TestChangelogManager(unittest.TestCase):
def test_changelog_from_snapshot(self):
"""Test that Changelog can be created from a Snapshot."""
- from pypaimon.snapshot.snapshot_manager import SnapshotManager
- snapshot_manager = SnapshotManager(self.table)
+ snapshot_manager = self.table.snapshot_manager()
snapshot = snapshot_manager.get_latest_snapshot()
if snapshot:
diff --git a/paimon-python/pypaimon/tests/data_evolution_test.py
b/paimon-python/pypaimon/tests/data_evolution_test.py
index b9ca4c7acc..edd3ac9007 100644
--- a/paimon-python/pypaimon/tests/data_evolution_test.py
+++ b/paimon-python/pypaimon/tests/data_evolution_test.py
@@ -29,7 +29,6 @@ import pyarrow.dataset as ds
from pypaimon import CatalogFactory, Schema
from pypaimon.common.predicate import Predicate
from pypaimon.manifest.manifest_list_manager import ManifestListManager
-from pypaimon.snapshot.snapshot_manager import SnapshotManager
from pypaimon.table.row.offset_row import OffsetRow
@@ -192,7 +191,7 @@ class DataEvolutionTest(unittest.TestCase):
# Assert manifest file meta contains min and max row id
manifest_list_manager = ManifestListManager(table)
- snapshot_manager = SnapshotManager(table)
+ snapshot_manager = table.snapshot_manager()
all_manifests =
manifest_list_manager.read_all(snapshot_manager.get_latest_snapshot())
first_commit = next((m for m in all_manifests if m.min_row_id == 0 and
m.max_row_id == 1), None)
self.assertIsNotNone(first_commit, "Should have a manifest with
min_row_id=0, max_row_id=1")
diff --git a/paimon-python/pypaimon/tests/file_store_commit_test.py
b/paimon-python/pypaimon/tests/file_store_commit_test.py
index 958ea85a6b..d4943a6952 100644
--- a/paimon-python/pypaimon/tests/file_store_commit_test.py
+++ b/paimon-python/pypaimon/tests/file_store_commit_test.py
@@ -28,7 +28,6 @@ from pypaimon.write.commit_message import CommitMessage
from pypaimon.write.file_store_commit import FileStoreCommit
-@patch('pypaimon.write.file_store_commit.SnapshotManager')
@patch('pypaimon.write.file_store_commit.ManifestFileManager')
@patch('pypaimon.write.file_store_commit.ManifestListManager')
class TestFileStoreCommit(unittest.TestCase):
@@ -55,7 +54,7 @@ class TestFileStoreCommit(unittest.TestCase):
)
def test_generate_partition_statistics_single_partition_single_file(
- self, mock_manifest_list_manager, mock_manifest_file_manager,
mock_snapshot_manager):
+ self, mock_manifest_list_manager, mock_manifest_file_manager):
"""Test partition statistics generation with single partition and
single file."""
# Create FileStoreCommit instance
file_store_commit = self._create_file_store_commit()
@@ -107,7 +106,7 @@ class TestFileStoreCommit(unittest.TestCase):
self.assertEqual(stat.last_file_creation_time, expected_time)
def test_generate_partition_statistics_multiple_files_same_partition(
- self, mock_manifest_list_manager, mock_manifest_file_manager,
mock_snapshot_manager):
+ self, mock_manifest_list_manager, mock_manifest_file_manager):
"""Test partition statistics generation with multiple files in same
partition."""
# Create FileStoreCommit instance
file_store_commit = self._create_file_store_commit()
@@ -171,7 +170,7 @@ class TestFileStoreCommit(unittest.TestCase):
self.assertEqual(stat.last_file_creation_time, expected_time)
def test_generate_partition_statistics_multiple_partitions(
- self, mock_manifest_list_manager, mock_manifest_file_manager,
mock_snapshot_manager):
+ self, mock_manifest_list_manager, mock_manifest_file_manager):
"""Test partition statistics generation with multiple different
partitions."""
# Create FileStoreCommit instance
file_store_commit = self._create_file_store_commit()
@@ -259,7 +258,7 @@ class TestFileStoreCommit(unittest.TestCase):
self.assertEqual(stat_2.file_size_in_bytes, 2 * 1024 * 1024)
def test_generate_partition_statistics_unpartitioned_table(
- self, mock_manifest_list_manager, mock_manifest_file_manager,
mock_snapshot_manager):
+ self, mock_manifest_list_manager, mock_manifest_file_manager):
"""Test partition statistics generation for unpartitioned table."""
# Update mock table to have no partition keys
self.mock_table.partition_keys = []
@@ -310,7 +309,7 @@ class TestFileStoreCommit(unittest.TestCase):
self.assertEqual(stat.file_size_in_bytes, 1024 * 1024)
def test_generate_partition_statistics_no_creation_time(
- self, mock_manifest_list_manager, mock_manifest_file_manager,
mock_snapshot_manager):
+ self, mock_manifest_list_manager, mock_manifest_file_manager):
"""Test partition statistics generation when file has no creation
time."""
# Create FileStoreCommit instance
file_store_commit = self._create_file_store_commit()
@@ -347,7 +346,7 @@ class TestFileStoreCommit(unittest.TestCase):
self.assertGreater(stat.last_file_creation_time, 0)
def test_generate_partition_statistics_mismatched_partition_keys(
- self, mock_manifest_list_manager, mock_manifest_file_manager,
mock_snapshot_manager):
+ self, mock_manifest_list_manager, mock_manifest_file_manager):
"""Test partition statistics generation when partition tuple doesn't
match partition keys."""
# Create FileStoreCommit instance
file_store_commit = self._create_file_store_commit()
@@ -393,7 +392,7 @@ class TestFileStoreCommit(unittest.TestCase):
self.assertEqual(stat.spec, expected_spec)
def test_generate_partition_statistics_empty_commit_messages(
- self, mock_manifest_list_manager, mock_manifest_file_manager,
mock_snapshot_manager):
+ self, mock_manifest_list_manager, mock_manifest_file_manager):
"""Test partition statistics generation with empty commit messages
list."""
# Create FileStoreCommit instance
file_store_commit = self._create_file_store_commit()
@@ -405,7 +404,7 @@ class TestFileStoreCommit(unittest.TestCase):
self.assertEqual(len(statistics), 0)
def test_append_commit_inherits_index_manifest(
- self, mock_manifest_list_manager, mock_manifest_file_manager,
mock_snapshot_manager):
+ self, mock_manifest_list_manager, mock_manifest_file_manager):
file_store_commit = self._create_file_store_commit()
self.mock_table.identifier = 'default.test_table'
@@ -448,7 +447,7 @@ class TestFileStoreCommit(unittest.TestCase):
)
def test_null_partition_value(
- self, mock_manifest_list_manager, mock_manifest_file_manager,
mock_snapshot_manager):
+ self, mock_manifest_list_manager, mock_manifest_file_manager):
from pypaimon.data.timestamp import Timestamp
from pypaimon.manifest.schema.simple_stats import SimpleStats
from pypaimon.schema.data_types import DataField, AtomicType
diff --git a/paimon-python/pypaimon/tests/partition_predicate_test.py
b/paimon-python/pypaimon/tests/partition_predicate_test.py
index a9c9482373..1dbc9a150f 100644
--- a/paimon-python/pypaimon/tests/partition_predicate_test.py
+++ b/paimon-python/pypaimon/tests/partition_predicate_test.py
@@ -96,7 +96,6 @@ def _manifest_entry(partition_values):
)
-@patch('pypaimon.read.scanner.file_scanner.SnapshotManager')
@patch('pypaimon.read.scanner.file_scanner.ManifestFileManager')
@patch('pypaimon.read.scanner.file_scanner.ManifestListManager')
class TestFileScannerPartitionPredicate(unittest.TestCase):
@@ -154,7 +153,6 @@ class TestFileScannerPartitionPredicate(unittest.TestCase):
_manifest_entry(['2024-01-15', 'us-west-2'])))
-@patch('pypaimon.write.file_store_commit.SnapshotManager')
@patch('pypaimon.write.file_store_commit.ManifestFileManager')
@patch('pypaimon.write.file_store_commit.ManifestListManager')
class TestOverwritePartitionPredicate(unittest.TestCase):
diff --git a/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
b/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
index cfdf33b755..5258aec1d0 100644
--- a/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
+++ b/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
@@ -37,7 +37,6 @@ from pypaimon.manifest.schema.data_file_meta import
DataFileMeta
from pypaimon.manifest.schema.manifest_entry import ManifestEntry
from pypaimon.manifest.schema.simple_stats import SimpleStats
from pypaimon.schema.data_types import AtomicType, DataField
-from pypaimon.snapshot.snapshot_manager import SnapshotManager
from pypaimon.table.row.generic_row import (GenericRow, GenericRowDeserializer,
GenericRowSerializer)
from pypaimon.table.row.row_kind import RowKind
@@ -180,7 +179,7 @@ class RESTAOReadWritePy36Test(RESTBaseTest):
self.assertEqual(actual_data, expect_data)
# to test GenericRow ability
- latest_snapshot = SnapshotManager(table).get_latest_snapshot()
+ latest_snapshot = table.snapshot_manager().get_latest_snapshot()
manifest_files =
table_scan.file_scanner.manifest_list_manager.read_all(latest_snapshot)
manifest_entries = table_scan.file_scanner.manifest_file_manager.read(
manifest_files[0].file_name,
@@ -771,7 +770,7 @@ class RESTAOReadWritePy36Test(RESTBaseTest):
timestamp = int(time.time() * 1000)
self._write_test_table(table)
- snapshot_manager = SnapshotManager(table)
+ snapshot_manager = table.snapshot_manager()
t1 = snapshot_manager.get_snapshot_by_id(1).time_millis
t2 = snapshot_manager.get_snapshot_by_id(2).time_millis
# test 1
diff --git a/paimon-python/pypaimon/tests/reader_append_only_test.py
b/paimon-python/pypaimon/tests/reader_append_only_test.py
index 00b2f803f7..d922cb2e30 100644
--- a/paimon-python/pypaimon/tests/reader_append_only_test.py
+++ b/paimon-python/pypaimon/tests/reader_append_only_test.py
@@ -31,7 +31,6 @@ from pypaimon import CatalogFactory, Schema
from pypaimon.common.options.core_options import CoreOptions
from pypaimon.manifest.schema.manifest_entry import ManifestEntry
from pypaimon.snapshot.snapshot import BATCH_COMMIT_IDENTIFIER
-from pypaimon.snapshot.snapshot_manager import SnapshotManager
from pypaimon.table.row.generic_row import GenericRow
from pypaimon.write.file_store_commit import RetryResult
@@ -137,7 +136,7 @@ class AoReaderTest(unittest.TestCase):
table_write.close()
table_commit.close()
- snapshot_manager = SnapshotManager(table)
+ snapshot_manager = table.snapshot_manager()
snapshot = snapshot_manager.get_latest_snapshot()
table_inc = table.copy({
CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP.key():
@@ -478,7 +477,7 @@ class AoReaderTest(unittest.TestCase):
table_commit.commit(messages)
table_write.close()
- snapshot_manager = SnapshotManager(table)
+ snapshot_manager = table.snapshot_manager()
latest_snapshot = snapshot_manager.get_latest_snapshot()
commit_entries = []
for msg in messages:
@@ -673,7 +672,7 @@ class AoReaderTest(unittest.TestCase):
timestamp = int(time.time() * 1000)
self._write_test_table(table)
- snapshot_manager = SnapshotManager(table)
+ snapshot_manager = table.snapshot_manager()
t1 = snapshot_manager.get_snapshot_by_id(1).time_millis
t2 = snapshot_manager.get_snapshot_by_id(2).time_millis
# test 1
@@ -713,7 +712,7 @@ class AoReaderTest(unittest.TestCase):
table_write.close()
table_commit.close()
- snapshot_manager = SnapshotManager(table)
+ snapshot_manager = table.snapshot_manager()
t10 = snapshot_manager.get_snapshot_by_id(10).time_millis
t20 = snapshot_manager.get_snapshot_by_id(20).time_millis
@@ -820,7 +819,7 @@ class AoReaderTest(unittest.TestCase):
f"Iteration {test_iteration}: User IDs mismatch")
# Verify snapshot count (should have num_threads snapshots)
- snapshot_manager = SnapshotManager(table)
+ snapshot_manager = table.snapshot_manager()
latest_snapshot = snapshot_manager.get_latest_snapshot()
self.assertIsNotNone(latest_snapshot,
f"Iteration {test_iteration}: Latest snapshot
should not be None")
diff --git a/paimon-python/pypaimon/tests/reader_base_test.py
b/paimon-python/pypaimon/tests/reader_base_test.py
index 657678f9ea..f1b147d147 100644
--- a/paimon-python/pypaimon/tests/reader_base_test.py
+++ b/paimon-python/pypaimon/tests/reader_base_test.py
@@ -38,7 +38,6 @@ from pypaimon.manifest.schema.simple_stats import SimpleStats
from pypaimon.schema.data_types import (ArrayType, AtomicType, DataField,
MapType, PyarrowFieldParser)
from pypaimon.schema.table_schema import TableSchema
-from pypaimon.snapshot.snapshot_manager import SnapshotManager
from pypaimon.table.row.generic_row import GenericRow, GenericRowDeserializer
from pypaimon.write.file_store_commit import FileStoreCommit
@@ -226,7 +225,7 @@ class ReaderBasicTest(unittest.TestCase):
self.assertEqual(actual_data, expect_data)
# to test GenericRow ability
- latest_snapshot = SnapshotManager(table).get_latest_snapshot()
+ latest_snapshot = table.snapshot_manager().get_latest_snapshot()
manifest_files =
table_scan.file_scanner.manifest_list_manager.read_all(latest_snapshot)
manifest_entries = table_scan.file_scanner.manifest_file_manager.read(
manifest_files[0].file_name, lambda row:
table_scan.file_scanner._filter_manifest_entry(row), False)
@@ -517,7 +516,7 @@ class ReaderBasicTest(unittest.TestCase):
pk_read_builder = pk_table.new_read_builder()
pk_table_scan = pk_read_builder.new_scan()
- latest_snapshot = SnapshotManager(pk_table).get_latest_snapshot()
+ latest_snapshot = pk_table.snapshot_manager().get_latest_snapshot()
pk_manifest_files =
pk_table_scan.file_scanner.manifest_list_manager.read_all(latest_snapshot)
pk_manifest_entries =
pk_table_scan.file_scanner.manifest_file_manager.read(
pk_manifest_files[0].file_name,
@@ -592,7 +591,7 @@ class ReaderBasicTest(unittest.TestCase):
read_builder = table.new_read_builder()
table_scan = read_builder.new_scan()
- latest_snapshot = SnapshotManager(table).get_latest_snapshot()
+ latest_snapshot = table.snapshot_manager().get_latest_snapshot()
manifest_files =
table_scan.file_scanner.manifest_list_manager.read_all(latest_snapshot)
manifest_entries = table_scan.file_scanner.manifest_file_manager.read(
manifest_files[0].file_name,
@@ -1105,7 +1104,7 @@ class ReaderBasicTest(unittest.TestCase):
# Read manifest to verify value_stats_cols is None (all fields
included)
read_builder = table.new_read_builder()
table_scan = read_builder.new_scan()
- latest_snapshot = SnapshotManager(table).get_latest_snapshot()
+ latest_snapshot = table.snapshot_manager().get_latest_snapshot()
manifest_files =
table_scan.file_scanner.manifest_list_manager.read_all(latest_snapshot)
manifest_entries = table_scan.file_scanner.manifest_file_manager.read(
manifest_files[0].file_name,
diff --git a/paimon-python/pypaimon/tests/reader_primary_key_test.py
b/paimon-python/pypaimon/tests/reader_primary_key_test.py
index 541dabe895..36ec678c8c 100644
--- a/paimon-python/pypaimon/tests/reader_primary_key_test.py
+++ b/paimon-python/pypaimon/tests/reader_primary_key_test.py
@@ -26,7 +26,6 @@ import pyarrow as pa
from pypaimon import CatalogFactory, Schema
from pypaimon.common.options.core_options import CoreOptions
-from pypaimon.snapshot.snapshot_manager import SnapshotManager
class PkReaderTest(unittest.TestCase):
@@ -339,7 +338,7 @@ class PkReaderTest(unittest.TestCase):
timestamp = int(time.time() * 1000)
self._write_test_table(table)
- snapshot_manager = SnapshotManager(table)
+ snapshot_manager = table.snapshot_manager()
t1 = snapshot_manager.get_snapshot_by_id(1).time_millis
t2 = snapshot_manager.get_snapshot_by_id(2).time_millis
# test 1
@@ -386,7 +385,7 @@ class PkReaderTest(unittest.TestCase):
table_write.close()
table_commit.close()
- snapshot_manager = SnapshotManager(table)
+ snapshot_manager = table.snapshot_manager()
t10 = snapshot_manager.get_snapshot_by_id(10).time_millis
t20 = snapshot_manager.get_snapshot_by_id(20).time_millis
@@ -412,7 +411,7 @@ class PkReaderTest(unittest.TestCase):
self._write_test_table(table)
- snapshot_manager = SnapshotManager(table)
+ snapshot_manager = table.snapshot_manager()
latest_snapshot = snapshot_manager.get_latest_snapshot()
read_builder = table.new_read_builder()
table_scan = read_builder.new_scan()
@@ -575,7 +574,7 @@ class PkReaderTest(unittest.TestCase):
f"Iteration {test_iteration}: User IDs mismatch")
# Verify snapshot count (should have num_threads snapshots)
- snapshot_manager = SnapshotManager(table)
+ snapshot_manager = table.snapshot_manager()
latest_snapshot = snapshot_manager.get_latest_snapshot()
self.assertIsNotNone(latest_snapshot,
f"Iteration {test_iteration}: Latest snapshot
should not be None")
diff --git a/paimon-python/pypaimon/tests/snapshot_manager_test.py
b/paimon-python/pypaimon/tests/snapshot_manager_test.py
index 0d1f1b1388..d2ac8360c5 100644
--- a/paimon-python/pypaimon/tests/snapshot_manager_test.py
+++ b/paimon-python/pypaimon/tests/snapshot_manager_test.py
@@ -33,64 +33,47 @@ def _create_mock_snapshot(snapshot_id: int, commit_kind:
str = "APPEND"):
return snapshot
+def _build_manager(file_io):
+ from pypaimon.snapshot.snapshot_manager import SnapshotManager
+ return SnapshotManager(file_io, "/tmp/test_table")
+
+
class SnapshotManagerTest(unittest.TestCase):
"""Tests for SnapshotManager batch lookahead methods."""
def test_find_next_scannable_returns_first_matching(self):
"""find_next_scannable should return the first snapshot that passes
should_scan."""
- from pypaimon.snapshot.snapshot_manager import SnapshotManager
-
- table = Mock()
- table.table_path = "/tmp/test_table"
- table.current_branch.return_value = "main"
- table.file_io = Mock()
- table.file_io.exists_batch.return_value = {
+ file_io = Mock()
+ file_io.exists_batch.return_value = {
"/tmp/test_table/snapshot/snapshot-5": True,
"/tmp/test_table/snapshot/snapshot-6": True,
"/tmp/test_table/snapshot/snapshot-7": True,
}
- table.catalog_environment = Mock()
- table.catalog_environment.snapshot_loader.return_value = None
- # Create mock snapshots with different commit kinds
snapshots = {
5: _create_mock_snapshot(5, "COMPACT"),
6: _create_mock_snapshot(6, "COMPACT"),
7: _create_mock_snapshot(7, "APPEND"),
}
- manager = SnapshotManager(table)
+ manager = _build_manager(file_io)
+ manager.get_snapshot_by_id = lambda sid: snapshots.get(sid)
- # Mock get_snapshot_by_id to return our test snapshots
- def mock_get_snapshot(sid):
- return snapshots.get(sid)
-
- manager.get_snapshot_by_id = mock_get_snapshot
-
- # should_scan only accepts APPEND commits
def should_scan(snapshot):
return snapshot.commit_kind == "APPEND"
result, next_id, skipped_count = manager.find_next_scannable(5,
should_scan, lookahead_size=5)
- self.assertEqual(result.id, 7) # First APPEND snapshot
- self.assertEqual(next_id, 8) # Next ID to check
- self.assertEqual(skipped_count, 2) # Skipped snapshots 5 and 6
+ self.assertEqual(result.id, 7)
+ self.assertEqual(next_id, 8)
+ self.assertEqual(skipped_count, 2)
def test_find_next_scannable_returns_none_when_no_snapshot_exists(self):
"""find_next_scannable should return None when no snapshot exists at
start_id."""
- from pypaimon.snapshot.snapshot_manager import SnapshotManager
-
- table = Mock()
- table.table_path = "/tmp/test_table"
- table.current_branch.return_value = "main"
- table.file_io = Mock()
- # All paths return False (no files exist)
- table.file_io.exists_batch.return_value = {}
- table.catalog_environment = Mock()
- table.catalog_environment.snapshot_loader.return_value = None
+ file_io = Mock()
+ file_io.exists_batch.return_value = {}
- manager = SnapshotManager(table)
+ manager = _build_manager(file_io)
def should_scan(snapshot):
return True
@@ -98,26 +81,17 @@ class SnapshotManagerTest(unittest.TestCase):
result, next_id, skipped_count = manager.find_next_scannable(5,
should_scan, lookahead_size=5)
self.assertIsNone(result)
- self.assertEqual(next_id, 5) # Still at start_id
+ self.assertEqual(next_id, 5)
self.assertEqual(skipped_count, 0)
def test_find_next_scannable_continues_when_all_skipped(self):
"""When all lookahead snapshots are skipped, next_id should be
start+lookahead."""
- from pypaimon.snapshot.snapshot_manager import SnapshotManager
-
- table = Mock()
- table.table_path = "/tmp/test_table"
- table.current_branch.return_value = "main"
- table.file_io = Mock()
-
- # All 3 snapshots exist but are COMPACT (will be skipped)
- table.file_io.exists_batch.return_value = {
+ file_io = Mock()
+ file_io.exists_batch.return_value = {
"/tmp/test_table/snapshot/snapshot-5": True,
"/tmp/test_table/snapshot/snapshot-6": True,
"/tmp/test_table/snapshot/snapshot-7": True,
}
- table.catalog_environment = Mock()
- table.catalog_environment.snapshot_loader.return_value = None
snapshots = {
5: _create_mock_snapshot(5, "COMPACT"),
@@ -125,21 +99,17 @@ class SnapshotManagerTest(unittest.TestCase):
7: _create_mock_snapshot(7, "COMPACT"),
}
- manager = SnapshotManager(table)
-
- def mock_get_snapshot(sid):
- return snapshots.get(sid)
-
- manager.get_snapshot_by_id = mock_get_snapshot
+ manager = _build_manager(file_io)
+ manager.get_snapshot_by_id = lambda sid: snapshots.get(sid)
def should_scan(snapshot):
return snapshot.commit_kind == "APPEND"
result, next_id, skipped_count = manager.find_next_scannable(5,
should_scan, lookahead_size=3)
- self.assertIsNone(result) # No APPEND found
- self.assertEqual(next_id, 8) # 5 + 3 = 8, continue from here
- self.assertEqual(skipped_count, 3) # All 3 were skipped
+ self.assertIsNone(result)
+ self.assertEqual(next_id, 8)
+ self.assertEqual(skipped_count, 3)
if __name__ == '__main__':
diff --git a/paimon-python/pypaimon/tests/streaming_table_scan_test.py
b/paimon-python/pypaimon/tests/streaming_table_scan_test.py
index a0b456027e..7e5386a86f 100644
--- a/paimon-python/pypaimon/tests/streaming_table_scan_test.py
+++ b/paimon-python/pypaimon/tests/streaming_table_scan_test.py
@@ -67,14 +67,14 @@ def _create_mock_table(latest_snapshot_id: int = 5):
class AsyncStreamingTableScanTest(unittest.TestCase):
"""Tests for AsyncStreamingTableScan async streaming functionality."""
- @patch('pypaimon.read.streaming_table_scan.SnapshotManager')
@patch('pypaimon.read.streaming_table_scan.ManifestListManager')
@patch('pypaimon.read.streaming_table_scan.FileScanner')
- def test_initial_scan(self, MockStartingScanner, MockManifestListManager,
MockSnapshotManager):
+ def test_initial_scan(self, MockStartingScanner, MockManifestListManager):
"""Initial scan should yield a Plan and set next_snapshot_id to latest
+ 1."""
table, _ = _create_mock_table(latest_snapshot_id=5)
- mock_snapshot_manager = MockSnapshotManager.return_value
+ mock_snapshot_manager = Mock()
+ table.snapshot_manager.return_value = mock_snapshot_manager
mock_snapshot_manager.get_latest_snapshot.return_value =
_create_mock_snapshot(5)
mock_snapshot_manager.get_snapshot_by_id.return_value = None
@@ -91,15 +91,15 @@ class AsyncStreamingTableScanTest(unittest.TestCase):
self.assertIsInstance(plan, Plan)
self.assertEqual(scan.next_snapshot_id, 6)
- @patch('pypaimon.read.streaming_table_scan.SnapshotManager')
@patch('pypaimon.read.streaming_table_scan.ManifestListManager')
@patch('pypaimon.read.streaming_table_scan.FileScanner')
- def test_stream_skips_non_append_commits(self, MockStartingScanner,
MockManifestListManager, MockSnapshotManager):
+ def test_stream_skips_non_append_commits(self, MockStartingScanner,
MockManifestListManager):
"""Stream should skip COMPACT/OVERWRITE commits."""
table, _ = _create_mock_table(latest_snapshot_id=7)
# Setup mocks
- mock_snapshot_manager = MockSnapshotManager.return_value
+ mock_snapshot_manager = Mock()
+ table.snapshot_manager.return_value = mock_snapshot_manager
# Snapshots: 6 (COMPACT - skip), 7 (APPEND - scan)
snapshot_7 = _create_mock_snapshot(7, "APPEND")
@@ -137,15 +137,15 @@ class AsyncStreamingTableScanTest(unittest.TestCase):
# Verify lookahead skipped 1 snapshot
self.assertEqual(scan._lookahead_skips, 1)
- @patch('pypaimon.read.streaming_table_scan.SnapshotManager')
@patch('pypaimon.read.streaming_table_scan.ManifestListManager')
@patch('pypaimon.read.streaming_table_scan.FileScanner')
- def test_stream_sync_yields_plans(self, MockStartingScanner,
MockManifestListManager, MockSnapshotManager):
+ def test_stream_sync_yields_plans(self, MockStartingScanner,
MockManifestListManager):
"""stream_sync() should provide a synchronous iterator."""
table, _ = _create_mock_table(latest_snapshot_id=5)
# Setup mocks
- mock_snapshot_manager = MockSnapshotManager.return_value
+ mock_snapshot_manager = Mock()
+ table.snapshot_manager.return_value = mock_snapshot_manager
mock_snapshot_manager.get_latest_snapshot.return_value =
_create_mock_snapshot(5)
mock_snapshot_manager.get_snapshot_by_id.return_value = None
@@ -159,9 +159,8 @@ class AsyncStreamingTableScanTest(unittest.TestCase):
self.assertIsInstance(plan, Plan)
break # Just get one
- @patch('pypaimon.read.streaming_table_scan.SnapshotManager')
@patch('pypaimon.read.streaming_table_scan.ManifestListManager')
- def test_poll_interval_configurable(self, MockManifestListManager,
MockSnapshotManager):
+ def test_poll_interval_configurable(self, MockManifestListManager):
"""Poll interval should be configurable."""
table, _ = _create_mock_table()
@@ -169,14 +168,14 @@ class AsyncStreamingTableScanTest(unittest.TestCase):
self.assertEqual(scan.poll_interval, 0.5)
- @patch('pypaimon.read.streaming_table_scan.SnapshotManager')
@patch('pypaimon.read.streaming_table_scan.ManifestListManager')
@patch('pypaimon.read.streaming_table_scan.FileScanner')
- def test_no_snapshot_waits_and_polls(self, MockStartingScanner,
MockManifestListManager, MockSnapshotManager):
+ def test_no_snapshot_waits_and_polls(self, MockStartingScanner,
MockManifestListManager):
"""When no new snapshot exists, should wait and poll again."""
table, _ = _create_mock_table(latest_snapshot_id=5)
- mock_snapshot_manager = MockSnapshotManager.return_value
+ mock_snapshot_manager = Mock()
+ table.snapshot_manager.return_value = mock_snapshot_manager
mock_snapshot_manager.get_cache_stats.return_value = {"cache_hits": 0,
"cache_misses": 0, "cache_size": 0}
# No snapshot 6 exists yet - find_next_scannable returns (None, 6, 0)
first,
@@ -216,36 +215,33 @@ class AsyncStreamingTableScanTest(unittest.TestCase):
class StreamingPrefetchTest(unittest.TestCase):
"""Tests for prefetching functionality in AsyncStreamingTableScan."""
- @patch('pypaimon.read.streaming_table_scan.SnapshotManager')
@patch('pypaimon.read.streaming_table_scan.ManifestListManager')
@patch('pypaimon.read.streaming_table_scan.ManifestFileManager')
- def test_prefetch_enabled_by_default(self, MockManifestFileManager,
MockManifestListManager, MockSnapshotManager):
+ def test_prefetch_enabled_by_default(self, MockManifestFileManager,
MockManifestListManager):
"""Prefetching should be enabled by default."""
table, _ = _create_mock_table()
scan = AsyncStreamingTableScan(table)
self.assertTrue(scan._prefetch_enabled)
- @patch('pypaimon.read.streaming_table_scan.SnapshotManager')
@patch('pypaimon.read.streaming_table_scan.ManifestListManager')
@patch('pypaimon.read.streaming_table_scan.ManifestFileManager')
- def test_prefetch_can_be_disabled(self, MockManifestFileManager,
MockManifestListManager, MockSnapshotManager):
+ def test_prefetch_can_be_disabled(self, MockManifestFileManager,
MockManifestListManager):
"""Prefetching can be disabled via constructor parameter."""
table, _ = _create_mock_table()
scan = AsyncStreamingTableScan(table, prefetch_enabled=False)
self.assertFalse(scan._prefetch_enabled)
- @patch('pypaimon.read.streaming_table_scan.SnapshotManager')
@patch('pypaimon.read.streaming_table_scan.ManifestListManager')
@patch('pypaimon.read.streaming_table_scan.ManifestFileManager')
def test_prefetch_starts_after_yielding_plan(
self,
MockManifestFileManager,
- MockManifestListManager,
- MockSnapshotManager):
+ MockManifestListManager):
"""After yielding a plan, prefetch for next snapshot should start."""
table, _ = _create_mock_table(latest_snapshot_id=5)
- mock_snapshot_manager = MockSnapshotManager.return_value
+ mock_snapshot_manager = Mock()
+ table.snapshot_manager.return_value = mock_snapshot_manager
mock_manifest_list_manager = MockManifestListManager.return_value
mock_manifest_file_manager = MockManifestFileManager.return_value
mock_snapshot_manager.get_cache_stats.return_value = {"cache_hits": 0,
"cache_misses": 0, "cache_size": 0}
@@ -292,18 +288,17 @@ class StreamingPrefetchTest(unittest.TestCase):
plans = asyncio.run(get_two_plans())
self.assertEqual(len(plans), 2)
- @patch('pypaimon.read.streaming_table_scan.SnapshotManager')
@patch('pypaimon.read.streaming_table_scan.ManifestListManager')
@patch('pypaimon.read.streaming_table_scan.ManifestFileManager')
def test_prefetch_returns_same_data_as_sequential(
self,
MockManifestFileManager,
- MockManifestListManager,
- MockSnapshotManager):
+ MockManifestListManager):
"""Prefetched plans should contain the same data as non-prefetched."""
table, _ = _create_mock_table(latest_snapshot_id=5)
- mock_snapshot_manager = MockSnapshotManager.return_value
+ mock_snapshot_manager = Mock()
+ table.snapshot_manager.return_value = mock_snapshot_manager
mock_manifest_list_manager = MockManifestListManager.return_value
mock_manifest_file_manager = MockManifestFileManager.return_value
mock_snapshot_manager.get_cache_stats.return_value = {"cache_hits": 0,
"cache_misses": 0, "cache_size": 0}
@@ -346,18 +341,17 @@ class StreamingPrefetchTest(unittest.TestCase):
# Both should get the same number of plans
self.assertEqual(len(plans_prefetch), len(plans_sequential))
- @patch('pypaimon.read.streaming_table_scan.SnapshotManager')
@patch('pypaimon.read.streaming_table_scan.ManifestListManager')
@patch('pypaimon.read.streaming_table_scan.ManifestFileManager')
def test_prefetch_handles_no_next_snapshot(
self,
MockManifestFileManager,
- MockManifestListManager,
- MockSnapshotManager):
+ MockManifestListManager):
"""When no next snapshot exists, prefetch should return None
gracefully."""
table, _ = _create_mock_table(latest_snapshot_id=5)
- mock_snapshot_manager = MockSnapshotManager.return_value
+ mock_snapshot_manager = Mock()
+ table.snapshot_manager.return_value = mock_snapshot_manager
mock_manifest_list_manager = MockManifestListManager.return_value
mock_manifest_file_manager = MockManifestFileManager.return_value
mock_snapshot_manager.get_cache_stats.return_value = {"cache_hits": 0,
"cache_misses": 0, "cache_size": 0}
@@ -391,18 +385,17 @@ class StreamingPrefetchTest(unittest.TestCase):
plan = asyncio.run(get_one_plan())
self.assertIsInstance(plan, Plan)
- @patch('pypaimon.read.streaming_table_scan.SnapshotManager')
@patch('pypaimon.read.streaming_table_scan.ManifestListManager')
@patch('pypaimon.read.streaming_table_scan.ManifestFileManager')
def test_prefetch_disabled_no_prefetch_future(
self,
MockManifestFileManager,
- MockManifestListManager,
- MockSnapshotManager):
+ MockManifestListManager):
"""With prefetch disabled, no prefetch future should be created."""
table, _ = _create_mock_table(latest_snapshot_id=5)
- mock_snapshot_manager = MockSnapshotManager.return_value
+ mock_snapshot_manager = Mock()
+ table.snapshot_manager.return_value = mock_snapshot_manager
mock_manifest_list_manager = MockManifestListManager.return_value
mock_manifest_file_manager = MockManifestFileManager.return_value
mock_snapshot_manager.get_cache_stats.return_value = {"cache_hits": 0,
"cache_misses": 0, "cache_size": 0}
@@ -437,12 +430,11 @@ class StreamingCatchUpDiffTest(unittest.TestCase):
"""Tests for diff-based catch-up optimization in
AsyncStreamingTableScan."""
@patch('pypaimon.read.streaming_table_scan.IncrementalDiffScanner')
- @patch('pypaimon.read.streaming_table_scan.SnapshotManager')
@patch('pypaimon.read.streaming_table_scan.ManifestListManager')
@patch('pypaimon.read.streaming_table_scan.ManifestFileManager')
def test_stream_triggers_diff_catch_up_for_large_gap(
self, MockManifestFileManager, MockManifestListManager,
- MockSnapshotManager, MockDiffScanner
+ MockDiffScanner
):
"""
When starting with a large gap, stream() should use diff scanner.
@@ -454,7 +446,8 @@ class StreamingCatchUpDiffTest(unittest.TestCase):
"""
table, _ = _create_mock_table(latest_snapshot_id=100)
- mock_snapshot_manager = MockSnapshotManager.return_value
+ mock_snapshot_manager = Mock()
+ table.snapshot_manager.return_value = mock_snapshot_manager
mock_diff_scanner = MockDiffScanner.return_value
# Setup: latest is 100, start is 5 (gap=95)
@@ -493,19 +486,19 @@ class StreamingConsumerTest(unittest.TestCase):
"""Tests for consumer management integration in AsyncStreamingTableScan."""
@patch('pypaimon.read.streaming_table_scan.ConsumerManager')
- @patch('pypaimon.read.streaming_table_scan.SnapshotManager')
@patch('pypaimon.read.streaming_table_scan.ManifestListManager')
@patch('pypaimon.read.streaming_table_scan.ManifestFileManager')
def test_consumer_restores_next_snapshot_id(
self, MockManifestFileManager, MockManifestListManager,
- MockSnapshotManager, MockConsumerManager
+ MockConsumerManager
):
"""When consumer exists, stream() should resume from saved position."""
from pypaimon.consumer.consumer import Consumer
table, _ = _create_mock_table(latest_snapshot_id=10)
- mock_snapshot_manager = MockSnapshotManager.return_value
+ mock_snapshot_manager = Mock()
+ table.snapshot_manager.return_value = mock_snapshot_manager
mock_consumer_manager = MockConsumerManager.return_value
mock_manifest_list_manager = MockManifestListManager.return_value
@@ -541,12 +534,11 @@ class StreamingConsumerTest(unittest.TestCase):
self.assertEqual(scan.next_snapshot_id, 9)
@patch('pypaimon.read.streaming_table_scan.ConsumerManager')
- @patch('pypaimon.read.streaming_table_scan.SnapshotManager')
@patch('pypaimon.read.streaming_table_scan.ManifestListManager')
@patch('pypaimon.read.streaming_table_scan.FileScanner')
def test_consumer_saves_after_yield(
self, MockFileScanner, MockManifestListManager,
- MockSnapshotManager, MockConsumerManager
+ MockConsumerManager
):
"""Consumer progress is flushed on the next __anext__() call after
yielding a plan.
@@ -556,7 +548,8 @@ class StreamingConsumerTest(unittest.TestCase):
"""
table, _ = _create_mock_table(latest_snapshot_id=5)
- mock_snapshot_manager = MockSnapshotManager.return_value
+ mock_snapshot_manager = Mock()
+ table.snapshot_manager.return_value = mock_snapshot_manager
mock_consumer_manager = MockConsumerManager.return_value
# No existing consumer state
@@ -597,12 +590,10 @@ class StreamingConsumerTest(unittest.TestCase):
self.assertEqual(call_args[0][0], "save-test")
self.assertEqual(call_args[0][1].next_snapshot, 6)
- @patch('pypaimon.read.streaming_table_scan.SnapshotManager')
@patch('pypaimon.read.streaming_table_scan.ManifestListManager')
@patch('pypaimon.read.streaming_table_scan.ManifestFileManager')
def test_no_consumer_when_consumer_id_not_set(
- self, MockManifestFileManager, MockManifestListManager,
- MockSnapshotManager
+ self, MockManifestFileManager, MockManifestListManager
):
"""Without consumer_id, no ConsumerManager should be created."""
table, _ = _create_mock_table()
diff --git a/paimon-python/pypaimon/write/file_store_commit.py
b/paimon-python/pypaimon/write/file_store_commit.py
index 97d4ed9298..6eb7f5f5da 100644
--- a/paimon-python/pypaimon/write/file_store_commit.py
+++ b/paimon-python/pypaimon/write/file_store_commit.py
@@ -34,7 +34,6 @@ from pypaimon.read.scanner.file_scanner import FileScanner
from pypaimon.snapshot.snapshot import Snapshot
from pypaimon.snapshot.snapshot_commit import (PartitionStatistics,
SnapshotCommit)
-from pypaimon.snapshot.snapshot_manager import SnapshotManager
from pypaimon.table.row.generic_row import GenericRow
from pypaimon.table.row.offset_row import OffsetRow
from pypaimon.write.commit.commit_rollback import CommitRollback
@@ -85,7 +84,7 @@ class FileStoreCommit:
self.table: FileStoreTable = table
self.commit_user = commit_user
- self.snapshot_manager = SnapshotManager(table)
+ self.snapshot_manager = table.snapshot_manager()
self.manifest_file_manager = ManifestFileManager(table)
self.manifest_list_manager = ManifestListManager(table)