This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch release-1.20
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.20 by this push:
     new 5a151f6c49e [FLINK-37747][runtime] Use old subtask count for restored 
committable objects.
5a151f6c49e is described below

commit 5a151f6c49e2483ed225bf007e818f95c3a2eae8
Author: Diego de Souza <dieggol...@gmail.com>
AuthorDate: Fri Sep 19 14:41:17 2025 +0100

    [FLINK-37747][runtime] Use old subtask count for restored committable 
objects.
    
    * [FLINK-37747][runtime] Use old subtask count for restored committable 
objects
    
    * trying to adapt test
    
    * Attempt to backport SinkV2ITCase from FLINK-37747
    
    * Add more test use cases for SinkV2ITCase
    
    * Fix SinkV2ITCase test
    
    * Setup TestSinkV2WithPostCommitTopology global committer
    
    * Rename test SinkV2ITCase streaming source to be more specific
    
    ---------
    
    Co-authored-by: Eric Nascimento <ericsn2...@gmail.com>
    Co-authored-by: David Wang <dwa...@atlassian.com>
---
 .../runtime/operators/sink/CommitterOperator.java  |   7 +-
 .../operators/sink/CommitterOperatorTestBase.java  |  42 ++---
 .../runtime/operators/sink/TestSinkV2.java         |   2 +-
 .../flink/test/streaming/runtime/SinkV2ITCase.java | 204 ++++++++++++++++++++-
 4 files changed, 227 insertions(+), 28 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
