XComp commented on code in PR #27921:
URL: https://github.com/apache/flink/pull/27921#discussion_r3225990963


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:
##########
@@ -2000,17 +2001,21 @@ public long getCheckpointTimeout() {
     }
 
     /**
-     * Returns {@code -1} if a checkpoint is already in flight, otherwise the 
remaining time (in ms)
-     * until {@code minPauseBetweenCheckpoints} is satisfied ({@code 0} = 
trigger now). All checks
-     * are made under the coordinator lock.
+     * Returns {@link Optional#empty()} if a checkpoint is already in flight 
(triggering or
+     * pending). Otherwise returns the remaining {@link Duration} until {@code

Review Comment:
   Usually, you would mention the actual value first and the fallback 
afterwards.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##########
@@ -182,6 +190,111 @@ public ScheduledFuture<?> scheduleOperation(Runnable 
callback, Duration delay) {
         return context.runIfState(this, callback, delay);
     }
 
+    @Override
+    public void requestActiveCheckpointTrigger() {
+        if (!activeCheckpointTriggerEnabled) {
+            return;
+        }
+
+        final CheckpointCoordinator checkpointCoordinator =
+                getExecutionGraph().getCheckpointCoordinator();
+        if (!shouldTriggerActiveCheckpoint(checkpointCoordinator)) {
+            return;
+        }
+
+        final Optional<Duration> triggerDelay =
+                checkpointCoordinator.getActiveCheckpointTriggerDelay();
+        if (triggerDelay.isEmpty()) {
+            getLogger()
+                    .debug(
+                            "Skipping active checkpoint trigger for rescale: 
checkpoint already in progress.");
+            return;
+        }
+        scheduleActiveCheckpointTriggerRetry(triggerDelay.get());
+    }
+
+    private boolean shouldTriggerActiveCheckpoint(

Review Comment:
   All calls of this method negate the result. We could rename the method into 
`skipActiveCheckpointTriggering` and revert its logic to avoid negations.



##########
flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java:
##########
@@ -745,6 +745,24 @@ public InlineElement getDescription() {
                                                             .key()))
                                     .build());
 
+    @Documentation.Section({
+        Documentation.Sections.EXPERT_SCHEDULING,
+        Documentation.Sections.ALL_JOB_MANAGER
+    })
+    public static final ConfigOption<Boolean> 
SCHEDULER_RESCALE_TRIGGER_ACTIVE_CHECKPOINT_ENABLED =
+            
key("jobmanager.adaptive-scheduler.rescale-trigger.active-checkpoint.enabled")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "When enabled, the Adaptive 
Scheduler actively triggers a checkpoint when resources change and rescaling is 
desired, "
+                                                    + "rather than waiting for 
the next periodic checkpoint. "
+                                                    + "This reduces rescaling 
latency, especially when checkpoint intervals are large. "
+                                                    + "The active trigger 
respects min-pause and "

Review Comment:
   Why did we remove the reference to the `min-pause` parameter? 🤔 



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java:
##########
@@ -556,6 +562,517 @@ public CheckpointCoordinator getCheckpointCoordinator() {
         }
     }
 
+    @Test
+    void testActiveCheckpointTriggerRetryIsNoopAfterStopWithSavepoint() throws 
Exception {
+        try (MockExecutingContext ctx = new MockExecutingContext()) {
+            MockExecutionJobVertex mejv = new 
MockExecutionJobVertex(MockExecutionVertex::new);
+
+            ManuallyTriggeredScheduledExecutor checkpointTimer =
+                    new ManuallyTriggeredScheduledExecutor();
+            ManualClock clock = new ManualClock();
+            CheckpointCoordinatorConfiguration coordConfig =
+                    new CheckpointCoordinatorConfiguration
+                                    
.CheckpointCoordinatorConfigurationBuilder()
+                            .setCheckpointInterval(10_000L)
+                            .setMinPauseBetweenCheckpoints(10_000L)
+                            .setMaxConcurrentCheckpoints(Integer.MAX_VALUE)
+                            .build();
+            CheckpointCoordinator coordinator =
+                    new 
CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder()
+                            .setCheckpointCoordinatorConfiguration(coordConfig)
+                            .setTimer(checkpointTimer)
+                            .setClock(clock)
+                            .build(EXECUTOR_EXTENSION.getExecutor());
+            StateTrackingMockExecutionGraph graph =
+                    new StateTrackingMockExecutionGraph() {
+                        @Nullable
+                        @Override
+                        public CheckpointCoordinator 
getCheckpointCoordinator() {
+                            return coordinator;
+                        }
+
+                        @Override
+                        public Iterable<ExecutionJobVertex> 
getVerticesTopologically() {
+                            return Collections.singletonList(mejv);
+                        }
+                    };
+            ctx.setVertexParallelism(
+                    new VertexParallelism(
+                            graph.getAllVertices().values().stream()
+                                    .collect(
+                                            Collectors.toMap(
+                                                    
AccessExecutionJobVertex::getJobVertexId,
+                                                    v -> v.getParallelism() + 
1))));
+
+            Executing exec =
+                    new ExecutingStateBuilder()
+                            .setExecutionGraph(graph)
+                            .setActiveCheckpointTriggerEnabled(true)
+                            .build(ctx);
+            exec.requestActiveCheckpointTrigger();
+            ManuallyTriggeredComponentMainThreadExecutor ctxExecutor =
+                    (ManuallyTriggeredComponentMainThreadExecutor) 
ctx.getMainThreadExecutor();
+            assertThat(ctxExecutor.getActiveNonPeriodicScheduledTask())
+                    .as("A retry should be scheduled while min-pause is not 
satisfied")
+                    .isNotEmpty();
+
+            ctx.setExpectStopWithSavepoint(assertNonNull());
+            exec.stopWithSavepoint("file:///tmp/target", true, 
SavepointFormatType.CANONICAL);
+
+            ctxExecutor.triggerAll();
+            checkpointTimer.triggerAll();
+            int pendingAfterSavepoint = 
coordinator.getNumberOfPendingCheckpoints();
+
+            // Trigger the previously scheduled retry; it must be a no-op 
because the state
+            // transitioned to StopWithSavepoint (runIfState gates the action 
on
+            // hadStateTransition).
+            ctxExecutor.triggerNonPeriodicScheduledTasks();
+            checkpointTimer.triggerAll();
+            assertThat(coordinator.getNumberOfPendingCheckpoints())
+                    .as("Active checkpoint trigger retry must not fire after 
stopWithSavepoint")
+                    .isEqualTo(pendingAfterSavepoint);
+        }
+    }
+
+    @Test
+    void testActiveCheckpointTriggerSkipsWhenDisabled() throws Exception {
+        try (MockExecutingContext ctx = new MockExecutingContext()) {
+            final AtomicBoolean coordinatorAccessed = new AtomicBoolean(false);
+            CheckpointCoordinator coordinator =
+                    new 
CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder()
+                            .build(EXECUTOR_EXTENSION.getExecutor());
+            StateTrackingMockExecutionGraph graph =
+                    new StateTrackingMockExecutionGraph() {
+                        @Nullable
+                        @Override
+                        public CheckpointCoordinator 
getCheckpointCoordinator() {
+                            coordinatorAccessed.set(true);
+                            return coordinator;
+                        }
+                    };
+            Executing exec =
+                    new ExecutingStateBuilder()
+                            .setExecutionGraph(graph)
+                            .setActiveCheckpointTriggerEnabled(false)
+                            .build(ctx);
+
+            exec.requestActiveCheckpointTrigger();
+            assertThat(coordinatorAccessed.get()).isFalse();
+        }
+    }
+
+    @Test
+    void testActiveCheckpointTriggerSkipsWhenNoCoordinator() throws Exception {
+        try (MockExecutingContext ctx = new MockExecutingContext()) {
+            StateTrackingMockExecutionGraph graph =
+                    new StateTrackingMockExecutionGraph() {
+                        @Nullable
+                        @Override
+                        public CheckpointCoordinator 
getCheckpointCoordinator() {
+                            return null;
+                        }
+                    };
+
+            Executing exec =
+                    new ExecutingStateBuilder()
+                            .setExecutionGraph(graph)
+                            .setActiveCheckpointTriggerEnabled(true)
+                            .build(ctx);
+            ManuallyTriggeredComponentMainThreadExecutor ctxExecutor =
+                    (ManuallyTriggeredComponentMainThreadExecutor) 
ctx.getMainThreadExecutor();
+            int baseline = 
ctxExecutor.getActiveNonPeriodicScheduledTask().size();
+            exec.requestActiveCheckpointTrigger();
+            assertThat(ctxExecutor.getActiveNonPeriodicScheduledTask())
+                    .as("No checkpoint trigger should be scheduled when 
coordinator is null")
+                    .hasSize(baseline);
+        }
+    }
+
+    @Test
+    void 
testActiveCheckpointTriggerSkipsWhenPeriodicCheckpointingNotConfigured() throws 
Exception {
+        try (MockExecutingContext ctx = new MockExecutingContext()) {
+            MockExecutionJobVertex mejv = new 
MockExecutionJobVertex(MockExecutionVertex::new);
+
+            ManuallyTriggeredScheduledExecutor checkpointTimer =
+                    new ManuallyTriggeredScheduledExecutor();
+            CheckpointCoordinator coordinator =
+                    new 
CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder()
+                            .setCheckpointCoordinatorConfiguration(
+                                    new CheckpointCoordinatorConfiguration
+                                                    
.CheckpointCoordinatorConfigurationBuilder()
+                                            
.setCheckpointInterval(Long.MAX_VALUE)
+                                            .build())
+                            .setTimer(checkpointTimer)
+                            .build(EXECUTOR_EXTENSION.getExecutor());
+            StateTrackingMockExecutionGraph graph =
+                    new StateTrackingMockExecutionGraph() {
+                        @Nullable
+                        @Override
+                        public CheckpointCoordinator 
getCheckpointCoordinator() {
+                            return coordinator;
+                        }
+
+                        @Override
+                        public Iterable<ExecutionJobVertex> 
getVerticesTopologically() {
+                            return Collections.singletonList(mejv);
+                        }
+                    };
+            // Set up parallelism change so the only blocking guard is 
periodic checkpointing
+            ctx.setVertexParallelism(
+                    new VertexParallelism(
+                            graph.getAllVertices().values().stream()
+                                    .collect(
+                                            Collectors.toMap(
+                                                    
AccessExecutionJobVertex::getJobVertexId,
+                                                    v -> v.getParallelism() + 
1))));
+
+            Executing exec =
+                    new ExecutingStateBuilder()
+                            .setExecutionGraph(graph)
+                            .setActiveCheckpointTriggerEnabled(true)
+                            .build(ctx);
+
+            
assertThat(coordinator.isPeriodicCheckpointingConfigured()).isFalse();
+            exec.requestActiveCheckpointTrigger();

Review Comment:
   ```suggestion
               exec.requestActiveCheckpointTrigger();
               ((ManuallyTriggeredComponentMainThreadExecutor) 
ctx.getMainThreadExecutor()).triggerNonPeriodicScheduledTasks();
   ```
   we're missing the state transition here. Otherwise, the test wouldn't test 
what it wants to test (you can verify it by commenting out the periodic 
checkpoint check in `Executing`)



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##########
@@ -182,6 +190,111 @@ public ScheduledFuture<?> scheduleOperation(Runnable 
callback, Duration delay) {
         return context.runIfState(this, callback, delay);
     }
 
+    @Override
+    public void requestActiveCheckpointTrigger() {
+        if (!activeCheckpointTriggerEnabled) {
+            return;
+        }
+
+        final CheckpointCoordinator checkpointCoordinator =
+                getExecutionGraph().getCheckpointCoordinator();
+        if (!shouldTriggerActiveCheckpoint(checkpointCoordinator)) {
+            return;
+        }
+
+        final Optional<Duration> triggerDelay =
+                checkpointCoordinator.getActiveCheckpointTriggerDelay();
+        if (triggerDelay.isEmpty()) {
+            getLogger()
+                    .debug(
+                            "Skipping active checkpoint trigger for rescale: 
checkpoint already in progress.");
+            return;
+        }
+        scheduleActiveCheckpointTriggerRetry(triggerDelay.get());
+    }
+
+    private boolean shouldTriggerActiveCheckpoint(
+            @Nullable CheckpointCoordinator checkpointCoordinator) {
+        if (checkpointCoordinator == null
+                || !checkpointCoordinator.isPeriodicCheckpointingConfigured()) 
{
+            getLogger()
+                    .debug(
+                            "Skipping active checkpoint trigger for rescale: 
checkpointing not configured.");
+            return false;
+        }
+        if (!parallelismChanged()) {

Review Comment:
   Generally, checking the unit tests makes me feel like we're having too many 
different locations where we verify whether active checkpoint triggering is 
allowed or not. 🤔 
   
   The parallelism change should only be evaluated in 
`tryFireActiveCheckpointAfterRetry` because that's where it's relevant. In 
contrast, the `checkpointCoordinator` null check and the periodic checkpointing 
config check should not need to be evaluated in the scheduled task but should 
only live in the code that would eventually schedule the checkpoint triggering.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManagerTest.java:
##########
@@ -400,6 +400,59 @@ private static void assertFinalStateTransitionHappened(
         assertThat(testInstance.getPhase()).isInstanceOf(Transitioning.class);
     }
 
+    @Test
+    void testActiveCheckpointTriggerCalledOnEnteringStabilizing() {
+        final TestingStateTransitionManagerContext ctx =
+                TestingStateTransitionManagerContext.stableContext();
+        ctx.withSufficientResources();
+        final DefaultStateTransitionManager testInstance =
+                ctx.createTestInstanceThatPassedCooldownPhase();
+
+        assertThat(testInstance.getPhase()).isInstanceOf(Idling.class);
+        ctx.clearActiveCheckpointTriggerCount();
+
+        testInstance.onChange(true);
+
+        assertThat(testInstance.getPhase()).isInstanceOf(Stabilizing.class);
+        
assertThat(ctx.getActiveCheckpointTriggerCount()).isGreaterThanOrEqualTo(1);

Review Comment:
   why do we assert for `isGreaterThanOrEqualTo` here? Shouldn't we be able to 
come up with an exact expected value? The test succeeds even when removing the 
`clearActiveCheckpointTriggerCount`.
   
   That applies to all three newly added tests.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##########
@@ -182,6 +190,111 @@ public ScheduledFuture<?> scheduleOperation(Runnable 
callback, Duration delay) {
         return context.runIfState(this, callback, delay);
     }
 
+    @Override
+    public void requestActiveCheckpointTrigger() {
+        if (!activeCheckpointTriggerEnabled) {
+            return;
+        }
+
+        final CheckpointCoordinator checkpointCoordinator =
+                getExecutionGraph().getCheckpointCoordinator();
+        if (!shouldTriggerActiveCheckpoint(checkpointCoordinator)) {
+            return;
+        }
+
+        final Optional<Duration> triggerDelay =
+                checkpointCoordinator.getActiveCheckpointTriggerDelay();
+        if (triggerDelay.isEmpty()) {
+            getLogger()
+                    .debug(
+                            "Skipping active checkpoint trigger for rescale: 
checkpoint already in progress.");
+            return;
+        }
+        scheduleActiveCheckpointTriggerRetry(triggerDelay.get());
+    }
+
+    private boolean shouldTriggerActiveCheckpoint(
+            @Nullable CheckpointCoordinator checkpointCoordinator) {
+        if (checkpointCoordinator == null
+                || !checkpointCoordinator.isPeriodicCheckpointingConfigured()) 
{
+            getLogger()
+                    .debug(
+                            "Skipping active checkpoint trigger for rescale: 
checkpointing not configured.");
+            return false;
+        }
+        if (!parallelismChanged()) {
+            getLogger()
+                    .debug(
+                            "Skipping active checkpoint trigger for rescale: 
parallelism unchanged.");
+            return false;
+        }
+        return true;
+    }
+
+    private void scheduleActiveCheckpointTriggerRetry(Duration delay) {
+        if (activeCheckpointTriggerScheduled) {
+            return;
+        }
+        activeCheckpointTriggerScheduled = true;
+        if (!delay.isZero()) {
+            getLogger()
+                    .debug(
+                            "Min pause not satisfied, scheduling active 
checkpoint trigger retry in {} ms.",
+                            delay.toMillis());
+        }
+        context.runIfState(this, this::tryFireActiveCheckpointAfterRetry, 
delay);
+    }
+
+    private void tryFireActiveCheckpointAfterRetry() {
+        activeCheckpointTriggerScheduled = false;
+        final CheckpointCoordinator checkpointCoordinator =
+                getExecutionGraph().getCheckpointCoordinator();
+        if (!shouldTriggerActiveCheckpoint(checkpointCoordinator)) {

Review Comment:
   nit: I feel like we're chosing the wrong location for the null check here. 
The subsequent calls like `getActiveCheckpointTriggerDelay` rely on 
`checkpointCoordinator` to be not null. But this check is hidden in 
`shouldTriggerActiveCheckpoint`. Doing the null check here instead would make 
the code more readable.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java:
##########
@@ -556,6 +562,517 @@ public CheckpointCoordinator getCheckpointCoordinator() {
         }
     }
 
+    @Test
+    void testActiveCheckpointTriggerRetryIsNoopAfterStopWithSavepoint() throws 
Exception {
+        try (MockExecutingContext ctx = new MockExecutingContext()) {
+            MockExecutionJobVertex mejv = new 
MockExecutionJobVertex(MockExecutionVertex::new);
+
+            ManuallyTriggeredScheduledExecutor checkpointTimer =
+                    new ManuallyTriggeredScheduledExecutor();
+            ManualClock clock = new ManualClock();
+            CheckpointCoordinatorConfiguration coordConfig =
+                    new CheckpointCoordinatorConfiguration
+                                    
.CheckpointCoordinatorConfigurationBuilder()
+                            .setCheckpointInterval(10_000L)
+                            .setMinPauseBetweenCheckpoints(10_000L)
+                            .setMaxConcurrentCheckpoints(Integer.MAX_VALUE)
+                            .build();
+            CheckpointCoordinator coordinator =
+                    new 
CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder()
+                            .setCheckpointCoordinatorConfiguration(coordConfig)
+                            .setTimer(checkpointTimer)
+                            .setClock(clock)
+                            .build(EXECUTOR_EXTENSION.getExecutor());
+            StateTrackingMockExecutionGraph graph =
+                    new StateTrackingMockExecutionGraph() {
+                        @Nullable
+                        @Override
+                        public CheckpointCoordinator 
getCheckpointCoordinator() {
+                            return coordinator;
+                        }
+
+                        @Override
+                        public Iterable<ExecutionJobVertex> 
getVerticesTopologically() {
+                            return Collections.singletonList(mejv);
+                        }
+                    };
+            ctx.setVertexParallelism(
+                    new VertexParallelism(
+                            graph.getAllVertices().values().stream()
+                                    .collect(
+                                            Collectors.toMap(
+                                                    
AccessExecutionJobVertex::getJobVertexId,
+                                                    v -> v.getParallelism() + 
1))));
+
+            Executing exec =
+                    new ExecutingStateBuilder()
+                            .setExecutionGraph(graph)
+                            .setActiveCheckpointTriggerEnabled(true)
+                            .build(ctx);
+            exec.requestActiveCheckpointTrigger();
+            ManuallyTriggeredComponentMainThreadExecutor ctxExecutor =
+                    (ManuallyTriggeredComponentMainThreadExecutor) 
ctx.getMainThreadExecutor();
+            assertThat(ctxExecutor.getActiveNonPeriodicScheduledTask())
+                    .as("A retry should be scheduled while min-pause is not 
satisfied")
+                    .isNotEmpty();
+
+            ctx.setExpectStopWithSavepoint(assertNonNull());
+            exec.stopWithSavepoint("file:///tmp/target", true, 
SavepointFormatType.CANONICAL);
+
+            ctxExecutor.triggerAll();
+            checkpointTimer.triggerAll();
+            int pendingAfterSavepoint = 
coordinator.getNumberOfPendingCheckpoints();
+
+            // Trigger the previously scheduled retry; it must be a no-op 
because the state
+            // transitioned to StopWithSavepoint (runIfState gates the action 
on
+            // hadStateTransition).
+            ctxExecutor.triggerNonPeriodicScheduledTasks();
+            checkpointTimer.triggerAll();
+            assertThat(coordinator.getNumberOfPendingCheckpoints())
+                    .as("Active checkpoint trigger retry must not fire after 
stopWithSavepoint")
+                    .isEqualTo(pendingAfterSavepoint);
+        }
+    }
+
+    @Test
+    void testActiveCheckpointTriggerSkipsWhenDisabled() throws Exception {
+        try (MockExecutingContext ctx = new MockExecutingContext()) {
+            final AtomicBoolean coordinatorAccessed = new AtomicBoolean(false);
+            CheckpointCoordinator coordinator =
+                    new 
CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder()
+                            .build(EXECUTOR_EXTENSION.getExecutor());
+            StateTrackingMockExecutionGraph graph =
+                    new StateTrackingMockExecutionGraph() {
+                        @Nullable
+                        @Override
+                        public CheckpointCoordinator 
getCheckpointCoordinator() {
+                            coordinatorAccessed.set(true);
+                            return coordinator;
+                        }
+                    };
+            Executing exec =
+                    new ExecutingStateBuilder()
+                            .setExecutionGraph(graph)
+                            .setActiveCheckpointTriggerEnabled(false)
+                            .build(ctx);
+
+            exec.requestActiveCheckpointTrigger();
+            assertThat(coordinatorAccessed.get()).isFalse();
+        }
+    }
+
+    @Test
+    void testActiveCheckpointTriggerSkipsWhenNoCoordinator() throws Exception {
+        try (MockExecutingContext ctx = new MockExecutingContext()) {
+            StateTrackingMockExecutionGraph graph =
+                    new StateTrackingMockExecutionGraph() {
+                        @Nullable
+                        @Override
+                        public CheckpointCoordinator 
getCheckpointCoordinator() {
+                            return null;
+                        }
+                    };
+
+            Executing exec =
+                    new ExecutingStateBuilder()
+                            .setExecutionGraph(graph)
+                            .setActiveCheckpointTriggerEnabled(true)
+                            .build(ctx);
+            ManuallyTriggeredComponentMainThreadExecutor ctxExecutor =
+                    (ManuallyTriggeredComponentMainThreadExecutor) 
ctx.getMainThreadExecutor();
+            int baseline = 
ctxExecutor.getActiveNonPeriodicScheduledTask().size();
+            exec.requestActiveCheckpointTrigger();
+            assertThat(ctxExecutor.getActiveNonPeriodicScheduledTask())

Review Comment:
   This test also succeeds if the existence of the checkpointCoordinator is not 
evaluated because we're not changing the parallelism.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManagerTest.java:
##########
@@ -400,6 +400,59 @@ private static void assertFinalStateTransitionHappened(
         assertThat(testInstance.getPhase()).isInstanceOf(Transitioning.class);
     }
 
+    @Test
+    void testActiveCheckpointTriggerCalledOnEnteringStabilizing() {
+        final TestingStateTransitionManagerContext ctx =
+                TestingStateTransitionManagerContext.stableContext();
+        ctx.withSufficientResources();
+        final DefaultStateTransitionManager testInstance =
+                ctx.createTestInstanceThatPassedCooldownPhase();
+
+        assertThat(testInstance.getPhase()).isInstanceOf(Idling.class);
+        ctx.clearActiveCheckpointTriggerCount();
+
+        testInstance.onChange(true);
+
+        assertThat(testInstance.getPhase()).isInstanceOf(Stabilizing.class);
+        
assertThat(ctx.getActiveCheckpointTriggerCount()).isGreaterThanOrEqualTo(1);
+    }
+
+    @Test
+    void testActiveCheckpointTriggerCalledOnChangeInStabilizing() {
+        final TestingStateTransitionManagerContext ctx =
+                TestingStateTransitionManagerContext.stableContext();
+        ctx.withSufficientResources();
+        final DefaultStateTransitionManager testInstance =
+                ctx.createTestInstanceThatPassedCooldownPhase();
+
+        testInstance.onChange(true);
+        assertThat(testInstance.getPhase()).isInstanceOf(Stabilizing.class);
+        ctx.clearActiveCheckpointTriggerCount();
+
+        testInstance.onChange(true);
+
+        assertThat(testInstance.getPhase()).isInstanceOf(Stabilizing.class);
+        
assertThat(ctx.getActiveCheckpointTriggerCount()).isGreaterThanOrEqualTo(1);

Review Comment:
   Wouldn't it be nicer to actually count the trigger calls instead of cleaning 
the variable up in between? We do two onChange calls so we expect the trigger 
to be called twice.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java:
##########
@@ -556,6 +562,517 @@ public CheckpointCoordinator getCheckpointCoordinator() {
         }
     }
 
+    @Test
+    void testActiveCheckpointTriggerRetryIsNoopAfterStopWithSavepoint() throws 
Exception {
+        try (MockExecutingContext ctx = new MockExecutingContext()) {
+            MockExecutionJobVertex mejv = new 
MockExecutionJobVertex(MockExecutionVertex::new);
+
+            ManuallyTriggeredScheduledExecutor checkpointTimer =
+                    new ManuallyTriggeredScheduledExecutor();
+            ManualClock clock = new ManualClock();
+            CheckpointCoordinatorConfiguration coordConfig =
+                    new CheckpointCoordinatorConfiguration
+                                    
.CheckpointCoordinatorConfigurationBuilder()
+                            .setCheckpointInterval(10_000L)
+                            .setMinPauseBetweenCheckpoints(10_000L)
+                            .setMaxConcurrentCheckpoints(Integer.MAX_VALUE)
+                            .build();
+            CheckpointCoordinator coordinator =
+                    new 
CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder()
+                            .setCheckpointCoordinatorConfiguration(coordConfig)
+                            .setTimer(checkpointTimer)
+                            .setClock(clock)
+                            .build(EXECUTOR_EXTENSION.getExecutor());
+            StateTrackingMockExecutionGraph graph =
+                    new StateTrackingMockExecutionGraph() {
+                        @Nullable
+                        @Override
+                        public CheckpointCoordinator 
getCheckpointCoordinator() {
+                            return coordinator;
+                        }
+
+                        @Override
+                        public Iterable<ExecutionJobVertex> 
getVerticesTopologically() {
+                            return Collections.singletonList(mejv);
+                        }
+                    };
+            ctx.setVertexParallelism(
+                    new VertexParallelism(
+                            graph.getAllVertices().values().stream()
+                                    .collect(
+                                            Collectors.toMap(
+                                                    
AccessExecutionJobVertex::getJobVertexId,
+                                                    v -> v.getParallelism() + 
1))));
+
+            Executing exec =
+                    new ExecutingStateBuilder()
+                            .setExecutionGraph(graph)
+                            .setActiveCheckpointTriggerEnabled(true)
+                            .build(ctx);
+            exec.requestActiveCheckpointTrigger();
+            ManuallyTriggeredComponentMainThreadExecutor ctxExecutor =
+                    (ManuallyTriggeredComponentMainThreadExecutor) 
ctx.getMainThreadExecutor();
+            assertThat(ctxExecutor.getActiveNonPeriodicScheduledTask())
+                    .as("A retry should be scheduled while min-pause is not 
satisfied")
+                    .isNotEmpty();
+
+            ctx.setExpectStopWithSavepoint(assertNonNull());
+            exec.stopWithSavepoint("file:///tmp/target", true, 
SavepointFormatType.CANONICAL);
+
+            ctxExecutor.triggerAll();
+            checkpointTimer.triggerAll();
+            int pendingAfterSavepoint = 
coordinator.getNumberOfPendingCheckpoints();
+
+            // Trigger the previously scheduled retry; it must be a no-op 
because the state
+            // transitioned to StopWithSavepoint (runIfState gates the action 
on
+            // hadStateTransition).
+            ctxExecutor.triggerNonPeriodicScheduledTasks();
+            checkpointTimer.triggerAll();
+            assertThat(coordinator.getNumberOfPendingCheckpoints())
+                    .as("Active checkpoint trigger retry must not fire after 
stopWithSavepoint")
+                    .isEqualTo(pendingAfterSavepoint);
+        }
+    }
+
+    @Test
+    void testActiveCheckpointTriggerSkipsWhenDisabled() throws Exception {
+        try (MockExecutingContext ctx = new MockExecutingContext()) {
+            final AtomicBoolean coordinatorAccessed = new AtomicBoolean(false);
+            CheckpointCoordinator coordinator =
+                    new 
CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder()
+                            .build(EXECUTOR_EXTENSION.getExecutor());
+            StateTrackingMockExecutionGraph graph =
+                    new StateTrackingMockExecutionGraph() {
+                        @Nullable
+                        @Override
+                        public CheckpointCoordinator 
getCheckpointCoordinator() {
+                            coordinatorAccessed.set(true);
+                            return coordinator;
+                        }
+                    };
+            Executing exec =
+                    new ExecutingStateBuilder()
+                            .setExecutionGraph(graph)
+                            .setActiveCheckpointTriggerEnabled(false)
+                            .build(ctx);
+
+            exec.requestActiveCheckpointTrigger();
+            assertThat(coordinatorAccessed.get()).isFalse();
+        }
+    }
+
+    @Test
+    void testActiveCheckpointTriggerSkipsWhenNoCoordinator() throws Exception {
+        try (MockExecutingContext ctx = new MockExecutingContext()) {
+            StateTrackingMockExecutionGraph graph =
+                    new StateTrackingMockExecutionGraph() {
+                        @Nullable
+                        @Override
+                        public CheckpointCoordinator 
getCheckpointCoordinator() {
+                            return null;
+                        }
+                    };
+
+            Executing exec =
+                    new ExecutingStateBuilder()
+                            .setExecutionGraph(graph)
+                            .setActiveCheckpointTriggerEnabled(true)
+                            .build(ctx);
+            ManuallyTriggeredComponentMainThreadExecutor ctxExecutor =
+                    (ManuallyTriggeredComponentMainThreadExecutor) 
ctx.getMainThreadExecutor();
+            int baseline = 
ctxExecutor.getActiveNonPeriodicScheduledTask().size();
+            exec.requestActiveCheckpointTrigger();
+            assertThat(ctxExecutor.getActiveNonPeriodicScheduledTask())
+                    .as("No checkpoint trigger should be scheduled when 
coordinator is null")
+                    .hasSize(baseline);
+        }
+    }
+
+    @Test
+    void 
testActiveCheckpointTriggerSkipsWhenPeriodicCheckpointingNotConfigured() throws 
Exception {
+        try (MockExecutingContext ctx = new MockExecutingContext()) {
+            MockExecutionJobVertex mejv = new 
MockExecutionJobVertex(MockExecutionVertex::new);
+
+            ManuallyTriggeredScheduledExecutor checkpointTimer =
+                    new ManuallyTriggeredScheduledExecutor();
+            CheckpointCoordinator coordinator =
+                    new 
CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder()
+                            .setCheckpointCoordinatorConfiguration(
+                                    new CheckpointCoordinatorConfiguration
+                                                    
.CheckpointCoordinatorConfigurationBuilder()
+                                            
.setCheckpointInterval(Long.MAX_VALUE)
+                                            .build())
+                            .setTimer(checkpointTimer)
+                            .build(EXECUTOR_EXTENSION.getExecutor());
+            StateTrackingMockExecutionGraph graph =
+                    new StateTrackingMockExecutionGraph() {
+                        @Nullable
+                        @Override
+                        public CheckpointCoordinator 
getCheckpointCoordinator() {
+                            return coordinator;
+                        }
+
+                        @Override
+                        public Iterable<ExecutionJobVertex> 
getVerticesTopologically() {
+                            return Collections.singletonList(mejv);
+                        }
+                    };
+            // Set up parallelism change so the only blocking guard is 
periodic checkpointing
+            ctx.setVertexParallelism(
+                    new VertexParallelism(
+                            graph.getAllVertices().values().stream()
+                                    .collect(
+                                            Collectors.toMap(
+                                                    
AccessExecutionJobVertex::getJobVertexId,
+                                                    v -> v.getParallelism() + 
1))));
+
+            Executing exec =
+                    new ExecutingStateBuilder()
+                            .setExecutionGraph(graph)
+                            .setActiveCheckpointTriggerEnabled(true)
+                            .build(ctx);
+
+            
assertThat(coordinator.isPeriodicCheckpointingConfigured()).isFalse();
+            exec.requestActiveCheckpointTrigger();
+            checkpointTimer.triggerAll();
+            assertThat(coordinator.getNumberOfPendingCheckpoints())
+                    .as(
+                            "No checkpoint should be triggered when periodic 
checkpointing is not configured")
+                    .isEqualTo(0);
+        }
+    }
+
+    @Test
+    void testActiveCheckpointTriggerSkipsWhenParallelismUnchanged() throws 
Exception {
+        try (MockExecutingContext ctx = new MockExecutingContext()) {
+            MockExecutionJobVertex mejv = new 
MockExecutionJobVertex(MockExecutionVertex::new);
+
+            ManuallyTriggeredScheduledExecutor checkpointTimer =
+                    new ManuallyTriggeredScheduledExecutor();
+            CheckpointCoordinator coordinator =
+                    new 
CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder()
+                            .setTimer(checkpointTimer)
+                            .build(EXECUTOR_EXTENSION.getExecutor());
+            StateTrackingMockExecutionGraph graph =
+                    new StateTrackingMockExecutionGraph() {
+                        @Nullable
+                        @Override
+                        public CheckpointCoordinator 
getCheckpointCoordinator() {
+                            return coordinator;
+                        }
+
+                        @Override
+                        public Iterable<ExecutionJobVertex> 
getVerticesTopologically() {
+                            return Collections.singletonList(mejv);
+                        }
+                    };
+            ctx.setVertexParallelism(
+                    new VertexParallelism(
+                            graph.getAllVertices().values().stream()
+                                    .collect(
+                                            Collectors.toMap(
+                                                    
AccessExecutionJobVertex::getJobVertexId,
+                                                    
AccessExecutionJobVertex::getParallelism))));
+
+            Executing exec =
+                    new ExecutingStateBuilder()
+                            .setExecutionGraph(graph)
+                            .setActiveCheckpointTriggerEnabled(true)
+                            .build(ctx);
+            exec.requestActiveCheckpointTrigger();

Review Comment:
   ```suggestion
               exec.requestActiveCheckpointTrigger();
               ((ManuallyTriggeredComponentMainThreadExecutor) 
ctx.getMainThreadExecutor()).triggerNonPeriodicScheduledTasks();
   ```
   same here, we're not testing the exact code path the test is suppose to 
verify.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java:
##########
@@ -556,6 +562,517 @@ public CheckpointCoordinator getCheckpointCoordinator() {
         }
     }
 
+    @Test
+    void testActiveCheckpointTriggerRetryIsNoopAfterStopWithSavepoint() throws 
Exception {
+        try (MockExecutingContext ctx = new MockExecutingContext()) {
+            MockExecutionJobVertex mejv = new 
MockExecutionJobVertex(MockExecutionVertex::new);
+
+            ManuallyTriggeredScheduledExecutor checkpointTimer =
+                    new ManuallyTriggeredScheduledExecutor();
+            ManualClock clock = new ManualClock();
+            CheckpointCoordinatorConfiguration coordConfig =
+                    new CheckpointCoordinatorConfiguration
+                                    
.CheckpointCoordinatorConfigurationBuilder()
+                            .setCheckpointInterval(10_000L)
+                            .setMinPauseBetweenCheckpoints(10_000L)
+                            .setMaxConcurrentCheckpoints(Integer.MAX_VALUE)
+                            .build();
+            CheckpointCoordinator coordinator =
+                    new 
CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder()
+                            .setCheckpointCoordinatorConfiguration(coordConfig)
+                            .setTimer(checkpointTimer)
+                            .setClock(clock)
+                            .build(EXECUTOR_EXTENSION.getExecutor());
+            StateTrackingMockExecutionGraph graph =
+                    new StateTrackingMockExecutionGraph() {
+                        @Nullable
+                        @Override
+                        public CheckpointCoordinator 
getCheckpointCoordinator() {
+                            return coordinator;
+                        }
+
+                        @Override
+                        public Iterable<ExecutionJobVertex> 
getVerticesTopologically() {
+                            return Collections.singletonList(mejv);
+                        }
+                    };
+            ctx.setVertexParallelism(
+                    new VertexParallelism(
+                            graph.getAllVertices().values().stream()
+                                    .collect(
+                                            Collectors.toMap(
+                                                    
AccessExecutionJobVertex::getJobVertexId,
+                                                    v -> v.getParallelism() + 
1))));
+
+            Executing exec =
+                    new ExecutingStateBuilder()
+                            .setExecutionGraph(graph)
+                            .setActiveCheckpointTriggerEnabled(true)
+                            .build(ctx);
+            exec.requestActiveCheckpointTrigger();
+            ManuallyTriggeredComponentMainThreadExecutor ctxExecutor =
+                    (ManuallyTriggeredComponentMainThreadExecutor) 
ctx.getMainThreadExecutor();
+            assertThat(ctxExecutor.getActiveNonPeriodicScheduledTask())
+                    .as("A retry should be scheduled while min-pause is not 
satisfied")
+                    .isNotEmpty();
+
+            ctx.setExpectStopWithSavepoint(assertNonNull());
+            exec.stopWithSavepoint("file:///tmp/target", true, 
SavepointFormatType.CANONICAL);
+
+            ctxExecutor.triggerAll();
+            checkpointTimer.triggerAll();
+            int pendingAfterSavepoint = 
coordinator.getNumberOfPendingCheckpoints();
+
+            // Trigger the previously scheduled retry; it must be a no-op 
because the state
+            // transitioned to StopWithSavepoint (runIfState gates the action 
on
+            // hadStateTransition).
+            ctxExecutor.triggerNonPeriodicScheduledTasks();
+            checkpointTimer.triggerAll();
+            assertThat(coordinator.getNumberOfPendingCheckpoints())
+                    .as("Active checkpoint trigger retry must not fire after 
stopWithSavepoint")
+                    .isEqualTo(pendingAfterSavepoint);
+        }
+    }
+
+    @Test
+    void testActiveCheckpointTriggerSkipsWhenDisabled() throws Exception {
+        try (MockExecutingContext ctx = new MockExecutingContext()) {
+            final AtomicBoolean coordinatorAccessed = new AtomicBoolean(false);
+            CheckpointCoordinator coordinator =
+                    new 
CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder()
+                            .build(EXECUTOR_EXTENSION.getExecutor());
+            StateTrackingMockExecutionGraph graph =
+                    new StateTrackingMockExecutionGraph() {
+                        @Nullable
+                        @Override
+                        public CheckpointCoordinator 
getCheckpointCoordinator() {
+                            coordinatorAccessed.set(true);
+                            return coordinator;
+                        }
+                    };
+            Executing exec =
+                    new ExecutingStateBuilder()
+                            .setExecutionGraph(graph)
+                            .setActiveCheckpointTriggerEnabled(false)
+                            .build(ctx);
+
+            exec.requestActiveCheckpointTrigger();
+            assertThat(coordinatorAccessed.get()).isFalse();
+        }
+    }
+
+    @Test
+    void testActiveCheckpointTriggerSkipsWhenNoCoordinator() throws Exception {
+        try (MockExecutingContext ctx = new MockExecutingContext()) {
+            StateTrackingMockExecutionGraph graph =
+                    new StateTrackingMockExecutionGraph() {
+                        @Nullable
+                        @Override
+                        public CheckpointCoordinator 
getCheckpointCoordinator() {
+                            return null;
+                        }
+                    };
+
+            Executing exec =
+                    new ExecutingStateBuilder()
+                            .setExecutionGraph(graph)
+                            .setActiveCheckpointTriggerEnabled(true)
+                            .build(ctx);
+            ManuallyTriggeredComponentMainThreadExecutor ctxExecutor =
+                    (ManuallyTriggeredComponentMainThreadExecutor) 
ctx.getMainThreadExecutor();
+            int baseline = 
ctxExecutor.getActiveNonPeriodicScheduledTask().size();
+            exec.requestActiveCheckpointTrigger();
+            assertThat(ctxExecutor.getActiveNonPeriodicScheduledTask())
+                    .as("No checkpoint trigger should be scheduled when 
coordinator is null")
+                    .hasSize(baseline);
+        }
+    }
+
+    @Test
+    void 
testActiveCheckpointTriggerSkipsWhenPeriodicCheckpointingNotConfigured() throws 
Exception {
+        try (MockExecutingContext ctx = new MockExecutingContext()) {
+            MockExecutionJobVertex mejv = new 
MockExecutionJobVertex(MockExecutionVertex::new);
+
+            ManuallyTriggeredScheduledExecutor checkpointTimer =
+                    new ManuallyTriggeredScheduledExecutor();
+            CheckpointCoordinator coordinator =
+                    new 
CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder()
+                            .setCheckpointCoordinatorConfiguration(
+                                    new CheckpointCoordinatorConfiguration
+                                                    
.CheckpointCoordinatorConfigurationBuilder()
+                                            
.setCheckpointInterval(Long.MAX_VALUE)
+                                            .build())
+                            .setTimer(checkpointTimer)
+                            .build(EXECUTOR_EXTENSION.getExecutor());
+            StateTrackingMockExecutionGraph graph =
+                    new StateTrackingMockExecutionGraph() {
+                        @Nullable
+                        @Override
+                        public CheckpointCoordinator 
getCheckpointCoordinator() {
+                            return coordinator;
+                        }
+
+                        @Override
+                        public Iterable<ExecutionJobVertex> 
getVerticesTopologically() {
+                            return Collections.singletonList(mejv);
+                        }
+                    };
+            // Set up parallelism change so the only blocking guard is 
periodic checkpointing
+            ctx.setVertexParallelism(
+                    new VertexParallelism(
+                            graph.getAllVertices().values().stream()
+                                    .collect(
+                                            Collectors.toMap(
+                                                    
AccessExecutionJobVertex::getJobVertexId,
+                                                    v -> v.getParallelism() + 
1))));
+
+            Executing exec =
+                    new ExecutingStateBuilder()
+                            .setExecutionGraph(graph)
+                            .setActiveCheckpointTriggerEnabled(true)
+                            .build(ctx);
+
+            
assertThat(coordinator.isPeriodicCheckpointingConfigured()).isFalse();
+            exec.requestActiveCheckpointTrigger();
+            checkpointTimer.triggerAll();
+            assertThat(coordinator.getNumberOfPendingCheckpoints())
+                    .as(
+                            "No checkpoint should be triggered when periodic 
checkpointing is not configured")
+                    .isEqualTo(0);
+        }
+    }
+
+    @Test
+    void testActiveCheckpointTriggerSkipsWhenParallelismUnchanged() throws 
Exception {
+        try (MockExecutingContext ctx = new MockExecutingContext()) {
+            MockExecutionJobVertex mejv = new 
MockExecutionJobVertex(MockExecutionVertex::new);
+
+            ManuallyTriggeredScheduledExecutor checkpointTimer =
+                    new ManuallyTriggeredScheduledExecutor();
+            CheckpointCoordinator coordinator =
+                    new 
CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder()
+                            .setTimer(checkpointTimer)
+                            .build(EXECUTOR_EXTENSION.getExecutor());
+            StateTrackingMockExecutionGraph graph =
+                    new StateTrackingMockExecutionGraph() {
+                        @Nullable
+                        @Override
+                        public CheckpointCoordinator 
getCheckpointCoordinator() {
+                            return coordinator;
+                        }
+
+                        @Override
+                        public Iterable<ExecutionJobVertex> 
getVerticesTopologically() {
+                            return Collections.singletonList(mejv);
+                        }
+                    };
+            ctx.setVertexParallelism(
+                    new VertexParallelism(
+                            graph.getAllVertices().values().stream()
+                                    .collect(
+                                            Collectors.toMap(
+                                                    
AccessExecutionJobVertex::getJobVertexId,
+                                                    
AccessExecutionJobVertex::getParallelism))));
+
+            Executing exec =
+                    new ExecutingStateBuilder()
+                            .setExecutionGraph(graph)
+                            .setActiveCheckpointTriggerEnabled(true)
+                            .build(ctx);
+            exec.requestActiveCheckpointTrigger();
+            checkpointTimer.triggerAll();
+            assertThat(coordinator.getNumberOfPendingCheckpoints())
+                    .as("No checkpoint should be triggered when parallelism is 
unchanged")
+                    .isEqualTo(0);
+        }
+    }
+
+    @Test
+    void testActiveCheckpointTriggerSkipsWhenCheckpointInProgress() throws 
Exception {
+        try (MockExecutingContext ctx = new MockExecutingContext()) {
+            MockExecutionJobVertex mejv = new 
MockExecutionJobVertex(MockExecutionVertex::new);
+
+            ManuallyTriggeredScheduledExecutor checkpointTimer =
+                    new ManuallyTriggeredScheduledExecutor();
+            CheckpointCoordinator coordinator =
+                    new 
CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder()
+                            .setTimer(checkpointTimer)
+                            .build(EXECUTOR_EXTENSION.getExecutor());
+            StateTrackingMockExecutionGraph graph =
+                    new StateTrackingMockExecutionGraph() {
+                        @Nullable
+                        @Override
+                        public CheckpointCoordinator 
getCheckpointCoordinator() {
+                            return coordinator;
+                        }
+
+                        @Override
+                        public Iterable<ExecutionJobVertex> 
getVerticesTopologically() {
+                            return Collections.singletonList(mejv);
+                        }
+                    };
+
+            ctx.setVertexParallelism(
+                    new VertexParallelism(
+                            graph.getAllVertices().values().stream()
+                                    .collect(
+                                            Collectors.toMap(
+                                                    
AccessExecutionJobVertex::getJobVertexId,
+                                                    v -> v.getParallelism() + 
1))));
+
+            Executing exec =
+                    new ExecutingStateBuilder()
+                            .setExecutionGraph(graph)
+                            .setActiveCheckpointTriggerEnabled(true)
+                            .build(ctx);
+            coordinator.triggerCheckpoint(false);
+            checkpointTimer.triggerAll();
+
+            int pendingBefore = coordinator.getNumberOfPendingCheckpoints();
+            assertThat(pendingBefore).isGreaterThan(0);
+            exec.requestActiveCheckpointTrigger();

Review Comment:
   we're missing the state transition here as well



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java:
##########
@@ -556,6 +562,517 @@ public CheckpointCoordinator getCheckpointCoordinator() {
         }
     }
 
+    @Test
+    void testActiveCheckpointTriggerRetryIsNoopAfterStopWithSavepoint() throws 
Exception {
+        try (MockExecutingContext ctx = new MockExecutingContext()) {
+            MockExecutionJobVertex mejv = new 
MockExecutionJobVertex(MockExecutionVertex::new);
+
+            ManuallyTriggeredScheduledExecutor checkpointTimer =
+                    new ManuallyTriggeredScheduledExecutor();
+            ManualClock clock = new ManualClock();
+            CheckpointCoordinatorConfiguration coordConfig =
+                    new CheckpointCoordinatorConfiguration
+                                    
.CheckpointCoordinatorConfigurationBuilder()
+                            .setCheckpointInterval(10_000L)
+                            .setMinPauseBetweenCheckpoints(10_000L)
+                            .setMaxConcurrentCheckpoints(Integer.MAX_VALUE)
+                            .build();
+            CheckpointCoordinator coordinator =
+                    new 
CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder()
+                            .setCheckpointCoordinatorConfiguration(coordConfig)
+                            .setTimer(checkpointTimer)
+                            .setClock(clock)
+                            .build(EXECUTOR_EXTENSION.getExecutor());
+            StateTrackingMockExecutionGraph graph =
+                    new StateTrackingMockExecutionGraph() {
+                        @Nullable
+                        @Override
+                        public CheckpointCoordinator 
getCheckpointCoordinator() {
+                            return coordinator;
+                        }
+
+                        @Override
+                        public Iterable<ExecutionJobVertex> 
getVerticesTopologically() {
+                            return Collections.singletonList(mejv);
+                        }
+                    };
+            ctx.setVertexParallelism(
+                    new VertexParallelism(
+                            graph.getAllVertices().values().stream()
+                                    .collect(
+                                            Collectors.toMap(
+                                                    
AccessExecutionJobVertex::getJobVertexId,
+                                                    v -> v.getParallelism() + 
1))));
+
+            Executing exec =
+                    new ExecutingStateBuilder()
+                            .setExecutionGraph(graph)
+                            .setActiveCheckpointTriggerEnabled(true)
+                            .build(ctx);
+            exec.requestActiveCheckpointTrigger();
+            ManuallyTriggeredComponentMainThreadExecutor ctxExecutor =
+                    (ManuallyTriggeredComponentMainThreadExecutor) 
ctx.getMainThreadExecutor();
+            assertThat(ctxExecutor.getActiveNonPeriodicScheduledTask())
+                    .as("A retry should be scheduled while min-pause is not 
satisfied")
+                    .isNotEmpty();
+
+            ctx.setExpectStopWithSavepoint(assertNonNull());
+            exec.stopWithSavepoint("file:///tmp/target", true, 
SavepointFormatType.CANONICAL);
+
+            ctxExecutor.triggerAll();
+            checkpointTimer.triggerAll();
+            int pendingAfterSavepoint = 
coordinator.getNumberOfPendingCheckpoints();
+
+            // Trigger the previously scheduled retry; it must be a no-op 
because the state
+            // transitioned to StopWithSavepoint (runIfState gates the action 
on
+            // hadStateTransition).
+            ctxExecutor.triggerNonPeriodicScheduledTasks();
+            checkpointTimer.triggerAll();
+            assertThat(coordinator.getNumberOfPendingCheckpoints())
+                    .as("Active checkpoint trigger retry must not fire after 
stopWithSavepoint")
+                    .isEqualTo(pendingAfterSavepoint);
+        }
+    }
+
+    @Test
+    void testActiveCheckpointTriggerSkipsWhenDisabled() throws Exception {
+        try (MockExecutingContext ctx = new MockExecutingContext()) {
+            final AtomicBoolean coordinatorAccessed = new AtomicBoolean(false);
+            CheckpointCoordinator coordinator =
+                    new 
CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder()
+                            .build(EXECUTOR_EXTENSION.getExecutor());
+            StateTrackingMockExecutionGraph graph =
+                    new StateTrackingMockExecutionGraph() {
+                        @Nullable
+                        @Override
+                        public CheckpointCoordinator 
getCheckpointCoordinator() {
+                            coordinatorAccessed.set(true);
+                            return coordinator;
+                        }
+                    };
+            Executing exec =
+                    new ExecutingStateBuilder()
+                            .setExecutionGraph(graph)
+                            .setActiveCheckpointTriggerEnabled(false)
+                            .build(ctx);
+
+            exec.requestActiveCheckpointTrigger();
+            assertThat(coordinatorAccessed.get()).isFalse();
+        }
+    }
+
+    @Test
+    void testActiveCheckpointTriggerSkipsWhenNoCoordinator() throws Exception {
+        try (MockExecutingContext ctx = new MockExecutingContext()) {
+            StateTrackingMockExecutionGraph graph =
+                    new StateTrackingMockExecutionGraph() {
+                        @Nullable
+                        @Override
+                        public CheckpointCoordinator 
getCheckpointCoordinator() {
+                            return null;
+                        }
+                    };
+
+            Executing exec =
+                    new ExecutingStateBuilder()
+                            .setExecutionGraph(graph)
+                            .setActiveCheckpointTriggerEnabled(true)
+                            .build(ctx);
+            ManuallyTriggeredComponentMainThreadExecutor ctxExecutor =
+                    (ManuallyTriggeredComponentMainThreadExecutor) 
ctx.getMainThreadExecutor();
+            int baseline = 
ctxExecutor.getActiveNonPeriodicScheduledTask().size();
+            exec.requestActiveCheckpointTrigger();
+            assertThat(ctxExecutor.getActiveNonPeriodicScheduledTask())
+                    .as("No checkpoint trigger should be scheduled when 
coordinator is null")
+                    .hasSize(baseline);
+        }
+    }
+
+    @Test
+    void 
testActiveCheckpointTriggerSkipsWhenPeriodicCheckpointingNotConfigured() throws 
Exception {
+        try (MockExecutingContext ctx = new MockExecutingContext()) {
+            MockExecutionJobVertex mejv = new 
MockExecutionJobVertex(MockExecutionVertex::new);
+
+            ManuallyTriggeredScheduledExecutor checkpointTimer =
+                    new ManuallyTriggeredScheduledExecutor();
+            CheckpointCoordinator coordinator =
+                    new 
CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder()
+                            .setCheckpointCoordinatorConfiguration(
+                                    new CheckpointCoordinatorConfiguration
+                                                    
.CheckpointCoordinatorConfigurationBuilder()
+                                            
.setCheckpointInterval(Long.MAX_VALUE)
+                                            .build())
+                            .setTimer(checkpointTimer)
+                            .build(EXECUTOR_EXTENSION.getExecutor());
+            StateTrackingMockExecutionGraph graph =
+                    new StateTrackingMockExecutionGraph() {
+                        @Nullable
+                        @Override
+                        public CheckpointCoordinator 
getCheckpointCoordinator() {
+                            return coordinator;
+                        }
+
+                        @Override
+                        public Iterable<ExecutionJobVertex> 
getVerticesTopologically() {
+                            return Collections.singletonList(mejv);
+                        }
+                    };
+            // Set up parallelism change so the only blocking guard is 
periodic checkpointing
+            ctx.setVertexParallelism(
+                    new VertexParallelism(
+                            graph.getAllVertices().values().stream()
+                                    .collect(
+                                            Collectors.toMap(
+                                                    
AccessExecutionJobVertex::getJobVertexId,
+                                                    v -> v.getParallelism() + 
1))));
+
+            Executing exec =
+                    new ExecutingStateBuilder()
+                            .setExecutionGraph(graph)
+                            .setActiveCheckpointTriggerEnabled(true)
+                            .build(ctx);
+
+            
assertThat(coordinator.isPeriodicCheckpointingConfigured()).isFalse();
+            exec.requestActiveCheckpointTrigger();
+            checkpointTimer.triggerAll();
+            assertThat(coordinator.getNumberOfPendingCheckpoints())
+                    .as(
+                            "No checkpoint should be triggered when periodic 
checkpointing is not configured")
+                    .isEqualTo(0);
+        }
+    }
+
+    @Test
+    void testActiveCheckpointTriggerSkipsWhenParallelismUnchanged() throws 
Exception {
+        try (MockExecutingContext ctx = new MockExecutingContext()) {
+            MockExecutionJobVertex mejv = new 
MockExecutionJobVertex(MockExecutionVertex::new);
+
+            ManuallyTriggeredScheduledExecutor checkpointTimer =
+                    new ManuallyTriggeredScheduledExecutor();
+            CheckpointCoordinator coordinator =
+                    new 
CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder()
+                            .setTimer(checkpointTimer)
+                            .build(EXECUTOR_EXTENSION.getExecutor());
+            StateTrackingMockExecutionGraph graph =
+                    new StateTrackingMockExecutionGraph() {
+                        @Nullable
+                        @Override
+                        public CheckpointCoordinator 
getCheckpointCoordinator() {
+                            return coordinator;
+                        }
+
+                        @Override
+                        public Iterable<ExecutionJobVertex> 
getVerticesTopologically() {
+                            return Collections.singletonList(mejv);
+                        }
+                    };
+            ctx.setVertexParallelism(
+                    new VertexParallelism(
+                            graph.getAllVertices().values().stream()
+                                    .collect(
+                                            Collectors.toMap(
+                                                    
AccessExecutionJobVertex::getJobVertexId,
+                                                    
AccessExecutionJobVertex::getParallelism))));
+
+            Executing exec =
+                    new ExecutingStateBuilder()
+                            .setExecutionGraph(graph)
+                            .setActiveCheckpointTriggerEnabled(true)
+                            .build(ctx);
+            exec.requestActiveCheckpointTrigger();
+            checkpointTimer.triggerAll();
+            assertThat(coordinator.getNumberOfPendingCheckpoints())
+                    .as("No checkpoint should be triggered when parallelism is 
unchanged")
+                    .isEqualTo(0);
+        }
+    }
+
+    @Test
+    void testActiveCheckpointTriggerSkipsWhenCheckpointInProgress() throws 
Exception {

Review Comment:
   This test passes because no parallelism change is reocrded.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to