This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch release-1.20 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.20 by this push: new 5a151f6c49e [FLINK-37747][runtime] Use old subtask count for restored committable objects. 5a151f6c49e is described below commit 5a151f6c49e2483ed225bf007e818f95c3a2eae8 Author: Diego de Souza <dieggol...@gmail.com> AuthorDate: Fri Sep 19 14:41:17 2025 +0100 [FLINK-37747][runtime] Use old subtask count for restored committable objects. * [FLINK-37747][runtime] Use old subtask count for restored committable objects * trying to adapt test * Attempt to backport SinkV2ITCase from FLINK-37747 * Add more test use cases for SinkV2ITCase * Fix SinkV2ITCase test * Setup TestSinkV2WithPostCommitTopology global committer * Rename test SinkV2ITCase streaming source to be more specific --------- Co-authored-by: Eric Nascimento <ericsn2...@gmail.com> Co-authored-by: David Wang <dwa...@atlassian.com> --- .../runtime/operators/sink/CommitterOperator.java | 7 +- .../operators/sink/CommitterOperatorTestBase.java | 42 ++--- .../runtime/operators/sink/TestSinkV2.java | 2 +- .../flink/test/streaming/runtime/SinkV2ITCase.java | 204 ++++++++++++++++++++- 4 files changed, 227 insertions(+), 28 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java index 2f766a341cb..9fdff9b3688 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java @@ -180,7 +180,12 @@ class CommitterOperator<CommT> extends AbstractStreamOperator<CommittableMessage private void emit(CheckpointCommittableManager<CommT> committableManager) { int subtaskId = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(); - int numberOfSubtasks = getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks(); + // Ensure that numberOfSubtasks is in sync with the number of actually emitted + // CommittableSummaries during upscaling recovery (see FLINK-37747). + int numberOfSubtasks = + Math.min( + getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks(), + committableManager.getNumberOfSubtasks()); long checkpointId = committableManager.getCheckpointId(); Collection<CommT> committables = committableManager.getSuccessfulCommittables(); output.collect( diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java index ee58c8b94ac..58115a18b0d 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java @@ -31,6 +31,7 @@ import org.assertj.core.api.AbstractThrowableAssert; import org.assertj.core.api.ListAssert; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.ValueSource; import java.util.function.IntSupplier; @@ -91,7 +92,7 @@ abstract class CommitterOperatorTestBase { SinkAndCounters sinkAndCounters = sinkWithPostCommit(); final OneInputStreamOperatorTestHarness< CommittableMessage<String>, CommittableMessage<String>> - testHarness = createTestHarness(sinkAndCounters.sink, false, true); + testHarness = createTestHarness(sinkAndCounters.sink, false, true, 1, 1, 0); testHarness.open(); testHarness.setProcessingTime(0); @@ -125,11 +126,13 @@ abstract class CommitterOperatorTestBase { testHarness.close(); } - @Test - void testStateRestore() throws Exception { + @ParameterizedTest + @CsvSource({"1, 10, 9", "2, 1, 0", "2, 2, 1"}) + void testStateRestoreWithScaling( + int parallelismBeforeScaling, int parallelismAfterScaling, int subtaskIdAfterRecovery) + throws Exception { final int originalSubtaskId = 0; - final int subtaskIdAfterRecovery = 9; final OneInputStreamOperatorTestHarness< CommittableMessage<String>, CommittableMessage<String>> @@ -138,8 +141,8 @@ abstract class CommitterOperatorTestBase { sinkWithPostCommitWithRetry().sink, false, true, - 1, - 1, + parallelismBeforeScaling, + parallelismBeforeScaling, originalSubtaskId); testHarness.open(); @@ -148,7 +151,8 @@ abstract class CommitterOperatorTestBase { long checkpointId = 0L; final CommittableSummary<String> committableSummary = - new CommittableSummary<>(originalSubtaskId, 1, checkpointId, 1, 1, 0); + new CommittableSummary<>( + originalSubtaskId, parallelismBeforeScaling, checkpointId, 1, 1, 0); testHarness.processElement(new StreamRecord<>(committableSummary)); final CommittableWithLineage<String> first = new CommittableWithLineage<>("1", checkpointId, originalSubtaskId); @@ -156,7 +160,8 @@ abstract class CommitterOperatorTestBase { // another committable for the same checkpointId but from different subtask. final CommittableSummary<String> committableSummary2 = - new CommittableSummary<>(originalSubtaskId + 1, 1, checkpointId, 1, 1, 0); + new CommittableSummary<>( + originalSubtaskId + 1, parallelismBeforeScaling, checkpointId, 1, 1, 0); testHarness.processElement(new StreamRecord<>(committableSummary2)); final CommittableWithLineage<String> second = new CommittableWithLineage<>("2", checkpointId, originalSubtaskId + 1); @@ -174,7 +179,12 @@ abstract class CommitterOperatorTestBase { CommittableMessage<String>, CommittableMessage<String>> restored = createTestHarness( - sinkAndCounters.sink, false, true, 10, 10, subtaskIdAfterRecovery); + sinkAndCounters.sink, + false, + true, + parallelismAfterScaling, + parallelismAfterScaling, + subtaskIdAfterRecovery); restored.initializeState(snapshot); restored.open(); @@ -188,7 +198,8 @@ abstract class CommitterOperatorTestBase { .hasSubtaskId(subtaskIdAfterRecovery) .hasFailedCommittables(0) .hasOverallCommittables(2) - .hasPendingCommittables(0); + .hasPendingCommittables(0) + .hasNumberOfSubtasks(Math.min(parallelismBeforeScaling, parallelismAfterScaling)); // Expect the same checkpointId that the original snapshot was made with. records.element(1, as(committableWithLineage())) @@ -303,17 +314,6 @@ abstract class CommitterOperatorTestBase { testHarness.close(); } - private OneInputStreamOperatorTestHarness< - CommittableMessage<String>, CommittableMessage<String>> - createTestHarness( - SupportsCommitter<String> sink, - boolean isBatchMode, - boolean isCheckpointingEnabled) - throws Exception { - return new OneInputStreamOperatorTestHarness<>( - new CommitterOperatorFactory<>(sink, isBatchMode, isCheckpointingEnabled)); - } - private OneInputStreamOperatorTestHarness< CommittableMessage<String>, CommittableMessage<String>> createTestHarness( diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSinkV2.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSinkV2.java index 0a34d157861..6e2c3bf2004 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSinkV2.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSinkV2.java @@ -243,7 +243,7 @@ public class TestSinkV2<InputT> implements Sink<InputT> { @Override public void addPostCommitTopology(DataStream<CommittableMessage<String>> committables) { StandardSinkTopologies.addGlobalCommitter( - committables, DefaultCommitter::new, this::getCommittableSerializer); + committables, () -> createCommitter(null), this::getCommittableSerializer); } } diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java index 1695694a04e..92690b73f43 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java @@ -17,39 +17,72 @@ package org.apache.flink.test.streaming.runtime; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.state.CheckpointListener; import org.apache.flink.api.common.typeinfo.IntegerTypeInfo; import org.apache.flink.api.connector.sink2.Committer; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.util.ratelimit.GatedRateLimiter; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimiter; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.configuration.ExternalizedCheckpointRetention; +import org.apache.flink.configuration.RestartStrategyOptions; +import org.apache.flink.configuration.StateBackendOptions; +import org.apache.flink.configuration.StateRecoveryOptions; +import org.apache.flink.connector.datagen.source.DataGeneratorSource; +import org.apache.flink.runtime.messages.FlinkJobNotFoundException; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.testutils.CommonTestUtils; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.graph.StreamEdge; import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.streaming.api.graph.StreamNode; import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2; +import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2.DefaultCommitter; import org.apache.flink.streaming.util.FiniteTestSource; -import org.apache.flink.test.util.AbstractTestBaseJUnit4; +import org.apache.flink.test.junit5.InjectClusterClient; +import org.apache.flink.test.junit5.InjectMiniCluster; +import org.apache.flink.test.util.AbstractTestBase; -import org.junit.Before; -import org.junit.Test; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import java.io.File; import java.io.Serializable; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.Queue; +import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BooleanSupplier; import java.util.function.Supplier; import java.util.stream.Collectors; +import java.util.stream.IntStream; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.is; /** * Integration test for {@link org.apache.flink.api.connector.sink.Sink} run time implementation. */ -public class SinkV2ITCase extends AbstractTestBaseJUnit4 { +public class SinkV2ITCase extends AbstractTestBase { static final List<Integer> SOURCE_DATA = Arrays.asList( 895, 127, 148, 161, 148, 662, 822, 491, 275, 122, 850, 630, 682, 765, 434, 970, @@ -80,7 +113,7 @@ public class SinkV2ITCase extends AbstractTestBaseJUnit4 { (BooleanSupplier & Serializable) () -> COMMIT_QUEUE.size() == STREAMING_SOURCE_SEND_ELEMENTS_NUM; - @Before + @BeforeEach public void init() { COMMIT_QUEUE.clear(); } @@ -189,6 +222,76 @@ public class SinkV2ITCase extends AbstractTestBaseJUnit4 { .toArray())); } + @ParameterizedTest + @CsvSource({"1, 2", "2, 1", "1, 1"}) + public void writerAndCommitterExecuteInStreamingModeWithScaling( + int initialParallelism, + int scaledParallelism, + @TempDir File checkpointDir, + @InjectMiniCluster MiniCluster miniCluster, + @InjectClusterClient ClusterClient<?> clusterClient) + throws Exception { + final DefaultCommitter committer = + new DefaultCommitter( + (Supplier<Queue<Committer.CommitRequest<String>>> & Serializable) + () -> COMMIT_QUEUE); + final Configuration config = createConfigForScalingTest(checkpointDir, initialParallelism); + + // first run + final JobID jobID = runStreamingWithScalingTest(config, true, committer, clusterClient); + + // second run + config.set(StateRecoveryOptions.SAVEPOINT_PATH, getCheckpointPath(miniCluster, jobID)); + config.set(CoreOptions.DEFAULT_PARALLELISM, scaledParallelism); + runStreamingWithScalingTest(config, false, committer, clusterClient); + + assertThat( + COMMIT_QUEUE.stream() + .map(Committer.CommitRequest::getCommittable) + .collect(Collectors.toList()), + containsInAnyOrder(duplicate(EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE).toArray())); + } + + private static List<String> duplicate(List<String> values) { + return IntStream.range(0, 2) + .boxed() + .flatMap(i -> values.stream()) + .collect(Collectors.toList()); + } + + private JobID runStreamingWithScalingTest( + Configuration config, + boolean shouldMapperFail, + DefaultCommitter committer, + ClusterClient<?> clusterClient) + throws Exception { + final StreamExecutionEnvironment env = buildStreamEnvWithCheckpointDir(config); + final Source<Integer, ?, ?> source = createStreamingSourceForScalingTest(); + + env.fromSource(source, WatermarkStrategy.noWatermarks(), "source") + .rebalance() + .map(new FailingCheckpointMapper(!shouldMapperFail)) + .sinkTo( + TestSinkV2.<Integer>newBuilder() + .setCommitter(committer) + .setWithPostCommitTopology(true) + .build()); + + final JobID jobId = clusterClient.submitJob(env.getStreamGraph().getJobGraph()).get(); + clusterClient.requestJobResult(jobId).get(); + + return jobId; + } + + private String getCheckpointPath(MiniCluster miniCluster, JobID secondJobId) + throws InterruptedException, ExecutionException, FlinkJobNotFoundException { + final Optional<String> completedCheckpoint = + CommonTestUtils.getLatestCompletedCheckpointPath(secondJobId, miniCluster); + + assertThat(completedCheckpoint.isPresent(), is(true)); + return completedCheckpoint.get(); + } + private StreamExecutionEnvironment buildStreamEnv() { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.STREAMING); @@ -196,12 +299,35 @@ public class SinkV2ITCase extends AbstractTestBaseJUnit4 { return env; } + private StreamExecutionEnvironment buildStreamEnvWithCheckpointDir(Configuration config) { + final StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(config); + env.setRuntimeMode(RuntimeExecutionMode.STREAMING); + env.enableCheckpointing(100); + + return env; + } + private StreamExecutionEnvironment buildBatchEnv() { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.BATCH); return env; } + private Configuration createConfigForScalingTest(File checkpointDir, int parallelism) { + final Configuration config = new Configuration(); + config.set(CoreOptions.DEFAULT_PARALLELISM, parallelism); + config.set(StateBackendOptions.STATE_BACKEND, "hashmap"); + config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString()); + config.set( + CheckpointingOptions.EXTERNALIZED_CHECKPOINT_RETENTION, + ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION); + config.set(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, 2000); + config.set(RestartStrategyOptions.RESTART_STRATEGY, "disable"); + + return config; + } + private void executeAndVerifyStreamGraph(StreamExecutionEnvironment env) throws Exception { StreamGraph streamGraph = env.getStreamGraph(); assertNoUnalignedCheckpointInSink(streamGraph); @@ -227,4 +353,72 @@ public class SinkV2ITCase extends AbstractTestBaseJUnit4 { .allMatch(StreamEdge::supportsUnalignedCheckpoints) .isNotEmpty(); } + + /** + * A stream source that: 1) emits a list of elements without allowing checkpoints, 2) then waits + * for two more checkpoints to complete, 3) then re-emits the same elements before 4) waiting + * for another two checkpoints and 5) exiting. + */ + private Source<Integer, ?, ?> createStreamingSourceForScalingTest() { + RateLimiterStrategy rateLimiterStrategy = + parallelism -> new BurstingRateLimiter(SOURCE_DATA.size() / 4, 2); + return new DataGeneratorSource<>( + l -> SOURCE_DATA.get(l.intValue() % SOURCE_DATA.size()), + SOURCE_DATA.size() * 2L, + rateLimiterStrategy, + IntegerTypeInfo.INT_TYPE_INFO); + } + + private static class BurstingRateLimiter implements RateLimiter { + private final RateLimiter rateLimiter; + private final int numCheckpointCooldown; + private int cooldown; + + public BurstingRateLimiter(int recordPerCycle, int numCheckpointCooldown) { + rateLimiter = new GatedRateLimiter(recordPerCycle); + this.numCheckpointCooldown = numCheckpointCooldown; + } + + @Override + public CompletionStage<Void> acquire() { + CompletionStage<Void> stage = rateLimiter.acquire(); + cooldown = numCheckpointCooldown; + return stage; + } + + @Override + public void notifyCheckpointComplete(long checkpointId) { + if (cooldown-- <= 0) { + rateLimiter.notifyCheckpointComplete(checkpointId); + } + } + } + + private static class FailingCheckpointMapper + implements MapFunction<Integer, Integer>, CheckpointListener { + + private static final AtomicBoolean failed = new AtomicBoolean(false); + private long lastCheckpointId = 0; + private int emittedBetweenCheckpoint = 0; + + FailingCheckpointMapper(boolean failed) { + FailingCheckpointMapper.failed.set(failed); + } + + @Override + public Integer map(Integer value) { + if (lastCheckpointId >= 1 && emittedBetweenCheckpoint > 0 && !failed.get()) { + failed.set(true); + throw new RuntimeException("Planned exception."); + } + emittedBetweenCheckpoint++; + return value; + } + + @Override + public void notifyCheckpointComplete(long checkpointId) { + lastCheckpointId = checkpointId; + emittedBetweenCheckpoint = 0; + } + } }