This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new fce5cb34aec [FLINK-37747][runtime] Use old subtask count for restored 
committable objects (#26518)
fce5cb34aec is described below

commit fce5cb34aec2f384b551677f69b272f9d36a7ca0
Author: David Wang <[email protected]>
AuthorDate: Thu May 8 16:27:48 2025 +1000

    [FLINK-37747][runtime] Use old subtask count for restored committable 
objects (#26518)
    
    
    ---------
    
    Co-authored-by: David Wang <[email protected]>
---
 .../runtime/operators/sink/CommitterOperator.java  |   7 +-
 .../sink/SinkV2CommitterOperatorTest.java          |  43 +++---
 .../flink/test/streaming/runtime/SinkV2ITCase.java | 155 +++++++++++++++++++++
 3 files changed, 183 insertions(+), 22 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
index 2f1cffcbcce..4a640fedf75 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
@@ -183,7 +183,12 @@ class CommitterOperator<CommT> extends 
AbstractStreamOperator<CommittableMessage
 
     private void emit(CheckpointCommittableManager<CommT> committableManager) {
         int subtaskId = 
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
-        int numberOfSubtasks = 
getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks();
+        // Ensure that numberOfSubtasks is in sync with the number of actually 
emitted
+        // CommittableSummaries during upscaling recovery (see FLINK-37747).
+        int numberOfSubtasks =
+                Math.min(
+                        
getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks(),
+                        committableManager.getNumberOfSubtasks());
         long checkpointId = committableManager.getCheckpointId();
         Collection<CommT> committables = 
committableManager.getSuccessfulCommittables();
         output.collect(
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java
index 94a22fb68bf..03a0c791b3a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java
@@ -36,6 +36,7 @@ import org.assertj.core.api.AbstractThrowableAssert;
 import org.assertj.core.api.ListAssert;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
 import org.junit.jupiter.params.provider.ValueSource;
 
 import java.util.Collection;
@@ -131,7 +132,7 @@ class SinkV2CommitterOperatorTest {
         SinkAndCounters sinkAndCounters = sinkWithPostCommit();
         final OneInputStreamOperatorTestHarness<
                         CommittableMessage<String>, CommittableMessage<String>>
-                testHarness = createTestHarness(sinkAndCounters.sink, false, 
true);
+                testHarness = createTestHarness(sinkAndCounters.sink, false, 
true, 1, 1, 0);
         testHarness.open();
         testHarness.setProcessingTime(0);
 
@@ -164,11 +165,13 @@ class SinkV2CommitterOperatorTest {
         testHarness.close();
     }
 
-    @Test
-    void testStateRestore() throws Exception {
+    @ParameterizedTest
+    @CsvSource({"1, 10, 9", "2, 1, 0", "2, 2, 1"})
+    void testStateRestoreWithScaling(
+            int parallelismBeforeScaling, int parallelismAfterScaling, int 
subtaskIdAfterRecovery)
+            throws Exception {
 
         final int originalSubtaskId = 0;
-        final int subtaskIdAfterRecovery = 9;
 
         final OneInputStreamOperatorTestHarness<
                         CommittableMessage<String>, CommittableMessage<String>>
@@ -177,8 +180,8 @@ class SinkV2CommitterOperatorTest {
                                 sinkWithPostCommitWithRetry().sink,
                                 false,
                                 true,
-                                1,
-                                1,
+                                parallelismBeforeScaling,
+                                parallelismBeforeScaling,
                                 originalSubtaskId);
         testHarness.open();
 
@@ -187,7 +190,8 @@ class SinkV2CommitterOperatorTest {
         long checkpointId = 0L;
 
         final CommittableSummary<String> committableSummary =
-                new CommittableSummary<>(originalSubtaskId, 1, checkpointId, 
1, 0);
+                new CommittableSummary<>(
+                        originalSubtaskId, parallelismBeforeScaling, 
checkpointId, 1, 0);
         testHarness.processElement(new StreamRecord<>(committableSummary));
         final CommittableWithLineage<String> first =
                 new CommittableWithLineage<>("1", checkpointId, 
originalSubtaskId);
@@ -195,7 +199,8 @@ class SinkV2CommitterOperatorTest {
 
         // another committable for the same checkpointId but from different 
subtask.
         final CommittableSummary<String> committableSummary2 =
-                new CommittableSummary<>(originalSubtaskId + 1, 1, 
checkpointId, 1, 0);
+                new CommittableSummary<>(
+                        originalSubtaskId + 1, parallelismBeforeScaling, 
checkpointId, 1, 0);
         testHarness.processElement(new StreamRecord<>(committableSummary2));
         final CommittableWithLineage<String> second =
                 new CommittableWithLineage<>("2", checkpointId, 
originalSubtaskId + 1);
@@ -213,7 +218,12 @@ class SinkV2CommitterOperatorTest {
                         CommittableMessage<String>, CommittableMessage<String>>
                 restoredHarness =
                         createTestHarness(
-                                restored.sink, false, true, 10, 10, 
subtaskIdAfterRecovery);
+                                restored.sink,
+                                false,
+                                true,
+                                parallelismAfterScaling,
+                                parallelismAfterScaling,
+                                subtaskIdAfterRecovery);
 
         restoredHarness.initializeState(snapshot);
         restoredHarness.open();
@@ -226,7 +236,9 @@ class SinkV2CommitterOperatorTest {
                 records.element(0, as(committableSummary()))
                         .hasCheckpointId(checkpointId)
                         .hasFailedCommittables(0)
-                        .hasSubtaskId(subtaskIdAfterRecovery);
+                        .hasSubtaskId(subtaskIdAfterRecovery)
+                        .hasNumberOfSubtasks(
+                                Math.min(parallelismBeforeScaling, 
parallelismAfterScaling));
         objectCommittableSummaryAssert.hasOverallCommittables(2);
 
         // Expect the same checkpointId that the original snapshot was made 
with.
@@ -314,17 +326,6 @@ class SinkV2CommitterOperatorTest {
         }
     }
 
-    private OneInputStreamOperatorTestHarness<
-                    CommittableMessage<String>, CommittableMessage<String>>
-            createTestHarness(
-                    SupportsCommitter<String> sink,
-                    boolean isBatchMode,
-                    boolean isCheckpointingEnabled)
-                    throws Exception {
-        return new OneInputStreamOperatorTestHarness<>(
-                new CommitterOperatorFactory<>(sink, isBatchMode, 
isCheckpointingEnabled));
-    }
-
     private OneInputStreamOperatorTestHarness<
                     CommittableMessage<String>, CommittableMessage<String>>
             createTestHarness(
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java
index 88e89e3185e..cbae6342184 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java
@@ -17,37 +17,61 @@
 
 package org.apache.flink.test.streaming.runtime;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.state.CheckpointListener;
 import org.apache.flink.api.common.typeinfo.IntegerTypeInfo;
 import org.apache.flink.api.connector.sink2.Committer;
 import org.apache.flink.api.connector.source.Source;
 import org.apache.flink.api.connector.source.util.ratelimit.GatedRateLimiter;
 import org.apache.flink.api.connector.source.util.ratelimit.RateLimiter;
 import 
org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.ExternalizedCheckpointRetention;
+import org.apache.flink.configuration.RestartStrategyOptions;
+import org.apache.flink.configuration.StateBackendOptions;
+import org.apache.flink.configuration.StateRecoveryOptions;
 import org.apache.flink.connector.datagen.source.DataGeneratorSource;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2;
 import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2.Record;
 import 
org.apache.flink.streaming.runtime.operators.sink.TestSinkV2.RecordSerializer;
+import org.apache.flink.test.junit5.InjectClusterClient;
+import org.apache.flink.test.junit5.InjectMiniCluster;
 import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.testutils.junit.SharedObjectsExtension;
 import org.apache.flink.testutils.junit.SharedReference;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.Queue;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -125,6 +149,49 @@ public class SinkV2ITCase extends AbstractTestBase {
         return r.withValue(-r.getValue());
     }
 
+    @ParameterizedTest
+    @CsvSource({"1, 2", "2, 1", "1, 1"})
+    public void writerAndCommitterExecuteInStreamingModeWithScaling(
+            int initialParallelism,
+            int scaledParallelism,
+            @TempDir File checkpointDir,
+            @InjectMiniCluster MiniCluster miniCluster,
+            @InjectClusterClient ClusterClient<?> clusterClient)
+            throws Exception {
+        SharedReference<Queue<Committer.CommitRequest<Record<Integer>>>> 
committed =
+                SHARED_OBJECTS.add(new ConcurrentLinkedQueue<>());
+        final TrackingCommitter trackingCommitter = new 
TrackingCommitter(committed);
+        final Configuration config = createConfigForScalingTest(checkpointDir, 
initialParallelism);
+
+        // first run
+        final JobID jobID =
+                runStreamingWithScalingTest(
+                        config,
+                        initialParallelism,
+                        trackingCommitter,
+                        true,
+                        miniCluster,
+                        clusterClient);
+
+        // second run
+        config.set(StateRecoveryOptions.SAVEPOINT_PATH, 
getCheckpointPath(miniCluster, jobID));
+        config.set(CoreOptions.DEFAULT_PARALLELISM, scaledParallelism);
+        runStreamingWithScalingTest(
+                config, initialParallelism, trackingCommitter, false, 
miniCluster, clusterClient);
+
+        assertThat(committed.get())
+                .extracting(Committer.CommitRequest::getCommittable)
+                .containsExactlyInAnyOrderElementsOf(
+                        duplicate(EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE));
+    }
+
+    private static List<Record<Integer>> duplicate(List<Record<Integer>> 
values) {
+        return IntStream.range(0, 2)
+                .boxed()
+                .flatMap(i -> values.stream())
+                .collect(Collectors.toList());
+    }
+
     @Test
     public void writerAndCommitterExecuteInBatchMode() throws Exception {
         final StreamExecutionEnvironment env = buildBatchEnv();
@@ -184,6 +251,66 @@ public class SinkV2ITCase extends AbstractTestBase {
         return env;
     }
 
+    private Configuration createConfigForScalingTest(File checkpointDir, int 
parallelism) {
+        final Configuration config = new Configuration();
+        config.set(CoreOptions.DEFAULT_PARALLELISM, parallelism);
+        config.set(StateBackendOptions.STATE_BACKEND, "hashmap");
+        config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, 
checkpointDir.toURI().toString());
+        config.set(
+                CheckpointingOptions.EXTERNALIZED_CHECKPOINT_RETENTION,
+                ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION);
+        config.set(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, 2000);
+        config.set(RestartStrategyOptions.RESTART_STRATEGY, "disable");
+
+        return config;
+    }
+
+    private StreamExecutionEnvironment 
buildStreamEnvWithCheckpointDir(Configuration config) {
+        final StreamExecutionEnvironment env =
+                StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
+        env.enableCheckpointing(100);
+
+        return env;
+    }
+
+    private JobID runStreamingWithScalingTest(
+            Configuration config,
+            int parallelism,
+            TrackingCommitter trackingCommitter,
+            boolean shouldMapperFail,
+            MiniCluster miniCluster,
+            ClusterClient<?> clusterClient)
+            throws Exception {
+        final StreamExecutionEnvironment env = 
buildStreamEnvWithCheckpointDir(config);
+        final Source<Integer, ?, ?> source = createStreamingSource();
+
+        env.fromSource(source, WatermarkStrategy.noWatermarks(), "source")
+                .rebalance()
+                .map(
+                        new FailingCheckpointMapper(
+                                SHARED_OBJECTS.add(new 
AtomicBoolean(!shouldMapperFail))))
+                .sinkTo(
+                        TestSinkV2.<Integer>newBuilder()
+                                .setCommitter(trackingCommitter, 
RecordSerializer::new)
+                                .setWithPostCommitTopology(true)
+                                .build());
+
+        final JobID jobId = 
clusterClient.submitJob(env.getStreamGraph().getJobGraph()).get();
+        clusterClient.requestJobResult(jobId).get();
+
+        return jobId;
+    }
+
+    private String getCheckpointPath(MiniCluster miniCluster, JobID 
secondJobId)
+            throws InterruptedException, ExecutionException, 
FlinkJobNotFoundException {
+        final Optional<String> completedCheckpoint =
+                CommonTestUtils.getLatestCompletedCheckpointPath(secondJobId, 
miniCluster);
+
+        assertThat(completedCheckpoint).isPresent();
+        return completedCheckpoint.get();
+    }
+
     private StreamExecutionEnvironment buildBatchEnv() {
         final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         env.setRuntimeMode(RuntimeExecutionMode.BATCH);
@@ -245,4 +372,32 @@ public class SinkV2ITCase extends AbstractTestBase {
         @Override
         public void close() {}
     }
+
+    private static class FailingCheckpointMapper
+            implements MapFunction<Integer, Integer>, CheckpointListener {
+
+        private final SharedReference<AtomicBoolean> failed;
+        private long lastCheckpointId = 0;
+        private int emittedBetweenCheckpoint = 0;
+
+        FailingCheckpointMapper(SharedReference<AtomicBoolean> failed) {
+            this.failed = failed;
+        }
+
+        @Override
+        public Integer map(Integer value) {
+            if (lastCheckpointId >= 1 && emittedBetweenCheckpoint > 0 && 
!failed.get().get()) {
+                failed.get().set(true);
+                throw new RuntimeException("Planned exception.");
+            }
+            emittedBetweenCheckpoint++;
+            return value;
+        }
+
+        @Override
+        public void notifyCheckpointComplete(long checkpointId) {
+            lastCheckpointId = checkpointId;
+            emittedBetweenCheckpoint = 0;
+        }
+    }
 }

Reply via email to