This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push: new af7be8440f [Core] Custom commit callback support to be initialized with table (#6004) af7be8440f is described below commit af7be8440f504edac8ec097f926fe0b88af8301b Author: yuzelin <33053040+yuze...@users.noreply.github.com> AuthorDate: Fri Aug 1 12:27:36 2025 +0800 [Core] Custom commit callback support to be initialized with table (#6004) --- .../src/main/java/org/apache/paimon/AbstractFileStore.java | 2 +- .../main/java/org/apache/paimon/table/sink/CallbackUtils.java | 9 +++++++-- .../main/java/org/apache/paimon/table/sink/CommitCallback.java | 3 +++ 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java index d998b290dd..11f67a2c10 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java @@ -357,7 +357,7 @@ abstract class AbstractFileStore<T> implements FileStore<T> { private List<CommitCallback> createCommitCallbacks(String commitUser, FileStoreTable table) { List<CommitCallback> callbacks = - new ArrayList<>(CallbackUtils.loadCommitCallbacks(options)); + new ArrayList<>(CallbackUtils.loadCommitCallbacks(options, table)); if (options.partitionedTableInMetastore() && !schema.partitionKeys().isEmpty()) { PartitionHandler partitionHandler = catalogEnvironment.partitionHandler(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/CallbackUtils.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/CallbackUtils.java index 7d8a0a849c..bec038718c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/CallbackUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/CallbackUtils.java @@ -19,6 +19,7 @@ package org.apache.paimon.table.sink; import org.apache.paimon.CoreOptions; +import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.utils.Preconditions; import java.util.ArrayList; @@ -32,8 +33,12 @@ public class CallbackUtils { return loadCallbacks(coreOptions.tagCallbacks(), TagCallback.class); } - public static List<CommitCallback> loadCommitCallbacks(CoreOptions coreOptions) { - return loadCallbacks(coreOptions.commitCallbacks(), CommitCallback.class); + public static List<CommitCallback> loadCommitCallbacks( + CoreOptions coreOptions, FileStoreTable table) { + List<CommitCallback> commitCallbacks = + loadCallbacks(coreOptions.commitCallbacks(), CommitCallback.class); + commitCallbacks.forEach(callback -> callback.setTable(table)); + return commitCallbacks; } @SuppressWarnings("unchecked") diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitCallback.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitCallback.java index 40e615198e..22e3ce753e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitCallback.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitCallback.java @@ -22,6 +22,7 @@ import org.apache.paimon.Snapshot; import org.apache.paimon.manifest.IndexManifestEntry; import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.manifest.ManifestEntry; +import org.apache.paimon.table.FileStoreTable; import java.util.List; @@ -44,4 +45,6 @@ public interface CommitCallback extends AutoCloseable { Snapshot snapshot); void retry(ManifestCommittable committable); + + default void setTable(FileStoreTable table) {} }