This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 348c99b5dc [python] Add CommitCallback support for post-commit hooks 
(#7925)
348c99b5dc is described below

commit 348c99b5dc713a97dc6e9922ee72b057dca08fc6
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu May 21 20:15:17 2026 +0800

    [python] Add CommitCallback support for post-commit hooks (#7925)
---
 docs/content/pypaimon/python-api.md                |  39 +++
 .../pypaimon/tests/write/commit_callback_test.py   | 266 +++++++++++++++++++++
 paimon-python/pypaimon/write/commit_callback.py    |  51 ++++
 paimon-python/pypaimon/write/file_store_commit.py  |  20 +-
 paimon-python/pypaimon/write/table_commit.py       |  10 +-
 5 files changed, 384 insertions(+), 2 deletions(-)

diff --git a/docs/content/pypaimon/python-api.md 
b/docs/content/pypaimon/python-api.md
index 2c18cf060a..ee44bcd16c 100644
--- a/docs/content/pypaimon/python-api.md
+++ b/docs/content/pypaimon/python-api.md
@@ -285,6 +285,45 @@ write_builder = table.new_batch_write_builder().overwrite()
 write_builder = table.new_batch_write_builder().overwrite({'dt': '2024-01-01'})
 ```
 
+### Commit Callback
+
+You can register `CommitCallback` instances on a `TableCommit` to be notified 
after each successful
+snapshot commit. This is useful for post-commit actions such as syncing 
metadata to external systems.
+
+Implementations must be **idempotent** — a callback may be invoked more than 
once for the same commit
+if a failure occurs right after the commit succeeds.
+
+```python
+from pypaimon.write.commit_callback import CommitCallback, 
CommitCallbackContext
+
+class MyCallback(CommitCallback):
+    def call(self, context: CommitCallbackContext) -> None:
+        print(f"Committed snapshot {context.snapshot.id}, "
+              f"{len(context.commit_entries)} entries, "
+              f"identifier {context.identifier}")
+
+    def close(self) -> None:
+        pass  # release resources if needed
+
+write_builder = table.new_batch_write_builder()
+table_write = write_builder.new_write()
+table_commit = write_builder.new_commit()
+table_commit.add_commit_callback(MyCallback())
+
+table_write.write_arrow(data)
+table_commit.commit(table_write.prepare_commit())
+table_write.close()
+table_commit.close()
+```
+
+`CommitCallbackContext` provides:
+
+| Field            | Type                | Description                         
     |
+|------------------|---------------------|------------------------------------------|
+| `snapshot`       | `Snapshot`          | The committed snapshot (id, 
commit_kind, time_millis, next_row_id, …) |
+| `commit_entries` | `List[ManifestEntry]` | Delta manifest entries in this 
commit (each carries `file.first_row_id` when row-tracking is enabled) |
+| `identifier`     | `int`               | Commit identifier                   
     |
+
 ## Batch Read
 
 ### Predicate pushdown
diff --git a/paimon-python/pypaimon/tests/write/commit_callback_test.py 
b/paimon-python/pypaimon/tests/write/commit_callback_test.py
new file mode 100644
index 0000000000..4e02bf32f6
--- /dev/null
+++ b/paimon-python/pypaimon/tests/write/commit_callback_test.py
@@ -0,0 +1,266 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+import shutil
+import tempfile
+import unittest
+
+import pyarrow as pa
+
+from pypaimon import CatalogFactory, Schema
+from pypaimon.write.commit_callback import CommitCallback, 
CommitCallbackContext
+
+
+class RecordingCallback(CommitCallback):
+    """Test callback that records all invocations."""
+
+    def __init__(self):
+        self.contexts = []
+        self.closed = False
+
+    def call(self, context: CommitCallbackContext) -> None:
+        self.contexts.append(context)
+
+    def close(self) -> None:
+        self.closed = True
+
+
+class CommitCallbackTest(unittest.TestCase):
+
+    @classmethod
+    def setUpClass(cls):
+        cls.tempdir = tempfile.mkdtemp()
+        cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
+        cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse})
+        cls.catalog.create_database('default', True)
+        cls.pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('name', pa.string()),
+            ('dt', pa.string()),
+        ])
+
+    @classmethod
+    def tearDownClass(cls):
+        shutil.rmtree(cls.tempdir, ignore_errors=True)
+
+    def _create_table(self, table_name, partition_keys=None, options=None):
+        schema = Schema.from_pyarrow_schema(
+            self.pa_schema, partition_keys=partition_keys or [],
+            options=options or {})
+        self.catalog.create_table(f'default.{table_name}', schema, False)
+        return self.catalog.get_table(f'default.{table_name}')
+
+    def test_callback_invoked_on_commit(self):
+        table = self._create_table('test_callback_invoked')
+        write_builder = table.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+
+        callback = RecordingCallback()
+        table_commit.add_commit_callback(callback)
+
+        data = pa.Table.from_pydict({
+            'id': [1, 2],
+            'name': ['a', 'b'],
+            'dt': ['p1', 'p1'],
+        }, schema=self.pa_schema)
+        table_write.write_arrow(data)
+        table_commit.commit(table_write.prepare_commit())
+
+        self.assertEqual(1, len(callback.contexts))
+        ctx = callback.contexts[0]
+        self.assertEqual(1, ctx.snapshot.id)
+        self.assertEqual('APPEND', ctx.snapshot.commit_kind)
+        self.assertGreater(len(ctx.commit_entries), 0)
+
+        table_write.close()
+        table_commit.close()
+
+    def test_callback_receives_correct_snapshot_data(self):
+        table = self._create_table('test_callback_snapshot_data', 
partition_keys=['dt'])
+        write_builder = table.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+
+        callback = RecordingCallback()
+        table_commit.add_commit_callback(callback)
+
+        data = pa.Table.from_pydict({
+            'id': [1, 2, 3],
+            'name': ['a', 'b', 'c'],
+            'dt': ['p1', 'p1', 'p2'],
+        }, schema=self.pa_schema)
+        table_write.write_arrow(data)
+        table_commit.commit(table_write.prepare_commit())
+
+        ctx = callback.contexts[0]
+        self.assertEqual(3, ctx.snapshot.delta_record_count)
+        self.assertEqual(3, ctx.snapshot.total_record_count)
+
+        table_write.close()
+        table_commit.close()
+
+    def test_multiple_callbacks(self):
+        table = self._create_table('test_multi_callbacks')
+        write_builder = table.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+
+        cb1 = RecordingCallback()
+        cb2 = RecordingCallback()
+        table_commit.add_commit_callback(cb1)
+        table_commit.add_commit_callback(cb2)
+
+        data = pa.Table.from_pydict({
+            'id': [1],
+            'name': ['a'],
+            'dt': ['p1'],
+        }, schema=self.pa_schema)
+        table_write.write_arrow(data)
+        table_commit.commit(table_write.prepare_commit())
+
+        self.assertEqual(1, len(cb1.contexts))
+        self.assertEqual(1, len(cb2.contexts))
+        self.assertEqual(cb1.contexts[0].snapshot.id, 
cb2.contexts[0].snapshot.id)
+
+        table_write.close()
+        table_commit.close()
+
+    def test_callback_close_on_commit_close(self):
+        table = self._create_table('test_callback_close')
+        write_builder = table.new_batch_write_builder()
+        table_commit = write_builder.new_commit()
+
+        callback = RecordingCallback()
+        table_commit.add_commit_callback(callback)
+
+        self.assertFalse(callback.closed)
+        table_commit.close()
+        self.assertTrue(callback.closed)
+
+    def test_callback_not_invoked_when_no_data(self):
+        table = self._create_table('test_callback_no_data')
+        write_builder = table.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+
+        callback = RecordingCallback()
+        table_commit.add_commit_callback(callback)
+
+        table_commit.commit(table_write.prepare_commit())
+
+        self.assertEqual(0, len(callback.contexts))
+
+        table_write.close()
+        table_commit.close()
+
+    def test_stream_commit_callback_multiple_rounds(self):
+        table = self._create_table('test_stream_callback')
+        write_builder = table.new_stream_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+
+        callback = RecordingCallback()
+        table_commit.add_commit_callback(callback)
+
+        for i in range(3):
+            data = pa.Table.from_pydict({
+                'id': [i],
+                'name': [f'name_{i}'],
+                'dt': ['p1'],
+            }, schema=self.pa_schema)
+            table_write.write_arrow(data)
+            table_commit.commit(table_write.prepare_commit(i), 
commit_identifier=i)
+
+        self.assertEqual(3, len(callback.contexts))
+        for i, ctx in enumerate(callback.contexts):
+            self.assertEqual(i + 1, ctx.snapshot.id)
+
+        table_write.close()
+        table_commit.close()
+
+    def test_data_evolution_callback_sees_row_id(self):
+        table = self._create_table('test_de_row_id', options={
+            'row-tracking.enabled': 'true',
+            'data-evolution.enabled': 'true',
+        })
+        write_builder = table.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+
+        callback = RecordingCallback()
+        table_commit.add_commit_callback(callback)
+
+        data = pa.Table.from_pydict({
+            'id': [1, 2, 3],
+            'name': ['a', 'b', 'c'],
+            'dt': ['p1', 'p2', 'p3'],
+        }, schema=self.pa_schema)
+        table_write.write_arrow(data)
+        table_commit.commit(table_write.prepare_commit())
+
+        ctx = callback.contexts[0]
+        self.assertIsNotNone(ctx.snapshot.next_row_id)
+        for entry in ctx.commit_entries:
+            self.assertIsNotNone(entry.file.first_row_id)
+
+        total_rows = sum(e.file.row_count for e in ctx.commit_entries)
+        self.assertEqual(3, total_rows)
+        self.assertEqual(total_rows, ctx.snapshot.next_row_id)
+
+        table_write.close()
+        table_commit.close()
+
+    def test_data_evolution_callback_row_id_increments_across_commits(self):
+        table = self._create_table('test_de_row_id_incr', options={
+            'row-tracking.enabled': 'true',
+            'data-evolution.enabled': 'true',
+        })
+        write_builder = table.new_stream_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+
+        callback = RecordingCallback()
+        table_commit.add_commit_callback(callback)
+
+        for i in range(3):
+            data = pa.Table.from_pydict({
+                'id': [i * 2, i * 2 + 1],
+                'name': [f'a{i}', f'b{i}'],
+                'dt': ['p1', 'p1'],
+            }, schema=self.pa_schema)
+            table_write.write_arrow(data)
+            table_commit.commit(table_write.prepare_commit(i), 
commit_identifier=i)
+
+        self.assertEqual(3, len(callback.contexts))
+
+        # Row IDs must be assigned and monotonically increasing across commits
+        prev_next_row_id = 0
+        for ctx in callback.contexts:
+            for entry in ctx.commit_entries:
+                self.assertIsNotNone(entry.file.first_row_id)
+                self.assertGreaterEqual(entry.file.first_row_id, 
prev_next_row_id)
+            self.assertGreater(ctx.snapshot.next_row_id, prev_next_row_id)
+            prev_next_row_id = ctx.snapshot.next_row_id
+
+        table_write.close()
+        table_commit.close()
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/paimon-python/pypaimon/write/commit_callback.py 
b/paimon-python/pypaimon/write/commit_callback.py
new file mode 100644
index 0000000000..fd09aa67f3
--- /dev/null
+++ b/paimon-python/pypaimon/write/commit_callback.py
@@ -0,0 +1,51 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from abc import ABC, abstractmethod
+from dataclasses import dataclass
+from typing import List
+
+from pypaimon.manifest.schema.manifest_entry import ManifestEntry
+from pypaimon.snapshot.snapshot import Snapshot
+
+
+@dataclass
+class CommitCallbackContext:
+    """Context passed to CommitCallback after a successful commit.
+
+    Implementations must be idempotent because the callback might be called
+    multiple times if a failure occurs right after the commit.
+    """
+
+    snapshot: Snapshot
+    commit_entries: List[ManifestEntry]
+    identifier: int
+
+
+class CommitCallback(ABC):
+    """Callback invoked after a list of commit entries is successfully 
committed.
+
+    Implementations must be idempotent because the callback might be called
+    multiple times if a failure occurs right after the commit.
+    """
+
+    @abstractmethod
+    def call(self, context: CommitCallbackContext) -> None:
+        """Invoked after a snapshot is successfully committed."""
+
+    def close(self) -> None:
+        """Release any resources held by this callback."""
diff --git a/paimon-python/pypaimon/write/file_store_commit.py 
b/paimon-python/pypaimon/write/file_store_commit.py
index d88beb164e..486e289240 100644
--- a/paimon-python/pypaimon/write/file_store_commit.py
+++ b/paimon-python/pypaimon/write/file_store_commit.py
@@ -39,6 +39,7 @@ from pypaimon.table.row.offset_row import OffsetRow
 from pypaimon.write.commit.commit_rollback import CommitRollback
 from pypaimon.write.commit.commit_scanner import CommitScanner
 from pypaimon.write.commit.conflict_detection import ConflictDetection
+from pypaimon.write.commit_callback import CommitCallback, 
CommitCallbackContext
 from pypaimon.write.commit_message import CommitMessage
 
 logger = logging.getLogger(__name__)
@@ -77,12 +78,14 @@ class FileStoreCommit:
     org.apache.paimon.operation.FileStoreCommitImpl in Java.
     """
 
-    def __init__(self, snapshot_commit: SnapshotCommit, table, commit_user: 
str):
+    def __init__(self, snapshot_commit: SnapshotCommit, table, commit_user: 
str,
+                 commit_callbacks: Optional[List[CommitCallback]] = None):
         from pypaimon.table.file_store_table import FileStoreTable
 
         self.snapshot_commit = snapshot_commit
         self.table: FileStoreTable = table
         self.commit_user = commit_user
+        self.commit_callbacks: List[CommitCallback] = commit_callbacks if 
commit_callbacks is not None else []
 
         self.snapshot_manager = table.snapshot_manager()
         self.manifest_file_manager = ManifestFileManager(table)
@@ -436,6 +439,16 @@ class FileStoreCommit:
             commit_identifier,
             commit_kind,
         )
+
+        if self.commit_callbacks:
+            context = CommitCallbackContext(
+                snapshot=snapshot_data,
+                commit_entries=commit_entries,
+                identifier=commit_identifier,
+            )
+            for callback in self.commit_callbacks:
+                callback.call(context)
+
         return SuccessResult()
 
     def _write_manifest_file(self, commit_entries, new_manifest_file):
@@ -626,6 +639,11 @@ class FileStoreCommit:
 
     def close(self):
         """Close the FileStoreCommit and release resources."""
+        for callback in self.commit_callbacks:
+            try:
+                callback.close()
+            except Exception:
+                pass
         if hasattr(self.snapshot_commit, 'close'):
             self.snapshot_commit.close()
 
diff --git a/paimon-python/pypaimon/write/table_commit.py 
b/paimon-python/pypaimon/write/table_commit.py
index 54298f3b19..4a758cb176 100644
--- a/paimon-python/pypaimon/write/table_commit.py
+++ b/paimon-python/pypaimon/write/table_commit.py
@@ -21,6 +21,7 @@ from typing import Dict, List, Optional
 from pypaimon.snapshot.snapshot import BATCH_COMMIT_IDENTIFIER
 
 logger = logging.getLogger(__name__)
+from pypaimon.write.commit_callback import CommitCallback
 from pypaimon.write.commit_message import CommitMessage
 from pypaimon.write.file_store_commit import FileStoreCommit
 
@@ -50,7 +51,14 @@ class TableCommit:
         if snapshot_commit is None:
             raise RuntimeError("Table does not provide a SnapshotCommit 
instance")
 
-        self.file_store_commit = FileStoreCommit(snapshot_commit, table, 
commit_user)
+        self._commit_callbacks: List[CommitCallback] = []
+        self.file_store_commit = FileStoreCommit(
+            snapshot_commit, table, commit_user,
+            commit_callbacks=self._commit_callbacks)
+
+    def add_commit_callback(self, callback: CommitCallback) -> None:
+        """Register a callback to be invoked after each successful commit."""
+        self._commit_callbacks.append(callback)
 
     def _commit(self, commit_messages: List[CommitMessage], commit_identifier: 
int = BATCH_COMMIT_IDENTIFIER):
         non_empty_messages = [msg for msg in commit_messages if not 
msg.is_empty()]

Reply via email to