This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new af7be8440f [Core] Custom commit callback support to be initialized 
with table (#6004)
af7be8440f is described below

commit af7be8440f504edac8ec097f926fe0b88af8301b
Author: yuzelin <33053040+yuze...@users.noreply.github.com>
AuthorDate: Fri Aug 1 12:27:36 2025 +0800

    [Core] Custom commit callback support to be initialized with table (#6004)
---
 .../src/main/java/org/apache/paimon/AbstractFileStore.java       | 2 +-
 .../main/java/org/apache/paimon/table/sink/CallbackUtils.java    | 9 +++++++--
 .../main/java/org/apache/paimon/table/sink/CommitCallback.java   | 3 +++
 3 files changed, 11 insertions(+), 3 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index d998b290dd..11f67a2c10 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -357,7 +357,7 @@ abstract class AbstractFileStore<T> implements FileStore<T> 
{
 
     private List<CommitCallback> createCommitCallbacks(String commitUser, 
FileStoreTable table) {
         List<CommitCallback> callbacks =
-                new ArrayList<>(CallbackUtils.loadCommitCallbacks(options));
+                new ArrayList<>(CallbackUtils.loadCommitCallbacks(options, 
table));
 
         if (options.partitionedTableInMetastore() && 
!schema.partitionKeys().isEmpty()) {
             PartitionHandler partitionHandler = 
catalogEnvironment.partitionHandler();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CallbackUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CallbackUtils.java
index 7d8a0a849c..bec038718c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/CallbackUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/CallbackUtils.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.table.sink;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.utils.Preconditions;
 
 import java.util.ArrayList;
@@ -32,8 +33,12 @@ public class CallbackUtils {
         return loadCallbacks(coreOptions.tagCallbacks(), TagCallback.class);
     }
 
-    public static List<CommitCallback> loadCommitCallbacks(CoreOptions 
coreOptions) {
-        return loadCallbacks(coreOptions.commitCallbacks(), 
CommitCallback.class);
+    public static List<CommitCallback> loadCommitCallbacks(
+            CoreOptions coreOptions, FileStoreTable table) {
+        List<CommitCallback> commitCallbacks =
+                loadCallbacks(coreOptions.commitCallbacks(), 
CommitCallback.class);
+        commitCallbacks.forEach(callback -> callback.setTable(table));
+        return commitCallbacks;
     }
 
     @SuppressWarnings("unchecked")
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitCallback.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitCallback.java
index 40e615198e..22e3ce753e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitCallback.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitCallback.java
@@ -22,6 +22,7 @@ import org.apache.paimon.Snapshot;
 import org.apache.paimon.manifest.IndexManifestEntry;
 import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.table.FileStoreTable;
 
 import java.util.List;
 
@@ -44,4 +45,6 @@ public interface CommitCallback extends AutoCloseable {
             Snapshot snapshot);
 
     void retry(ManifestCommittable committable);
+
+    default void setTable(FileStoreTable table) {}
 }

Reply via email to