index 2f766a341cb..9fdff9b3688 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
@@ -180,7 +180,12 @@ class CommitterOperator<CommT> extends 
AbstractStreamOperator<CommittableMessage
 
     private void emit(CheckpointCommittableManager<CommT> committableManager) {
         int subtaskId = 
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
-        int numberOfSubtasks = 
getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks();
+        // Ensure that numberOfSubtasks is in sync with the number of actually 
emitted
+        // CommittableSummaries during upscaling recovery (see FLINK-37747).
+        int numberOfSubtasks =
+                Math.min(
+                        
getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks(),
+                        committableManager.getNumberOfSubtasks());
         long checkpointId = committableManager.getCheckpointId();
         Collection<CommT> committables = 
committableManager.getSuccessfulCommittables();
         output.collect(
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java
index ee58c8b94ac..58115a18b0d 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java
@@ -31,6 +31,7 @@ import org.assertj.core.api.AbstractThrowableAssert;
 import org.assertj.core.api.ListAssert;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
 import org.junit.jupiter.params.provider.ValueSource;
 
 import java.util.function.IntSupplier;
@@ -91,7 +92,7 @@ abstract class CommitterOperatorTestBase {
         SinkAndCounters sinkAndCounters = sinkWithPostCommit();
         final OneInputStreamOperatorTestHarness<
                         CommittableMessage<String>, CommittableMessage<String>>
-                testHarness = createTestHarness(sinkAndCounters.sink, false, 
true);
+                testHarness = createTestHarness(sinkAndCounters.sink, false, 
true, 1, 1, 0);
         testHarness.open();
         testHarness.setProcessingTime(0);
 
@@ -125,11 +126,13 @@ abstract class CommitterOperatorTestBase {
         testHarness.close();
     }
 
-    @Test
-    void testStateRestore() throws Exception {
+    @ParameterizedTest
+    @CsvSource({"1, 10, 9", "2, 1, 0", "2, 2, 1"})
+    void testStateRestoreWithScaling(
+            int parallelismBeforeScaling, int parallelismAfterScaling, int 
subtaskIdAfterRecovery)
+            throws Exception {
 
         final int originalSubtaskId = 0;
-        final int subtaskIdAfterRecovery = 9;
 
         final OneInputStreamOperatorTestHarness<
                         CommittableMessage<String>, CommittableMessage<String>>
@@ -138,8 +141,8 @@ abstract class CommitterOperatorTestBase {
                                 sinkWithPostCommitWithRetry().sink,
                                 false,
                                 true,
-                                1,
-                                1,
+                                parallelismBeforeScaling,
+                                parallelismBeforeScaling,
                                 originalSubtaskId);
         testHarness.open();
 
@@ -148,7 +151,8 @@ abstract class CommitterOperatorTestBase {
         long checkpointId = 0L;
 
         final CommittableSummary<String> committableSummary =
-                new CommittableSummary<>(originalSubtaskId, 1, checkpointId, 
1, 1, 0);
+                new CommittableSummary<>(
+                        originalSubtaskId, parallelismBeforeScaling, 
checkpointId, 1, 1, 0);
         testHarness.processElement(new StreamRecord<>(committableSummary));
         final CommittableWithLineage<String> first =
                 new CommittableWithLineage<>("1", checkpointId, 
originalSubtaskId);
@@ -156,7 +160,8 @@ abstract class CommitterOperatorTestBase {
 
         // another committable for the same checkpointId but from different 
subtask.
         final CommittableSummary<String> committableSummary2 =
-                new CommittableSummary<>(originalSubtaskId + 1, 1, 
checkpointId, 1, 1, 0);
+                new CommittableSummary<>(
+                        originalSubtaskId + 1, parallelismBeforeScaling, 
checkpointId, 1, 1, 0);
         testHarness.processElement(new StreamRecord<>(committableSummary2));
         final CommittableWithLineage<String> second =
                 new CommittableWithLineage<>("2", checkpointId, 
originalSubtaskId + 1);
@@ -174,7 +179,12 @@ abstract class CommitterOperatorTestBase {
                         CommittableMessage<String>, CommittableMessage<String>>
                 restored =
                         createTestHarness(
-                                sinkAndCounters.sink, false, true, 10, 10, 
subtaskIdAfterRecovery);
+                                sinkAndCounters.sink,
+                                false,
+                                true,
+                                parallelismAfterScaling,
+                                parallelismAfterScaling,
+                                subtaskIdAfterRecovery);
 
         restored.initializeState(snapshot);
         restored.open();
@@ -188,7 +198,8 @@ abstract class CommitterOperatorTestBase {
                 .hasSubtaskId(subtaskIdAfterRecovery)
                 .hasFailedCommittables(0)
                 .hasOverallCommittables(2)
-                .hasPendingCommittables(0);
+                .hasPendingCommittables(0)
+                .hasNumberOfSubtasks(Math.min(parallelismBeforeScaling, 
parallelismAfterScaling));
 
         // Expect the same checkpointId that the original snapshot was made 
with.
         records.element(1, as(committableWithLineage()))
@@ -303,17 +314,6 @@ abstract class CommitterOperatorTestBase {
         testHarness.close();
     }
 
-    private OneInputStreamOperatorTestHarness<
-                    CommittableMessage<String>, CommittableMessage<String>>
-            createTestHarness(
-                    SupportsCommitter<String> sink,
-                    boolean isBatchMode,
-                    boolean isCheckpointingEnabled)
-                    throws Exception {
-        return new OneInputStreamOperatorTestHarness<>(
-                new CommitterOperatorFactory<>(sink, isBatchMode, 
isCheckpointingEnabled));
-    }
-
     private OneInputStreamOperatorTestHarness<
                     CommittableMessage<String>, CommittableMessage<String>>
             createTestHarness(
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSinkV2.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSinkV2.java
index 0a34d157861..6e2c3bf2004 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSinkV2.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSinkV2.java
@@ -243,7 +243,7 @@ public class TestSinkV2<InputT> implements Sink<InputT> {
         @Override
         public void 
addPostCommitTopology(DataStream<CommittableMessage<String>> committables) {
             StandardSinkTopologies.addGlobalCommitter(
-                    committables, DefaultCommitter::new, 
this::getCommittableSerializer);
+                    committables, () -> createCommitter(null), 
this::getCommittableSerializer);
         }
     }
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java
index 1695694a04e..92690b73f43 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java
@@ -17,39 +17,72 @@
 
 package org.apache.flink.test.streaming.runtime;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.state.CheckpointListener;
 import org.apache.flink.api.common.typeinfo.IntegerTypeInfo;
 import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.util.ratelimit.GatedRateLimiter;
+import org.apache.flink.api.connector.source.util.ratelimit.RateLimiter;
+import 
org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.ExternalizedCheckpointRetention;
+import org.apache.flink.configuration.RestartStrategyOptions;
+import org.apache.flink.configuration.StateBackendOptions;
+import org.apache.flink.configuration.StateRecoveryOptions;
+import org.apache.flink.connector.datagen.source.DataGeneratorSource;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.graph.StreamNode;
 import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2;
+import 
org.apache.flink.streaming.runtime.operators.sink.TestSinkV2.DefaultCommitter;
 import org.apache.flink.streaming.util.FiniteTestSource;
-import org.apache.flink.test.util.AbstractTestBaseJUnit4;
+import org.apache.flink.test.junit5.InjectClusterClient;
+import org.apache.flink.test.junit5.InjectMiniCluster;
+import org.apache.flink.test.util.AbstractTestBase;
 
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
 
+import java.io.File;
 import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.Queue;
+import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.BooleanSupplier;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.is;
 
 /**
  * Integration test for {@link org.apache.flink.api.connector.sink.Sink} run 
time implementation.
  */
-public class SinkV2ITCase extends AbstractTestBaseJUnit4 {
+public class SinkV2ITCase extends AbstractTestBase {
     static final List<Integer> SOURCE_DATA =
             Arrays.asList(
                     895, 127, 148, 161, 148, 662, 822, 491, 275, 122, 850, 
630, 682, 765, 434, 970,
@@ -80,7 +113,7 @@ public class SinkV2ITCase extends AbstractTestBaseJUnit4 {
             (BooleanSupplier & Serializable)
                     () -> COMMIT_QUEUE.size() == 
STREAMING_SOURCE_SEND_ELEMENTS_NUM;
 
-    @Before
+    @BeforeEach
     public void init() {
         COMMIT_QUEUE.clear();
     }
@@ -189,6 +222,76 @@ public class SinkV2ITCase extends AbstractTestBaseJUnit4 {
                                 .toArray()));
     }
 
+    @ParameterizedTest
+    @CsvSource({"1, 2", "2, 1", "1, 1"})
+    public void writerAndCommitterExecuteInStreamingModeWithScaling(
+            int initialParallelism,
+            int scaledParallelism,
+            @TempDir File checkpointDir,
+            @InjectMiniCluster MiniCluster miniCluster,
+            @InjectClusterClient ClusterClient<?> clusterClient)
+            throws Exception {
+        final DefaultCommitter committer =
+                new DefaultCommitter(
+                        (Supplier<Queue<Committer.CommitRequest<String>>> & 
Serializable)
+                                () -> COMMIT_QUEUE);
+        final Configuration config = createConfigForScalingTest(checkpointDir, 
initialParallelism);
+
+        // first run
+        final JobID jobID = runStreamingWithScalingTest(config, true, 
committer, clusterClient);
+
+        // second run
+        config.set(StateRecoveryOptions.SAVEPOINT_PATH, 
getCheckpointPath(miniCluster, jobID));
+        config.set(CoreOptions.DEFAULT_PARALLELISM, scaledParallelism);
+        runStreamingWithScalingTest(config, false, committer, clusterClient);
+
+        assertThat(
+                COMMIT_QUEUE.stream()
+                        .map(Committer.CommitRequest::getCommittable)
+                        .collect(Collectors.toList()),
+                
containsInAnyOrder(duplicate(EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE).toArray()));
+    }
+
+    private static List<String> duplicate(List<String> values) {
+        return IntStream.range(0, 2)
+                .boxed()
+                .flatMap(i -> values.stream())
+                .collect(Collectors.toList());
+    }
+
+    private JobID runStreamingWithScalingTest(
+            Configuration config,
+            boolean shouldMapperFail,
+            DefaultCommitter committer,
+            ClusterClient<?> clusterClient)
+            throws Exception {
+        final StreamExecutionEnvironment env = 
buildStreamEnvWithCheckpointDir(config);
+        final Source<Integer, ?, ?> source = 
createStreamingSourceForScalingTest();
+
+        env.fromSource(source, WatermarkStrategy.noWatermarks(), "source")
+                .rebalance()
+                .map(new FailingCheckpointMapper(!shouldMapperFail))
+                .sinkTo(
+                        TestSinkV2.<Integer>newBuilder()
+                                .setCommitter(committer)
+                                .setWithPostCommitTopology(true)
+                                .build());
+
+        final JobID jobId = 
clusterClient.submitJob(env.getStreamGraph().getJobGraph()).get();
+        clusterClient.requestJobResult(jobId).get();
+
+        return jobId;
+    }
+
+    private String getCheckpointPath(MiniCluster miniCluster, JobID 
secondJobId)
+            throws InterruptedException, ExecutionException, 
FlinkJobNotFoundException {
+        final Optional<String> completedCheckpoint =
+                CommonTestUtils.getLatestCompletedCheckpointPath(secondJobId, 
miniCluster);
+
+        assertThat(completedCheckpoint.isPresent(), is(true));
+        return completedCheckpoint.get();
+    }
+
     private StreamExecutionEnvironment buildStreamEnv() {
         final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
@@ -196,12 +299,35 @@ public class SinkV2ITCase extends AbstractTestBaseJUnit4 {
         return env;
     }
 
+    private StreamExecutionEnvironment 
buildStreamEnvWithCheckpointDir(Configuration config) {
+        final StreamExecutionEnvironment env =
+                StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
+        env.enableCheckpointing(100);
+
+        return env;
+    }
+
     private StreamExecutionEnvironment buildBatchEnv() {
         final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         env.setRuntimeMode(RuntimeExecutionMode.BATCH);
         return env;
     }
 
+    private Configuration createConfigForScalingTest(File checkpointDir, int 
parallelism) {
+        final Configuration config = new Configuration();
+        config.set(CoreOptions.DEFAULT_PARALLELISM, parallelism);
+        config.set(StateBackendOptions.STATE_BACKEND, "hashmap");
+        config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, 
checkpointDir.toURI().toString());
+        config.set(
+                CheckpointingOptions.EXTERNALIZED_CHECKPOINT_RETENTION,
+                ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION);
+        config.set(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, 2000);
+        config.set(RestartStrategyOptions.RESTART_STRATEGY, "disable");
+
+        return config;
+    }
+
     private void executeAndVerifyStreamGraph(StreamExecutionEnvironment env) 
throws Exception {
         StreamGraph streamGraph = env.getStreamGraph();
         assertNoUnalignedCheckpointInSink(streamGraph);
@@ -227,4 +353,72 @@ public class SinkV2ITCase extends AbstractTestBaseJUnit4 {
                 .allMatch(StreamEdge::supportsUnalignedCheckpoints)
                 .isNotEmpty();
     }
+
+    /**
+     * A stream source that: 1) emits a list of elements without allowing 
checkpoints, 2) then waits
+     * for two more checkpoints to complete, 3) then re-emits the same 
elements before 4) waiting
+     * for another two checkpoints and 5) exiting.
+     */
+    private Source<Integer, ?, ?> createStreamingSourceForScalingTest() {
+        RateLimiterStrategy rateLimiterStrategy =
+                parallelism -> new BurstingRateLimiter(SOURCE_DATA.size() / 4, 
2);
+        return new DataGeneratorSource<>(
+                l -> SOURCE_DATA.get(l.intValue() % SOURCE_DATA.size()),
+                SOURCE_DATA.size() * 2L,
+                rateLimiterStrategy,
+                IntegerTypeInfo.INT_TYPE_INFO);
+    }
+
+    private static class BurstingRateLimiter implements RateLimiter {
+        private final RateLimiter rateLimiter;
+        private final int numCheckpointCooldown;
+        private int cooldown;
+
+        public BurstingRateLimiter(int recordPerCycle, int 
numCheckpointCooldown) {
+            rateLimiter = new GatedRateLimiter(recordPerCycle);
+            this.numCheckpointCooldown = numCheckpointCooldown;
+        }
+
+        @Override
+        public CompletionStage<Void> acquire() {
+            CompletionStage<Void> stage = rateLimiter.acquire();
+            cooldown = numCheckpointCooldown;
+            return stage;
+        }
+
+        @Override
+        public void notifyCheckpointComplete(long checkpointId) {
+            if (cooldown-- <= 0) {
+                rateLimiter.notifyCheckpointComplete(checkpointId);
+            }
+        }
+    }
+
+    private static class FailingCheckpointMapper
+            implements MapFunction<Integer, Integer>, CheckpointListener {
+
+        private static final AtomicBoolean failed = new AtomicBoolean(false);
+        private long lastCheckpointId = 0;
+        private int emittedBetweenCheckpoint = 0;
+
+        FailingCheckpointMapper(boolean failed) {
+            FailingCheckpointMapper.failed.set(failed);
+        }
+
+        @Override
+        public Integer map(Integer value) {
+            if (lastCheckpointId >= 1 && emittedBetweenCheckpoint > 0 && 
!failed.get()) {
+                failed.set(true);
+                throw new RuntimeException("Planned exception.");
+            }
+            emittedBetweenCheckpoint++;
+            return value;
+        }
+
+        @Override
+        public void notifyCheckpointComplete(long checkpointId) {
+            lastCheckpointId = checkpointId;
+            emittedBetweenCheckpoint = 0;
+        }
+    }
 }

Reply via email to