This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 1e9c0b31ac [python] Introduce changelog_manager for pypaimon (#7492)
1e9c0b31ac is described below
commit 1e9c0b31ac2e80db4180bb05556eb3a3bf8be30a
Author: xuzifu666 <[email protected]>
AuthorDate: Sat Mar 21 13:30:57 2026 +0800
[python] Introduce changelog_manager for pypaimon (#7492)
---
paimon-python/pypaimon/changelog/__init__.py | 23 ++
paimon-python/pypaimon/changelog/changelog.py | 85 +++++
.../pypaimon/changelog/changelog_manager.py | 353 +++++++++++++++++++++
paimon-python/pypaimon/snapshot/snapshot.py | 4 +
paimon-python/pypaimon/table/file_store_table.py | 5 +
.../pypaimon/tests/changelog_manager_test.py | 137 ++++++++
.../pypaimon/tests/table/file_store_table_test.py | 59 ++++
7 files changed, 666 insertions(+)
diff --git a/paimon-python/pypaimon/changelog/__init__.py
b/paimon-python/pypaimon/changelog/__init__.py
new file mode 100644
index 0000000000..a057af07dd
--- /dev/null
+++ b/paimon-python/pypaimon/changelog/__init__.py
@@ -0,0 +1,23 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from pypaimon.changelog.changelog import Changelog
+from pypaimon.changelog.changelog_manager import ChangelogManager
+
+__all__ = ['Changelog', 'ChangelogManager']
diff --git a/paimon-python/pypaimon/changelog/changelog.py
b/paimon-python/pypaimon/changelog/changelog.py
new file mode 100644
index 0000000000..8e25fdc2e4
--- /dev/null
+++ b/paimon-python/pypaimon/changelog/changelog.py
@@ -0,0 +1,85 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import logging
+
+from pypaimon.common.file_io import FileIO
+from pypaimon.common.json_util import JSON
+from pypaimon.snapshot.snapshot import Snapshot
+
+logger = logging.getLogger(__name__)
+
+
+class Changelog(Snapshot):
+ """The metadata of changelog.
+
+ Changelog extends Snapshot with the same fields. It generates from the
snapshot
+ file during expiration so that changelog of table can outlive snapshot's
lifecycle.
+ """
+
+ @staticmethod
+ def from_snapshot(snapshot: Snapshot) -> 'Changelog':
+ """Create a Changelog from a Snapshot instance."""
+ return Changelog(
+ version=snapshot.version,
+ id=snapshot.id,
+ schema_id=snapshot.schema_id,
+ base_manifest_list=snapshot.base_manifest_list,
+ base_manifest_list_size=snapshot.base_manifest_list_size,
+ delta_manifest_list=snapshot.delta_manifest_list,
+ delta_manifest_list_size=snapshot.delta_manifest_list_size,
+ changelog_manifest_list=snapshot.changelog_manifest_list,
+ changelog_manifest_list_size=snapshot.changelog_manifest_list_size,
+ index_manifest=snapshot.index_manifest,
+ commit_user=snapshot.commit_user,
+ commit_identifier=snapshot.commit_identifier,
+ commit_kind=snapshot.commit_kind,
+ time_millis=snapshot.time_millis,
+ total_record_count=snapshot.total_record_count,
+ delta_record_count=snapshot.delta_record_count,
+ changelog_record_count=snapshot.changelog_record_count,
+ watermark=snapshot.watermark,
+ statistics=snapshot.statistics,
+ properties=snapshot.properties,
+ next_row_id=snapshot.next_row_id
+ )
+
+ @staticmethod
+ def from_json(json_str: str) -> 'Changelog':
+ """Create a Changelog from JSON string."""
+ return JSON.from_json(json_str, Changelog)
+
+ @staticmethod
+ def from_path(file_io: FileIO, path: str) -> 'Changelog':
+ """Create a Changelog from a file path. Raises RuntimeError if file
doesn't exist."""
+ try:
+ return Changelog.try_from_path(file_io, path)
+ except FileNotFoundError as e:
+ raise RuntimeError(f"Failed to read changelog from path {path}")
from e
+
+ @staticmethod
+ def try_from_path(file_io: FileIO, path: str) -> 'Changelog':
+ """Create a Changelog from a file path. Raises FileNotFoundError if
file doesn't exist."""
+ try:
+ json_str = file_io.read_file_utf8(path)
+ return Changelog.from_json(json_str)
+ except FileNotFoundError as e:
+ raise e
+ except Exception as e:
+ raise RuntimeError(f"Failed to read changelog from path {path}")
from e
diff --git a/paimon-python/pypaimon/changelog/changelog_manager.py
b/paimon-python/pypaimon/changelog/changelog_manager.py
new file mode 100644
index 0000000000..06f2c1b8ad
--- /dev/null
+++ b/paimon-python/pypaimon/changelog/changelog_manager.py
@@ -0,0 +1,353 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import logging
+import re
+from concurrent.futures import ThreadPoolExecutor, as_completed
+from typing import Iterator, List, Optional
+
+from pypaimon.branch.branch_manager import BranchManager
+from pypaimon.changelog.changelog import Changelog
+from pypaimon.common.file_io import FileIO
+
+logger = logging.getLogger(__name__)
+
+
+class ChangelogManager:
+ """Manager for Changelog, providing utility methods related to paths and
changelog hints."""
+
+ CHANGELOG_PREFIX = "changelog-"
+
+ def __init__(self, file_io: FileIO, table_path: str, branch: Optional[str]
= None):
+ """Initialize ChangelogManager.
+
+ Args:
+ file_io: FileIO instance for file operations
+ table_path: Path to the table directory
+ branch: Branch name (defaults to 'main' if None)
+ """
+ self.file_io = file_io
+ self.table_path = table_path
+ self.branch = BranchManager.normalize_branch(branch) if branch else
'main'
+
+ def file_io(self) -> FileIO:
+ """Get the FileIO instance."""
+ return self.file_io
+
+ def latest_long_lived_changelog_id(self) -> Optional[int]:
+ """Get the latest long-lived changelog ID.
+
+ Returns:
+ The latest changelog ID, or None if not found
+ """
+ try:
+ return self._find_latest_hint_id(self.changelog_directory())
+ except Exception as e:
+ raise RuntimeError("Failed to find latest changelog id") from e
+
+ def earliest_long_lived_changelog_id(self) -> Optional[int]:
+ """Get the earliest long-lived changelog ID.
+
+ Returns:
+ The earliest changelog ID, or None if not found
+ """
+ try:
+ return self._find_earliest_hint_id(self.changelog_directory())
+ except Exception as e:
+ raise RuntimeError("Failed to find earliest changelog id") from e
+
+ def long_lived_changelog_exists(self, snapshot_id: int) -> bool:
+ """Check if a long-lived changelog exists for the given snapshot ID.
+
+ Args:
+ snapshot_id: Snapshot ID to check
+
+ Returns:
+ True if changelog exists, False otherwise
+ """
+ path = self.long_lived_changelog_path(snapshot_id)
+ try:
+ return self.file_io.exists(path)
+ except Exception as e:
+ raise RuntimeError(f"Failed to determine if changelog
#{snapshot_id} exists in path {path}") from e
+
+ def long_lived_changelog(self, snapshot_id: int) -> Changelog:
+ """Get a long-lived changelog for the given snapshot ID.
+
+ Args:
+ snapshot_id: Snapshot ID
+
+ Returns:
+ Changelog instance
+ """
+ return Changelog.from_path(self.file_io,
self.long_lived_changelog_path(snapshot_id))
+
+ def changelog(self, snapshot_id: int) -> Changelog:
+ """Get a changelog for the given snapshot ID.
+
+ Args:
+ snapshot_id: Snapshot ID
+
+ Returns:
+ Changelog instance
+ """
+ changelog_path = self.long_lived_changelog_path(snapshot_id)
+ return Changelog.from_path(self.file_io, changelog_path)
+
+ def long_lived_changelog_path(self, snapshot_id: int) -> str:
+ """Get the path to a long-lived changelog for the given snapshot ID.
+
+ Args:
+ snapshot_id: Snapshot ID
+
+ Returns:
+ Path string to the changelog file
+ """
+ return
f"{self._branch_path()}/changelog/{self.CHANGELOG_PREFIX}{snapshot_id}"
+
+ def changelog_directory(self) -> str:
+ """Get the changelog directory path.
+
+ Returns:
+ Path string to the changelog directory
+ """
+ return f"{self._branch_path()}/changelog"
+
+ def commit_changelog(self, changelog: Changelog, changelog_id: int) ->
None:
+ """Commit a changelog to storage.
+
+ Args:
+ changelog: Changelog instance to commit
+ changelog_id: Changelog ID
+ """
+ path = self.long_lived_changelog_path(changelog_id)
+ self.file_io.write_file(path, changelog.to_json(), True)
+
+ def commit_long_lived_changelog_latest_hint(self, snapshot_id: int) ->
None:
+ """Commit the latest hint for long-lived changelog.
+
+ Args:
+ snapshot_id: Latest snapshot ID
+ """
+ self._commit_latest_hint(snapshot_id, self.changelog_directory())
+
+ def commit_long_lived_changelog_earliest_hint(self, snapshot_id: int) ->
None:
+ """Commit the earliest hint for long-lived changelog.
+
+ Args:
+ snapshot_id: Earliest snapshot ID
+ """
+ self._commit_earliest_hint(snapshot_id, self.changelog_directory())
+
+ def try_get_changelog(self, snapshot_id: int) -> Optional[Changelog]:
+ """Try to get a changelog for the given snapshot ID.
+
+ Args:
+ snapshot_id: Snapshot ID
+
+ Returns:
+ Changelog instance if found, None otherwise
+
+ Raises:
+ FileNotFoundError: If changelog file is explicitly not found
+ """
+ changelog_path = self.long_lived_changelog_path(snapshot_id)
+ return Changelog.try_from_path(self.file_io, changelog_path)
+
+ def changelogs(self) -> Iterator[Changelog]:
+ """Get an iterator over all changelogs, sorted by ID.
+
+ Yields:
+ Changelog instances in ascending order of ID
+ """
+ for snapshot_id in self._list_changelog_ids():
+ yield self.changelog(snapshot_id)
+
+ def safely_get_all_changelogs(self) -> List[Changelog]:
+ """Safely get all changelogs, handling potential errors gracefully.
+
+ This method reads changelogs in parallel and handles cases where
+ files might be missing or corrupted.
+
+ Returns:
+ List of Changelog instances (may include None for failed reads)
+ """
+ paths = [self.long_lived_changelog_path(sid) for sid in
self._list_changelog_ids()]
+ changelogs: List[Optional[Changelog]] = [None] * len(paths)
+
+ def read_changelog(index: int, path: str):
+ """Read changelog from path, handling errors gracefully."""
+ try:
+ changelog_str = self.file_io.read_file_utf8(path)
+ if not changelog_str or not changelog_str.strip():
+ logger.warning(f"Changelog file is empty, path: {path}")
+ return
+ changelogs[index] = Changelog.from_json(changelog_str)
+ except FileNotFoundError:
+ # File not found is expected in some cases, ignore
+ pass
+ except Exception as e:
+ raise RuntimeError(f"Failed to read changelog from path
{path}") from e
+
+ # Read changelogs in parallel
+ with ThreadPoolExecutor(max_workers=4) as executor:
+ futures = {
+ executor.submit(read_changelog, i, path): (i, path)
+ for i, path in enumerate(paths)
+ }
+ for future in as_completed(futures):
+ future.result()
+
+ # Filter out None values and sort by ID
+ valid_changelogs = [c for c in changelogs if c is not None]
+ valid_changelogs.sort(key=lambda c: c.id)
+ return valid_changelogs
+
+ def delete_latest_hint(self) -> None:
+ """Delete the latest hint file."""
+ self._delete_latest_hint(self.changelog_directory())
+
+ def delete_earliest_hint(self) -> None:
+ """Delete the earliest hint file."""
+ self._delete_earliest_hint(self.changelog_directory())
+
+ def _branch_path(self) -> str:
+ """Get the branch-specific path.
+
+ Returns:
+ Path string for the current branch
+ """
+ return BranchManager.branch_path(self.table_path, self.branch)
+
+ def _list_changelog_ids(self) -> List[int]:
+ """List all changelog IDs in the changelog directory.
+
+ Returns:
+ Sorted list of changelog IDs
+ """
+ pattern = re.compile(r'^changelog-(\d+)$')
+ changelog_ids = []
+
+ try:
+ file_infos = self.file_io.list_status(self.changelog_directory())
+ for file_info in file_infos:
+ filename = file_info.path.split('/')[-1]
+ match = pattern.match(filename)
+ if match:
+ changelog_id = int(match.group(1))
+ changelog_ids.append(changelog_id)
+ except Exception as e:
+ logger.warning(f"Failed to list changelog files: {e}")
+ return []
+
+ return sorted(changelog_ids)
+
+ def _find_latest_hint_id(self, directory: str) -> Optional[int]:
+ """Find the latest snapshot ID from LATEST hint file.
+
+ Args:
+ directory: Directory containing the hint file
+
+ Returns:
+ Latest snapshot ID, or None if not found
+ """
+ latest_file = f"{directory}/LATEST"
+ if not self.file_io.exists(latest_file):
+ return None
+
+ try:
+ content = self.file_io.read_file_utf8(latest_file)
+ if content and content.strip():
+ return int(content.strip())
+ except Exception as e:
+ logger.warning(f"Failed to read latest hint from {latest_file}:
{e}")
+
+ return None
+
+ def _find_earliest_hint_id(self, directory: str) -> Optional[int]:
+ """Find the earliest snapshot ID from EARLIEST hint file.
+
+ Args:
+ directory: Directory containing the hint file
+
+ Returns:
+ Earliest snapshot ID, or None if not found
+ """
+ earliest_file = f"{directory}/EARLIEST"
+ if not self.file_io.exists(earliest_file):
+ return None
+
+ try:
+ content = self.file_io.read_file_utf8(earliest_file)
+ if content and content.strip():
+ return int(content.strip())
+ except Exception as e:
+ logger.warning(f"Failed to read earliest hint from
{earliest_file}: {e}")
+
+ return None
+
+ def _commit_latest_hint(self, snapshot_id: int, directory: str) -> None:
+ """Commit the latest hint file.
+
+ Args:
+ snapshot_id: Latest snapshot ID
+ directory: Directory to write hint file
+ """
+ latest_file = f"{directory}/LATEST"
+ self.file_io.write_file(latest_file, str(snapshot_id), False)
+
+ def _commit_earliest_hint(self, snapshot_id: int, directory: str) -> None:
+ """Commit the earliest hint file.
+
+ Args:
+ snapshot_id: Earliest snapshot ID
+ directory: Directory to write hint file
+ """
+ earliest_file = f"{directory}/EARLIEST"
+ self.file_io.write_file(earliest_file, str(snapshot_id), False)
+
+ def _delete_latest_hint(self, directory: str) -> None:
+ """Delete the latest hint file.
+
+ Args:
+ directory: Directory containing the hint file
+ """
+ latest_file = f"{directory}/LATEST"
+ try:
+ self.file_io.delete(latest_file)
+ except FileNotFoundError:
+ # File doesn't exist, that's fine
+ pass
+ except Exception as e:
+ logger.warning(f"Failed to delete latest hint {latest_file}: {e}")
+
+ def _delete_earliest_hint(self, directory: str) -> None:
+ """Delete the earliest hint file.
+
+ Args:
+ directory: Directory containing the hint file
+ """
+ earliest_file = f"{directory}/EARLIEST"
+ try:
+ self.file_io.delete(earliest_file)
+ except FileNotFoundError:
+ # File doesn't exist, that's fine
+ pass
+ except Exception as e:
+ logger.warning(f"Failed to delete earliest hint {earliest_file}:
{e}")
diff --git a/paimon-python/pypaimon/snapshot/snapshot.py
b/paimon-python/pypaimon/snapshot/snapshot.py
index f269a9ac85..6903ad9236 100644
--- a/paimon-python/pypaimon/snapshot/snapshot.py
+++ b/paimon-python/pypaimon/snapshot/snapshot.py
@@ -39,9 +39,13 @@ class Snapshot:
commit_kind: str = json_field("commitKind")
time_millis: int = json_field("timeMillis")
# Optional fields with defaults
+ base_manifest_list_size: Optional[int] =
optional_json_field("baseManifestListSize", "non_null")
+ delta_manifest_list_size: Optional[int] =
optional_json_field("deltaManifestListSize", "non_null")
changelog_manifest_list: Optional[str] =
optional_json_field("changelogManifestList", "non_null")
+ changelog_manifest_list_size: Optional[int] =
optional_json_field("changelogManifestListSize", "non_null")
index_manifest: Optional[str] = optional_json_field("indexManifest",
"non_null")
changelog_record_count: Optional[int] =
optional_json_field("changelogRecordCount", "non_null")
watermark: Optional[int] = optional_json_field("watermark", "non_null")
statistics: Optional[str] = optional_json_field("statistics", "non_null")
next_row_id: Optional[int] = optional_json_field("nextRowId", "non_null")
+ properties: Optional[dict] = optional_json_field("properties", "non_null")
diff --git a/paimon-python/pypaimon/table/file_store_table.py
b/paimon-python/pypaimon/table/file_store_table.py
index bb9587cc28..51a031bd94 100644
--- a/paimon-python/pypaimon/table/file_store_table.py
+++ b/paimon-python/pypaimon/table/file_store_table.py
@@ -123,6 +123,11 @@ class FileStoreTable(Table):
current_branch
)
+ def changelog_manager(self):
+ """Get the changelog manager for this table."""
+ from pypaimon.changelog.changelog_manager import ChangelogManager
+ return ChangelogManager(self.file_io, self.table_path,
self.current_branch())
+
def create_tag(
self,
tag_name: str,
diff --git a/paimon-python/pypaimon/tests/changelog_manager_test.py
b/paimon-python/pypaimon/tests/changelog_manager_test.py
new file mode 100644
index 0000000000..63d9157e5e
--- /dev/null
+++ b/paimon-python/pypaimon/tests/changelog_manager_test.py
@@ -0,0 +1,137 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import os
+import shutil
+import tempfile
+import unittest
+
+from pypaimon import CatalogFactory, Schema
+from pypaimon.changelog import Changelog, ChangelogManager
+
+import pyarrow as pa
+
+
+class TestChangelogManager(unittest.TestCase):
+
+ @classmethod
+ def setUpClass(cls):
+ cls.tempdir = tempfile.mkdtemp()
+ cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
+ cls.catalog = CatalogFactory.create({
+ 'warehouse': cls.warehouse
+ })
+ cls.catalog.create_database('default', True)
+
+ cls.pa_schema = pa.schema([
+ ('user_id', pa.int32()),
+ ('item_id', pa.int64()),
+ ('behavior', pa.string()),
+ ('dt', pa.string())
+ ])
+ schema = Schema.from_pyarrow_schema(cls.pa_schema,
partition_keys=['dt'],
+ options={"bucket": "2"})
+ cls.catalog.create_table('default.test_changelog_table', schema, False)
+ cls.table = cls.catalog.get_table('default.test_changelog_table')
+
+ @classmethod
+ def tearDownClass(cls):
+ shutil.rmtree(cls.tempdir, ignore_errors=True)
+
+ def test_changelog_manager_initialization(self):
+ """Test that ChangelogManager can be initialized."""
+ changelog_manager = ChangelogManager(
+ self.table.file_io,
+ self.table.table_path
+ )
+ self.assertIsNotNone(changelog_manager)
+ self.assertEqual(changelog_manager.branch, 'main')
+
+ def test_changelog_manager_with_branch(self):
+ """Test that ChangelogManager can be initialized with a custom
branch."""
+ changelog_manager = ChangelogManager(
+ self.table.file_io,
+ self.table.table_path,
+ branch='feature'
+ )
+ self.assertIsNotNone(changelog_manager)
+ self.assertEqual(changelog_manager.branch, 'feature')
+
+ def test_changelog_directory_path(self):
+ """Test that changelog directory path is correct."""
+ changelog_manager = ChangelogManager(
+ self.table.file_io,
+ self.table.table_path
+ )
+ expected = f"{self.table.table_path}/changelog"
+ self.assertEqual(changelog_manager.changelog_directory(), expected)
+
+ def test_changelog_directory_path_with_branch(self):
+ """Test that changelog directory path includes branch."""
+ changelog_manager = ChangelogManager(
+ self.table.file_io,
+ self.table.table_path,
+ branch='feature'
+ )
+ expected = f"{self.table.table_path}/branch/branch-feature/changelog"
+ self.assertEqual(changelog_manager.changelog_directory(), expected)
+
+ def test_long_lived_changelog_path(self):
+ """Test that long-lived changelog path is correct."""
+ changelog_manager = ChangelogManager(
+ self.table.file_io,
+ self.table.table_path
+ )
+ expected = f"{self.table.table_path}/changelog/changelog-123"
+ self.assertEqual(changelog_manager.long_lived_changelog_path(123),
expected)
+
+ def test_latest_long_lived_changelog_id_none(self):
+ """Test that latest changelog ID is None when no changelog exists."""
+ changelog_manager = ChangelogManager(
+ self.table.file_io,
+ self.table.table_path
+ )
+ # No changelog files should exist yet
+ self.assertIsNone(changelog_manager.latest_long_lived_changelog_id())
+
+ def test_earliest_long_lived_changelog_id_none(self):
+ """Test that earliest changelog ID is None when no changelog exists."""
+ changelog_manager = ChangelogManager(
+ self.table.file_io,
+ self.table.table_path
+ )
+ # No changelog files should exist yet
+ self.assertIsNone(changelog_manager.earliest_long_lived_changelog_id())
+
+ def test_changelog_from_snapshot(self):
+ """Test that Changelog can be created from a Snapshot."""
+ from pypaimon.snapshot.snapshot_manager import SnapshotManager
+
+ snapshot_manager = SnapshotManager(self.table)
+ snapshot = snapshot_manager.get_latest_snapshot()
+
+ if snapshot:
+ changelog = Changelog.from_snapshot(snapshot)
+ self.assertEqual(changelog.id, snapshot.id)
+ self.assertEqual(changelog.schema_id, snapshot.schema_id)
+ self.assertEqual(changelog.time_millis, snapshot.time_millis)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/paimon-python/pypaimon/tests/table/file_store_table_test.py
b/paimon-python/pypaimon/tests/table/file_store_table_test.py
index 9406c884cd..d30846a2ae 100644
--- a/paimon-python/pypaimon/tests/table/file_store_table_test.py
+++ b/paimon-python/pypaimon/tests/table/file_store_table_test.py
@@ -239,6 +239,65 @@ class FileStoreTableTest(unittest.TestCase):
# Restore original catalog environment
self.table.catalog_environment = original_env
+ def test_changelog_manager(self):
+ """Test that FileStoreTable has changelog_manager method."""
+ # Get changelog_manager
+ changelog_manager = self.table.changelog_manager()
+
+ # Verify changelog_manager type
+ from pypaimon.changelog.changelog_manager import ChangelogManager
+ self.assertIsInstance(changelog_manager, ChangelogManager)
+
+ # Verify changelog_manager has correct branch
+ from pypaimon.branch.branch_manager import DEFAULT_MAIN_BRANCH
+ self.assertEqual(self.table.current_branch(), DEFAULT_MAIN_BRANCH)
+ self.assertEqual(changelog_manager.branch, DEFAULT_MAIN_BRANCH)
+
+ def test_changelog_manager_with_branch(self):
+ """Test changelog_manager with branch option."""
+ # Create table with branch option
+ branch_name = "feature"
+ schema = Schema.from_pyarrow_schema(
+ self.pa_schema,
+ partition_keys=['dt'],
+ options={
+ CoreOptions.BUCKET.key(): "2",
+ "branch": branch_name
+ }
+ )
+ self.catalog.create_table('default.test_changelog_branch_table',
schema, False)
+ branch_table =
self.catalog.get_table('default.test_changelog_branch_table')
+
+ # Get changelog_manager and verify it has correct branch
+ branch_changelog_manager = branch_table.changelog_manager()
+ self.assertEqual(branch_table.current_branch(), branch_name)
+ self.assertEqual(branch_changelog_manager.branch, branch_name)
+
+ # Verify changelog directory path is correct
+ expected_changelog_dir =
f"{branch_table.table_path}/branch/branch-feature/changelog"
+ self.assertEqual(branch_changelog_manager.changelog_directory(),
expected_changelog_dir)
+
+ def test_changelog_manager_path_generation(self):
+ """Test that changelog_manager generates correct paths."""
+ changelog_manager = self.table.changelog_manager()
+
+ # Test changelog directory path
+ expected_dir = f"{self.table.table_path}/changelog"
+ self.assertEqual(changelog_manager.changelog_directory(), expected_dir)
+
+ # Test changelog file path
+ snapshot_id = 123
+ expected_path =
f"{self.table.table_path}/changelog/changelog-{snapshot_id}"
+
self.assertEqual(changelog_manager.long_lived_changelog_path(snapshot_id),
expected_path)
+
+ def test_changelog_manager_latest_and_earliest_none(self):
+ """Test that latest and earliest changelog IDs are None when no
changelog exists."""
+ changelog_manager = self.table.changelog_manager()
+
+ # No changelog files should exist yet
+ self.assertIsNone(changelog_manager.latest_long_lived_changelog_id())
+ self.assertIsNone(changelog_manager.earliest_long_lived_changelog_id())
+
def test_current_branch(self):
"""Test that current_branch returns the branch from options."""
from pypaimon.branch.branch_manager import DEFAULT_MAIN_BRANCH