This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 47174b99d [flink] Check parallelism of committer operator while 
runtime. (#3237)
47174b99d is described below

commit 47174b99d2a7461482bf55f41723aff2638b099e
Author: YeJunHao <[email protected]>
AuthorDate: Fri Apr 19 21:15:52 2024 +0800

    [flink] Check parallelism of committer operator while runtime. (#3237)
---
 .../flink/sink/cdc/FlinkCdcMultiTableSink.java     |  1 +
 .../paimon/flink/sink/CommitterOperator.java       | 11 ++++++++
 .../org/apache/paimon/flink/sink/FlinkSink.java    |  1 +
 .../flink/sink/MultiTablesCompactorSink.java       |  1 +
 .../paimon/flink/sink/CommitterOperatorTest.java   | 33 +++++++++++++++++++++-
 .../paimon/flink/sink/StoreMultiCommitterTest.java |  2 ++
 6 files changed, 48 insertions(+), 1 deletion(-)

diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
index 9cba78f21..cff31087b 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
@@ -127,6 +127,7 @@ public class FlinkCdcMultiTableSink implements Serializable 
{
                                 typeInfo,
                                 new CommitterOperator<>(
                                         true,
+                                        false,
                                         commitUser,
                                         createCommitterFactory(),
                                         createCommittableStateManager()))
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
index dd7dc05cf..7d637c3b7 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
@@ -18,6 +18,8 @@
 
 package org.apache.paimon.flink.sink;
 
+import org.apache.paimon.utils.Preconditions;
+
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
@@ -55,6 +57,9 @@ public class CommitterOperator<CommitT, GlobalCommitT> 
extends AbstractStreamOpe
      */
     private final boolean streamingCheckpointEnabled;
 
+    /** Whether to check the parallelism while runtime. */
+    private final boolean forceSingleParallelism;
+
     /**
      * This commitUser is valid only for new jobs. After the job starts, this 
commitUser will be
      * recorded into the states of write and commit operators. When the job 
restarts, commitUser
@@ -83,10 +88,12 @@ public class CommitterOperator<CommitT, GlobalCommitT> 
extends AbstractStreamOpe
 
     public CommitterOperator(
             boolean streamingCheckpointEnabled,
+            boolean forceSingleParallelism,
             String initialCommitUser,
             Committer.Factory<CommitT, GlobalCommitT> committerFactory,
             CommittableStateManager<GlobalCommitT> committableStateManager) {
         this.streamingCheckpointEnabled = streamingCheckpointEnabled;
+        this.forceSingleParallelism = forceSingleParallelism;
         this.initialCommitUser = initialCommitUser;
         this.committablesPerCheckpoint = new TreeMap<>();
         this.committerFactory = checkNotNull(committerFactory);
@@ -98,6 +105,10 @@ public class CommitterOperator<CommitT, GlobalCommitT> 
extends AbstractStreamOpe
     public void initializeState(StateInitializationContext context) throws 
Exception {
         super.initializeState(context);
 
+        Preconditions.checkArgument(
+                !forceSingleParallelism || 
getRuntimeContext().getNumberOfParallelSubtasks() == 1,
+                "Committer Operator parallelism in paimon MUST be one.");
+
         this.currentWatermark = Long.MIN_VALUE;
         this.endInput = false;
         // each job can only have one user name and this name must be 
consistent across restarts
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index fa4526897..7a52552ff 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -228,6 +228,7 @@ public abstract class FlinkSink<T> implements Serializable {
         OneInputStreamOperator<Committable, Committable> committerOperator =
                 new CommitterOperator<>(
                         streamingCheckpointEnabled,
+                        true,
                         commitUser,
                         createCommitterFactory(streamingCheckpointEnabled),
                         createCommittableStateManager());
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesCompactorSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesCompactorSink.java
index 7f70688ee..d638e1f69 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesCompactorSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesCompactorSink.java
@@ -128,6 +128,7 @@ public class MultiTablesCompactorSink implements 
Serializable {
                                 new MultiTableCommittableTypeInfo(),
                                 new CommitterOperator<>(
                                         streamingCheckpointEnabled,
+                                        false,
                                         commitUser,
                                         createCommitterFactory(),
                                         createCommittableStateManager()))
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
index 6cbb151e7..5a4329067 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
@@ -48,6 +48,7 @@ import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
@@ -651,6 +652,19 @@ public class CommitterOperatorTest extends 
CommitterOperatorTestBase {
         write.close();
     }
 
+    @Test
+    public void testParallelism() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+        String commitUser = UUID.randomUUID().toString();
+        OneInputStreamOperator<Committable, Committable> operator =
+                createCommitterOperator(table, commitUser, new 
NoopCommittableStateManager());
+        try (OneInputStreamOperatorTestHarness<Committable, Committable> 
testHarness =
+                createTestHarness(operator, 10, 10, 3)) {
+            Assertions.assertThatCode(testHarness::open)
+                    .hasMessage("Committer Operator parallelism in paimon MUST 
be one.");
+        }
+    }
+
     // ------------------------------------------------------------------------
     //  Test utils
     // ------------------------------------------------------------------------
@@ -682,10 +696,25 @@ public class CommitterOperatorTest extends 
CommitterOperatorTestBase {
 
     private OneInputStreamOperatorTestHarness<Committable, Committable> 
createTestHarness(
             OneInputStreamOperator<Committable, Committable> operator) throws 
Exception {
+        return createTestHarness(operator, 1, 1, 0);
+    }
+
+    private OneInputStreamOperatorTestHarness<Committable, Committable> 
createTestHarness(
+            OneInputStreamOperator<Committable, Committable> operator,
+            int maxParallelism,
+            int parallelism,
+            int subTaskIndex)
+            throws Exception {
         TypeSerializer<Committable> serializer =
                 new CommittableTypeInfo().createSerializer(new 
ExecutionConfig());
         OneInputStreamOperatorTestHarness<Committable, Committable> harness =
-                new OneInputStreamOperatorTestHarness<>(operator, serializer);
+                new OneInputStreamOperatorTestHarness<>(
+                        operator,
+                        maxParallelism,
+                        parallelism,
+                        subTaskIndex,
+                        serializer,
+                        new OperatorID());
         harness.setup(serializer);
         return harness;
     }
@@ -695,6 +724,7 @@ public class CommitterOperatorTest extends 
CommitterOperatorTestBase {
             String commitUser,
             CommittableStateManager<ManifestCommittable> 
committableStateManager) {
         return new CommitterOperator<>(
+                true,
                 true,
                 commitUser == null ? initialCommitUser : commitUser,
                 (user, metricGroup) ->
@@ -710,6 +740,7 @@ public class CommitterOperatorTest extends 
CommitterOperatorTestBase {
             CommittableStateManager<ManifestCommittable> 
committableStateManager,
             ThrowingConsumer<StateInitializationContext, Exception> 
initializeFunction) {
         return new CommitterOperator<Committable, ManifestCommittable>(
+                true,
                 true,
                 commitUser == null ? initialCommitUser : commitUser,
                 (user, metricGroup) ->
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
index cc7bb27a9..6b449ff0c 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
@@ -636,6 +636,7 @@ class StoreMultiCommitterTest {
         CommitterOperator<MultiTableCommittable, WrappedManifestCommittable> 
operator =
                 new CommitterOperator<>(
                         true,
+                        false,
                         initialCommitUser,
                         (user, metricGroup) ->
                                 new StoreMultiCommitter(
@@ -652,6 +653,7 @@ class StoreMultiCommitterTest {
         CommitterOperator<MultiTableCommittable, WrappedManifestCommittable> 
operator =
                 new CommitterOperator<>(
                         true,
+                        false,
                         initialCommitUser,
                         (user, metricGroup) ->
                                 new StoreMultiCommitter(

Reply via email to