This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 4aec277912 [python] Align SnapshotManager constructor with Java (#7758)
4aec277912 is described below

commit 4aec277912538a92d030492d563970fbbbb50a4e
Author: chaoyang <[email protected]>
AuthorDate: Fri May 8 11:19:01 2026 +0800

    [python] Align SnapshotManager constructor with Java (#7758)
    
    Switch the Python `SnapshotManager.__init__` from `(table)` to the
    Java-aligned `(file_io, table_path, branch=None, snapshot_loader=None)`,
    so the class no longer depends on `FileStoreTable`. Mirrors
    `paimon-core/.../utils/SnapshotManager.java`'s constructor (which takes
    `FileIO, Path tablePath, @Nullable String branchName, @Nullable
    SnapshotLoader, @Nullable Cache`); the `Cache` parameter is left out of
    scope (Caffeine has no Python equivalent yet).
    
    The previous Python signature held `self.table = table` and reached
    through `self.table.file_io` /
    `self.table.catalog_environment.snapshot_loader()` /
    `self.table.current_branch()` to populate fields. That made
    `SnapshotManager` a layering-violation -- the snapshot layer reaching
    back into the table layer -- and forced mock tests to stub a
    four-attribute mock table just to construct the manager. The new
    constructor takes the four basics directly.
---
 .../acceptance/incremental_diff_acceptance_test.py |  5 +-
 .../pypaimon/globalindex/global_index_scanner.py   |  3 +-
 .../pypaimon/read/scanner/file_scanner.py          |  3 +-
 .../pypaimon/read/streaming_table_scan.py          |  3 +-
 paimon-python/pypaimon/read/table_scan.py          |  3 +-
 .../pypaimon/snapshot/snapshot_manager.py          | 37 +++++-----
 paimon-python/pypaimon/table/file_store_table.py   |  7 +-
 .../pypaimon/table/source/full_text_scan.py        |  3 +-
 .../pypaimon/table/source/vector_search_scan.py    |  3 +-
 paimon-python/pypaimon/tests/blob_table_test.py    |  3 +-
 .../pypaimon/tests/branch_manager_test.py          |  4 +-
 .../pypaimon/tests/changelog_manager_test.py       |  3 +-
 .../pypaimon/tests/data_evolution_test.py          |  3 +-
 .../pypaimon/tests/file_store_commit_test.py       | 19 +++--
 .../pypaimon/tests/partition_predicate_test.py     |  2 -
 .../pypaimon/tests/py36/rest_ao_read_write_test.py |  5 +-
 .../pypaimon/tests/reader_append_only_test.py      | 11 ++-
 paimon-python/pypaimon/tests/reader_base_test.py   |  9 ++-
 .../pypaimon/tests/reader_primary_key_test.py      |  9 ++-
 .../pypaimon/tests/snapshot_manager_test.py        | 76 ++++++--------------
 .../pypaimon/tests/streaming_table_scan_test.py    | 83 ++++++++++------------
 paimon-python/pypaimon/write/file_store_commit.py  |  3 +-
 22 files changed, 124 insertions(+), 173 deletions(-)

diff --git 
a/paimon-python/pypaimon/acceptance/incremental_diff_acceptance_test.py 
b/paimon-python/pypaimon/acceptance/incremental_diff_acceptance_test.py
index 0dc7ebf53f..a6a56850ff 100644
--- a/paimon-python/pypaimon/acceptance/incremental_diff_acceptance_test.py
+++ b/paimon-python/pypaimon/acceptance/incremental_diff_acceptance_test.py
@@ -38,7 +38,6 @@ from pypaimon.read.scanner.append_table_split_generator 
import \
     AppendTableSplitGenerator
 from pypaimon.read.scanner.incremental_diff_scanner import \
     IncrementalDiffScanner
-from pypaimon.snapshot.snapshot_manager import SnapshotManager
 
 
 class IncrementalDiffAcceptanceTest(unittest.TestCase):
@@ -94,7 +93,7 @@ class IncrementalDiffAcceptanceTest(unittest.TestCase):
 
     def _read_via_diff(self, table, start_snap_id, end_snap_id):
         """Read data using IncrementalDiffScanner between two snapshots."""
-        snapshot_manager = SnapshotManager(table)
+        snapshot_manager = table.snapshot_manager()
         start_snapshot = snapshot_manager.get_snapshot_by_id(start_snap_id)
         end_snapshot = snapshot_manager.get_snapshot_by_id(end_snap_id)
 
@@ -106,7 +105,7 @@ class IncrementalDiffAcceptanceTest(unittest.TestCase):
 
     def _read_via_delta(self, table, start_snap_id, end_snap_id):
         """Read data by iterating delta_manifest_lists between two 
snapshots."""
-        snapshot_manager = SnapshotManager(table)
+        snapshot_manager = table.snapshot_manager()
         manifest_list_manager = ManifestListManager(table)
         manifest_file_manager = ManifestFileManager(table)
 
diff --git a/paimon-python/pypaimon/globalindex/global_index_scanner.py 
b/paimon-python/pypaimon/globalindex/global_index_scanner.py
index 38408059a9..c51be0731b 100644
--- a/paimon-python/pypaimon/globalindex/global_index_scanner.py
+++ b/paimon-python/pypaimon/globalindex/global_index_scanner.py
@@ -111,8 +111,7 @@ class GlobalIndexScanner:
                 return False
             return global_index_meta.index_field_id in filter_field_ids
 
-        from pypaimon.snapshot.snapshot_manager import SnapshotManager
-        snapshot = SnapshotManager(table).get_latest_snapshot()
+        snapshot = table.snapshot_manager().get_latest_snapshot()
         index_file_handler = IndexFileHandler(table=table)
         entries = index_file_handler.scan(snapshot, index_file_filter)
         scanned_index_files = [entry.index_file for entry in entries]
diff --git a/paimon-python/pypaimon/read/scanner/file_scanner.py 
b/paimon-python/pypaimon/read/scanner/file_scanner.py
index ffbd83daf0..39c7401746 100755
--- a/paimon-python/pypaimon/read/scanner/file_scanner.py
+++ b/paimon-python/pypaimon/read/scanner/file_scanner.py
@@ -41,7 +41,6 @@ from pypaimon.read.scanner.primary_key_table_split_generator 
import \
     PrimaryKeyTableSplitGenerator
 from pypaimon.read.split import DataSplit
 from pypaimon.snapshot.snapshot import Snapshot
-from pypaimon.snapshot.snapshot_manager import SnapshotManager
 from pypaimon.table.bucket_mode import BucketMode
 from pypaimon.table.source.deletion_file import DeletionFile
 
@@ -179,7 +178,7 @@ class FileScanner:
         self.predicate_for_stats = remove_row_id_filter(predicate) if 
predicate else None
         self.limit = limit
 
-        self.snapshot_manager = SnapshotManager(table)
+        self.snapshot_manager = table.snapshot_manager()
         self.manifest_list_manager = ManifestListManager(table)
         self.manifest_file_manager = ManifestFileManager(table)
 
diff --git a/paimon-python/pypaimon/read/streaming_table_scan.py 
b/paimon-python/pypaimon/read/streaming_table_scan.py
index f426d2064f..f7f8ca4182 100644
--- a/paimon-python/pypaimon/read/streaming_table_scan.py
+++ b/paimon-python/pypaimon/read/streaming_table_scan.py
@@ -48,7 +48,6 @@ from pypaimon.read.scanner.incremental_diff_scanner import \
 from pypaimon.read.scanner.primary_key_table_split_generator import \
     PrimaryKeyTableSplitGenerator
 from pypaimon.snapshot.snapshot import Snapshot
-from pypaimon.snapshot.snapshot_manager import SnapshotManager
 
 
 class AsyncStreamingTableScan:
@@ -103,7 +102,7 @@ class AsyncStreamingTableScan:
         self._lookahead_size = 10  # How many snapshots to look ahead
 
         # Initialize managers
-        self._snapshot_manager = SnapshotManager(table)
+        self._snapshot_manager = table.snapshot_manager()
         self._manifest_list_manager = ManifestListManager(table)
         self._manifest_file_manager = ManifestFileManager(table)
 
diff --git a/paimon-python/pypaimon/read/table_scan.py 
b/paimon-python/pypaimon/read/table_scan.py
index c754b50111..562bea26f5 100755
--- a/paimon-python/pypaimon/read/table_scan.py
+++ b/paimon-python/pypaimon/read/table_scan.py
@@ -23,7 +23,6 @@ from pypaimon.common.predicate import Predicate
 
 from pypaimon.read.plan import Plan
 from pypaimon.read.scanner.file_scanner import FileScanner
-from pypaimon.snapshot.snapshot_manager import SnapshotManager
 from pypaimon.manifest.manifest_list_manager import ManifestListManager
 
 
@@ -48,7 +47,7 @@ class TableScan:
 
     def _create_file_scanner(self) -> FileScanner:
         options = self.table.options.options
-        snapshot_manager = SnapshotManager(self.table)
+        snapshot_manager = self.table.snapshot_manager()
         manifest_list_manager = ManifestListManager(self.table)
         if options.contains(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP):
             ts = 
options.get(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP).split(",")
diff --git a/paimon-python/pypaimon/snapshot/snapshot_manager.py 
b/paimon-python/pypaimon/snapshot/snapshot_manager.py
index 074fc844cd..6f2bef7225 100644
--- a/paimon-python/pypaimon/snapshot/snapshot_manager.py
+++ b/paimon-python/pypaimon/snapshot/snapshot_manager.py
@@ -29,24 +29,24 @@ from pypaimon.snapshot.snapshot_loader import SnapshotLoader
 class SnapshotManager:
     """Manager for snapshot files using unified FileIO."""
 
-    def __init__(self, table, branch: Optional[str] = None):
-        # Lazy imports to avoid a cycle: pypaimon.branch.__init__
+    def __init__(
+        self,
+        file_io: FileIO,
+        table_path: str,
+        branch: Optional[str] = None,
+        snapshot_loader: Optional[SnapshotLoader] = None,
+    ):
+        # Lazy import to avoid a cycle: pypaimon.branch.__init__
         # eagerly loads FileSystemBranchManager, which imports
         # SnapshotManager.
         from pypaimon.branch.branch_manager import BranchManager
-        from pypaimon.common.identifier import DEFAULT_MAIN_BRANCH
-        from pypaimon.table.file_store_table import FileStoreTable
-
-        self.table: FileStoreTable = table
-        self.file_io: FileIO = self.table.file_io
-        self.snapshot_loader: Optional[SnapshotLoader] = 
self.table.catalog_environment.snapshot_loader()
 
-        if branch is None:
-            branch = self.table.current_branch() or DEFAULT_MAIN_BRANCH
-        self.branch = BranchManager.normalize_branch(branch)
+        self.file_io: FileIO = file_io
+        self.table_path: str = table_path.rstrip('/')
+        self.branch: str = BranchManager.normalize_branch(branch)
+        self.snapshot_loader: Optional[SnapshotLoader] = snapshot_loader
 
-        table_root = self.table.table_path.rstrip('/')
-        branch_root = BranchManager.branch_path(table_root, self.branch)
+        branch_root = BranchManager.branch_path(self.table_path, self.branch)
         self.snapshot_dir = f"{branch_root}/snapshot"
         self.latest_file = f"{self.snapshot_dir}/LATEST"
 
@@ -55,10 +55,13 @@ class SnapshotManager:
         # carries a SnapshotLoader rebranched to ``branch_name`` so REST
         # loads target the correct branch instead of falling back to the
         # main-branch identifier.
-        new_manager = SnapshotManager(self.table, branch_name)
-        if self.snapshot_loader is not None:
-            new_manager.snapshot_loader = 
self.snapshot_loader.copy_with_branch(branch_name)
-        return new_manager
+        rebranched_loader = (
+            self.snapshot_loader.copy_with_branch(branch_name)
+            if self.snapshot_loader is not None
+            else None
+        )
+        return SnapshotManager(
+            self.file_io, self.table_path, branch_name, rebranched_loader)
 
     def get_latest_snapshot(self) -> Optional[Snapshot]:
         """
diff --git a/paimon-python/pypaimon/table/file_store_table.py 
b/paimon-python/pypaimon/table/file_store_table.py
index 4b49a62869..35addad251 100644
--- a/paimon-python/pypaimon/table/file_store_table.py
+++ b/paimon-python/pypaimon/table/file_store_table.py
@@ -98,7 +98,12 @@ class FileStoreTable(Table):
     def snapshot_manager(self):
         """Get the snapshot manager for this table."""
         from pypaimon.snapshot.snapshot_manager import SnapshotManager
-        return SnapshotManager(self)
+        return SnapshotManager(
+            self.file_io,
+            self.table_path,
+            self.current_branch(),
+            self.catalog_environment.snapshot_loader(),
+        )
 
     def tag_manager(self):
         """Get the tag manager for this table."""
diff --git a/paimon-python/pypaimon/table/source/full_text_scan.py 
b/paimon-python/pypaimon/table/source/full_text_scan.py
index 06f72405b3..50b2381d27 100644
--- a/paimon-python/pypaimon/table/source/full_text_scan.py
+++ b/paimon-python/pypaimon/table/source/full_text_scan.py
@@ -53,10 +53,9 @@ class FullTextScanImpl(FullTextScan):
 
     def scan(self) -> FullTextScanPlan:
         from pypaimon.index.index_file_handler import IndexFileHandler
-        from pypaimon.snapshot.snapshot_manager import SnapshotManager
 
         text_column = self._text_column
-        snapshot = SnapshotManager(self._table).get_latest_snapshot()
+        snapshot = self._table.snapshot_manager().get_latest_snapshot()
 
         from pypaimon.snapshot.time_travel_util import TimeTravelUtil
         from pypaimon.common.options.options import Options
diff --git a/paimon-python/pypaimon/table/source/vector_search_scan.py 
b/paimon-python/pypaimon/table/source/vector_search_scan.py
index 5245d41f19..035e51a846 100644
--- a/paimon-python/pypaimon/table/source/vector_search_scan.py
+++ b/paimon-python/pypaimon/table/source/vector_search_scan.py
@@ -60,7 +60,6 @@ class VectorSearchScanImpl(VectorSearchScan):
         from pypaimon.common.options.options import Options
         from pypaimon.index.index_file_handler import IndexFileHandler
         from pypaimon.read.push_down_utils import _get_all_fields
-        from pypaimon.snapshot.snapshot_manager import SnapshotManager
         from pypaimon.snapshot.time_travel_util import TimeTravelUtil
 
         vector_column = self._vector_column
@@ -81,7 +80,7 @@ class VectorSearchScanImpl(VectorSearchScan):
             self._table.tag_manager()
         )
         if snapshot is None:
-            snapshot = SnapshotManager(self._table).get_latest_snapshot()
+            snapshot = self._table.snapshot_manager().get_latest_snapshot()
 
         index_file_handler = IndexFileHandler(table=self._table)
 
diff --git a/paimon-python/pypaimon/tests/blob_table_test.py 
b/paimon-python/pypaimon/tests/blob_table_test.py
index 7e4d6c6d26..2692aa2ea0 100755
--- a/paimon-python/pypaimon/tests/blob_table_test.py
+++ b/paimon-python/pypaimon/tests/blob_table_test.py
@@ -2748,7 +2748,6 @@ class DataBlobWriterTest(unittest.TestCase):
         """Test concurrent blob writes to verify retry mechanism works 
correctly."""
         import threading
         from pypaimon import Schema
-        from pypaimon.snapshot.snapshot_manager import SnapshotManager
 
         # Run the test 10 times to verify stability
         iter_num = 2
@@ -2873,7 +2872,7 @@ class DataBlobWriterTest(unittest.TestCase):
                 self.assertIn(b'BLOB_PATTERN_', blob, f"Blob {i} should 
contain pattern")
 
             # Verify snapshot count (should have num_threads snapshots)
-            snapshot_manager = SnapshotManager(table)
+            snapshot_manager = table.snapshot_manager()
             latest_snapshot = snapshot_manager.get_latest_snapshot()
             self.assertIsNotNone(latest_snapshot,
                                  f"Iteration {test_iteration}: Latest snapshot 
should not be None")
diff --git a/paimon-python/pypaimon/tests/branch_manager_test.py 
b/paimon-python/pypaimon/tests/branch_manager_test.py
index a9419825a7..76a4a287c9 100644
--- a/paimon-python/pypaimon/tests/branch_manager_test.py
+++ b/paimon-python/pypaimon/tests/branch_manager_test.py
@@ -243,8 +243,8 @@ class SnapshotManagerBranchAwarenessTest(unittest.TestCase):
         self.assertIsNot(branch_sm.snapshot_loader, sm.snapshot_loader)
         self.assertEqual(branch_sm.snapshot_loader.identifier.branch, "b1")
         self.assertEqual(
-            branch_sm.snapshot_loader.identifier.database,
-            sm.snapshot_loader.identifier.database)
+            branch_sm.snapshot_loader.identifier.get_database_name(),
+            sm.snapshot_loader.identifier.get_database_name())
         self.assertEqual(
             branch_sm.snapshot_loader.identifier.get_table_name(),
             sm.snapshot_loader.identifier.get_table_name())
diff --git a/paimon-python/pypaimon/tests/changelog_manager_test.py 
b/paimon-python/pypaimon/tests/changelog_manager_test.py
index 63d9157e5e..24c079316a 100644
--- a/paimon-python/pypaimon/tests/changelog_manager_test.py
+++ b/paimon-python/pypaimon/tests/changelog_manager_test.py
@@ -121,9 +121,8 @@ class TestChangelogManager(unittest.TestCase):
 
     def test_changelog_from_snapshot(self):
         """Test that Changelog can be created from a Snapshot."""
-        from pypaimon.snapshot.snapshot_manager import SnapshotManager
 
-        snapshot_manager = SnapshotManager(self.table)
+        snapshot_manager = self.table.snapshot_manager()
         snapshot = snapshot_manager.get_latest_snapshot()
 
         if snapshot:
diff --git a/paimon-python/pypaimon/tests/data_evolution_test.py 
b/paimon-python/pypaimon/tests/data_evolution_test.py
index b9ca4c7acc..edd3ac9007 100644
--- a/paimon-python/pypaimon/tests/data_evolution_test.py
+++ b/paimon-python/pypaimon/tests/data_evolution_test.py
@@ -29,7 +29,6 @@ import pyarrow.dataset as ds
 from pypaimon import CatalogFactory, Schema
 from pypaimon.common.predicate import Predicate
 from pypaimon.manifest.manifest_list_manager import ManifestListManager
-from pypaimon.snapshot.snapshot_manager import SnapshotManager
 from pypaimon.table.row.offset_row import OffsetRow
 
 
@@ -192,7 +191,7 @@ class DataEvolutionTest(unittest.TestCase):
 
         # Assert manifest file meta contains min and max row id
         manifest_list_manager = ManifestListManager(table)
-        snapshot_manager = SnapshotManager(table)
+        snapshot_manager = table.snapshot_manager()
         all_manifests = 
manifest_list_manager.read_all(snapshot_manager.get_latest_snapshot())
         first_commit = next((m for m in all_manifests if m.min_row_id == 0 and 
m.max_row_id == 1), None)
         self.assertIsNotNone(first_commit, "Should have a manifest with 
min_row_id=0, max_row_id=1")
diff --git a/paimon-python/pypaimon/tests/file_store_commit_test.py 
b/paimon-python/pypaimon/tests/file_store_commit_test.py
index 958ea85a6b..d4943a6952 100644
--- a/paimon-python/pypaimon/tests/file_store_commit_test.py
+++ b/paimon-python/pypaimon/tests/file_store_commit_test.py
@@ -28,7 +28,6 @@ from pypaimon.write.commit_message import CommitMessage
 from pypaimon.write.file_store_commit import FileStoreCommit
 
 
-@patch('pypaimon.write.file_store_commit.SnapshotManager')
 @patch('pypaimon.write.file_store_commit.ManifestFileManager')
 @patch('pypaimon.write.file_store_commit.ManifestListManager')
 class TestFileStoreCommit(unittest.TestCase):
@@ -55,7 +54,7 @@ class TestFileStoreCommit(unittest.TestCase):
         )
 
     def test_generate_partition_statistics_single_partition_single_file(
-            self, mock_manifest_list_manager, mock_manifest_file_manager, 
mock_snapshot_manager):
+            self, mock_manifest_list_manager, mock_manifest_file_manager):
         """Test partition statistics generation with single partition and 
single file."""
         # Create FileStoreCommit instance
         file_store_commit = self._create_file_store_commit()
@@ -107,7 +106,7 @@ class TestFileStoreCommit(unittest.TestCase):
         self.assertEqual(stat.last_file_creation_time, expected_time)
 
     def test_generate_partition_statistics_multiple_files_same_partition(
-            self, mock_manifest_list_manager, mock_manifest_file_manager, 
mock_snapshot_manager):
+            self, mock_manifest_list_manager, mock_manifest_file_manager):
         """Test partition statistics generation with multiple files in same 
partition."""
         # Create FileStoreCommit instance
         file_store_commit = self._create_file_store_commit()
@@ -171,7 +170,7 @@ class TestFileStoreCommit(unittest.TestCase):
         self.assertEqual(stat.last_file_creation_time, expected_time)
 
     def test_generate_partition_statistics_multiple_partitions(
-            self, mock_manifest_list_manager, mock_manifest_file_manager, 
mock_snapshot_manager):
+            self, mock_manifest_list_manager, mock_manifest_file_manager):
         """Test partition statistics generation with multiple different 
partitions."""
         # Create FileStoreCommit instance
         file_store_commit = self._create_file_store_commit()
@@ -259,7 +258,7 @@ class TestFileStoreCommit(unittest.TestCase):
         self.assertEqual(stat_2.file_size_in_bytes, 2 * 1024 * 1024)
 
     def test_generate_partition_statistics_unpartitioned_table(
-            self, mock_manifest_list_manager, mock_manifest_file_manager, 
mock_snapshot_manager):
+            self, mock_manifest_list_manager, mock_manifest_file_manager):
         """Test partition statistics generation for unpartitioned table."""
         # Update mock table to have no partition keys
         self.mock_table.partition_keys = []
@@ -310,7 +309,7 @@ class TestFileStoreCommit(unittest.TestCase):
         self.assertEqual(stat.file_size_in_bytes, 1024 * 1024)
 
     def test_generate_partition_statistics_no_creation_time(
-            self, mock_manifest_list_manager, mock_manifest_file_manager, 
mock_snapshot_manager):
+            self, mock_manifest_list_manager, mock_manifest_file_manager):
         """Test partition statistics generation when file has no creation 
time."""
         # Create FileStoreCommit instance
         file_store_commit = self._create_file_store_commit()
@@ -347,7 +346,7 @@ class TestFileStoreCommit(unittest.TestCase):
         self.assertGreater(stat.last_file_creation_time, 0)
 
     def test_generate_partition_statistics_mismatched_partition_keys(
-            self, mock_manifest_list_manager, mock_manifest_file_manager, 
mock_snapshot_manager):
+            self, mock_manifest_list_manager, mock_manifest_file_manager):
         """Test partition statistics generation when partition tuple doesn't 
match partition keys."""
         # Create FileStoreCommit instance
         file_store_commit = self._create_file_store_commit()
@@ -393,7 +392,7 @@ class TestFileStoreCommit(unittest.TestCase):
         self.assertEqual(stat.spec, expected_spec)
 
     def test_generate_partition_statistics_empty_commit_messages(
-            self, mock_manifest_list_manager, mock_manifest_file_manager, 
mock_snapshot_manager):
+            self, mock_manifest_list_manager, mock_manifest_file_manager):
         """Test partition statistics generation with empty commit messages 
list."""
         # Create FileStoreCommit instance
         file_store_commit = self._create_file_store_commit()
@@ -405,7 +404,7 @@ class TestFileStoreCommit(unittest.TestCase):
         self.assertEqual(len(statistics), 0)
 
     def test_append_commit_inherits_index_manifest(
-            self, mock_manifest_list_manager, mock_manifest_file_manager, 
mock_snapshot_manager):
+            self, mock_manifest_list_manager, mock_manifest_file_manager):
         file_store_commit = self._create_file_store_commit()
 
         self.mock_table.identifier = 'default.test_table'
@@ -448,7 +447,7 @@ class TestFileStoreCommit(unittest.TestCase):
         )
 
     def test_null_partition_value(
-            self, mock_manifest_list_manager, mock_manifest_file_manager, 
mock_snapshot_manager):
+            self, mock_manifest_list_manager, mock_manifest_file_manager):
         from pypaimon.data.timestamp import Timestamp
         from pypaimon.manifest.schema.simple_stats import SimpleStats
         from pypaimon.schema.data_types import DataField, AtomicType
diff --git a/paimon-python/pypaimon/tests/partition_predicate_test.py 
b/paimon-python/pypaimon/tests/partition_predicate_test.py
index a9c9482373..1dbc9a150f 100644
--- a/paimon-python/pypaimon/tests/partition_predicate_test.py
+++ b/paimon-python/pypaimon/tests/partition_predicate_test.py
@@ -96,7 +96,6 @@ def _manifest_entry(partition_values):
     )
 
 
-@patch('pypaimon.read.scanner.file_scanner.SnapshotManager')
 @patch('pypaimon.read.scanner.file_scanner.ManifestFileManager')
 @patch('pypaimon.read.scanner.file_scanner.ManifestListManager')
 class TestFileScannerPartitionPredicate(unittest.TestCase):
@@ -154,7 +153,6 @@ class TestFileScannerPartitionPredicate(unittest.TestCase):
             _manifest_entry(['2024-01-15', 'us-west-2'])))
 
 
-@patch('pypaimon.write.file_store_commit.SnapshotManager')
 @patch('pypaimon.write.file_store_commit.ManifestFileManager')
 @patch('pypaimon.write.file_store_commit.ManifestListManager')
 class TestOverwritePartitionPredicate(unittest.TestCase):
diff --git a/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py 
b/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
index cfdf33b755..5258aec1d0 100644
--- a/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
+++ b/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
@@ -37,7 +37,6 @@ from pypaimon.manifest.schema.data_file_meta import 
DataFileMeta
 from pypaimon.manifest.schema.manifest_entry import ManifestEntry
 from pypaimon.manifest.schema.simple_stats import SimpleStats
 from pypaimon.schema.data_types import AtomicType, DataField
-from pypaimon.snapshot.snapshot_manager import SnapshotManager
 from pypaimon.table.row.generic_row import (GenericRow, GenericRowDeserializer,
                                             GenericRowSerializer)
 from pypaimon.table.row.row_kind import RowKind
@@ -180,7 +179,7 @@ class RESTAOReadWritePy36Test(RESTBaseTest):
         self.assertEqual(actual_data, expect_data)
 
         # to test GenericRow ability
-        latest_snapshot = SnapshotManager(table).get_latest_snapshot()
+        latest_snapshot = table.snapshot_manager().get_latest_snapshot()
         manifest_files = 
table_scan.file_scanner.manifest_list_manager.read_all(latest_snapshot)
         manifest_entries = table_scan.file_scanner.manifest_file_manager.read(
             manifest_files[0].file_name,
@@ -771,7 +770,7 @@ class RESTAOReadWritePy36Test(RESTBaseTest):
         timestamp = int(time.time() * 1000)
         self._write_test_table(table)
 
-        snapshot_manager = SnapshotManager(table)
+        snapshot_manager = table.snapshot_manager()
         t1 = snapshot_manager.get_snapshot_by_id(1).time_millis
         t2 = snapshot_manager.get_snapshot_by_id(2).time_millis
         # test 1
diff --git a/paimon-python/pypaimon/tests/reader_append_only_test.py 
b/paimon-python/pypaimon/tests/reader_append_only_test.py
index 00b2f803f7..d922cb2e30 100644
--- a/paimon-python/pypaimon/tests/reader_append_only_test.py
+++ b/paimon-python/pypaimon/tests/reader_append_only_test.py
@@ -31,7 +31,6 @@ from pypaimon import CatalogFactory, Schema
 from pypaimon.common.options.core_options import CoreOptions
 from pypaimon.manifest.schema.manifest_entry import ManifestEntry
 from pypaimon.snapshot.snapshot import BATCH_COMMIT_IDENTIFIER
-from pypaimon.snapshot.snapshot_manager import SnapshotManager
 from pypaimon.table.row.generic_row import GenericRow
 from pypaimon.write.file_store_commit import RetryResult
 
@@ -137,7 +136,7 @@ class AoReaderTest(unittest.TestCase):
         table_write.close()
         table_commit.close()
 
-        snapshot_manager = SnapshotManager(table)
+        snapshot_manager = table.snapshot_manager()
         snapshot = snapshot_manager.get_latest_snapshot()
         table_inc = table.copy({
             CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP.key():
@@ -478,7 +477,7 @@ class AoReaderTest(unittest.TestCase):
         table_commit.commit(messages)
         table_write.close()
 
-        snapshot_manager = SnapshotManager(table)
+        snapshot_manager = table.snapshot_manager()
         latest_snapshot = snapshot_manager.get_latest_snapshot()
         commit_entries = []
         for msg in messages:
@@ -673,7 +672,7 @@ class AoReaderTest(unittest.TestCase):
         timestamp = int(time.time() * 1000)
         self._write_test_table(table)
 
-        snapshot_manager = SnapshotManager(table)
+        snapshot_manager = table.snapshot_manager()
         t1 = snapshot_manager.get_snapshot_by_id(1).time_millis
         t2 = snapshot_manager.get_snapshot_by_id(2).time_millis
         # test 1
@@ -713,7 +712,7 @@ class AoReaderTest(unittest.TestCase):
             table_write.close()
             table_commit.close()
 
-        snapshot_manager = SnapshotManager(table)
+        snapshot_manager = table.snapshot_manager()
         t10 = snapshot_manager.get_snapshot_by_id(10).time_millis
         t20 = snapshot_manager.get_snapshot_by_id(20).time_millis
 
@@ -820,7 +819,7 @@ class AoReaderTest(unittest.TestCase):
                              f"Iteration {test_iteration}: User IDs mismatch")
 
             # Verify snapshot count (should have num_threads snapshots)
-            snapshot_manager = SnapshotManager(table)
+            snapshot_manager = table.snapshot_manager()
             latest_snapshot = snapshot_manager.get_latest_snapshot()
             self.assertIsNotNone(latest_snapshot,
                                  f"Iteration {test_iteration}: Latest snapshot 
should not be None")
diff --git a/paimon-python/pypaimon/tests/reader_base_test.py 
b/paimon-python/pypaimon/tests/reader_base_test.py
index 657678f9ea..f1b147d147 100644
--- a/paimon-python/pypaimon/tests/reader_base_test.py
+++ b/paimon-python/pypaimon/tests/reader_base_test.py
@@ -38,7 +38,6 @@ from pypaimon.manifest.schema.simple_stats import SimpleStats
 from pypaimon.schema.data_types import (ArrayType, AtomicType, DataField,
                                         MapType, PyarrowFieldParser)
 from pypaimon.schema.table_schema import TableSchema
-from pypaimon.snapshot.snapshot_manager import SnapshotManager
 from pypaimon.table.row.generic_row import GenericRow, GenericRowDeserializer
 from pypaimon.write.file_store_commit import FileStoreCommit
 
@@ -226,7 +225,7 @@ class ReaderBasicTest(unittest.TestCase):
         self.assertEqual(actual_data, expect_data)
 
         # to test GenericRow ability
-        latest_snapshot = SnapshotManager(table).get_latest_snapshot()
+        latest_snapshot = table.snapshot_manager().get_latest_snapshot()
         manifest_files = 
table_scan.file_scanner.manifest_list_manager.read_all(latest_snapshot)
         manifest_entries = table_scan.file_scanner.manifest_file_manager.read(
             manifest_files[0].file_name, lambda row: 
table_scan.file_scanner._filter_manifest_entry(row), False)
@@ -517,7 +516,7 @@ class ReaderBasicTest(unittest.TestCase):
 
         pk_read_builder = pk_table.new_read_builder()
         pk_table_scan = pk_read_builder.new_scan()
-        latest_snapshot = SnapshotManager(pk_table).get_latest_snapshot()
+        latest_snapshot = pk_table.snapshot_manager().get_latest_snapshot()
         pk_manifest_files = 
pk_table_scan.file_scanner.manifest_list_manager.read_all(latest_snapshot)
         pk_manifest_entries = 
pk_table_scan.file_scanner.manifest_file_manager.read(
             pk_manifest_files[0].file_name,
@@ -592,7 +591,7 @@ class ReaderBasicTest(unittest.TestCase):
 
         read_builder = table.new_read_builder()
         table_scan = read_builder.new_scan()
-        latest_snapshot = SnapshotManager(table).get_latest_snapshot()
+        latest_snapshot = table.snapshot_manager().get_latest_snapshot()
         manifest_files = 
table_scan.file_scanner.manifest_list_manager.read_all(latest_snapshot)
         manifest_entries = table_scan.file_scanner.manifest_file_manager.read(
             manifest_files[0].file_name,
@@ -1105,7 +1104,7 @@ class ReaderBasicTest(unittest.TestCase):
         # Read manifest to verify value_stats_cols is None (all fields 
included)
         read_builder = table.new_read_builder()
         table_scan = read_builder.new_scan()
-        latest_snapshot = SnapshotManager(table).get_latest_snapshot()
+        latest_snapshot = table.snapshot_manager().get_latest_snapshot()
         manifest_files = 
table_scan.file_scanner.manifest_list_manager.read_all(latest_snapshot)
         manifest_entries = table_scan.file_scanner.manifest_file_manager.read(
             manifest_files[0].file_name,
diff --git a/paimon-python/pypaimon/tests/reader_primary_key_test.py 
b/paimon-python/pypaimon/tests/reader_primary_key_test.py
index 541dabe895..36ec678c8c 100644
--- a/paimon-python/pypaimon/tests/reader_primary_key_test.py
+++ b/paimon-python/pypaimon/tests/reader_primary_key_test.py
@@ -26,7 +26,6 @@ import pyarrow as pa
 
 from pypaimon import CatalogFactory, Schema
 from pypaimon.common.options.core_options import CoreOptions
-from pypaimon.snapshot.snapshot_manager import SnapshotManager
 
 
 class PkReaderTest(unittest.TestCase):
@@ -339,7 +338,7 @@ class PkReaderTest(unittest.TestCase):
         timestamp = int(time.time() * 1000)
         self._write_test_table(table)
 
-        snapshot_manager = SnapshotManager(table)
+        snapshot_manager = table.snapshot_manager()
         t1 = snapshot_manager.get_snapshot_by_id(1).time_millis
         t2 = snapshot_manager.get_snapshot_by_id(2).time_millis
         # test 1
@@ -386,7 +385,7 @@ class PkReaderTest(unittest.TestCase):
             table_write.close()
             table_commit.close()
 
-        snapshot_manager = SnapshotManager(table)
+        snapshot_manager = table.snapshot_manager()
         t10 = snapshot_manager.get_snapshot_by_id(10).time_millis
         t20 = snapshot_manager.get_snapshot_by_id(20).time_millis
 
@@ -412,7 +411,7 @@ class PkReaderTest(unittest.TestCase):
 
         self._write_test_table(table)
 
-        snapshot_manager = SnapshotManager(table)
+        snapshot_manager = table.snapshot_manager()
         latest_snapshot = snapshot_manager.get_latest_snapshot()
         read_builder = table.new_read_builder()
         table_scan = read_builder.new_scan()
@@ -575,7 +574,7 @@ class PkReaderTest(unittest.TestCase):
                              f"Iteration {test_iteration}: User IDs mismatch")
 
             # Verify snapshot count (should have num_threads snapshots)
-            snapshot_manager = SnapshotManager(table)
+            snapshot_manager = table.snapshot_manager()
             latest_snapshot = snapshot_manager.get_latest_snapshot()
             self.assertIsNotNone(latest_snapshot,
                                  f"Iteration {test_iteration}: Latest snapshot 
should not be None")
diff --git a/paimon-python/pypaimon/tests/snapshot_manager_test.py 
b/paimon-python/pypaimon/tests/snapshot_manager_test.py
index 0d1f1b1388..d2ac8360c5 100644
--- a/paimon-python/pypaimon/tests/snapshot_manager_test.py
+++ b/paimon-python/pypaimon/tests/snapshot_manager_test.py
@@ -33,64 +33,47 @@ def _create_mock_snapshot(snapshot_id: int, commit_kind: 
str = "APPEND"):
     return snapshot
 
 
+def _build_manager(file_io):
+    from pypaimon.snapshot.snapshot_manager import SnapshotManager
+    return SnapshotManager(file_io, "/tmp/test_table")
+
+
 class SnapshotManagerTest(unittest.TestCase):
     """Tests for SnapshotManager batch lookahead methods."""
 
     def test_find_next_scannable_returns_first_matching(self):
         """find_next_scannable should return the first snapshot that passes 
should_scan."""
-        from pypaimon.snapshot.snapshot_manager import SnapshotManager
-
-        table = Mock()
-        table.table_path = "/tmp/test_table"
-        table.current_branch.return_value = "main"
-        table.file_io = Mock()
-        table.file_io.exists_batch.return_value = {
+        file_io = Mock()
+        file_io.exists_batch.return_value = {
             "/tmp/test_table/snapshot/snapshot-5": True,
             "/tmp/test_table/snapshot/snapshot-6": True,
             "/tmp/test_table/snapshot/snapshot-7": True,
         }
-        table.catalog_environment = Mock()
-        table.catalog_environment.snapshot_loader.return_value = None
 
-        # Create mock snapshots with different commit kinds
         snapshots = {
             5: _create_mock_snapshot(5, "COMPACT"),
             6: _create_mock_snapshot(6, "COMPACT"),
             7: _create_mock_snapshot(7, "APPEND"),
         }
 
-        manager = SnapshotManager(table)
+        manager = _build_manager(file_io)
+        manager.get_snapshot_by_id = lambda sid: snapshots.get(sid)
 
-        # Mock get_snapshot_by_id to return our test snapshots
-        def mock_get_snapshot(sid):
-            return snapshots.get(sid)
-
-        manager.get_snapshot_by_id = mock_get_snapshot
-
-        # should_scan only accepts APPEND commits
         def should_scan(snapshot):
             return snapshot.commit_kind == "APPEND"
 
         result, next_id, skipped_count = manager.find_next_scannable(5, 
should_scan, lookahead_size=5)
 
-        self.assertEqual(result.id, 7)  # First APPEND snapshot
-        self.assertEqual(next_id, 8)    # Next ID to check
-        self.assertEqual(skipped_count, 2)  # Skipped snapshots 5 and 6
+        self.assertEqual(result.id, 7)
+        self.assertEqual(next_id, 8)
+        self.assertEqual(skipped_count, 2)
 
     def test_find_next_scannable_returns_none_when_no_snapshot_exists(self):
         """find_next_scannable should return None when no snapshot exists at 
start_id."""
-        from pypaimon.snapshot.snapshot_manager import SnapshotManager
-
-        table = Mock()
-        table.table_path = "/tmp/test_table"
-        table.current_branch.return_value = "main"
-        table.file_io = Mock()
-        # All paths return False (no files exist)
-        table.file_io.exists_batch.return_value = {}
-        table.catalog_environment = Mock()
-        table.catalog_environment.snapshot_loader.return_value = None
+        file_io = Mock()
+        file_io.exists_batch.return_value = {}
 
-        manager = SnapshotManager(table)
+        manager = _build_manager(file_io)
 
         def should_scan(snapshot):
             return True
@@ -98,26 +81,17 @@ class SnapshotManagerTest(unittest.TestCase):
         result, next_id, skipped_count = manager.find_next_scannable(5, 
should_scan, lookahead_size=5)
 
         self.assertIsNone(result)
-        self.assertEqual(next_id, 5)  # Still at start_id
+        self.assertEqual(next_id, 5)
         self.assertEqual(skipped_count, 0)
 
     def test_find_next_scannable_continues_when_all_skipped(self):
         """When all lookahead snapshots are skipped, next_id should be 
start+lookahead."""
-        from pypaimon.snapshot.snapshot_manager import SnapshotManager
-
-        table = Mock()
-        table.table_path = "/tmp/test_table"
-        table.current_branch.return_value = "main"
-        table.file_io = Mock()
-
-        # All 3 snapshots exist but are COMPACT (will be skipped)
-        table.file_io.exists_batch.return_value = {
+        file_io = Mock()
+        file_io.exists_batch.return_value = {
             "/tmp/test_table/snapshot/snapshot-5": True,
             "/tmp/test_table/snapshot/snapshot-6": True,
             "/tmp/test_table/snapshot/snapshot-7": True,
         }
-        table.catalog_environment = Mock()
-        table.catalog_environment.snapshot_loader.return_value = None
 
         snapshots = {
             5: _create_mock_snapshot(5, "COMPACT"),
@@ -125,21 +99,17 @@ class SnapshotManagerTest(unittest.TestCase):
             7: _create_mock_snapshot(7, "COMPACT"),
         }
 
-        manager = SnapshotManager(table)
-
-        def mock_get_snapshot(sid):
-            return snapshots.get(sid)
-
-        manager.get_snapshot_by_id = mock_get_snapshot
+        manager = _build_manager(file_io)
+        manager.get_snapshot_by_id = lambda sid: snapshots.get(sid)
 
         def should_scan(snapshot):
             return snapshot.commit_kind == "APPEND"
 
         result, next_id, skipped_count = manager.find_next_scannable(5, 
should_scan, lookahead_size=3)
 
-        self.assertIsNone(result)  # No APPEND found
-        self.assertEqual(next_id, 8)  # 5 + 3 = 8, continue from here
-        self.assertEqual(skipped_count, 3)  # All 3 were skipped
+        self.assertIsNone(result)
+        self.assertEqual(next_id, 8)
+        self.assertEqual(skipped_count, 3)
 
 
 if __name__ == '__main__':
diff --git a/paimon-python/pypaimon/tests/streaming_table_scan_test.py 
b/paimon-python/pypaimon/tests/streaming_table_scan_test.py
index a0b456027e..7e5386a86f 100644
--- a/paimon-python/pypaimon/tests/streaming_table_scan_test.py
+++ b/paimon-python/pypaimon/tests/streaming_table_scan_test.py
@@ -67,14 +67,14 @@ def _create_mock_table(latest_snapshot_id: int = 5):
 class AsyncStreamingTableScanTest(unittest.TestCase):
     """Tests for AsyncStreamingTableScan async streaming functionality."""
 
-    @patch('pypaimon.read.streaming_table_scan.SnapshotManager')
     @patch('pypaimon.read.streaming_table_scan.ManifestListManager')
     @patch('pypaimon.read.streaming_table_scan.FileScanner')
-    def test_initial_scan(self, MockStartingScanner, MockManifestListManager, 
MockSnapshotManager):
+    def test_initial_scan(self, MockStartingScanner, MockManifestListManager):
         """Initial scan should yield a Plan and set next_snapshot_id to latest 
+ 1."""
         table, _ = _create_mock_table(latest_snapshot_id=5)
 
-        mock_snapshot_manager = MockSnapshotManager.return_value
+        mock_snapshot_manager = Mock()
+        table.snapshot_manager.return_value = mock_snapshot_manager
         mock_snapshot_manager.get_latest_snapshot.return_value = 
_create_mock_snapshot(5)
         mock_snapshot_manager.get_snapshot_by_id.return_value = None
 
@@ -91,15 +91,15 @@ class AsyncStreamingTableScanTest(unittest.TestCase):
         self.assertIsInstance(plan, Plan)
         self.assertEqual(scan.next_snapshot_id, 6)
 
-    @patch('pypaimon.read.streaming_table_scan.SnapshotManager')
     @patch('pypaimon.read.streaming_table_scan.ManifestListManager')
     @patch('pypaimon.read.streaming_table_scan.FileScanner')
-    def test_stream_skips_non_append_commits(self, MockStartingScanner, 
MockManifestListManager, MockSnapshotManager):
+    def test_stream_skips_non_append_commits(self, MockStartingScanner, 
MockManifestListManager):
         """Stream should skip COMPACT/OVERWRITE commits."""
         table, _ = _create_mock_table(latest_snapshot_id=7)
 
         # Setup mocks
-        mock_snapshot_manager = MockSnapshotManager.return_value
+        mock_snapshot_manager = Mock()
+        table.snapshot_manager.return_value = mock_snapshot_manager
 
         # Snapshots: 6 (COMPACT - skip), 7 (APPEND - scan)
         snapshot_7 = _create_mock_snapshot(7, "APPEND")
@@ -137,15 +137,15 @@ class AsyncStreamingTableScanTest(unittest.TestCase):
         # Verify lookahead skipped 1 snapshot
         self.assertEqual(scan._lookahead_skips, 1)
 
-    @patch('pypaimon.read.streaming_table_scan.SnapshotManager')
     @patch('pypaimon.read.streaming_table_scan.ManifestListManager')
     @patch('pypaimon.read.streaming_table_scan.FileScanner')
-    def test_stream_sync_yields_plans(self, MockStartingScanner, 
MockManifestListManager, MockSnapshotManager):
+    def test_stream_sync_yields_plans(self, MockStartingScanner, 
MockManifestListManager):
         """stream_sync() should provide a synchronous iterator."""
         table, _ = _create_mock_table(latest_snapshot_id=5)
 
         # Setup mocks
-        mock_snapshot_manager = MockSnapshotManager.return_value
+        mock_snapshot_manager = Mock()
+        table.snapshot_manager.return_value = mock_snapshot_manager
         mock_snapshot_manager.get_latest_snapshot.return_value = 
_create_mock_snapshot(5)
         mock_snapshot_manager.get_snapshot_by_id.return_value = None
 
@@ -159,9 +159,8 @@ class AsyncStreamingTableScanTest(unittest.TestCase):
             self.assertIsInstance(plan, Plan)
             break  # Just get one
 
-    @patch('pypaimon.read.streaming_table_scan.SnapshotManager')
     @patch('pypaimon.read.streaming_table_scan.ManifestListManager')
-    def test_poll_interval_configurable(self, MockManifestListManager, 
MockSnapshotManager):
+    def test_poll_interval_configurable(self, MockManifestListManager):
         """Poll interval should be configurable."""
         table, _ = _create_mock_table()
 
@@ -169,14 +168,14 @@ class AsyncStreamingTableScanTest(unittest.TestCase):
 
         self.assertEqual(scan.poll_interval, 0.5)
 
-    @patch('pypaimon.read.streaming_table_scan.SnapshotManager')
     @patch('pypaimon.read.streaming_table_scan.ManifestListManager')
     @patch('pypaimon.read.streaming_table_scan.FileScanner')
-    def test_no_snapshot_waits_and_polls(self, MockStartingScanner, 
MockManifestListManager, MockSnapshotManager):
+    def test_no_snapshot_waits_and_polls(self, MockStartingScanner, 
MockManifestListManager):
         """When no new snapshot exists, should wait and poll again."""
         table, _ = _create_mock_table(latest_snapshot_id=5)
 
-        mock_snapshot_manager = MockSnapshotManager.return_value
+        mock_snapshot_manager = Mock()
+        table.snapshot_manager.return_value = mock_snapshot_manager
         mock_snapshot_manager.get_cache_stats.return_value = {"cache_hits": 0, 
"cache_misses": 0, "cache_size": 0}
 
         # No snapshot 6 exists yet - find_next_scannable returns (None, 6, 0) 
first,
@@ -216,36 +215,33 @@ class AsyncStreamingTableScanTest(unittest.TestCase):
 class StreamingPrefetchTest(unittest.TestCase):
     """Tests for prefetching functionality in AsyncStreamingTableScan."""
 
-    @patch('pypaimon.read.streaming_table_scan.SnapshotManager')
     @patch('pypaimon.read.streaming_table_scan.ManifestListManager')
     @patch('pypaimon.read.streaming_table_scan.ManifestFileManager')
-    def test_prefetch_enabled_by_default(self, MockManifestFileManager, 
MockManifestListManager, MockSnapshotManager):
+    def test_prefetch_enabled_by_default(self, MockManifestFileManager, 
MockManifestListManager):
         """Prefetching should be enabled by default."""
         table, _ = _create_mock_table()
         scan = AsyncStreamingTableScan(table)
         self.assertTrue(scan._prefetch_enabled)
 
-    @patch('pypaimon.read.streaming_table_scan.SnapshotManager')
     @patch('pypaimon.read.streaming_table_scan.ManifestListManager')
     @patch('pypaimon.read.streaming_table_scan.ManifestFileManager')
-    def test_prefetch_can_be_disabled(self, MockManifestFileManager, 
MockManifestListManager, MockSnapshotManager):
+    def test_prefetch_can_be_disabled(self, MockManifestFileManager, 
MockManifestListManager):
         """Prefetching can be disabled via constructor parameter."""
         table, _ = _create_mock_table()
         scan = AsyncStreamingTableScan(table, prefetch_enabled=False)
         self.assertFalse(scan._prefetch_enabled)
 
-    @patch('pypaimon.read.streaming_table_scan.SnapshotManager')
     @patch('pypaimon.read.streaming_table_scan.ManifestListManager')
     @patch('pypaimon.read.streaming_table_scan.ManifestFileManager')
     def test_prefetch_starts_after_yielding_plan(
             self,
             MockManifestFileManager,
-            MockManifestListManager,
-            MockSnapshotManager):
+            MockManifestListManager):
         """After yielding a plan, prefetch for next snapshot should start."""
         table, _ = _create_mock_table(latest_snapshot_id=5)
 
-        mock_snapshot_manager = MockSnapshotManager.return_value
+        mock_snapshot_manager = Mock()
+        table.snapshot_manager.return_value = mock_snapshot_manager
         mock_manifest_list_manager = MockManifestListManager.return_value
         mock_manifest_file_manager = MockManifestFileManager.return_value
         mock_snapshot_manager.get_cache_stats.return_value = {"cache_hits": 0, 
"cache_misses": 0, "cache_size": 0}
@@ -292,18 +288,17 @@ class StreamingPrefetchTest(unittest.TestCase):
         plans = asyncio.run(get_two_plans())
         self.assertEqual(len(plans), 2)
 
-    @patch('pypaimon.read.streaming_table_scan.SnapshotManager')
     @patch('pypaimon.read.streaming_table_scan.ManifestListManager')
     @patch('pypaimon.read.streaming_table_scan.ManifestFileManager')
     def test_prefetch_returns_same_data_as_sequential(
             self,
             MockManifestFileManager,
-            MockManifestListManager,
-            MockSnapshotManager):
+            MockManifestListManager):
         """Prefetched plans should contain the same data as non-prefetched."""
         table, _ = _create_mock_table(latest_snapshot_id=5)
 
-        mock_snapshot_manager = MockSnapshotManager.return_value
+        mock_snapshot_manager = Mock()
+        table.snapshot_manager.return_value = mock_snapshot_manager
         mock_manifest_list_manager = MockManifestListManager.return_value
         mock_manifest_file_manager = MockManifestFileManager.return_value
         mock_snapshot_manager.get_cache_stats.return_value = {"cache_hits": 0, 
"cache_misses": 0, "cache_size": 0}
@@ -346,18 +341,17 @@ class StreamingPrefetchTest(unittest.TestCase):
         # Both should get the same number of plans
         self.assertEqual(len(plans_prefetch), len(plans_sequential))
 
-    @patch('pypaimon.read.streaming_table_scan.SnapshotManager')
     @patch('pypaimon.read.streaming_table_scan.ManifestListManager')
     @patch('pypaimon.read.streaming_table_scan.ManifestFileManager')
     def test_prefetch_handles_no_next_snapshot(
             self,
             MockManifestFileManager,
-            MockManifestListManager,
-            MockSnapshotManager):
+            MockManifestListManager):
         """When no next snapshot exists, prefetch should return None 
gracefully."""
         table, _ = _create_mock_table(latest_snapshot_id=5)
 
-        mock_snapshot_manager = MockSnapshotManager.return_value
+        mock_snapshot_manager = Mock()
+        table.snapshot_manager.return_value = mock_snapshot_manager
         mock_manifest_list_manager = MockManifestListManager.return_value
         mock_manifest_file_manager = MockManifestFileManager.return_value
         mock_snapshot_manager.get_cache_stats.return_value = {"cache_hits": 0, 
"cache_misses": 0, "cache_size": 0}
@@ -391,18 +385,17 @@ class StreamingPrefetchTest(unittest.TestCase):
         plan = asyncio.run(get_one_plan())
         self.assertIsInstance(plan, Plan)
 
-    @patch('pypaimon.read.streaming_table_scan.SnapshotManager')
     @patch('pypaimon.read.streaming_table_scan.ManifestListManager')
     @patch('pypaimon.read.streaming_table_scan.ManifestFileManager')
     def test_prefetch_disabled_no_prefetch_future(
             self,
             MockManifestFileManager,
-            MockManifestListManager,
-            MockSnapshotManager):
+            MockManifestListManager):
         """With prefetch disabled, no prefetch future should be created."""
         table, _ = _create_mock_table(latest_snapshot_id=5)
 
-        mock_snapshot_manager = MockSnapshotManager.return_value
+        mock_snapshot_manager = Mock()
+        table.snapshot_manager.return_value = mock_snapshot_manager
         mock_manifest_list_manager = MockManifestListManager.return_value
         mock_manifest_file_manager = MockManifestFileManager.return_value
         mock_snapshot_manager.get_cache_stats.return_value = {"cache_hits": 0, 
"cache_misses": 0, "cache_size": 0}
@@ -437,12 +430,11 @@ class StreamingCatchUpDiffTest(unittest.TestCase):
     """Tests for diff-based catch-up optimization in 
AsyncStreamingTableScan."""
 
     @patch('pypaimon.read.streaming_table_scan.IncrementalDiffScanner')
-    @patch('pypaimon.read.streaming_table_scan.SnapshotManager')
     @patch('pypaimon.read.streaming_table_scan.ManifestListManager')
     @patch('pypaimon.read.streaming_table_scan.ManifestFileManager')
     def test_stream_triggers_diff_catch_up_for_large_gap(
         self, MockManifestFileManager, MockManifestListManager,
-        MockSnapshotManager, MockDiffScanner
+        MockDiffScanner
     ):
         """
         When starting with a large gap, stream() should use diff scanner.
@@ -454,7 +446,8 @@ class StreamingCatchUpDiffTest(unittest.TestCase):
         """
         table, _ = _create_mock_table(latest_snapshot_id=100)
 
-        mock_snapshot_manager = MockSnapshotManager.return_value
+        mock_snapshot_manager = Mock()
+        table.snapshot_manager.return_value = mock_snapshot_manager
         mock_diff_scanner = MockDiffScanner.return_value
 
         # Setup: latest is 100, start is 5 (gap=95)
@@ -493,19 +486,19 @@ class StreamingConsumerTest(unittest.TestCase):
     """Tests for consumer management integration in AsyncStreamingTableScan."""
 
     @patch('pypaimon.read.streaming_table_scan.ConsumerManager')
-    @patch('pypaimon.read.streaming_table_scan.SnapshotManager')
     @patch('pypaimon.read.streaming_table_scan.ManifestListManager')
     @patch('pypaimon.read.streaming_table_scan.ManifestFileManager')
     def test_consumer_restores_next_snapshot_id(
         self, MockManifestFileManager, MockManifestListManager,
-        MockSnapshotManager, MockConsumerManager
+        MockConsumerManager
     ):
         """When consumer exists, stream() should resume from saved position."""
         from pypaimon.consumer.consumer import Consumer
 
         table, _ = _create_mock_table(latest_snapshot_id=10)
 
-        mock_snapshot_manager = MockSnapshotManager.return_value
+        mock_snapshot_manager = Mock()
+        table.snapshot_manager.return_value = mock_snapshot_manager
         mock_consumer_manager = MockConsumerManager.return_value
         mock_manifest_list_manager = MockManifestListManager.return_value
 
@@ -541,12 +534,11 @@ class StreamingConsumerTest(unittest.TestCase):
         self.assertEqual(scan.next_snapshot_id, 9)
 
     @patch('pypaimon.read.streaming_table_scan.ConsumerManager')
-    @patch('pypaimon.read.streaming_table_scan.SnapshotManager')
     @patch('pypaimon.read.streaming_table_scan.ManifestListManager')
     @patch('pypaimon.read.streaming_table_scan.FileScanner')
     def test_consumer_saves_after_yield(
         self, MockFileScanner, MockManifestListManager,
-        MockSnapshotManager, MockConsumerManager
+        MockConsumerManager
     ):
         """Consumer progress is flushed on the next __anext__() call after 
yielding a plan.
 
@@ -556,7 +548,8 @@ class StreamingConsumerTest(unittest.TestCase):
         """
         table, _ = _create_mock_table(latest_snapshot_id=5)
 
-        mock_snapshot_manager = MockSnapshotManager.return_value
+        mock_snapshot_manager = Mock()
+        table.snapshot_manager.return_value = mock_snapshot_manager
         mock_consumer_manager = MockConsumerManager.return_value
 
         # No existing consumer state
@@ -597,12 +590,10 @@ class StreamingConsumerTest(unittest.TestCase):
         self.assertEqual(call_args[0][0], "save-test")
         self.assertEqual(call_args[0][1].next_snapshot, 6)
 
-    @patch('pypaimon.read.streaming_table_scan.SnapshotManager')
     @patch('pypaimon.read.streaming_table_scan.ManifestListManager')
     @patch('pypaimon.read.streaming_table_scan.ManifestFileManager')
     def test_no_consumer_when_consumer_id_not_set(
-        self, MockManifestFileManager, MockManifestListManager,
-        MockSnapshotManager
+        self, MockManifestFileManager, MockManifestListManager
     ):
         """Without consumer_id, no ConsumerManager should be created."""
         table, _ = _create_mock_table()
diff --git a/paimon-python/pypaimon/write/file_store_commit.py 
b/paimon-python/pypaimon/write/file_store_commit.py
index 97d4ed9298..6eb7f5f5da 100644
--- a/paimon-python/pypaimon/write/file_store_commit.py
+++ b/paimon-python/pypaimon/write/file_store_commit.py
@@ -34,7 +34,6 @@ from pypaimon.read.scanner.file_scanner import FileScanner
 from pypaimon.snapshot.snapshot import Snapshot
 from pypaimon.snapshot.snapshot_commit import (PartitionStatistics,
                                                SnapshotCommit)
-from pypaimon.snapshot.snapshot_manager import SnapshotManager
 from pypaimon.table.row.generic_row import GenericRow
 from pypaimon.table.row.offset_row import OffsetRow
 from pypaimon.write.commit.commit_rollback import CommitRollback
@@ -85,7 +84,7 @@ class FileStoreCommit:
         self.table: FileStoreTable = table
         self.commit_user = commit_user
 
-        self.snapshot_manager = SnapshotManager(table)
+        self.snapshot_manager = table.snapshot_manager()
         self.manifest_file_manager = ManifestFileManager(table)
         self.manifest_list_manager = ManifestListManager(table)
 

Reply via email to