This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 5749eafd0a [flink] StoreMultiCommitter support dynamic options per 
table (#7404)
5749eafd0a is described below

commit 5749eafd0a6946f4407cfaa717543c6612b1e26c
Author: yuzelin <[email protected]>
AuthorDate: Wed Mar 11 19:33:25 2026 +0800

    [flink] StoreMultiCommitter support dynamic options per table (#7404)
---
 .../paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java |  1 +
 .../apache/paimon/flink/sink/StoreMultiCommitter.java | 19 +++++++++++++++++--
 2 files changed, 18 insertions(+), 2 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
index c9ae95b698..cdf619725d 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
@@ -170,6 +170,7 @@ public class FlinkCdcMultiTableSink implements Serializable 
{
                         context,
                         false,
                         Collections.emptyMap(),
+                        null,
                         eagerInit,
                         tableFilter);
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
index d8dde60b31..ee2278807b 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
@@ -28,6 +28,8 @@ import org.apache.paimon.table.sink.CommitMessage;
 
 import org.apache.flink.api.java.tuple.Tuple2;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -56,6 +58,7 @@ public class StoreMultiCommitter
     // Currently, only compact_database job needs to ignore empty commit and 
set dynamic options
     private final boolean ignoreEmptyCommit;
     private final Map<String, String> dynamicOptions;
+    @Nullable private final Map<Identifier, Map<String, String>> 
dynamicOptionsPerTable;
 
     private final TableFilter tableFilter;
 
@@ -68,7 +71,7 @@ public class StoreMultiCommitter
             Context context,
             boolean ignoreEmptyCommit,
             Map<String, String> dynamicOptions) {
-        this(catalogLoader, context, ignoreEmptyCommit, dynamicOptions, false, 
null);
+        this(catalogLoader, context, ignoreEmptyCommit, dynamicOptions, null, 
false, null);
     }
 
     public StoreMultiCommitter(
@@ -76,12 +79,14 @@ public class StoreMultiCommitter
             Context context,
             boolean ignoreEmptyCommit,
             Map<String, String> dynamicOptions,
+            @Nullable Map<Identifier, Map<String, String>> 
dynamicOptionsPerTable,
             boolean eagerInit,
             TableFilter tableFilter) {
         this.catalog = catalogLoader.load();
         this.context = context;
         this.ignoreEmptyCommit = ignoreEmptyCommit;
         this.dynamicOptions = dynamicOptions;
+        this.dynamicOptionsPerTable = dynamicOptionsPerTable;
         this.tableCommitters = new HashMap<>();
 
         this.tableFilter = tableFilter;
@@ -213,7 +218,7 @@ public class StoreMultiCommitter
         if (committer == null) {
             FileStoreTable table;
             try {
-                table = (FileStoreTable) 
catalog.getTable(tableId).copy(dynamicOptions);
+                table = (FileStoreTable) 
catalog.getTable(tableId).copy(getDynamicOptions(tableId));
             } catch (Catalog.TableNotExistException e) {
                 throw new RuntimeException(
                         String.format(
@@ -232,6 +237,16 @@ public class StoreMultiCommitter
         return committer;
     }
 
+    private Map<String, String> getDynamicOptions(Identifier identifier) {
+        if (dynamicOptionsPerTable != null) {
+            Map<String, String> dynamicOptions = 
dynamicOptionsPerTable.get(identifier);
+            if (dynamicOptions != null) {
+                return dynamicOptions;
+            }
+        }
+        return this.dynamicOptions;
+    }
+
     @Override
     public void close() throws Exception {
         for (StoreCommitter committer : tableCommitters.values()) {

Reply via email to