This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 348c99b5dc [python] Add CommitCallback support for post-commit hooks
(#7925)
348c99b5dc is described below
commit 348c99b5dc713a97dc6e9922ee72b057dca08fc6
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu May 21 20:15:17 2026 +0800
[python] Add CommitCallback support for post-commit hooks (#7925)
---
docs/content/pypaimon/python-api.md | 39 +++
.../pypaimon/tests/write/commit_callback_test.py | 266 +++++++++++++++++++++
paimon-python/pypaimon/write/commit_callback.py | 51 ++++
paimon-python/pypaimon/write/file_store_commit.py | 20 +-
paimon-python/pypaimon/write/table_commit.py | 10 +-
5 files changed, 384 insertions(+), 2 deletions(-)
diff --git a/docs/content/pypaimon/python-api.md
b/docs/content/pypaimon/python-api.md
index 2c18cf060a..ee44bcd16c 100644
--- a/docs/content/pypaimon/python-api.md
+++ b/docs/content/pypaimon/python-api.md
@@ -285,6 +285,45 @@ write_builder = table.new_batch_write_builder().overwrite()
write_builder = table.new_batch_write_builder().overwrite({'dt': '2024-01-01'})
```
+### Commit Callback
+
+You can register `CommitCallback` instances on a `TableCommit` to be notified
after each successful
+snapshot commit. This is useful for post-commit actions such as syncing
metadata to external systems.
+
+Implementations must be **idempotent** — a callback may be invoked more than
once for the same commit
+if a failure occurs right after the commit succeeds.
+
+```python
+from pypaimon.write.commit_callback import CommitCallback,
CommitCallbackContext
+
+class MyCallback(CommitCallback):
+ def call(self, context: CommitCallbackContext) -> None:
+ print(f"Committed snapshot {context.snapshot.id}, "
+ f"{len(context.commit_entries)} entries, "
+ f"identifier {context.identifier}")
+
+ def close(self) -> None:
+ pass # release resources if needed
+
+write_builder = table.new_batch_write_builder()
+table_write = write_builder.new_write()
+table_commit = write_builder.new_commit()
+table_commit.add_commit_callback(MyCallback())
+
+table_write.write_arrow(data)
+table_commit.commit(table_write.prepare_commit())
+table_write.close()
+table_commit.close()
+```
+
+`CommitCallbackContext` provides:
+
+| Field | Type | Description
|
+|------------------|---------------------|------------------------------------------|
+| `snapshot` | `Snapshot` | The committed snapshot (id,
commit_kind, time_millis, next_row_id, …) |
+| `commit_entries` | `List[ManifestEntry]` | Delta manifest entries in this
commit (each carries `file.first_row_id` when row-tracking is enabled) |
+| `identifier` | `int` | Commit identifier
|
+
## Batch Read
### Predicate pushdown
diff --git a/paimon-python/pypaimon/tests/write/commit_callback_test.py
b/paimon-python/pypaimon/tests/write/commit_callback_test.py
new file mode 100644
index 0000000000..4e02bf32f6
--- /dev/null
+++ b/paimon-python/pypaimon/tests/write/commit_callback_test.py
@@ -0,0 +1,266 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+import shutil
+import tempfile
+import unittest
+
+import pyarrow as pa
+
+from pypaimon import CatalogFactory, Schema
+from pypaimon.write.commit_callback import CommitCallback,
CommitCallbackContext
+
+
+class RecordingCallback(CommitCallback):
+ """Test callback that records all invocations."""
+
+ def __init__(self):
+ self.contexts = []
+ self.closed = False
+
+ def call(self, context: CommitCallbackContext) -> None:
+ self.contexts.append(context)
+
+ def close(self) -> None:
+ self.closed = True
+
+
+class CommitCallbackTest(unittest.TestCase):
+
+ @classmethod
+ def setUpClass(cls):
+ cls.tempdir = tempfile.mkdtemp()
+ cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
+ cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse})
+ cls.catalog.create_database('default', True)
+ cls.pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('name', pa.string()),
+ ('dt', pa.string()),
+ ])
+
+ @classmethod
+ def tearDownClass(cls):
+ shutil.rmtree(cls.tempdir, ignore_errors=True)
+
+ def _create_table(self, table_name, partition_keys=None, options=None):
+ schema = Schema.from_pyarrow_schema(
+ self.pa_schema, partition_keys=partition_keys or [],
+ options=options or {})
+ self.catalog.create_table(f'default.{table_name}', schema, False)
+ return self.catalog.get_table(f'default.{table_name}')
+
+ def test_callback_invoked_on_commit(self):
+ table = self._create_table('test_callback_invoked')
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+
+ callback = RecordingCallback()
+ table_commit.add_commit_callback(callback)
+
+ data = pa.Table.from_pydict({
+ 'id': [1, 2],
+ 'name': ['a', 'b'],
+ 'dt': ['p1', 'p1'],
+ }, schema=self.pa_schema)
+ table_write.write_arrow(data)
+ table_commit.commit(table_write.prepare_commit())
+
+ self.assertEqual(1, len(callback.contexts))
+ ctx = callback.contexts[0]
+ self.assertEqual(1, ctx.snapshot.id)
+ self.assertEqual('APPEND', ctx.snapshot.commit_kind)
+ self.assertGreater(len(ctx.commit_entries), 0)
+
+ table_write.close()
+ table_commit.close()
+
+ def test_callback_receives_correct_snapshot_data(self):
+ table = self._create_table('test_callback_snapshot_data',
partition_keys=['dt'])
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+
+ callback = RecordingCallback()
+ table_commit.add_commit_callback(callback)
+
+ data = pa.Table.from_pydict({
+ 'id': [1, 2, 3],
+ 'name': ['a', 'b', 'c'],
+ 'dt': ['p1', 'p1', 'p2'],
+ }, schema=self.pa_schema)
+ table_write.write_arrow(data)
+ table_commit.commit(table_write.prepare_commit())
+
+ ctx = callback.contexts[0]
+ self.assertEqual(3, ctx.snapshot.delta_record_count)
+ self.assertEqual(3, ctx.snapshot.total_record_count)
+
+ table_write.close()
+ table_commit.close()
+
+ def test_multiple_callbacks(self):
+ table = self._create_table('test_multi_callbacks')
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+
+ cb1 = RecordingCallback()
+ cb2 = RecordingCallback()
+ table_commit.add_commit_callback(cb1)
+ table_commit.add_commit_callback(cb2)
+
+ data = pa.Table.from_pydict({
+ 'id': [1],
+ 'name': ['a'],
+ 'dt': ['p1'],
+ }, schema=self.pa_schema)
+ table_write.write_arrow(data)
+ table_commit.commit(table_write.prepare_commit())
+
+ self.assertEqual(1, len(cb1.contexts))
+ self.assertEqual(1, len(cb2.contexts))
+ self.assertEqual(cb1.contexts[0].snapshot.id,
cb2.contexts[0].snapshot.id)
+
+ table_write.close()
+ table_commit.close()
+
+ def test_callback_close_on_commit_close(self):
+ table = self._create_table('test_callback_close')
+ write_builder = table.new_batch_write_builder()
+ table_commit = write_builder.new_commit()
+
+ callback = RecordingCallback()
+ table_commit.add_commit_callback(callback)
+
+ self.assertFalse(callback.closed)
+ table_commit.close()
+ self.assertTrue(callback.closed)
+
+ def test_callback_not_invoked_when_no_data(self):
+ table = self._create_table('test_callback_no_data')
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+
+ callback = RecordingCallback()
+ table_commit.add_commit_callback(callback)
+
+ table_commit.commit(table_write.prepare_commit())
+
+ self.assertEqual(0, len(callback.contexts))
+
+ table_write.close()
+ table_commit.close()
+
+ def test_stream_commit_callback_multiple_rounds(self):
+ table = self._create_table('test_stream_callback')
+ write_builder = table.new_stream_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+
+ callback = RecordingCallback()
+ table_commit.add_commit_callback(callback)
+
+ for i in range(3):
+ data = pa.Table.from_pydict({
+ 'id': [i],
+ 'name': [f'name_{i}'],
+ 'dt': ['p1'],
+ }, schema=self.pa_schema)
+ table_write.write_arrow(data)
+ table_commit.commit(table_write.prepare_commit(i),
commit_identifier=i)
+
+ self.assertEqual(3, len(callback.contexts))
+ for i, ctx in enumerate(callback.contexts):
+ self.assertEqual(i + 1, ctx.snapshot.id)
+
+ table_write.close()
+ table_commit.close()
+
+ def test_data_evolution_callback_sees_row_id(self):
+ table = self._create_table('test_de_row_id', options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ })
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+
+ callback = RecordingCallback()
+ table_commit.add_commit_callback(callback)
+
+ data = pa.Table.from_pydict({
+ 'id': [1, 2, 3],
+ 'name': ['a', 'b', 'c'],
+ 'dt': ['p1', 'p2', 'p3'],
+ }, schema=self.pa_schema)
+ table_write.write_arrow(data)
+ table_commit.commit(table_write.prepare_commit())
+
+ ctx = callback.contexts[0]
+ self.assertIsNotNone(ctx.snapshot.next_row_id)
+ for entry in ctx.commit_entries:
+ self.assertIsNotNone(entry.file.first_row_id)
+
+ total_rows = sum(e.file.row_count for e in ctx.commit_entries)
+ self.assertEqual(3, total_rows)
+ self.assertEqual(total_rows, ctx.snapshot.next_row_id)
+
+ table_write.close()
+ table_commit.close()
+
+ def test_data_evolution_callback_row_id_increments_across_commits(self):
+ table = self._create_table('test_de_row_id_incr', options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ })
+ write_builder = table.new_stream_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+
+ callback = RecordingCallback()
+ table_commit.add_commit_callback(callback)
+
+ for i in range(3):
+ data = pa.Table.from_pydict({
+ 'id': [i * 2, i * 2 + 1],
+ 'name': [f'a{i}', f'b{i}'],
+ 'dt': ['p1', 'p1'],
+ }, schema=self.pa_schema)
+ table_write.write_arrow(data)
+ table_commit.commit(table_write.prepare_commit(i),
commit_identifier=i)
+
+ self.assertEqual(3, len(callback.contexts))
+
+ # Row IDs must be assigned and monotonically increasing across commits
+ prev_next_row_id = 0
+ for ctx in callback.contexts:
+ for entry in ctx.commit_entries:
+ self.assertIsNotNone(entry.file.first_row_id)
+ self.assertGreaterEqual(entry.file.first_row_id,
prev_next_row_id)
+ self.assertGreater(ctx.snapshot.next_row_id, prev_next_row_id)
+ prev_next_row_id = ctx.snapshot.next_row_id
+
+ table_write.close()
+ table_commit.close()
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/paimon-python/pypaimon/write/commit_callback.py
b/paimon-python/pypaimon/write/commit_callback.py
new file mode 100644
index 0000000000..fd09aa67f3
--- /dev/null
+++ b/paimon-python/pypaimon/write/commit_callback.py
@@ -0,0 +1,51 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from abc import ABC, abstractmethod
+from dataclasses import dataclass
+from typing import List
+
+from pypaimon.manifest.schema.manifest_entry import ManifestEntry
+from pypaimon.snapshot.snapshot import Snapshot
+
+
+@dataclass
+class CommitCallbackContext:
+ """Context passed to CommitCallback after a successful commit.
+
+ Implementations must be idempotent because the callback might be called
+ multiple times if a failure occurs right after the commit.
+ """
+
+ snapshot: Snapshot
+ commit_entries: List[ManifestEntry]
+ identifier: int
+
+
+class CommitCallback(ABC):
+ """Callback invoked after a list of commit entries is successfully
committed.
+
+ Implementations must be idempotent because the callback might be called
+ multiple times if a failure occurs right after the commit.
+ """
+
+ @abstractmethod
+ def call(self, context: CommitCallbackContext) -> None:
+ """Invoked after a snapshot is successfully committed."""
+
+ def close(self) -> None:
+ """Release any resources held by this callback."""
diff --git a/paimon-python/pypaimon/write/file_store_commit.py
b/paimon-python/pypaimon/write/file_store_commit.py
index d88beb164e..486e289240 100644
--- a/paimon-python/pypaimon/write/file_store_commit.py
+++ b/paimon-python/pypaimon/write/file_store_commit.py
@@ -39,6 +39,7 @@ from pypaimon.table.row.offset_row import OffsetRow
from pypaimon.write.commit.commit_rollback import CommitRollback
from pypaimon.write.commit.commit_scanner import CommitScanner
from pypaimon.write.commit.conflict_detection import ConflictDetection
+from pypaimon.write.commit_callback import CommitCallback,
CommitCallbackContext
from pypaimon.write.commit_message import CommitMessage
logger = logging.getLogger(__name__)
@@ -77,12 +78,14 @@ class FileStoreCommit:
org.apache.paimon.operation.FileStoreCommitImpl in Java.
"""
- def __init__(self, snapshot_commit: SnapshotCommit, table, commit_user:
str):
+ def __init__(self, snapshot_commit: SnapshotCommit, table, commit_user:
str,
+ commit_callbacks: Optional[List[CommitCallback]] = None):
from pypaimon.table.file_store_table import FileStoreTable
self.snapshot_commit = snapshot_commit
self.table: FileStoreTable = table
self.commit_user = commit_user
+ self.commit_callbacks: List[CommitCallback] = commit_callbacks if
commit_callbacks is not None else []
self.snapshot_manager = table.snapshot_manager()
self.manifest_file_manager = ManifestFileManager(table)
@@ -436,6 +439,16 @@ class FileStoreCommit:
commit_identifier,
commit_kind,
)
+
+ if self.commit_callbacks:
+ context = CommitCallbackContext(
+ snapshot=snapshot_data,
+ commit_entries=commit_entries,
+ identifier=commit_identifier,
+ )
+ for callback in self.commit_callbacks:
+ callback.call(context)
+
return SuccessResult()
def _write_manifest_file(self, commit_entries, new_manifest_file):
@@ -626,6 +639,11 @@ class FileStoreCommit:
def close(self):
"""Close the FileStoreCommit and release resources."""
+ for callback in self.commit_callbacks:
+ try:
+ callback.close()
+ except Exception:
+ pass
if hasattr(self.snapshot_commit, 'close'):
self.snapshot_commit.close()
diff --git a/paimon-python/pypaimon/write/table_commit.py
b/paimon-python/pypaimon/write/table_commit.py
index 54298f3b19..4a758cb176 100644
--- a/paimon-python/pypaimon/write/table_commit.py
+++ b/paimon-python/pypaimon/write/table_commit.py
@@ -21,6 +21,7 @@ from typing import Dict, List, Optional
from pypaimon.snapshot.snapshot import BATCH_COMMIT_IDENTIFIER
logger = logging.getLogger(__name__)
+from pypaimon.write.commit_callback import CommitCallback
from pypaimon.write.commit_message import CommitMessage
from pypaimon.write.file_store_commit import FileStoreCommit
@@ -50,7 +51,14 @@ class TableCommit:
if snapshot_commit is None:
raise RuntimeError("Table does not provide a SnapshotCommit
instance")
- self.file_store_commit = FileStoreCommit(snapshot_commit, table,
commit_user)
+ self._commit_callbacks: List[CommitCallback] = []
+ self.file_store_commit = FileStoreCommit(
+ snapshot_commit, table, commit_user,
+ commit_callbacks=self._commit_callbacks)
+
+ def add_commit_callback(self, callback: CommitCallback) -> None:
+ """Register a callback to be invoked after each successful commit."""
+ self._commit_callbacks.append(callback)
def _commit(self, commit_messages: List[CommitMessage], commit_identifier:
int = BATCH_COMMIT_IDENTIFIER):
non_empty_messages = [msg for msg in commit_messages if not
msg.is_empty()]