This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 5749eafd0a [flink] StoreMultiCommitter support dynamic options per
table (#7404)
5749eafd0a is described below
commit 5749eafd0a6946f4407cfaa717543c6612b1e26c
Author: yuzelin <[email protected]>
AuthorDate: Wed Mar 11 19:33:25 2026 +0800
[flink] StoreMultiCommitter support dynamic options per table (#7404)
---
.../paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java | 1 +
.../apache/paimon/flink/sink/StoreMultiCommitter.java | 19 +++++++++++++++++--
2 files changed, 18 insertions(+), 2 deletions(-)
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
index c9ae95b698..cdf619725d 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
@@ -170,6 +170,7 @@ public class FlinkCdcMultiTableSink implements Serializable
{
context,
false,
Collections.emptyMap(),
+ null,
eagerInit,
tableFilter);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
index d8dde60b31..ee2278807b 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
@@ -28,6 +28,8 @@ import org.apache.paimon.table.sink.CommitMessage;
import org.apache.flink.api.java.tuple.Tuple2;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
@@ -56,6 +58,7 @@ public class StoreMultiCommitter
// Currently, only compact_database job needs to ignore empty commit and
set dynamic options
private final boolean ignoreEmptyCommit;
private final Map<String, String> dynamicOptions;
+ @Nullable private final Map<Identifier, Map<String, String>>
dynamicOptionsPerTable;
private final TableFilter tableFilter;
@@ -68,7 +71,7 @@ public class StoreMultiCommitter
Context context,
boolean ignoreEmptyCommit,
Map<String, String> dynamicOptions) {
- this(catalogLoader, context, ignoreEmptyCommit, dynamicOptions, false,
null);
+ this(catalogLoader, context, ignoreEmptyCommit, dynamicOptions, null,
false, null);
}
public StoreMultiCommitter(
@@ -76,12 +79,14 @@ public class StoreMultiCommitter
Context context,
boolean ignoreEmptyCommit,
Map<String, String> dynamicOptions,
+ @Nullable Map<Identifier, Map<String, String>>
dynamicOptionsPerTable,
boolean eagerInit,
TableFilter tableFilter) {
this.catalog = catalogLoader.load();
this.context = context;
this.ignoreEmptyCommit = ignoreEmptyCommit;
this.dynamicOptions = dynamicOptions;
+ this.dynamicOptionsPerTable = dynamicOptionsPerTable;
this.tableCommitters = new HashMap<>();
this.tableFilter = tableFilter;
@@ -213,7 +218,7 @@ public class StoreMultiCommitter
if (committer == null) {
FileStoreTable table;
try {
- table = (FileStoreTable)
catalog.getTable(tableId).copy(dynamicOptions);
+ table = (FileStoreTable)
catalog.getTable(tableId).copy(getDynamicOptions(tableId));
} catch (Catalog.TableNotExistException e) {
throw new RuntimeException(
String.format(
@@ -232,6 +237,16 @@ public class StoreMultiCommitter
return committer;
}
+ private Map<String, String> getDynamicOptions(Identifier identifier) {
+ if (dynamicOptionsPerTable != null) {
+ Map<String, String> dynamicOptions =
dynamicOptionsPerTable.get(identifier);
+ if (dynamicOptions != null) {
+ return dynamicOptions;
+ }
+ }
+ return this.dynamicOptions;
+ }
+
@Override
public void close() throws Exception {
for (StoreCommitter committer : tableCommitters.values()) {