This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 47174b99d [flink] Check parallelism of committer operator while
runtime. (#3237)
47174b99d is described below
commit 47174b99d2a7461482bf55f41723aff2638b099e
Author: YeJunHao <[email protected]>
AuthorDate: Fri Apr 19 21:15:52 2024 +0800
[flink] Check parallelism of committer operator while runtime. (#3237)
---
.../flink/sink/cdc/FlinkCdcMultiTableSink.java | 1 +
.../paimon/flink/sink/CommitterOperator.java | 11 ++++++++
.../org/apache/paimon/flink/sink/FlinkSink.java | 1 +
.../flink/sink/MultiTablesCompactorSink.java | 1 +
.../paimon/flink/sink/CommitterOperatorTest.java | 33 +++++++++++++++++++++-
.../paimon/flink/sink/StoreMultiCommitterTest.java | 2 ++
6 files changed, 48 insertions(+), 1 deletion(-)
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
index 9cba78f21..cff31087b 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
@@ -127,6 +127,7 @@ public class FlinkCdcMultiTableSink implements Serializable
{
typeInfo,
new CommitterOperator<>(
true,
+ false,
commitUser,
createCommitterFactory(),
createCommittableStateManager()))
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
index dd7dc05cf..7d637c3b7 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
@@ -18,6 +18,8 @@
package org.apache.paimon.flink.sink;
+import org.apache.paimon.utils.Preconditions;
+
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
@@ -55,6 +57,9 @@ public class CommitterOperator<CommitT, GlobalCommitT>
extends AbstractStreamOpe
*/
private final boolean streamingCheckpointEnabled;
+ /** Whether to check the parallelism while runtime. */
+ private final boolean forceSingleParallelism;
+
/**
* This commitUser is valid only for new jobs. After the job starts, this
commitUser will be
* recorded into the states of write and commit operators. When the job
restarts, commitUser
@@ -83,10 +88,12 @@ public class CommitterOperator<CommitT, GlobalCommitT>
extends AbstractStreamOpe
public CommitterOperator(
boolean streamingCheckpointEnabled,
+ boolean forceSingleParallelism,
String initialCommitUser,
Committer.Factory<CommitT, GlobalCommitT> committerFactory,
CommittableStateManager<GlobalCommitT> committableStateManager) {
this.streamingCheckpointEnabled = streamingCheckpointEnabled;
+ this.forceSingleParallelism = forceSingleParallelism;
this.initialCommitUser = initialCommitUser;
this.committablesPerCheckpoint = new TreeMap<>();
this.committerFactory = checkNotNull(committerFactory);
@@ -98,6 +105,10 @@ public class CommitterOperator<CommitT, GlobalCommitT>
extends AbstractStreamOpe
public void initializeState(StateInitializationContext context) throws
Exception {
super.initializeState(context);
+ Preconditions.checkArgument(
+ !forceSingleParallelism ||
getRuntimeContext().getNumberOfParallelSubtasks() == 1,
+ "Committer Operator parallelism in paimon MUST be one.");
+
this.currentWatermark = Long.MIN_VALUE;
this.endInput = false;
// each job can only have one user name and this name must be
consistent across restarts
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index fa4526897..7a52552ff 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -228,6 +228,7 @@ public abstract class FlinkSink<T> implements Serializable {
OneInputStreamOperator<Committable, Committable> committerOperator =
new CommitterOperator<>(
streamingCheckpointEnabled,
+ true,
commitUser,
createCommitterFactory(streamingCheckpointEnabled),
createCommittableStateManager());
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesCompactorSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesCompactorSink.java
index 7f70688ee..d638e1f69 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesCompactorSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesCompactorSink.java
@@ -128,6 +128,7 @@ public class MultiTablesCompactorSink implements
Serializable {
new MultiTableCommittableTypeInfo(),
new CommitterOperator<>(
streamingCheckpointEnabled,
+ false,
commitUser,
createCommitterFactory(),
createCommittableStateManager()))
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
index 6cbb151e7..5a4329067 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
@@ -48,6 +48,7 @@ import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
@@ -651,6 +652,19 @@ public class CommitterOperatorTest extends
CommitterOperatorTestBase {
write.close();
}
+ @Test
+ public void testParallelism() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+ String commitUser = UUID.randomUUID().toString();
+ OneInputStreamOperator<Committable, Committable> operator =
+ createCommitterOperator(table, commitUser, new
NoopCommittableStateManager());
+ try (OneInputStreamOperatorTestHarness<Committable, Committable>
testHarness =
+ createTestHarness(operator, 10, 10, 3)) {
+ Assertions.assertThatCode(testHarness::open)
+ .hasMessage("Committer Operator parallelism in paimon MUST
be one.");
+ }
+ }
+
// ------------------------------------------------------------------------
// Test utils
// ------------------------------------------------------------------------
@@ -682,10 +696,25 @@ public class CommitterOperatorTest extends
CommitterOperatorTestBase {
private OneInputStreamOperatorTestHarness<Committable, Committable>
createTestHarness(
OneInputStreamOperator<Committable, Committable> operator) throws
Exception {
+ return createTestHarness(operator, 1, 1, 0);
+ }
+
+ private OneInputStreamOperatorTestHarness<Committable, Committable>
createTestHarness(
+ OneInputStreamOperator<Committable, Committable> operator,
+ int maxParallelism,
+ int parallelism,
+ int subTaskIndex)
+ throws Exception {
TypeSerializer<Committable> serializer =
new CommittableTypeInfo().createSerializer(new
ExecutionConfig());
OneInputStreamOperatorTestHarness<Committable, Committable> harness =
- new OneInputStreamOperatorTestHarness<>(operator, serializer);
+ new OneInputStreamOperatorTestHarness<>(
+ operator,
+ maxParallelism,
+ parallelism,
+ subTaskIndex,
+ serializer,
+ new OperatorID());
harness.setup(serializer);
return harness;
}
@@ -695,6 +724,7 @@ public class CommitterOperatorTest extends
CommitterOperatorTestBase {
String commitUser,
CommittableStateManager<ManifestCommittable>
committableStateManager) {
return new CommitterOperator<>(
+ true,
true,
commitUser == null ? initialCommitUser : commitUser,
(user, metricGroup) ->
@@ -710,6 +740,7 @@ public class CommitterOperatorTest extends
CommitterOperatorTestBase {
CommittableStateManager<ManifestCommittable>
committableStateManager,
ThrowingConsumer<StateInitializationContext, Exception>
initializeFunction) {
return new CommitterOperator<Committable, ManifestCommittable>(
+ true,
true,
commitUser == null ? initialCommitUser : commitUser,
(user, metricGroup) ->
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
index cc7bb27a9..6b449ff0c 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
@@ -636,6 +636,7 @@ class StoreMultiCommitterTest {
CommitterOperator<MultiTableCommittable, WrappedManifestCommittable>
operator =
new CommitterOperator<>(
true,
+ false,
initialCommitUser,
(user, metricGroup) ->
new StoreMultiCommitter(
@@ -652,6 +653,7 @@ class StoreMultiCommitterTest {
CommitterOperator<MultiTableCommittable, WrappedManifestCommittable>
operator =
new CommitterOperator<>(
true,
+ false,
initialCommitUser,
(user, metricGroup) ->
new StoreMultiCommitter(