This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new bd7436179d [flink] Fix StoreMultiCommitter with eager init mode (#5187)
bd7436179d is described below

commit bd7436179d384974bc20e0f5179fe1018327cd59
Author: YeJunHao <[email protected]>
AuthorDate: Fri Feb 28 23:07:03 2025 +0800

    [flink] Fix StoreMultiCommitter with eager init mode (#5187)
---
 .../java/org/apache/paimon/flink/sink/Committer.java   | 18 +++++++++++++++++-
 .../apache/paimon/flink/sink/CommitterOperator.java    |  7 ++++++-
 .../sink/MultiTableCommittableChannelComputer.java     |  9 ++++++---
 .../apache/paimon/flink/sink/StoreMultiCommitter.java  | 14 +++++++++++++-
 .../paimon/flink/sink/CommitterOperatorTest.java       |  4 +++-
 5 files changed, 45 insertions(+), 7 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java
index 81c2f6b007..23c6c7faeb 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java
@@ -81,6 +81,10 @@ public interface Committer<CommitT, GlobalCommitT> extends 
AutoCloseable {
         boolean isRestored();
 
         OperatorStateStore stateStore();
+
+        int getParallelism();
+
+        int getSubtaskIndex();
     }
 
     static Context createContext(
@@ -88,7 +92,9 @@ public interface Committer<CommitT, GlobalCommitT> extends 
AutoCloseable {
             @Nullable OperatorMetricGroup metricGroup,
             boolean streamingCheckpointEnabled,
             boolean isRestored,
-            OperatorStateStore stateStore) {
+            OperatorStateStore stateStore,
+            int parallelism,
+            int subtaskIndex) {
         return new Committer.Context() {
             @Override
             public String commitUser() {
@@ -114,6 +120,16 @@ public interface Committer<CommitT, GlobalCommitT> extends 
AutoCloseable {
             public OperatorStateStore stateStore() {
                 return stateStore;
             }
+
+            @Override
+            public int getParallelism() {
+                return parallelism;
+            }
+
+            @Override
+            public int getSubtaskIndex() {
+                return subtaskIndex;
+            }
         };
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
index 383cbcd6eb..4db63b4411 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
@@ -130,6 +130,9 @@ public class CommitterOperator<CommitT, GlobalCommitT> 
extends AbstractStreamOpe
                 StateUtils.getSingleValueFromState(
                         context, "commit_user_state", String.class, 
initialCommitUser);
         // parallelism of commit operator is always 1, so commitUser will 
never be null
+        int parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
+        int index = getRuntimeContext().getIndexOfThisSubtask();
+
         committer =
                 committerFactory.create(
                         Committer.createContext(
@@ -137,7 +140,9 @@ public class CommitterOperator<CommitT, GlobalCommitT> 
extends AbstractStreamOpe
                                 getMetricGroup(),
                                 streamingCheckpointEnabled,
                                 context.isRestored(),
-                                context.getOperatorStateStore()));
+                                context.getOperatorStateStore(),
+                                parallelism,
+                                index));
 
         committableStateManager.initializeState(context, committer);
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCommittableChannelComputer.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCommittableChannelComputer.java
index 405c6af271..dccc3b84d7 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCommittableChannelComputer.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCommittableChannelComputer.java
@@ -37,9 +37,12 @@ public class MultiTableCommittableChannelComputer
 
     @Override
     public int channel(MultiTableCommittable multiTableCommittable) {
-        return Math.floorMod(
-                Objects.hash(multiTableCommittable.getDatabase(), 
multiTableCommittable.getTable()),
-                numChannels);
+        return computeChannel(
+                multiTableCommittable.getDatabase(), 
multiTableCommittable.getTable(), numChannels);
+    }
+
+    public static int computeChannel(String database, String table, int 
numChannels) {
+        return Math.floorMod(Objects.hash(database, table), numChannels);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
index 67b2b6bd46..8ad3e4fb08 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
@@ -85,9 +85,21 @@ public class StoreMultiCommitter
         this.tableCommitters = new HashMap<>();
 
         this.tableFilter = tableFilter;
+        int parallelism = context.getParallelism();
+        int index = context.getSubtaskIndex();
 
         if (eagerInit) {
-            List<Identifier> tableIds = filterTables();
+            List<Identifier> tableIds =
+                    filterTables().stream()
+                            .filter(
+                                    identifier ->
+                                            
MultiTableCommittableChannelComputer.computeChannel(
+                                                            
identifier.getDatabaseName(),
+                                                            
identifier.getTableName(),
+                                                            parallelism)
+                                                    == index)
+                            .collect(Collectors.toList());
+
             tableIds.stream().forEach(this::getStoreCommitter);
         }
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
index 1981abd373..9d3bc135e2 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
@@ -597,7 +597,9 @@ public class CommitterOperatorTest extends 
CommitterOperatorTestBase {
         OperatorMetricGroup metricGroup = 
UnregisteredMetricsGroup.createOperatorMetricGroup();
         StoreCommitter committer =
                 new StoreCommitter(
-                        table, commit, Committer.createContext("", 
metricGroup, true, false, null));
+                        table,
+                        commit,
+                        Committer.createContext("", metricGroup, true, false, 
null, 1, 1));
         committer.commit(Collections.singletonList(manifestCommittable));
         CommitterMetrics metrics = committer.getCommitterMetrics();
         assertThat(metrics.getNumBytesOutCounter().getCount()).isEqualTo(533);

Reply via email to