This is an automated email from the ASF dual-hosted git repository.
arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new fce5cb34aec [FLINK-37747][runtime] Use old subtask count for restored
committable objects (#26518)
fce5cb34aec is described below
commit fce5cb34aec2f384b551677f69b272f9d36a7ca0
Author: David Wang <[email protected]>
AuthorDate: Thu May 8 16:27:48 2025 +1000
[FLINK-37747][runtime] Use old subtask count for restored committable
objects (#26518)
---------
Co-authored-by: David Wang <[email protected]>
---
.../runtime/operators/sink/CommitterOperator.java | 7 +-
.../sink/SinkV2CommitterOperatorTest.java | 43 +++---
.../flink/test/streaming/runtime/SinkV2ITCase.java | 155 +++++++++++++++++++++
3 files changed, 183 insertions(+), 22 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
index 2f1cffcbcce..4a640fedf75 100644
---
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
+++
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
@@ -183,7 +183,12 @@ class CommitterOperator<CommT> extends
AbstractStreamOperator<CommittableMessage
private void emit(CheckpointCommittableManager<CommT> committableManager) {
int subtaskId =
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
- int numberOfSubtasks =
getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks();
+ // Ensure that numberOfSubtasks is in sync with the number of actually
emitted
+ // CommittableSummaries during upscaling recovery (see FLINK-37747).
+ int numberOfSubtasks =
+ Math.min(
+
getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks(),
+ committableManager.getNumberOfSubtasks());
long checkpointId = committableManager.getCheckpointId();
Collection<CommT> committables =
committableManager.getSuccessfulCommittables();
output.collect(
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java
index 94a22fb68bf..03a0c791b3a 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java
@@ -36,6 +36,7 @@ import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.ListAssert;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
import java.util.Collection;
@@ -131,7 +132,7 @@ class SinkV2CommitterOperatorTest {
SinkAndCounters sinkAndCounters = sinkWithPostCommit();
final OneInputStreamOperatorTestHarness<
CommittableMessage<String>, CommittableMessage<String>>
- testHarness = createTestHarness(sinkAndCounters.sink, false,
true);
+ testHarness = createTestHarness(sinkAndCounters.sink, false,
true, 1, 1, 0);
testHarness.open();
testHarness.setProcessingTime(0);
@@ -164,11 +165,13 @@ class SinkV2CommitterOperatorTest {
testHarness.close();
}
- @Test
- void testStateRestore() throws Exception {
+ @ParameterizedTest
+ @CsvSource({"1, 10, 9", "2, 1, 0", "2, 2, 1"})
+ void testStateRestoreWithScaling(
+ int parallelismBeforeScaling, int parallelismAfterScaling, int
subtaskIdAfterRecovery)
+ throws Exception {
final int originalSubtaskId = 0;
- final int subtaskIdAfterRecovery = 9;
final OneInputStreamOperatorTestHarness<
CommittableMessage<String>, CommittableMessage<String>>
@@ -177,8 +180,8 @@ class SinkV2CommitterOperatorTest {
sinkWithPostCommitWithRetry().sink,
false,
true,
- 1,
- 1,
+ parallelismBeforeScaling,
+ parallelismBeforeScaling,
originalSubtaskId);
testHarness.open();
@@ -187,7 +190,8 @@ class SinkV2CommitterOperatorTest {
long checkpointId = 0L;
final CommittableSummary<String> committableSummary =
- new CommittableSummary<>(originalSubtaskId, 1, checkpointId,
1, 0);
+ new CommittableSummary<>(
+ originalSubtaskId, parallelismBeforeScaling,
checkpointId, 1, 0);
testHarness.processElement(new StreamRecord<>(committableSummary));
final CommittableWithLineage<String> first =
new CommittableWithLineage<>("1", checkpointId,
originalSubtaskId);
@@ -195,7 +199,8 @@ class SinkV2CommitterOperatorTest {
// another committable for the same checkpointId but from different
subtask.
final CommittableSummary<String> committableSummary2 =
- new CommittableSummary<>(originalSubtaskId + 1, 1,
checkpointId, 1, 0);
+ new CommittableSummary<>(
+ originalSubtaskId + 1, parallelismBeforeScaling,
checkpointId, 1, 0);
testHarness.processElement(new StreamRecord<>(committableSummary2));
final CommittableWithLineage<String> second =
new CommittableWithLineage<>("2", checkpointId,
originalSubtaskId + 1);
@@ -213,7 +218,12 @@ class SinkV2CommitterOperatorTest {
CommittableMessage<String>, CommittableMessage<String>>
restoredHarness =
createTestHarness(
- restored.sink, false, true, 10, 10,
subtaskIdAfterRecovery);
+ restored.sink,
+ false,
+ true,
+ parallelismAfterScaling,
+ parallelismAfterScaling,
+ subtaskIdAfterRecovery);
restoredHarness.initializeState(snapshot);
restoredHarness.open();
@@ -226,7 +236,9 @@ class SinkV2CommitterOperatorTest {
records.element(0, as(committableSummary()))
.hasCheckpointId(checkpointId)
.hasFailedCommittables(0)
- .hasSubtaskId(subtaskIdAfterRecovery);
+ .hasSubtaskId(subtaskIdAfterRecovery)
+ .hasNumberOfSubtasks(
+ Math.min(parallelismBeforeScaling,
parallelismAfterScaling));
objectCommittableSummaryAssert.hasOverallCommittables(2);
// Expect the same checkpointId that the original snapshot was made
with.
@@ -314,17 +326,6 @@ class SinkV2CommitterOperatorTest {
}
}
- private OneInputStreamOperatorTestHarness<
- CommittableMessage<String>, CommittableMessage<String>>
- createTestHarness(
- SupportsCommitter<String> sink,
- boolean isBatchMode,
- boolean isCheckpointingEnabled)
- throws Exception {
- return new OneInputStreamOperatorTestHarness<>(
- new CommitterOperatorFactory<>(sink, isBatchMode,
isCheckpointingEnabled));
- }
-
private OneInputStreamOperatorTestHarness<
CommittableMessage<String>, CommittableMessage<String>>
createTestHarness(
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java
index 88e89e3185e..cbae6342184 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java
@@ -17,37 +17,61 @@
package org.apache.flink.test.streaming.runtime;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.typeinfo.IntegerTypeInfo;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.util.ratelimit.GatedRateLimiter;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiter;
import
org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.ExternalizedCheckpointRetention;
+import org.apache.flink.configuration.RestartStrategyOptions;
+import org.apache.flink.configuration.StateBackendOptions;
+import org.apache.flink.configuration.StateRecoveryOptions;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2;
import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2.Record;
import
org.apache.flink.streaming.runtime.operators.sink.TestSinkV2.RecordSerializer;
+import org.apache.flink.test.junit5.InjectClusterClient;
+import org.apache.flink.test.junit5.InjectMiniCluster;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.testutils.junit.SharedObjectsExtension;
import org.apache.flink.testutils.junit.SharedReference;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import static org.assertj.core.api.Assertions.assertThat;
@@ -125,6 +149,49 @@ public class SinkV2ITCase extends AbstractTestBase {
return r.withValue(-r.getValue());
}
+ @ParameterizedTest
+ @CsvSource({"1, 2", "2, 1", "1, 1"})
+ public void writerAndCommitterExecuteInStreamingModeWithScaling(
+ int initialParallelism,
+ int scaledParallelism,
+ @TempDir File checkpointDir,
+ @InjectMiniCluster MiniCluster miniCluster,
+ @InjectClusterClient ClusterClient<?> clusterClient)
+ throws Exception {
+ SharedReference<Queue<Committer.CommitRequest<Record<Integer>>>>
committed =
+ SHARED_OBJECTS.add(new ConcurrentLinkedQueue<>());
+ final TrackingCommitter trackingCommitter = new
TrackingCommitter(committed);
+ final Configuration config = createConfigForScalingTest(checkpointDir,
initialParallelism);
+
+ // first run
+ final JobID jobID =
+ runStreamingWithScalingTest(
+ config,
+ initialParallelism,
+ trackingCommitter,
+ true,
+ miniCluster,
+ clusterClient);
+
+ // second run
+ config.set(StateRecoveryOptions.SAVEPOINT_PATH,
getCheckpointPath(miniCluster, jobID));
+ config.set(CoreOptions.DEFAULT_PARALLELISM, scaledParallelism);
+ runStreamingWithScalingTest(
+ config, initialParallelism, trackingCommitter, false,
miniCluster, clusterClient);
+
+ assertThat(committed.get())
+ .extracting(Committer.CommitRequest::getCommittable)
+ .containsExactlyInAnyOrderElementsOf(
+ duplicate(EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE));
+ }
+
+ private static List<Record<Integer>> duplicate(List<Record<Integer>>
values) {
+ return IntStream.range(0, 2)
+ .boxed()
+ .flatMap(i -> values.stream())
+ .collect(Collectors.toList());
+ }
+
@Test
public void writerAndCommitterExecuteInBatchMode() throws Exception {
final StreamExecutionEnvironment env = buildBatchEnv();
@@ -184,6 +251,66 @@ public class SinkV2ITCase extends AbstractTestBase {
return env;
}
+ private Configuration createConfigForScalingTest(File checkpointDir, int
parallelism) {
+ final Configuration config = new Configuration();
+ config.set(CoreOptions.DEFAULT_PARALLELISM, parallelism);
+ config.set(StateBackendOptions.STATE_BACKEND, "hashmap");
+ config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY,
checkpointDir.toURI().toString());
+ config.set(
+ CheckpointingOptions.EXTERNALIZED_CHECKPOINT_RETENTION,
+ ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION);
+ config.set(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, 2000);
+ config.set(RestartStrategyOptions.RESTART_STRATEGY, "disable");
+
+ return config;
+ }
+
+ private StreamExecutionEnvironment
buildStreamEnvWithCheckpointDir(Configuration config) {
+ final StreamExecutionEnvironment env =
+ StreamExecutionEnvironment.getExecutionEnvironment(config);
+ env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
+ env.enableCheckpointing(100);
+
+ return env;
+ }
+
+ private JobID runStreamingWithScalingTest(
+ Configuration config,
+ int parallelism,
+ TrackingCommitter trackingCommitter,
+ boolean shouldMapperFail,
+ MiniCluster miniCluster,
+ ClusterClient<?> clusterClient)
+ throws Exception {
+ final StreamExecutionEnvironment env =
buildStreamEnvWithCheckpointDir(config);
+ final Source<Integer, ?, ?> source = createStreamingSource();
+
+ env.fromSource(source, WatermarkStrategy.noWatermarks(), "source")
+ .rebalance()
+ .map(
+ new FailingCheckpointMapper(
+ SHARED_OBJECTS.add(new
AtomicBoolean(!shouldMapperFail))))
+ .sinkTo(
+ TestSinkV2.<Integer>newBuilder()
+ .setCommitter(trackingCommitter,
RecordSerializer::new)
+ .setWithPostCommitTopology(true)
+ .build());
+
+ final JobID jobId =
clusterClient.submitJob(env.getStreamGraph().getJobGraph()).get();
+ clusterClient.requestJobResult(jobId).get();
+
+ return jobId;
+ }
+
+ private String getCheckpointPath(MiniCluster miniCluster, JobID
secondJobId)
+ throws InterruptedException, ExecutionException,
FlinkJobNotFoundException {
+ final Optional<String> completedCheckpoint =
+ CommonTestUtils.getLatestCompletedCheckpointPath(secondJobId,
miniCluster);
+
+ assertThat(completedCheckpoint).isPresent();
+ return completedCheckpoint.get();
+ }
+
private StreamExecutionEnvironment buildBatchEnv() {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
@@ -245,4 +372,32 @@ public class SinkV2ITCase extends AbstractTestBase {
@Override
public void close() {}
}
+
+ private static class FailingCheckpointMapper
+ implements MapFunction<Integer, Integer>, CheckpointListener {
+
+ private final SharedReference<AtomicBoolean> failed;
+ private long lastCheckpointId = 0;
+ private int emittedBetweenCheckpoint = 0;
+
+ FailingCheckpointMapper(SharedReference<AtomicBoolean> failed) {
+ this.failed = failed;
+ }
+
+ @Override
+ public Integer map(Integer value) {
+ if (lastCheckpointId >= 1 && emittedBetweenCheckpoint > 0 &&
!failed.get().get()) {
+ failed.get().set(true);
+ throw new RuntimeException("Planned exception.");
+ }
+ emittedBetweenCheckpoint++;
+ return value;
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {
+ lastCheckpointId = checkpointId;
+ emittedBetweenCheckpoint = 0;
+ }
+ }
}