This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c869326d089 [FLINK-36011] [runtime] Generalize RescaleManager to 
become StateTransitionManager
c869326d089 is described below

commit c869326d089705475481c2c2ea42a6efabb8c828
Author: Zdenek Tison <[email protected]>
AuthorDate: Fri Jul 12 15:01:55 2024 +0200

    [FLINK-36011] [runtime] Generalize RescaleManager to become 
StateTransitionManager
---
 .../scheduler/adaptive/AdaptiveScheduler.java      |  10 +-
 .../scheduler/adaptive/DefaultRescaleManager.java  | 232 -------
 .../adaptive/DefaultStateTransitionManager.java    | 434 +++++++++++++
 .../runtime/scheduler/adaptive/Executing.java      |  38 +-
 ...aleManager.java => StateTransitionManager.java} |  39 +-
 .../adaptive/AdaptiveSchedulerBuilder.java         |  14 +-
 .../scheduler/adaptive/AdaptiveSchedulerTest.java  |   4 +-
 .../adaptive/DefaultRescaleManagerTest.java        | 675 -------------------
 .../DefaultStateTransitionManagerTest.java         | 722 +++++++++++++++++++++
 .../runtime/scheduler/adaptive/ExecutingTest.java  |  41 +-
 ...ger.java => TestingStateTransitionManager.java} |  16 +-
 11 files changed, 1250 insertions(+), 975 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index 2fd7b694b73..ebcfc369c82 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -349,7 +349,7 @@ public class AdaptiveScheduler
     }
 
     private final Settings settings;
-    private final RescaleManager.Factory rescaleManagerFactory;
+    private final StateTransitionManager.Factory stateTransitionManagerFactory;
 
     private final JobGraph jobGraph;
 
@@ -427,7 +427,7 @@ public class AdaptiveScheduler
             throws JobExecutionException {
         this(
                 settings,
-                DefaultRescaleManager.Factory.fromSettings(settings),
+                DefaultStateTransitionManager.Factory.fromSettings(settings),
                 (metricGroup, checkpointStatsListener) ->
                         new DefaultCheckpointStatsTracker(
                                 
configuration.get(WebOptions.CHECKPOINTS_HISTORY_SIZE),
@@ -455,7 +455,7 @@ public class AdaptiveScheduler
     @VisibleForTesting
     AdaptiveScheduler(
             Settings settings,
-            RescaleManager.Factory rescaleManagerFactory,
+            StateTransitionManager.Factory stateTransitionManagerFactory,
             BiFunction<JobManagerJobMetricGroup, CheckpointStatsListener, 
CheckpointStatsTracker>
                     checkpointStatsTrackerFactory,
             JobGraph jobGraph,
@@ -480,7 +480,7 @@ public class AdaptiveScheduler
         assertPreconditions(jobGraph);
 
         this.settings = settings;
-        this.rescaleManagerFactory = rescaleManagerFactory;
+        this.stateTransitionManagerFactory = stateTransitionManagerFactory;
 
         this.jobGraph = jobGraph;
         this.jobInfo = new JobInfoImpl(jobGraph.getJobID(), 
jobGraph.getName());
@@ -1175,7 +1175,7 @@ public class AdaptiveScheduler
                         this,
                         userCodeClassLoader,
                         failureCollection,
-                        rescaleManagerFactory,
+                        stateTransitionManagerFactory,
                         settings.getMinParallelismChangeForDesiredRescale(),
                         settings.getRescaleOnFailedCheckpointCount()));
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultRescaleManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultRescaleManager.java
deleted file mode 100644
index 0b0fc013357..00000000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultRescaleManager.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.scheduler.adaptive;
-
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.concurrent.FutureUtils;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-import javax.annotation.concurrent.NotThreadSafe;
-
-import java.time.Duration;
-import java.time.Instant;
-import java.time.temporal.Temporal;
-import java.util.concurrent.CompletableFuture;
-import java.util.function.Supplier;
-
-/**
- * {@code DefaultRescaleManager} manages triggering the next rescaling based 
on when the previous
- * rescale operation happened and the available resources. It handles the 
event based on the
- * following phases (in that order):
- *
- * <ol>
- *   <li>Cooldown phase: No rescaling takes place (its upper threshold is 
defined by {@code
- *       scalingIntervalMin}.
- *   <li>Soft-rescaling phase: Rescaling is triggered if the desired amount of 
resources is
- *       available.
- *   <li>Hard-rescaling phase: Rescaling is triggered if a sufficient amount 
of resources is
- *       available (its lower threshold is defined by (@code 
scalingIntervalMax}).
- * </ol>
- *
- * <p>Thread-safety: This class is not implemented in a thread-safe manner and 
relies on the fact
- * that any method call happens within a single thread.
- *
- * @see Executing
- */
-@NotThreadSafe
-public class DefaultRescaleManager implements RescaleManager {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(DefaultRescaleManager.class);
-
-    private final Temporal initializationTime;
-    private final Supplier<Temporal> clock;
-
-    @VisibleForTesting final Duration scalingIntervalMin;
-    @VisibleForTesting @Nullable final Duration scalingIntervalMax;
-
-    private final RescaleManager.Context rescaleContext;
-
-    private boolean rescaleScheduled = false;
-
-    @VisibleForTesting final Duration maxTriggerDelay;
-
-    /**
-     * {@code triggerFuture} is used to allow triggering a scheduled callback. 
Rather than
-     * scheduling the callback itself, the callback is just chained with the 
future. The completion
-     * of the future is then scheduled which will, as a consequence, run the 
callback as part of the
-     * scheduled operation.
-     *
-     * <p>{@code triggerFuture} can be used to trigger the callback even 
earlier (before the
-     * scheduled delay has passed). See {@link #onTrigger()}.
-     */
-    private CompletableFuture<Void> triggerFuture;
-
-    DefaultRescaleManager(
-            Temporal initializationTime,
-            RescaleManager.Context rescaleContext,
-            Duration scalingIntervalMin,
-            @Nullable Duration scalingIntervalMax,
-            Duration maxTriggerDelay) {
-        this(
-                initializationTime,
-                Instant::now,
-                rescaleContext,
-                scalingIntervalMin,
-                scalingIntervalMax,
-                maxTriggerDelay);
-    }
-
-    @VisibleForTesting
-    DefaultRescaleManager(
-            Temporal initializationTime,
-            Supplier<Temporal> clock,
-            RescaleManager.Context rescaleContext,
-            Duration scalingIntervalMin,
-            @Nullable Duration scalingIntervalMax,
-            Duration maxTriggerDelay) {
-        this.initializationTime = initializationTime;
-        this.clock = clock;
-
-        this.maxTriggerDelay = maxTriggerDelay;
-        this.triggerFuture = FutureUtils.completedVoidFuture();
-
-        Preconditions.checkArgument(
-                scalingIntervalMax == null || 
scalingIntervalMin.compareTo(scalingIntervalMax) <= 0,
-                "scalingIntervalMax should at least match or be longer than 
scalingIntervalMin.");
-        this.scalingIntervalMin = scalingIntervalMin;
-        this.scalingIntervalMax = scalingIntervalMax;
-
-        this.rescaleContext = rescaleContext;
-    }
-
-    @Override
-    public void onChange() {
-        if (this.triggerFuture.isDone()) {
-            this.triggerFuture = 
scheduleOperationWithTrigger(this::evaluateChangeEvent);
-        }
-    }
-
-    @Override
-    public void onTrigger() {
-        if (!this.triggerFuture.isDone()) {
-            this.triggerFuture.complete(null);
-            LOG.debug(
-                    "A rescale trigger event was observed causing the rescale 
verification logic to be initiated.");
-        } else {
-            LOG.debug(
-                    "A rescale trigger event was observed outside of a rescale 
cycle. No action taken.");
-        }
-    }
-
-    private void evaluateChangeEvent() {
-        if (timeSinceLastRescale().compareTo(scalingIntervalMin) > 0) {
-            maybeRescale();
-        } else if (!rescaleScheduled) {
-            rescaleScheduled = true;
-            rescaleContext.scheduleOperation(this::maybeRescale, 
scalingIntervalMin);
-        }
-    }
-
-    private CompletableFuture<Void> scheduleOperationWithTrigger(Runnable 
callback) {
-        final CompletableFuture<Void> triggerFuture = new 
CompletableFuture<>();
-        triggerFuture.thenRun(callback);
-        this.rescaleContext.scheduleOperation(
-                () -> triggerFuture.complete(null), this.maxTriggerDelay);
-
-        return triggerFuture;
-    }
-
-    private Duration timeSinceLastRescale() {
-        return Duration.between(this.initializationTime, clock.get());
-    }
-
-    private void maybeRescale() {
-        rescaleScheduled = false;
-        if (rescaleContext.hasDesiredResources()) {
-            LOG.info("Desired parallelism for job was reached: Rescaling will 
be triggered.");
-            rescaleContext.rescale();
-        } else if (scalingIntervalMax != null) {
-            LOG.info(
-                    "The longer the pipeline runs, the more the (small) 
resource gain is worth the restarting time. "
-                            + "Last resource added does not meet the 
configured minimal parallelism change. Forced rescaling will be triggered after 
{} if the resource is still there.",
-                    scalingIntervalMax);
-
-            // reasoning for inconsistent scheduling:
-            // https://lists.apache.org/thread/m2w2xzfjpxlw63j0k7tfxfgs0rshhwwr
-            if (timeSinceLastRescale().compareTo(scalingIntervalMax) > 0) {
-                rescaleWithSufficientResources();
-            } else {
-                rescaleContext.scheduleOperation(
-                        this::rescaleWithSufficientResources, 
scalingIntervalMax);
-            }
-        }
-    }
-
-    private void rescaleWithSufficientResources() {
-        if (rescaleContext.hasSufficientResources()) {
-            LOG.info(
-                    "Resources for desired job parallelism couldn't be 
collected after {}: Rescaling will be enforced.",
-                    scalingIntervalMax);
-            rescaleContext.rescale();
-        }
-    }
-
-    public static class Factory implements RescaleManager.Factory {
-
-        private final Duration scalingIntervalMin;
-        @Nullable private final Duration scalingIntervalMax;
-        private final Duration maximumDelayForTrigger;
-
-        /**
-         * Creates a {@code Factory} instance based on the {@link 
AdaptiveScheduler}'s {@code
-         * Settings} for rescaling.
-         */
-        public static Factory fromSettings(AdaptiveScheduler.Settings 
settings) {
-            // it's not ideal that we use a AdaptiveScheduler internal class 
here. We might want to
-            // change that as part of a more general alignment of the 
rescaling configuration.
-            return new Factory(
-                    settings.getScalingIntervalMin(),
-                    settings.getScalingIntervalMax(),
-                    settings.getMaximumDelayForTriggeringRescale());
-        }
-
-        private Factory(
-                Duration scalingIntervalMin,
-                @Nullable Duration scalingIntervalMax,
-                Duration maximumDelayForTrigger) {
-            this.scalingIntervalMin = scalingIntervalMin;
-            this.scalingIntervalMax = scalingIntervalMax;
-            this.maximumDelayForTrigger = maximumDelayForTrigger;
-        }
-
-        @Override
-        public DefaultRescaleManager create(Context rescaleContext, Instant 
lastRescale) {
-            return new DefaultRescaleManager(
-                    lastRescale,
-                    rescaleContext,
-                    scalingIntervalMin,
-                    scalingIntervalMax,
-                    maximumDelayForTrigger);
-        }
-    }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManager.java
new file mode 100644
index 00000000000..aa1ea2c965d
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManager.java
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.time.temporal.Temporal;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ScheduledFuture;
+import java.util.function.Supplier;
+
+/**
+ * {@code DefaultStateTransitionManager} is a state machine which manages the 
{@link
+ * AdaptiveScheduler}'s state transitions based on the previous transition 
time and the available
+ * resources. See {@link Phase} for details on each individual phase of this 
state machine. Note: We
+ * use the term phase here to avoid confusion with the state used in the 
{@link AdaptiveScheduler}.
+ *
+ * <pre>
+ * {@link Cooldown}
+ *   |
+ *   +--> {@link Idling}
+ *   |      |
+ *   |      V
+ *   +--> {@link Stabilizing}
+ *          |
+ *          +--> {@link Stabilized} --> {@link Idling}
+ *          |      |
+ *          |      V
+ *          \--> {@link Transitioning}
+ * </pre>
+ *
+ * <p>Thread-safety: This class is not implemented in a thread-safe manner and 
relies on the fact
+ * that any method call happens within a single thread.
+ *
+ * @see Executing
+ */
+@NotThreadSafe
+public class DefaultStateTransitionManager implements StateTransitionManager {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(DefaultStateTransitionManager.class);
+
+    private final Supplier<Temporal> clock;
+    private final StateTransitionManager.Context transitionContext;
+    private Phase phase;
+    private final List<ScheduledFuture<?>> scheduledFutures;
+
+    @VisibleForTesting final Duration cooldownTimeout;
+    @Nullable @VisibleForTesting final Duration resourceStabilizationTimeout;
+    @VisibleForTesting final Duration maxTriggerDelay;
+
+    DefaultStateTransitionManager(
+            Temporal initializationTime,
+            StateTransitionManager.Context transitionContext,
+            Duration cooldownTimeout,
+            @Nullable Duration resourceStabilizationTimeout,
+            Duration maxTriggerDelay) {
+        this(
+                initializationTime,
+                Instant::now,
+                transitionContext,
+                cooldownTimeout,
+                resourceStabilizationTimeout,
+                maxTriggerDelay);
+    }
+
+    @VisibleForTesting
+    DefaultStateTransitionManager(
+            Temporal initializationTime,
+            Supplier<Temporal> clock,
+            StateTransitionManager.Context transitionContext,
+            Duration cooldownTimeout,
+            @Nullable Duration resourceStabilizationTimeout,
+            Duration maxTriggerDelay) {
+
+        this.clock = clock;
+        this.maxTriggerDelay = maxTriggerDelay;
+        this.cooldownTimeout = cooldownTimeout;
+        this.resourceStabilizationTimeout = resourceStabilizationTimeout;
+        this.transitionContext = transitionContext;
+        this.scheduledFutures = new ArrayList<>();
+        this.phase = new Cooldown(initializationTime, clock, this, 
cooldownTimeout);
+    }
+
+    @Override
+    public void onChange() {
+        phase.onChange();
+    }
+
+    @Override
+    public void onTrigger() {
+        phase.onTrigger();
+    }
+
+    @Override
+    public void close() {
+        scheduledFutures.forEach(future -> future.cancel(true));
+        scheduledFutures.clear();
+    }
+
+    @VisibleForTesting
+    Phase getPhase() {
+        return phase;
+    }
+
+    private void progressToIdling() {
+        progressToPhase(new Idling(clock, this));
+    }
+
+    private void progressToStabilizing(Temporal firstChangeEventTimestamp) {
+        progressToPhase(
+                new Stabilizing(
+                        clock,
+                        this,
+                        resourceStabilizationTimeout,
+                        firstChangeEventTimestamp,
+                        maxTriggerDelay));
+    }
+
+    private void progressToStabilized(Temporal firstChangeEventTimestamp) {
+        progressToPhase(new Stabilized(clock, this, firstChangeEventTimestamp, 
maxTriggerDelay));
+    }
+
+    private void triggerTransitionToSubsequentState() {
+        progressToPhase(new Transitioning(clock, this));
+        transitionContext.transitionToSubsequentState();
+    }
+
+    private void progressToPhase(Phase newPhase) {
+        Preconditions.checkState(
+                !(phase instanceof Transitioning),
+                "The state transition operation has already been triggered.");
+        LOG.debug("Transitioning from {} to {}.", phase, newPhase);
+        phase = newPhase;
+    }
+
+    @VisibleForTesting
+    void scheduleFromNow(Runnable callback, Duration delay, Phase phase) {
+        scheduledFutures.add(
+                transitionContext.scheduleOperation(() -> runIfPhase(phase, 
callback), delay));
+    }
+
+    private void runIfPhase(Phase expectedPhase, Runnable callback) {
+        if (getPhase() == expectedPhase) {
+            callback.run();
+        } else {
+            LOG.debug(
+                    "Ignoring scheduled action because expected phase {} is 
not the actual phase {}.",
+                    expectedPhase,
+                    getPhase());
+        }
+    }
+
+    /** Factory for creating {@link DefaultStateTransitionManager} instances. 
*/
+    public static class Factory implements StateTransitionManager.Factory {
+
+        private final Duration cooldownTimeout;
+        @Nullable private final Duration resourceStabilizationTimeout;
+        private final Duration maximumDelayForTrigger;
+
+        /**
+         * Creates a {@code Factory} instance based on the {@link 
AdaptiveScheduler}'s {@code
+         * Settings} for rescaling.
+         */
+        public static Factory fromSettings(AdaptiveScheduler.Settings 
settings) {
+            // it's not ideal that we use a AdaptiveScheduler internal class 
here. We might want to
+            // change that as part of a more general alignment of the 
rescaling configuration.
+            return new Factory(
+                    settings.getScalingIntervalMin(),
+                    settings.getScalingIntervalMax(),
+                    settings.getMaximumDelayForTriggeringRescale());
+        }
+
+        private Factory(
+                Duration cooldownTimeout,
+                @Nullable Duration resourceStabilizationTimeout,
+                Duration maximumDelayForTrigger) {
+            this.cooldownTimeout = cooldownTimeout;
+            this.resourceStabilizationTimeout = resourceStabilizationTimeout;
+            this.maximumDelayForTrigger = maximumDelayForTrigger;
+        }
+
+        @Override
+        public DefaultStateTransitionManager create(Context context, Instant 
lastStateTransition) {
+            return new DefaultStateTransitionManager(
+                    lastStateTransition,
+                    context,
+                    cooldownTimeout,
+                    resourceStabilizationTimeout,
+                    maximumDelayForTrigger);
+        }
+    }
+
+    /**
+     * A phase in the state machine of the {@link 
DefaultStateTransitionManager}. Each phase is
+     * responsible for a specific part of the state transition process.
+     */
+    @VisibleForTesting
+    abstract static class Phase {
+
+        private final Supplier<Temporal> clock;
+        private final DefaultStateTransitionManager context;
+
+        @VisibleForTesting
+        Phase(Supplier<Temporal> clock, DefaultStateTransitionManager context) 
{
+            this.clock = clock;
+            this.context = context;
+        }
+
+        Temporal now() {
+            return clock.get();
+        }
+
+        DefaultStateTransitionManager context() {
+            return context;
+        }
+
+        void scheduleRelativelyTo(Runnable callback, Temporal startOfTimeout, 
Duration timeout) {
+            final Duration passedTimeout = Duration.between(startOfTimeout, 
now());
+            Preconditions.checkArgument(
+                    !passedTimeout.isNegative(),
+                    "The startOfTimeout ({}) should be in the past but is 
after the current time.",
+                    startOfTimeout);
+
+            final Duration timeoutLeft = timeout.minus(passedTimeout);
+            scheduleFromNow(callback, timeoutLeft.isNegative() ? Duration.ZERO 
: timeoutLeft);
+        }
+
+        void scheduleFromNow(Runnable callback, Duration delay) {
+            context.scheduleFromNow(callback, delay, this);
+        }
+
+        boolean hasDesiredResources() {
+            return context.transitionContext.hasDesiredResources();
+        }
+
+        boolean hasSufficientResources() {
+            return context.transitionContext.hasSufficientResources();
+        }
+
+        void onChange() {}
+
+        void onTrigger() {}
+    }
+
+    /**
+     * {@link Phase} to prevent any rescaling. {@link 
StateTransitionManager#onChange()} events will
+     * be monitored and forwarded to the next phase. {@link 
StateTransitionManager#onTrigger()}
+     * events will be ignored.
+     */
+    @VisibleForTesting
+    static final class Cooldown extends Phase {
+
+        @Nullable private Temporal firstChangeEventTimestamp;
+
+        private Cooldown(
+                Temporal timeOfLastRescale,
+                Supplier<Temporal> clock,
+                DefaultStateTransitionManager context,
+                Duration cooldownTimeout) {
+            super(clock, context);
+
+            this.scheduleRelativelyTo(this::finalizeCooldown, 
timeOfLastRescale, cooldownTimeout);
+        }
+
+        @Override
+        void onChange() {
+            if (hasSufficientResources() && firstChangeEventTimestamp == null) 
{
+                firstChangeEventTimestamp = now();
+            }
+        }
+
+        private void finalizeCooldown() {
+            if (firstChangeEventTimestamp == null) {
+                context().progressToIdling();
+            } else {
+                context().progressToStabilizing(firstChangeEventTimestamp);
+            }
+        }
+    }
+
+    /**
+     * {@link Phase} which follows the {@link Cooldown} phase if no {@link
+     * StateTransitionManager#onChange()} was observed, yet. The {@code
+     * DefaultStateTransitionManager} waits for a first {@link 
StateTransitionManager#onChange()}
+     * event. {@link StateTransitionManager#onTrigger()} events will be 
ignored.
+     */
+    @VisibleForTesting
+    static final class Idling extends Phase {
+
+        private Idling(Supplier<Temporal> clock, DefaultStateTransitionManager 
context) {
+            super(clock, context);
+        }
+
+        @Override
+        void onChange() {
+            if (hasSufficientResources()) {
+                context().progressToStabilizing(now());
+            }
+        }
+    }
+
+    /**
+     * {@link Phase} that handles the resources stabilization. In this phase, 
{@link
+     * StateTransitionManager#onTrigger()} will initiate rescaling if desired 
resources are met and
+     * {@link StateTransitionManager#onChange()} will schedule the evaluation 
of the desired
+     * resources.
+     */
+    static final class Stabilizing extends Phase {
+
+        private Temporal onChangeEventTimestamp;
+        private final Duration maxTriggerDelay;
+        private boolean evaluationScheduled = false;
+
+        private Stabilizing(
+                Supplier<Temporal> clock,
+                DefaultStateTransitionManager context,
+                @Nullable Duration resourceStabilizationTimeout,
+                Temporal firstOnChangeEventTimestamp,
+                Duration maxTriggerDelay) {
+            super(clock, context);
+            this.onChangeEventTimestamp = firstOnChangeEventTimestamp;
+            this.maxTriggerDelay = maxTriggerDelay;
+
+            if (resourceStabilizationTimeout != null) {
+                scheduleRelativelyTo(
+                        () -> 
context().progressToStabilized(firstOnChangeEventTimestamp),
+                        firstOnChangeEventTimestamp,
+                        resourceStabilizationTimeout);
+            }
+            scheduleTransitionEvaluation();
+        }
+
+        @Override
+        void onChange() {
+            // schedule another desired-resource evaluation in scenarios where 
the previous change
+            // event was already handled by a onTrigger callback with a no-op
+            onChangeEventTimestamp = now();
+            scheduleTransitionEvaluation();
+        }
+
+        @Override
+        void onTrigger() {
+            transitionToSubSequentStateForDesiredResources();
+        }
+
+        private void scheduleTransitionEvaluation() {
+            if (!evaluationScheduled) {
+                evaluationScheduled = true;
+                this.scheduleRelativelyTo(
+                        () -> {
+                            evaluationScheduled = false;
+                            transitionToSubSequentStateForDesiredResources();
+                        },
+                        onChangeEventTimestamp,
+                        maxTriggerDelay);
+            }
+        }
+
+        private void transitionToSubSequentStateForDesiredResources() {
+            if (hasDesiredResources()) {
+                context().triggerTransitionToSubsequentState();
+            } else {
+                LOG.debug(
+                        "Desired resources are not met, skipping the 
transition to the subsequent state.");
+            }
+        }
+    }
+
+    /**
+     * {@link Phase} that handles the post-stabilization phase. A {@link
+     * StateTransitionManager#onTrigger()} event initiates rescaling if 
sufficient resources are
+     * available; otherwise transitioning to {@link Idling} will be performed.
+     */
+    @VisibleForTesting
+    static final class Stabilized extends Phase {
+
+        private Stabilized(
+                Supplier<Temporal> clock,
+                DefaultStateTransitionManager context,
+                Temporal firstChangeEventTimestamp,
+                Duration maxTriggerDelay) {
+            super(clock, context);
+            this.scheduleRelativelyTo(this::onTrigger, 
firstChangeEventTimestamp, maxTriggerDelay);
+        }
+
+        @Override
+        void onTrigger() {
+            if (hasSufficientResources()) {
+                context().triggerTransitionToSubsequentState();
+            } else {
+                LOG.debug("Sufficient resources are not met, progressing to 
idling.");
+                context().progressToIdling();
+            }
+        }
+    }
+
+    /**
+     * In this final {@link Phase} no additional transition is possible: {@link
+     * StateTransitionManager#onChange()} and {@link 
StateTransitionManager#onTrigger()} events will
+     * be ignored.
+     */
+    @VisibleForTesting
+    static final class Transitioning extends Phase {
+        private Transitioning(Supplier<Temporal> clock, 
DefaultStateTransitionManager context) {
+            super(clock, context);
+        }
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
index 1fcd23884f5..f7dcb8d97cf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
@@ -58,13 +58,13 @@ import java.util.stream.Collectors;
 
 /** State which represents a running job with an {@link ExecutionGraph} and 
assigned slots. */
 class Executing extends StateWithExecutionGraph
-        implements ResourceListener, RescaleManager.Context, 
CheckpointStatsListener {
+        implements ResourceListener, StateTransitionManager.Context, 
CheckpointStatsListener {
 
     private final Context context;
 
     private final RescalingController sufficientResourcesController;
     private final RescalingController desiredResourcesController;
-    private final RescaleManager rescaleManager;
+    private final StateTransitionManager stateTransitionManager;
     private final int rescaleOnFailedCheckpointCount;
     // null indicates that there was no change event observed, yet
     @Nullable private AtomicInteger failedCheckpointCountdown;
@@ -77,7 +77,7 @@ class Executing extends StateWithExecutionGraph
             Context context,
             ClassLoader userCodeClassLoader,
             List<ExceptionHistoryEntry> failureCollection,
-            RescaleManager.Factory rescaleManagerFactory,
+            StateTransitionManager.Factory stateTransitionManagerFactory,
             int minParallelismChangeForRescale,
             int rescaleOnFailedCheckpointCount,
             Instant lastRescale) {
@@ -96,7 +96,7 @@ class Executing extends StateWithExecutionGraph
         this.sufficientResourcesController = new 
EnforceParallelismChangeRescalingController();
         this.desiredResourcesController =
                 new 
EnforceMinimalIncreaseRescalingController(minParallelismChangeForRescale);
-        this.rescaleManager = rescaleManagerFactory.create(this, lastRescale);
+        this.stateTransitionManager = 
stateTransitionManagerFactory.create(this, lastRescale);
 
         Preconditions.checkArgument(
                 rescaleOnFailedCheckpointCount > 0,
@@ -110,8 +110,8 @@ class Executing extends StateWithExecutionGraph
         context.runIfState(
                 this,
                 () -> {
-                    rescaleManager.onChange();
-                    rescaleManager.onTrigger();
+                    stateTransitionManager.onChange();
+                    stateTransitionManager.onTrigger();
                 },
                 Duration.ZERO);
     }
@@ -147,12 +147,12 @@ class Executing extends StateWithExecutionGraph
     }
 
     @Override
-    public void scheduleOperation(Runnable callback, Duration delay) {
-        context.runIfState(this, callback, delay);
+    public ScheduledFuture<?> scheduleOperation(Runnable callback, Duration 
delay) {
+        return context.runIfState(this, callback, delay);
     }
 
     @Override
-    public void rescale() {
+    public void transitionToSubsequentState() {
         context.goToRestarting(
                 getExecutionGraph(),
                 getExecutionGraphHandler(),
@@ -186,6 +186,12 @@ class Executing extends StateWithExecutionGraph
         
context.goToFinished(ArchivedExecutionGraph.createFrom(getExecutionGraph()));
     }
 
+    @Override
+    public void onLeave(Class<? extends State> newState) {
+        stateTransitionManager.close();
+        super.onLeave(newState);
+    }
+
     private void deploy() {
         for (ExecutionJobVertex executionJobVertex :
                 getExecutionGraph().getVerticesTopologically()) {
@@ -212,13 +218,13 @@ class Executing extends StateWithExecutionGraph
 
     @Override
     public void onNewResourcesAvailable() {
-        rescaleManager.onChange();
+        stateTransitionManager.onChange();
         initializeFailedCheckpointCountdownIfUnset();
     }
 
     @Override
     public void onNewResourceRequirements() {
-        rescaleManager.onChange();
+        stateTransitionManager.onChange();
         initializeFailedCheckpointCountdownIfUnset();
     }
 
@@ -236,7 +242,7 @@ class Executing extends StateWithExecutionGraph
     }
 
     private void triggerPotentialRescale() {
-        rescaleManager.onTrigger();
+        stateTransitionManager.onTrigger();
         this.failedCheckpointCountdown = null;
     }
 
@@ -322,7 +328,7 @@ class Executing extends StateWithExecutionGraph
         private final OperatorCoordinatorHandler operatorCoordinatorHandler;
         private final ClassLoader userCodeClassLoader;
         private final List<ExceptionHistoryEntry> failureCollection;
-        private final RescaleManager.Factory rescaleManagerFactory;
+        private final StateTransitionManager.Factory 
stateTransitionManagerFactory;
         private final int minParallelismChangeForRescale;
         private final int rescaleOnFailedCheckpointCount;
 
@@ -334,7 +340,7 @@ class Executing extends StateWithExecutionGraph
                 Context context,
                 ClassLoader userCodeClassLoader,
                 List<ExceptionHistoryEntry> failureCollection,
-                RescaleManager.Factory rescaleManagerFactory,
+                StateTransitionManager.Factory stateTransitionManagerFactory,
                 int minParallelismChangeForRescale,
                 int rescaleOnFailedCheckpointCount) {
             this.context = context;
@@ -344,7 +350,7 @@ class Executing extends StateWithExecutionGraph
             this.operatorCoordinatorHandler = operatorCoordinatorHandler;
             this.userCodeClassLoader = userCodeClassLoader;
             this.failureCollection = failureCollection;
-            this.rescaleManagerFactory = rescaleManagerFactory;
+            this.stateTransitionManagerFactory = stateTransitionManagerFactory;
             this.minParallelismChangeForRescale = 
minParallelismChangeForRescale;
             this.rescaleOnFailedCheckpointCount = 
rescaleOnFailedCheckpointCount;
         }
@@ -362,7 +368,7 @@ class Executing extends StateWithExecutionGraph
                     context,
                     userCodeClassLoader,
                     failureCollection,
-                    rescaleManagerFactory,
+                    stateTransitionManagerFactory,
                     minParallelismChangeForRescale,
                     rescaleOnFailedCheckpointCount,
                     Instant.now());
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/RescaleManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateTransitionManager.java
similarity index 55%
rename from 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/RescaleManager.java
rename to 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateTransitionManager.java
index a2e53bbe027..a6a6aaaca2e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/RescaleManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateTransitionManager.java
@@ -20,21 +20,30 @@ package org.apache.flink.runtime.scheduler.adaptive;
 
 import java.time.Duration;
 import java.time.Instant;
+import java.util.concurrent.ScheduledFuture;
 
-/** The {@code RescaleManager} decides on whether rescaling should happen or 
not. */
-public interface RescaleManager {
+/**
+ * The {@code StateTransitionManager} decides on whether {@link 
AdaptiveScheduler} state transition
+ * should happen or not.
+ */
+public interface StateTransitionManager {
 
-    /** Is called if the environment changed in a way that a rescaling could 
be considered. */
+    /**
+     * Is called if the environment changed in a way that a state transition 
could be considered.
+     */
     void onChange();
 
     /**
      * Is called when any previous observed environment changes shall be 
verified possibly
-     * triggering a rescale operation.
+     * triggering a state transition operation.
      */
     void onTrigger();
 
+    /** Is called when the state transition manager should be closed. */
+    default void close() {}
+
     /**
-     * The interface that can be used by the {@code RescaleManager} to 
communicate with the
+     * The interface that can be used by the {@code StateTransitionManager} to 
communicate with the
      * underlying system.
      */
     interface Context {
@@ -51,20 +60,24 @@ public interface RescaleManager {
          */
         boolean hasDesiredResources();
 
-        /** Triggers the rescaling of the job. */
-        void rescale();
+        /** Triggers the transition to the subsequent state of the {@link 
AdaptiveScheduler}. */
+        void transitionToSubsequentState();
 
-        /** Runs operation with a given delay in the underlying main thread. */
-        void scheduleOperation(Runnable callback, Duration delay);
+        /**
+         * Runs operation with a given delay in the underlying main thread.
+         *
+         * @return a ScheduledFuture representing pending completion of the 
operation.
+         */
+        ScheduledFuture<?> scheduleOperation(Runnable callback, Duration 
delay);
     }
 
-    /** Interface for creating {@code RescaleManager} instances. */
+    /** Interface for creating {@code StateTransitionManager} instances. */
     interface Factory {
 
         /**
-         * Creates a {@code RescaleManager} instance for the given {@code 
rescaleContext} and
-         * previous rescale time.
+         * Creates a {@code StateTransitionManager} instance for the given 
{@code Context} and
+         * previous state transition time.
          */
-        RescaleManager create(Context rescaleContext, Instant lastRescale);
+        StateTransitionManager create(Context context, Instant 
lastStateTransition);
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
index 433c6d66a90..26bd64e68b8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
@@ -97,7 +97,7 @@ public class AdaptiveSchedulerBuilder {
     /**
      * {@code null} indicates that the default factory will be used based on 
the set configuration.
      */
-    @Nullable private RescaleManager.Factory rescaleManagerFactory = null;
+    @Nullable private StateTransitionManager.Factory 
stateTransitionManagerFactory = null;
 
     private BiFunction<JobManagerJobMetricGroup, CheckpointStatsListener, 
CheckpointStatsTracker>
             checkpointStatsTrackerFactory =
@@ -224,9 +224,9 @@ public class AdaptiveSchedulerBuilder {
         return this;
     }
 
-    public AdaptiveSchedulerBuilder setRescaleManagerFactory(
-            @Nullable RescaleManager.Factory rescaleManagerFactory) {
-        this.rescaleManagerFactory = rescaleManagerFactory;
+    public AdaptiveSchedulerBuilder setStateTransitionManagerFactory(
+            @Nullable StateTransitionManager.Factory 
stateTransitionManagerFactory) {
+        this.stateTransitionManagerFactory = stateTransitionManagerFactory;
         return this;
     }
 
@@ -259,9 +259,9 @@ public class AdaptiveSchedulerBuilder {
                 AdaptiveScheduler.Settings.of(jobMasterConfiguration);
         return new AdaptiveScheduler(
                 settings,
-                rescaleManagerFactory == null
-                        ? DefaultRescaleManager.Factory.fromSettings(settings)
-                        : rescaleManagerFactory,
+                stateTransitionManagerFactory == null
+                        ? 
DefaultStateTransitionManager.Factory.fromSettings(settings)
+                        : stateTransitionManagerFactory,
                 checkpointStatsTrackerFactory,
                 jobGraph,
                 jobResourceRequirements,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
index ab0b96fa765..5bc3d859c48 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
@@ -2306,8 +2306,8 @@ public class AdaptiveSchedulerTest {
                                 EXECUTOR_RESOURCE.getExecutor())
                         .setJobMasterConfiguration(config)
                         .setDeclarativeSlotPool(slotPool)
-                        .setRescaleManagerFactory(
-                                new TestingRescaleManager.Factory(
+                        .setStateTransitionManagerFactory(
+                                new TestingStateTransitionManager.Factory(
                                         () -> {},
                                         () -> {
                                             singleThreadMainThreadExecutor
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/DefaultRescaleManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/DefaultRescaleManagerTest.java
deleted file mode 100644
index 94e6a57ea09..00000000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/DefaultRescaleManagerTest.java
+++ /dev/null
@@ -1,675 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.scheduler.adaptive;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.util.ConfigurationException;
-
-import org.junit.jupiter.api.Test;
-
-import java.time.Duration;
-import java.time.Instant;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Function;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-
-class DefaultRescaleManagerTest {
-
-    @Test
-    void testProperConfiguration() throws ConfigurationException {
-        final Duration scalingIntervalMin = Duration.ofMillis(1337);
-        final Duration scalingIntervalMax = Duration.ofMillis(7331);
-        final Duration maximumDelayForRescaleTrigger = Duration.ofMillis(4242);
-
-        final Configuration configuration = new Configuration();
-        configuration.set(JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MIN, 
scalingIntervalMin);
-        configuration.set(JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX, 
scalingIntervalMax);
-        configuration.set(
-                JobManagerOptions.MAXIMUM_DELAY_FOR_SCALE_TRIGGER, 
maximumDelayForRescaleTrigger);
-
-        final DefaultRescaleManager testInstance =
-                DefaultRescaleManager.Factory.fromSettings(
-                                AdaptiveScheduler.Settings.of(configuration))
-                        .create(TestingRescaleManagerContext.stableContext(), 
Instant.now());
-        
assertThat(testInstance.scalingIntervalMin).isEqualTo(scalingIntervalMin);
-        
assertThat(testInstance.scalingIntervalMax).isEqualTo(scalingIntervalMax);
-        
assertThat(testInstance.maxTriggerDelay).isEqualTo(maximumDelayForRescaleTrigger);
-    }
-
-    @Test
-    void testInvalidConfiguration() {
-        final Duration cooldownThreshold = Duration.ofMinutes(2);
-        final TestingRescaleManagerContext ctx = 
TestingRescaleManagerContext.stableContext();
-        assertThatThrownBy(
-                        () ->
-                                new DefaultRescaleManager(
-                                        Instant.now(),
-                                        ctx,
-                                        cooldownThreshold,
-                                        cooldownThreshold.minusNanos(1),
-                                        Duration.ofHours(5)))
-                .isInstanceOf(IllegalArgumentException.class);
-    }
-
-    @Test
-    void triggerWithoutChangeEventNoopInCooldownPhase() {
-        triggerWithoutChangeEventNoop(
-                
TestingRescaleManagerContext::createTestInstanceInCooldownPhase);
-    }
-
-    @Test
-    void triggerWithoutChangeEventNoopInSoftRescalingPhase() {
-        triggerWithoutChangeEventNoop(
-                
TestingRescaleManagerContext::createTestInstanceInSoftRescalePhase);
-    }
-
-    @Test
-    void triggerWithoutChangeEventNoopInHardRescalingPhase() {
-        triggerWithoutChangeEventNoop(
-                
TestingRescaleManagerContext::createTestInstanceInHardRescalePhase);
-    }
-
-    private void triggerWithoutChangeEventNoop(
-            Function<TestingRescaleManagerContext, DefaultRescaleManager> 
testInstanceCreator) {
-        final TestingRescaleManagerContext ctx =
-                
TestingRescaleManagerContext.stableContext().withDesiredRescaling();
-        final DefaultRescaleManager testInstance = 
testInstanceCreator.apply(ctx);
-
-        testInstance.onTrigger();
-
-        assertThat(ctx.rescaleWasTriggered())
-                .as(
-                        "No rescaling should have been triggered due to the 
missing change event despite the fact that desired rescaling would be 
possible.")
-                .isFalse();
-        assertThat(ctx.additionalTasksWaiting()).as("No tasks should be 
scheduled.").isFalse();
-    }
-
-    @Test
-    void testDesiredChangeEventDuringCooldown() {
-        final TestingRescaleManagerContext softScalePossibleCtx =
-                
TestingRescaleManagerContext.stableContext().withDesiredRescaling();
-        final DefaultRescaleManager testInstance =
-                softScalePossibleCtx.createTestInstanceInCooldownPhase();
-
-        testInstance.onChange();
-
-        assertIntermediateStateWithoutRescale(softScalePossibleCtx);
-
-        testInstance.onTrigger();
-
-        assertIntermediateStateWithoutRescale(softScalePossibleCtx);
-
-        softScalePossibleCtx.transitionIntoSoftScalingTimeframe();
-
-        assertFinalStateWithRescale(softScalePossibleCtx);
-    }
-
-    @Test
-    void testDesiredChangeEventInSoftRescalePhase() {
-        final TestingRescaleManagerContext desiredRescalePossibleCtx =
-                
TestingRescaleManagerContext.stableContext().withDesiredRescaling();
-        final DefaultRescaleManager testInstance =
-                
desiredRescalePossibleCtx.createTestInstanceInSoftRescalePhase();
-
-        testInstance.onChange();
-
-        assertIntermediateStateWithoutRescale(desiredRescalePossibleCtx);
-
-        testInstance.onTrigger();
-
-        assertFinalStateWithRescale(desiredRescalePossibleCtx);
-    }
-
-    @Test
-    void testDesiredChangeEventInHardRescalePhase() {
-        final TestingRescaleManagerContext desiredRescalePossibleCtx =
-                
TestingRescaleManagerContext.stableContext().withDesiredRescaling();
-        final DefaultRescaleManager testInstance =
-                
desiredRescalePossibleCtx.createTestInstanceInHardRescalePhase();
-
-        testInstance.onChange();
-
-        assertIntermediateStateWithoutRescale(desiredRescalePossibleCtx);
-
-        testInstance.onTrigger();
-
-        assertFinalStateWithRescale(desiredRescalePossibleCtx);
-    }
-
-    @Test
-    void testNoRescaleInCooldownPhase() {
-        final TestingRescaleManagerContext noRescalePossibleCtx =
-                TestingRescaleManagerContext.stableContext();
-        final DefaultRescaleManager testInstance =
-                noRescalePossibleCtx.createTestInstanceInCooldownPhase();
-
-        testInstance.onChange();
-
-        assertIntermediateStateWithoutRescale(noRescalePossibleCtx);
-
-        testInstance.onTrigger();
-
-        assertIntermediateStateWithoutRescale(noRescalePossibleCtx);
-
-        noRescalePossibleCtx.transitionIntoSoftScalingTimeframe();
-
-        assertIntermediateStateWithoutRescale(noRescalePossibleCtx);
-
-        noRescalePossibleCtx.transitionIntoHardScalingTimeframe();
-
-        assertThat(noRescalePossibleCtx.rescaleWasTriggered())
-                .as("No rescaling should have happened even in the 
hard-rescaling phase.")
-                .isFalse();
-        assertThat(noRescalePossibleCtx.additionalTasksWaiting())
-                .as("No further tasks should have been waiting for execution.")
-                .isTrue();
-    }
-
-    @Test
-    void testNoRescaleInSoftRescalePhase() {
-        final TestingRescaleManagerContext noRescalePossibleCtx =
-                TestingRescaleManagerContext.stableContext();
-        final DefaultRescaleManager testInstance =
-                noRescalePossibleCtx.createTestInstanceInSoftRescalePhase();
-
-        testInstance.onChange();
-
-        assertIntermediateStateWithoutRescale(noRescalePossibleCtx);
-
-        testInstance.onTrigger();
-
-        assertIntermediateStateWithoutRescale(noRescalePossibleCtx);
-
-        noRescalePossibleCtx.transitionIntoHardScalingTimeframe();
-
-        assertThat(noRescalePossibleCtx.rescaleWasTriggered())
-                .as("No rescaling should have happened even in the 
hard-rescaling phase.")
-                .isFalse();
-        assertThat(noRescalePossibleCtx.additionalTasksWaiting())
-                .as("No further tasks should have been waiting for execution.")
-                .isTrue();
-    }
-
-    @Test
-    void testNoResaleInHardRescalePhase() {
-        final TestingRescaleManagerContext noRescalePossibleCtx =
-                TestingRescaleManagerContext.stableContext();
-        final DefaultRescaleManager testInstance =
-                noRescalePossibleCtx.createTestInstanceInHardRescalePhase();
-
-        testInstance.onChange();
-
-        assertIntermediateStateWithoutRescale(noRescalePossibleCtx);
-
-        testInstance.onTrigger();
-
-        assertThat(noRescalePossibleCtx.rescaleWasTriggered())
-                .as("No rescaling should have happened even in the 
hard-rescaling phase.")
-                .isFalse();
-        assertThat(noRescalePossibleCtx.additionalTasksWaiting())
-                .as("No further tasks should have been waiting for execution.")
-                .isTrue();
-    }
-
-    @Test
-    void testSufficientChangeInCooldownPhase() {
-        final TestingRescaleManagerContext hardRescalePossibleCtx =
-                
TestingRescaleManagerContext.stableContext().withSufficientRescaling();
-        final DefaultRescaleManager testInstance =
-                hardRescalePossibleCtx.createTestInstanceInCooldownPhase();
-
-        testInstance.onChange();
-
-        assertIntermediateStateWithoutRescale(hardRescalePossibleCtx);
-
-        testInstance.onTrigger();
-
-        assertIntermediateStateWithoutRescale(hardRescalePossibleCtx);
-
-        hardRescalePossibleCtx.transitionIntoSoftScalingTimeframe();
-
-        assertIntermediateStateWithoutRescale(hardRescalePossibleCtx);
-
-        hardRescalePossibleCtx.transitionIntoHardScalingTimeframe();
-
-        assertFinalStateWithRescale(hardRescalePossibleCtx);
-    }
-
-    @Test
-    void testSufficientChangeInSoftRescalePhase() {
-        final TestingRescaleManagerContext hardRescalePossibleCtx =
-                
TestingRescaleManagerContext.stableContext().withSufficientRescaling();
-        final DefaultRescaleManager testInstance =
-                hardRescalePossibleCtx.createTestInstanceInSoftRescalePhase();
-
-        testInstance.onChange();
-
-        assertIntermediateStateWithoutRescale(hardRescalePossibleCtx);
-
-        testInstance.onTrigger();
-
-        assertIntermediateStateWithoutRescale(hardRescalePossibleCtx);
-
-        hardRescalePossibleCtx.transitionIntoHardScalingTimeframe();
-
-        assertFinalStateWithRescale(hardRescalePossibleCtx);
-    }
-
-    @Test
-    void testSufficientChangeInHardRescalePhase() {
-        final TestingRescaleManagerContext hardRescalePossibleCtx =
-                
TestingRescaleManagerContext.stableContext().withSufficientRescaling();
-        final DefaultRescaleManager testInstance =
-                hardRescalePossibleCtx.createTestInstanceInHardRescalePhase();
-
-        testInstance.onChange();
-
-        assertIntermediateStateWithoutRescale(hardRescalePossibleCtx);
-
-        testInstance.onTrigger();
-
-        assertFinalStateWithRescale(hardRescalePossibleCtx);
-    }
-
-    @Test
-    void 
testSufficientChangeInCooldownWithSubsequentDesiredChangeInSoftRescalePhase() {
-        final TestingRescaleManagerContext ctx =
-                
TestingRescaleManagerContext.stableContext().withSufficientRescaling();
-        final DefaultRescaleManager testInstance = 
ctx.createTestInstanceInCooldownPhase();
-
-        testInstance.onChange();
-
-        assertIntermediateStateWithoutRescale(ctx);
-
-        testInstance.onTrigger();
-
-        assertIntermediateStateWithoutRescale(ctx);
-
-        ctx.transitionIntoSoftScalingTimeframe();
-
-        ctx.withDesiredRescaling();
-
-        testInstance.onChange();
-
-        assertIntermediateStateWithoutRescale(ctx);
-
-        testInstance.onTrigger();
-
-        assertThat(ctx.rescaleWasTriggered()).isTrue();
-        assertThat(ctx.numberOfTasksWaiting())
-                .as(
-                        "There should be a task scheduled that allows 
transitioning into hard-rescaling phase.")
-                .isEqualTo(3);
-    }
-
-    @Test
-    void testSufficientChangeWithSubsequentDesiredChangeInSoftRescalePhase() {
-        final TestingRescaleManagerContext ctx =
-                
TestingRescaleManagerContext.stableContext().withSufficientRescaling();
-        final DefaultRescaleManager testInstance = 
ctx.createTestInstanceInSoftRescalePhase();
-
-        testInstance.onChange();
-
-        assertIntermediateStateWithoutRescale(ctx);
-
-        testInstance.onTrigger();
-
-        assertIntermediateStateWithoutRescale(ctx);
-
-        assertThat(ctx.numberOfTasksWaiting())
-                .as(
-                        "There should be a task scheduled that allows 
transitioning into hard-rescaling phase.")
-                .isEqualTo(2);
-
-        ctx.withDesiredRescaling();
-
-        testInstance.onChange();
-
-        assertIntermediateStateWithoutRescale(ctx);
-
-        testInstance.onTrigger();
-
-        assertThat(ctx.rescaleWasTriggered()).isTrue();
-    }
-
-    @Test
-    void
-            
testRevokedSufficientChangeInSoftRescalePhaseWithSubsequentSufficientChangeInHardRescalingPhase()
 {
-        final TestingRescaleManagerContext ctx =
-                
TestingRescaleManagerContext.stableContext().withSufficientRescaling();
-        final DefaultRescaleManager testInstance = 
ctx.createTestInstanceInSoftRescalePhase();
-
-        testInstance.onChange();
-
-        assertIntermediateStateWithoutRescale(ctx);
-
-        testInstance.onTrigger();
-
-        assertIntermediateStateWithoutRescale(ctx);
-
-        assertThat(ctx.numberOfTasksWaiting())
-                .as(
-                        "There should be a task scheduled that allows 
transitioning into hard-rescaling phase.")
-                .isEqualTo(2);
-
-        ctx.revertAnyParallelismImprovements();
-
-        testInstance.onChange();
-
-        assertIntermediateStateWithoutRescale(ctx);
-
-        testInstance.onTrigger();
-
-        assertIntermediateStateWithoutRescale(ctx);
-
-        assertThat(ctx.numberOfTasksWaiting())
-                .as(
-                        "There should be a task scheduled that allows 
transitioning into hard-rescaling phase.")
-                .isEqualTo(2);
-
-        ctx.transitionIntoHardScalingTimeframe();
-
-        assertThat(ctx.rescaleWasTriggered())
-                .as(
-                        "No rescaling should have been triggered because of 
the previous revert of the additional resources.")
-                .isFalse();
-        assertThat(ctx.additionalTasksWaiting())
-                .as(
-                        "The transition to hard-rescaling should have happened 
without any additional tasks in waiting state.")
-                .isTrue();
-
-        ctx.withSufficientRescaling();
-
-        testInstance.onChange();
-
-        assertIntermediateStateWithoutRescale(ctx);
-
-        testInstance.onTrigger();
-
-        assertFinalStateWithRescale(ctx);
-    }
-
-    @Test
-    void 
testRevokedChangeInHardRescalingPhaseCausesWithSubsequentSufficientChange() {
-        final TestingRescaleManagerContext ctx = 
TestingRescaleManagerContext.stableContext();
-        final DefaultRescaleManager testInstance = 
ctx.createTestInstanceInHardRescalePhase();
-
-        testInstance.onChange();
-
-        assertIntermediateStateWithoutRescale(ctx);
-
-        testInstance.onTrigger();
-
-        assertThat(ctx.rescaleWasTriggered()).isFalse();
-        assertThat(ctx.additionalTasksWaiting()).isTrue();
-
-        ctx.withSufficientRescaling();
-
-        testInstance.onChange();
-
-        assertIntermediateStateWithoutRescale(ctx);
-
-        testInstance.onTrigger();
-
-        assertFinalStateWithRescale(ctx);
-    }
-
-    private static void 
assertIntermediateStateWithoutRescale(TestingRescaleManagerContext ctx) {
-        assertThat(ctx.rescaleWasTriggered())
-                .as("The rescale should not have been triggered, yet.")
-                .isFalse();
-        assertThat(ctx.additionalTasksWaiting())
-                .as("There should be still tasks being scheduled.")
-                .isTrue();
-    }
-
-    private static void 
assertFinalStateWithRescale(TestingRescaleManagerContext ctx) {
-        assertThat(ctx.rescaleWasTriggered())
-                .as("The rescale should have been triggered already.")
-                .isTrue();
-        assertThat(ctx.additionalTasksWaiting())
-                .as("All scheduled tasks should have been executed.")
-                .isTrue();
-    }
-
-    /**
-     * {@code TestingRescaleManagerContext} provides methods for adjusting the 
elapsed time and for
-     * adjusting the available resources for rescaling.
-     */
-    private static class TestingRescaleManagerContext implements 
RescaleManager.Context {
-
-        // default configuration values to allow for easy transitioning 
between the phases
-        private static final Duration SCALING_MIN = Duration.ofHours(1);
-        private static final Duration SCALING_MAX = Duration.ofHours(2);
-
-        // configuration that defines what kind of rescaling would be possible
-        private boolean hasSufficientResources = false;
-        private boolean hasDesiredResources = false;
-
-        // internal state used for assertions
-        private final AtomicBoolean rescaleTriggered = new AtomicBoolean();
-        private final SortedMap<Instant, List<Runnable>> scheduledTasks = new 
TreeMap<>();
-
-        // Instant.MIN makes debugging easier because timestamps become 
human-readable
-        private final Instant initializationTime = Instant.MIN;
-        private Duration elapsedTime = Duration.ZERO;
-
-        // ///////////////////////////////////////////////
-        // Context creation
-        // ///////////////////////////////////////////////
-
-        public static TestingRescaleManagerContext stableContext() {
-            return new TestingRescaleManagerContext();
-        }
-
-        private TestingRescaleManagerContext() {
-            // no rescaling is enabled by default
-            revertAnyParallelismImprovements();
-        }
-
-        public void revertAnyParallelismImprovements() {
-            this.hasSufficientResources = false;
-            this.hasDesiredResources = false;
-        }
-
-        public TestingRescaleManagerContext withDesiredRescaling() {
-            // having desired resources should also mean that the sufficient 
resources are met
-            this.hasSufficientResources = true;
-            this.hasDesiredResources = true;
-
-            return this;
-        }
-
-        public TestingRescaleManagerContext withSufficientRescaling() {
-            this.hasSufficientResources = true;
-            this.hasDesiredResources = false;
-
-            return this;
-        }
-
-        // ///////////////////////////////////////////////
-        // RescaleManager.Context interface methods
-        // ///////////////////////////////////////////////
-
-        @Override
-        public boolean hasSufficientResources() {
-            return this.hasSufficientResources;
-        }
-
-        @Override
-        public boolean hasDesiredResources() {
-            return this.hasDesiredResources;
-        }
-
-        @Override
-        public void rescale() {
-            rescaleTriggered.set(true);
-        }
-
-        @Override
-        public void scheduleOperation(Runnable callback, Duration delay) {
-            final Instant triggerTime =
-                    
Objects.requireNonNull(initializationTime).plus(elapsedTime).plus(delay);
-            if (!scheduledTasks.containsKey(triggerTime)) {
-                scheduledTasks.put(triggerTime, new ArrayList<>());
-            }
-
-            scheduledTasks.get(triggerTime).add(callback);
-        }
-
-        // ///////////////////////////////////////////////
-        // Test instance creation
-        // ///////////////////////////////////////////////
-
-        /**
-         * Creates the {@code DefaultRescaleManager} test instance and 
transitions into a period in
-         * time where the instance is in cooldown phase.
-         */
-        public DefaultRescaleManager createTestInstanceInCooldownPhase() {
-            return createTestInstance(this::transitionIntoCooldownTimeframe);
-        }
-
-        /**
-         * Creates the {@code DefaultRescaleManager} test instance and 
transitions into a period in
-         * time where the instance is in soft-rescaling phase.
-         */
-        public DefaultRescaleManager createTestInstanceInSoftRescalePhase() {
-            return 
createTestInstance(this::transitionIntoSoftScalingTimeframe);
-        }
-
-        /**
-         * Creates the {@code DefaultRescaleManager} test instance and 
transitions into a period in
-         * time where the instance is in hard-rescaling phase.
-         */
-        public DefaultRescaleManager createTestInstanceInHardRescalePhase() {
-            return 
createTestInstance(this::transitionIntoHardScalingTimeframe);
-        }
-
-        /**
-         * Initializes the test instance and sets the context's elapsed time 
based on the passed
-         * callback.
-         */
-        private DefaultRescaleManager createTestInstance(Runnable 
timeTransitioning) {
-            final DefaultRescaleManager testInstance =
-                    new DefaultRescaleManager(
-                            initializationTime,
-                            // clock that returns the time based on the 
configured elapsedTime
-                            () -> 
Objects.requireNonNull(initializationTime).plus(elapsedTime),
-                            this,
-                            SCALING_MIN,
-                            SCALING_MAX,
-                            Duration.ofHours(5)) {
-                        @Override
-                        public void onChange() {
-                            super.onChange();
-
-                            // hack to avoid calling this method in every test 
method
-                            // we want to trigger tasks that are meant to run 
right-away
-                            
TestingRescaleManagerContext.this.triggerOutdatedTasks();
-                        }
-
-                        @Override
-                        public void onTrigger() {
-                            super.onTrigger();
-
-                            // hack to avoid calling this method in every test 
method
-                            // we want to trigger tasks that are meant to run 
right-away
-                            
TestingRescaleManagerContext.this.triggerOutdatedTasks();
-                        }
-                    };
-
-            timeTransitioning.run();
-            return testInstance;
-        }
-
-        // ///////////////////////////////////////////////
-        // Time-adjustment functionality
-        // ///////////////////////////////////////////////
-
-        /**
-         * Transitions the context's time to a moment that falls into the test 
instance's cooldown
-         * phase.
-         */
-        public void transitionIntoCooldownTimeframe() {
-            this.elapsedTime = SCALING_MIN.dividedBy(2);
-            this.triggerOutdatedTasks();
-        }
-
-        /**
-         * Transitions the context's time to a moment that falls into the test 
instance's
-         * soft-scaling phase.
-         */
-        public void transitionIntoSoftScalingTimeframe() {
-            // the state transition is scheduled based on the current event's 
time rather than the
-            // initializationTime
-            this.elapsedTime = elapsedTime.plus(SCALING_MIN);
-
-            // make sure that we're still below the scalingIntervalMax
-            this.elapsedTime = 
elapsedTime.plus(SCALING_MAX.minus(elapsedTime).dividedBy(2));
-            this.triggerOutdatedTasks();
-        }
-
-        /**
-         * Transitions the context's time to a moment that falls into the test 
instance's
-         * hard-scaling phase.
-         */
-        public void transitionIntoHardScalingTimeframe() {
-            // the state transition is scheduled based on the current event's 
time rather than the
-            // initializationTime
-            this.elapsedTime = elapsedTime.plus(SCALING_MAX).plusMinutes(1);
-            this.triggerOutdatedTasks();
-        }
-
-        private void triggerOutdatedTasks() {
-            while (!scheduledTasks.isEmpty()) {
-                final Instant timeOfExecution = scheduledTasks.firstKey();
-                if (!timeOfExecution.isAfter(
-                        
Objects.requireNonNull(initializationTime).plus(elapsedTime))) {
-                    
scheduledTasks.remove(timeOfExecution).forEach(Runnable::run);
-                } else {
-                    break;
-                }
-            }
-        }
-
-        // ///////////////////////////////////////////////
-        // Methods for verifying the context's state
-        // ///////////////////////////////////////////////
-
-        public boolean rescaleWasTriggered() {
-            return rescaleTriggered.get();
-        }
-
-        public int numberOfTasksWaiting() {
-            return scheduledTasks.size();
-        }
-
-        public boolean additionalTasksWaiting() {
-            return !scheduledTasks.isEmpty();
-        }
-    }
-}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManagerTest.java
new file mode 100644
index 00000000000..ca90aa2a1c5
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManagerTest.java
@@ -0,0 +1,722 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.core.testutils.ScheduledTask;
+import 
org.apache.flink.runtime.scheduler.adaptive.DefaultStateTransitionManager.Idling;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.Preconditions;
+
+import org.junit.jupiter.api.Test;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+
+import static 
org.apache.flink.runtime.scheduler.adaptive.DefaultStateTransitionManager.Cooldown;
+import static 
org.apache.flink.runtime.scheduler.adaptive.DefaultStateTransitionManager.Phase;
+import static 
org.apache.flink.runtime.scheduler.adaptive.DefaultStateTransitionManager.Stabilized;
+import static 
org.apache.flink.runtime.scheduler.adaptive.DefaultStateTransitionManager.Stabilizing;
+import static 
org.apache.flink.runtime.scheduler.adaptive.DefaultStateTransitionManager.Transitioning;
+import static org.assertj.core.api.Assertions.assertThat;
+
+class DefaultStateTransitionManagerTest {
+
+    @Test
+    void testProperConfiguration() throws ConfigurationException {
+        final Duration cooldownTimeout = Duration.ofMillis(1337);
+        final Duration resourceStabilizationTimeout = Duration.ofMillis(7331);
+        final Duration maximumDelayForRescaleTrigger = Duration.ofMillis(4242);
+
+        final Configuration configuration = new Configuration();
+        configuration.set(JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MIN, 
cooldownTimeout);
+        configuration.set(
+                JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX, 
resourceStabilizationTimeout);
+        configuration.set(
+                JobManagerOptions.MAXIMUM_DELAY_FOR_SCALE_TRIGGER, 
maximumDelayForRescaleTrigger);
+
+        final DefaultStateTransitionManager testInstance =
+                DefaultStateTransitionManager.Factory.fromSettings(
+                                AdaptiveScheduler.Settings.of(configuration))
+                        .create(
+                                
TestingStateTransitionManagerContext.stableContext(),
+                                Instant.now());
+        assertThat(testInstance.cooldownTimeout).isEqualTo(cooldownTimeout);
+        assertThat(testInstance.resourceStabilizationTimeout)
+                .isEqualTo(resourceStabilizationTimeout);
+        
assertThat(testInstance.maxTriggerDelay).isEqualTo(maximumDelayForRescaleTrigger);
+    }
+
+    @Test
+    void testTriggerWithoutChangeEventNoopInCooldownPhase() {
+        final TestingStateTransitionManagerContext ctx =
+                
TestingStateTransitionManagerContext.stableContext().withDesiredResources();
+        final DefaultStateTransitionManager testInstance = 
ctx.createTestInstanceInCooldownPhase();
+        triggerWithoutPhaseMove(ctx, testInstance, Cooldown.class);
+    }
+
+    @Test
+    void testTriggerWithoutChangeEventNoopInIdlingPhase() {
+        final TestingStateTransitionManagerContext ctx =
+                
TestingStateTransitionManagerContext.stableContext().withDesiredResources();
+        final DefaultStateTransitionManager testInstance =
+                ctx.createTestInstanceThatPassedCooldownPhase();
+        triggerWithoutPhaseMove(ctx, testInstance, Idling.class);
+    }
+
+    @Test
+    void testTriggerWithoutChangeEventNoopInTransitioningPhase() {
+        final TestingStateTransitionManagerContext ctx =
+                
TestingStateTransitionManagerContext.stableContext().withDesiredResources();
+        final DefaultStateTransitionManager testInstance =
+                ctx.createTestInstanceInTransitioningPhase();
+        triggerWithoutPhaseMove(ctx, testInstance, Transitioning.class);
+    }
+
+    @Test
+    void testStateTransitionRightAfterCooldown() {
+        final TestingStateTransitionManagerContext ctx =
+                
TestingStateTransitionManagerContext.stableContext().withDesiredResources();
+        final DefaultStateTransitionManager testInstance = 
ctx.createTestInstanceInCooldownPhase();
+
+        changeWithoutPhaseMove(ctx, testInstance, Cooldown.class);
+
+        triggerWithoutPhaseMove(ctx, testInstance, Cooldown.class);
+
+        ctx.transitionToInclusiveCooldownEnd();
+
+        assertPhaseWithoutStateTransition(ctx, testInstance, Cooldown.class);
+
+        testInstance.onTrigger();
+
+        ctx.passTime(Duration.ofMillis(1));
+
+        assertPhaseWithoutStateTransition(ctx, testInstance, 
Stabilizing.class);
+
+        testInstance.onTrigger();
+
+        assertFinalStateTransitionHappened(ctx, testInstance);
+    }
+
+    @Test
+    void testDesiredChangeInCooldownPhase() {
+        final TestingStateTransitionManagerContext ctx =
+                
TestingStateTransitionManagerContext.stableContext().withDesiredResources();
+        final DefaultStateTransitionManager testInstance = 
ctx.createTestInstanceInCooldownPhase();
+
+        changeWithoutPhaseMove(ctx, testInstance, Cooldown.class);
+
+        triggerWithoutPhaseMove(ctx, testInstance, Cooldown.class);
+
+        ctx.transitionOutOfCooldownPhase();
+
+        assertPhaseWithoutStateTransition(ctx, testInstance, 
Stabilizing.class);
+
+        testInstance.onTrigger();
+
+        assertFinalStateTransitionHappened(ctx, testInstance);
+    }
+
+    @Test
+    void testDesiredChangeInIdlingPhase() {
+        final TestingStateTransitionManagerContext ctx =
+                
TestingStateTransitionManagerContext.stableContext().withDesiredResources();
+        final DefaultStateTransitionManager testInstance =
+                ctx.createTestInstanceThatPassedCooldownPhase();
+
+        assertPhaseWithoutStateTransition(ctx, testInstance, Idling.class);
+
+        testInstance.onChange();
+
+        assertPhaseWithoutStateTransition(ctx, testInstance, 
Stabilizing.class);
+
+        testInstance.onTrigger();
+
+        assertFinalStateTransitionHappened(ctx, testInstance);
+    }
+
+    @Test
+    void testDesiredChangeInStabilizedPhase() {
+        final TestingStateTransitionManagerContext ctx =
+                
TestingStateTransitionManagerContext.stableContext().withSufficientResources();
+        final DefaultStateTransitionManager testInstance =
+                ctx.createTestInstanceInStabilizedPhase();
+
+        assertPhaseWithoutStateTransition(ctx, testInstance, Stabilized.class);
+
+        withDesiredChange(ctx, testInstance);
+
+        assertPhaseWithoutStateTransition(ctx, testInstance, Stabilized.class);
+
+        testInstance.onTrigger();
+
+        assertFinalStateTransitionHappened(ctx, testInstance);
+    }
+
+    @Test
+    void testDesiredResourcesInStabilizingPhaseAfterMaxTriggerDelay() {
+        final TestingStateTransitionManagerContext ctx =
+                
TestingStateTransitionManagerContext.stableContext().withSufficientResources();
+        final DefaultStateTransitionManager testInstance =
+                ctx.createTestInstanceWithoutStabilizationTimeout(
+                        manager -> {
+                            manager.onChange();
+                            
ctx.passTime(TestingStateTransitionManagerContext.COOLDOWN_TIMEOUT);
+                        });
+
+        assertPhaseWithoutStateTransition(ctx, testInstance, 
Stabilizing.class);
+
+        ctx.passMaxDelayTriggerTimeout();
+
+        withDesiredChange(ctx, testInstance);
+
+        assertPhaseWithoutStateTransition(ctx, testInstance, 
Stabilizing.class);
+
+        ctx.passMaxDelayTriggerTimeout();
+
+        assertFinalStateTransitionHappened(ctx, testInstance);
+    }
+
+    @Test
+    void testNoResourcesChangeInCooldownPhase() {
+        final TestingStateTransitionManagerContext ctx =
+                TestingStateTransitionManagerContext.stableContext();
+        final DefaultStateTransitionManager testInstance = 
ctx.createTestInstanceInCooldownPhase();
+
+        changeWithoutPhaseMove(ctx, testInstance, Cooldown.class);
+
+        triggerWithoutPhaseMove(ctx, testInstance, Cooldown.class);
+
+        ctx.transitionOutOfCooldownPhase();
+
+        assertPhaseWithoutStateTransition(ctx, testInstance, Idling.class);
+    }
+
+    @Test
+    void testNoResourcesChangeInIdlingPhase() {
+        final TestingStateTransitionManagerContext ctx =
+                TestingStateTransitionManagerContext.stableContext();
+        final DefaultStateTransitionManager testInstance =
+                ctx.createTestInstanceThatPassedCooldownPhase();
+
+        changeWithoutPhaseMove(ctx, testInstance, Idling.class);
+
+        triggerWithoutPhaseMove(ctx, testInstance, Idling.class);
+    }
+
+    @Test
+    void testSufficientResourcesInStabilizingPhaseAfterMaxTriggerDelay() {
+        final TestingStateTransitionManagerContext ctx =
+                
TestingStateTransitionManagerContext.stableContext().withSufficientResources();
+        final DefaultStateTransitionManager testInstance =
+                ctx.createTestInstanceWithoutStabilizationTimeout(
+                        manager -> {
+                            manager.onChange();
+                            
ctx.passTime(TestingStateTransitionManagerContext.COOLDOWN_TIMEOUT);
+                        });
+
+        assertPhaseWithoutStateTransition(ctx, testInstance, 
Stabilizing.class);
+
+        ctx.passMaxDelayTriggerTimeout();
+
+        assertPhaseWithoutStateTransition(ctx, testInstance, 
Stabilizing.class);
+    }
+
+    @Test
+    void testSufficientResourcesInStabilizedPhaseAfterMaxTriggerDelay() {
+        final TestingStateTransitionManagerContext ctx =
+                
TestingStateTransitionManagerContext.stableContext().withSufficientResources();
+        final DefaultStateTransitionManager testInstance =
+                ctx.createTestInstanceInStabilizedPhase();
+
+        assertPhaseWithoutStateTransition(ctx, testInstance, Stabilized.class);
+
+        ctx.passMaxDelayTriggerTimeout();
+
+        assertFinalStateTransitionHappened(ctx, testInstance);
+    }
+
+    @Test
+    void testSufficientChangeInCooldownPhase() {
+        final TestingStateTransitionManagerContext ctx =
+                
TestingStateTransitionManagerContext.stableContext().withSufficientResources();
+        final DefaultStateTransitionManager testInstance = 
ctx.createTestInstanceInCooldownPhase();
+
+        changeWithoutPhaseMove(ctx, testInstance, Cooldown.class);
+
+        triggerWithoutPhaseMove(ctx, testInstance, Cooldown.class);
+
+        ctx.transitionOutOfCooldownPhase();
+
+        triggerWithoutPhaseMove(ctx, testInstance, Stabilizing.class);
+
+        ctx.passResourceStabilizationTimeout();
+
+        testInstance.onTrigger();
+
+        assertFinalStateTransitionHappened(ctx, testInstance);
+    }
+
+    @Test
+    void testSufficientChangeInIdlingPhase() {
+        final TestingStateTransitionManagerContext ctx =
+                
TestingStateTransitionManagerContext.stableContext().withSufficientResources();
+        final DefaultStateTransitionManager testInstance =
+                ctx.createTestInstanceThatPassedCooldownPhase();
+
+        assertPhaseWithoutStateTransition(ctx, testInstance, Idling.class);
+
+        testInstance.onChange();
+
+        triggerWithoutPhaseMove(ctx, testInstance, Stabilizing.class);
+
+        ctx.passResourceStabilizationTimeout();
+
+        testInstance.onTrigger();
+
+        assertFinalStateTransitionHappened(ctx, testInstance);
+    }
+
+    @Test
+    void testSufficientChangeInStabilizedPhase() {
+        final TestingStateTransitionManagerContext ctx =
+                
TestingStateTransitionManagerContext.stableContext().withSufficientResources();
+        final DefaultStateTransitionManager testInstance =
+                ctx.createTestInstanceInStabilizedPhase();
+
+        assertPhaseWithoutStateTransition(ctx, testInstance, Stabilized.class);
+
+        testInstance.onTrigger();
+
+        assertFinalStateTransitionHappened(ctx, testInstance);
+    }
+
+    @Test
+    void testSufficientChangeWithSubsequentDesiredChangeInStabilizingPhase() {
+        final TestingStateTransitionManagerContext ctx =
+                
TestingStateTransitionManagerContext.stableContext().withSufficientResources();
+        final DefaultStateTransitionManager testInstance =
+                ctx.createTestInstanceThatPassedCooldownPhase();
+
+        assertPhaseWithoutStateTransition(ctx, testInstance, Idling.class);
+
+        testInstance.onChange();
+
+        triggerWithoutPhaseMove(ctx, testInstance, Stabilizing.class);
+
+        withDesiredChange(ctx, testInstance);
+
+        assertPhaseWithoutStateTransition(ctx, testInstance, 
Stabilizing.class);
+
+        testInstance.onTrigger();
+
+        assertFinalStateTransitionHappened(ctx, testInstance);
+    }
+
+    @Test
+    void 
testRevokedChangeInStabilizingPhaseWithSubsequentSufficientChangeInStabilizedPhase()
 {
+        final TestingStateTransitionManagerContext ctx =
+                
TestingStateTransitionManagerContext.stableContext().withSufficientResources();
+        final DefaultStateTransitionManager testInstance =
+                ctx.createTestInstanceThatPassedCooldownPhase();
+
+        assertPhaseWithoutStateTransition(ctx, testInstance, Idling.class);
+
+        testInstance.onChange();
+
+        triggerWithoutPhaseMove(ctx, testInstance, Stabilizing.class);
+
+        ctx.withRevokeResources();
+
+        changeWithoutPhaseMove(ctx, testInstance, Stabilizing.class);
+
+        triggerWithoutPhaseMove(ctx, testInstance, Stabilizing.class);
+
+        ctx.passResourceStabilizationTimeout();
+
+        assertPhaseWithoutStateTransition(ctx, testInstance, Stabilized.class);
+
+        withSufficientChange(ctx, testInstance);
+
+        assertPhaseWithoutStateTransition(ctx, testInstance, Stabilized.class);
+
+        testInstance.onTrigger();
+
+        assertFinalStateTransitionHappened(ctx, testInstance);
+    }
+
+    @Test
+    void testRevokedChangeInStabilizedPhase() {
+        final TestingStateTransitionManagerContext ctx =
+                
TestingStateTransitionManagerContext.stableContext().withSufficientResources();
+        final DefaultStateTransitionManager testInstance =
+                ctx.createTestInstanceInStabilizedPhase();
+
+        assertPhaseWithoutStateTransition(ctx, testInstance, Stabilized.class);
+
+        ctx.withRevokeResources();
+
+        testInstance.onTrigger();
+
+        assertPhaseWithoutStateTransition(ctx, testInstance, Idling.class);
+    }
+
+    @Test
+    void testScheduledTaskBeingIgnoredAfterStateChanged() {
+        final TestingStateTransitionManagerContext ctx =
+                TestingStateTransitionManagerContext.stableContext();
+        final DefaultStateTransitionManager testInstance =
+                ctx.createTestInstanceInStabilizedPhase();
+
+        final AtomicBoolean callbackCalled = new AtomicBoolean();
+        testInstance.scheduleFromNow(
+                () -> callbackCalled.set(true), Duration.ZERO, new 
TestPhase());
+        ctx.triggerOutdatedTasks();
+
+        assertThat(callbackCalled).isFalse();
+    }
+
+    private static class TestPhase extends Phase {
+        private TestPhase() {
+            super(Instant::now, null);
+        }
+    }
+
+    private static void assertPhaseWithoutStateTransition(
+            TestingStateTransitionManagerContext ctx,
+            DefaultStateTransitionManager testInstance,
+            Class<? extends Phase> expectedPhase) {
+        assertThat(ctx.stateTransitionWasTriggered()).isFalse();
+        assertThat(testInstance.getPhase()).isInstanceOf(expectedPhase);
+    }
+
+    private static void assertFinalStateTransitionHappened(
+            TestingStateTransitionManagerContext ctx, 
DefaultStateTransitionManager testInstance) {
+        assertThat(ctx.stateTransitionWasTriggered()).isTrue();
+        assertThat(testInstance.getPhase()).isInstanceOf(Transitioning.class);
+    }
+
+    private static void changeWithoutPhaseMove(
+            TestingStateTransitionManagerContext ctx,
+            DefaultStateTransitionManager testInstance,
+            Class<? extends Phase> expectedPhase) {
+        assertPhaseWithoutStateTransition(ctx, testInstance, expectedPhase);
+
+        testInstance.onChange();
+
+        assertPhaseWithoutStateTransition(ctx, testInstance, expectedPhase);
+    }
+
+    private static void triggerWithoutPhaseMove(
+            TestingStateTransitionManagerContext ctx,
+            DefaultStateTransitionManager testInstance,
+            Class<? extends Phase> expectedPhase) {
+
+        assertPhaseWithoutStateTransition(ctx, testInstance, expectedPhase);
+
+        testInstance.onTrigger();
+
+        assertPhaseWithoutStateTransition(ctx, testInstance, expectedPhase);
+    }
+
+    private static void withSufficientChange(
+            TestingStateTransitionManagerContext ctx, 
DefaultStateTransitionManager testInstance) {
+
+        ctx.withSufficientResources();
+        testInstance.onChange();
+    }
+
+    private static void withDesiredChange(
+            TestingStateTransitionManagerContext ctx, 
DefaultStateTransitionManager testInstance) {
+
+        ctx.withDesiredResources();
+        testInstance.onChange();
+    }
+
+    /**
+     * {@code TestingStateTransitionManagerContext} provides methods for 
adjusting the elapsed time
+     * and for adjusting the available resources for rescaling.
+     */
+    private static class TestingStateTransitionManagerContext
+            implements StateTransitionManager.Context {
+
+        // default configuration values to allow for easy transitioning 
between the phases
+        private static final Duration COOLDOWN_TIMEOUT = Duration.ofHours(1);
+        private static final Duration RESOURCE_STABILIZATION_TIMEOUT = 
Duration.ofHours(2);
+        private static final Duration MAX_TRIGGER_DELAY =
+                RESOURCE_STABILIZATION_TIMEOUT.plus(Duration.ofMinutes(10));
+
+        // configuration that defines what kind of rescaling would be possible
+        private boolean hasSufficientResources = false;
+        private boolean hasDesiredResources = false;
+
+        // internal state used for assertions
+        private final AtomicBoolean transitionTriggered = new AtomicBoolean();
+        private final SortedMap<Instant, List<ScheduledTask<Object>>> 
scheduledTasks =
+                new TreeMap<>();
+
+        // Instant.MIN makes debugging easier because timestamps become 
human-readable
+        private final Instant initializationTime = Instant.MIN;
+        private Duration elapsedTime = Duration.ZERO;
+
+        // ///////////////////////////////////////////////
+        // Context creation
+        // ///////////////////////////////////////////////
+
+        public static TestingStateTransitionManagerContext stableContext() {
+            return new TestingStateTransitionManagerContext();
+        }
+
+        private TestingStateTransitionManagerContext() {
+            // no rescaling is enabled by default
+            withRevokeResources();
+        }
+
+        public TestingStateTransitionManagerContext withRevokeResources() {
+            this.hasSufficientResources = false;
+            this.hasDesiredResources = false;
+
+            return this;
+        }
+
+        public TestingStateTransitionManagerContext withDesiredResources() {
+            // having desired resources should also mean that the sufficient 
resources are met
+            this.hasSufficientResources = true;
+            this.hasDesiredResources = true;
+
+            return this;
+        }
+
+        public TestingStateTransitionManagerContext withSufficientResources() {
+            this.hasSufficientResources = true;
+            this.hasDesiredResources = false;
+
+            return this;
+        }
+
+        // ///////////////////////////////////////////////
+        // StateTransitionManager.Context interface methods
+        // ///////////////////////////////////////////////
+
+        @Override
+        public boolean hasSufficientResources() {
+            return this.hasSufficientResources;
+        }
+
+        @Override
+        public boolean hasDesiredResources() {
+            return this.hasDesiredResources;
+        }
+
+        @Override
+        public void transitionToSubsequentState() {
+            transitionTriggered.set(true);
+        }
+
+        @Override
+        public ScheduledFuture<?> scheduleOperation(Runnable callback, 
Duration delay) {
+            final Instant triggerTime =
+                    
Objects.requireNonNull(initializationTime).plus(elapsedTime).plus(delay);
+            if (!scheduledTasks.containsKey(triggerTime)) {
+                scheduledTasks.put(triggerTime, new ArrayList<>());
+            }
+            ScheduledTask<Object> scheduledTask =
+                    new ScheduledTask<>(Executors.callable(callback), 
delay.toMillis());
+            scheduledTasks.get(triggerTime).add(scheduledTask);
+            return scheduledTask;
+        }
+
+        // ///////////////////////////////////////////////
+        // Test instance creation
+        // ///////////////////////////////////////////////
+
+        /**
+         * Creates the {@code DefaultStateTransitionManager} test instance and 
advances into a
+         * period in time where the instance is in cooldown phase.
+         */
+        public DefaultStateTransitionManager 
createTestInstanceInCooldownPhase() {
+            return createTestInstance(ignored -> 
this.transitionIntoCooldownTimeframe());
+        }
+
+        /**
+         * Creates the {@code DefaultStateTransitionManager} test instance and 
advances into a
+         * period in time where the instance is in stabilizing phase.
+         */
+        public DefaultStateTransitionManager 
createTestInstanceThatPassedCooldownPhase() {
+            return createTestInstance(ignored -> 
this.transitionOutOfCooldownPhase());
+        }
+
+        /**
+         * Creates the {@code DefaultStateTransitionManager} test instance and 
advances into a
+         * period in time where the instance is in stabilized phase.
+         */
+        public DefaultStateTransitionManager 
createTestInstanceInStabilizedPhase() {
+            return createTestInstance(
+                    manager -> {
+                        manager.onChange();
+                        passResourceStabilizationTimeout();
+                    });
+        }
+
+        /**
+         * Creates the {@code DefaultStateTransitionManager} test instance in 
terminal transitioning
+         * phase.
+         */
+        public DefaultStateTransitionManager 
createTestInstanceInTransitioningPhase() {
+            return createTestInstance(
+                    manager -> {
+                        manager.onChange();
+                        passResourceStabilizationTimeout();
+                        manager.onTrigger();
+                        clearStateTransition();
+                    });
+        }
+
+        /**
+         * Initializes the test instance with set stabilization timeout and 
sets the context's
+         * elapsed time based on the passed callback.
+         */
+        public DefaultStateTransitionManager createTestInstance(
+                Consumer<DefaultStateTransitionManager> callback) {
+            return createTestInstance(callback, 
RESOURCE_STABILIZATION_TIMEOUT);
+        }
+
+        /**
+         * Initializes the test instance without stabilization timeout and 
sets the context's
+         * elapsed time based on the passed callback.
+         */
+        public DefaultStateTransitionManager 
createTestInstanceWithoutStabilizationTimeout(
+                Consumer<DefaultStateTransitionManager> callback) {
+            return createTestInstance(callback, null);
+        }
+
+        private DefaultStateTransitionManager createTestInstance(
+                Consumer<DefaultStateTransitionManager> callback,
+                @Nullable Duration resourceStabilizationTimeout) {
+            final DefaultStateTransitionManager testInstance =
+                    new DefaultStateTransitionManager(
+                            initializationTime,
+                            // clock that returns the time based on the 
configured elapsedTime
+                            () -> 
Objects.requireNonNull(initializationTime).plus(elapsedTime),
+                            this,
+                            
TestingStateTransitionManagerContext.COOLDOWN_TIMEOUT,
+                            resourceStabilizationTimeout,
+                            
TestingStateTransitionManagerContext.MAX_TRIGGER_DELAY) {
+                        @Override
+                        public void onChange() {
+                            super.onChange();
+
+                            // hack to avoid calling this method in every test 
method
+                            // we want to trigger tasks that are meant to run 
right-away
+                            
TestingStateTransitionManagerContext.this.triggerOutdatedTasks();
+                        }
+
+                        @Override
+                        public void onTrigger() {
+                            super.onTrigger();
+
+                            // hack to avoid calling this method in every test 
method
+                            // we want to trigger tasks that are meant to run 
right-away
+                            
TestingStateTransitionManagerContext.this.triggerOutdatedTasks();
+                        }
+                    };
+
+            callback.accept(testInstance);
+            return testInstance;
+        }
+
+        // ///////////////////////////////////////////////
+        // Time-adjustment functionality
+        // ///////////////////////////////////////////////
+
+        /**
+         * Transitions the context's time to a moment that falls into the test 
instance's cooldown
+         * phase.
+         */
+        public void transitionIntoCooldownTimeframe() {
+            setElapsedTime(COOLDOWN_TIMEOUT.dividedBy(2));
+            this.triggerOutdatedTasks();
+        }
+
+        public void transitionOutOfCooldownPhase() {
+            this.setElapsedTime(COOLDOWN_TIMEOUT.plusMillis(1));
+        }
+
+        public void passResourceStabilizationTimeout() {
+            // resource stabilization is based on the current time
+            this.passTime(RESOURCE_STABILIZATION_TIMEOUT.plusMillis(1));
+        }
+
+        public void transitionToInclusiveCooldownEnd() {
+            setElapsedTime(COOLDOWN_TIMEOUT.minusMillis(1));
+        }
+
+        public void passMaxDelayTriggerTimeout() {
+            this.passTime(MAX_TRIGGER_DELAY.plusMillis(1));
+        }
+
+        public void passTime(Duration elapsed) {
+            setElapsedTime(this.elapsedTime.plus(elapsed));
+        }
+
+        public void setElapsedTime(Duration elapsedTime) {
+            Preconditions.checkState(
+                    this.elapsedTime.compareTo(elapsedTime) <= 0,
+                    "The elapsed time should monotonically increase.");
+            this.elapsedTime = elapsedTime;
+            this.triggerOutdatedTasks();
+        }
+
+        private void triggerOutdatedTasks() {
+            while (!scheduledTasks.isEmpty()) {
+                final Instant timeOfExecution = scheduledTasks.firstKey();
+                if (!timeOfExecution.isAfter(
+                        
Objects.requireNonNull(initializationTime).plus(elapsedTime))) {
+                    
scheduledTasks.remove(timeOfExecution).forEach(ScheduledTask::execute);
+                } else {
+                    break;
+                }
+            }
+        }
+
+        // ///////////////////////////////////////////////
+        // Methods for verifying the context's state
+        // ///////////////////////////////////////////////
+
+        public boolean stateTransitionWasTriggered() {
+            return transitionTriggered.get();
+        }
+
+        public void clearStateTransition() {
+            transitionTriggered.set(false);
+        }
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
index 48ee70bfd29..b49c1c9b35d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
@@ -155,7 +155,7 @@ class ExecutingTest {
                     ctx,
                     ClassLoader.getSystemClassLoader(),
                     new ArrayList<>(),
-                    TestingRescaleManager.Factory.noOpFactory(),
+                    TestingStateTransitionManager.Factory.noOpFactory(),
                     1,
                     1,
                     Instant.now());
@@ -183,7 +183,7 @@ class ExecutingTest {
                                         ctx,
                                         ClassLoader.getSystemClassLoader(),
                                         new ArrayList<>(),
-                                        
TestingRescaleManager.Factory.noOpFactory(),
+                                        
TestingStateTransitionManager.Factory.noOpFactory(),
                                         1,
                                         1,
                                         Instant.now());
@@ -195,12 +195,13 @@ class ExecutingTest {
     @Test
     public void testTriggerRescaleOnCompletedCheckpoint() throws Exception {
         final AtomicBoolean rescaleTriggered = new AtomicBoolean();
-        final RescaleManager.Factory rescaleManagerFactory =
-                new TestingRescaleManager.Factory(() -> {}, () -> 
rescaleTriggered.set(true));
+        final StateTransitionManager.Factory stateTransitionManagerFactory =
+                new TestingStateTransitionManager.Factory(
+                        () -> {}, () -> rescaleTriggered.set(true));
         try (MockExecutingContext ctx = new MockExecutingContext()) {
             final Executing testInstance =
                     new ExecutingStateBuilder()
-                            .setRescaleManagerFactory(rescaleManagerFactory)
+                            
.setStateTransitionManagerFactory(stateTransitionManagerFactory)
                             .build(ctx);
 
             assertThat(rescaleTriggered).isFalse();
@@ -212,13 +213,14 @@ class ExecutingTest {
     @Test
     public void testTriggerRescaleOnFailedCheckpoint() throws Exception {
         final AtomicInteger rescaleTriggerCount = new AtomicInteger();
-        final RescaleManager.Factory rescaleManagerFactory =
-                new TestingRescaleManager.Factory(() -> {}, 
rescaleTriggerCount::incrementAndGet);
+        final StateTransitionManager.Factory stateTransitionManagerFactory =
+                new TestingStateTransitionManager.Factory(
+                        () -> {}, rescaleTriggerCount::incrementAndGet);
         final int rescaleOnFailedCheckpointsCount = 3;
         try (MockExecutingContext ctx = new MockExecutingContext()) {
             final Executing testInstance =
                     new ExecutingStateBuilder()
-                            .setRescaleManagerFactory(rescaleManagerFactory)
+                            
.setStateTransitionManagerFactory(stateTransitionManagerFactory)
                             
.setRescaleOnFailedCheckpointCount(rescaleOnFailedCheckpointsCount)
                             .build(ctx);
 
@@ -254,13 +256,14 @@ class ExecutingTest {
     @Test
     public void testOnCompletedCheckpointResetsFailedCheckpointCount() throws 
Exception {
         final AtomicInteger rescaleTriggeredCount = new AtomicInteger();
-        final RescaleManager.Factory rescaleManagerFactory =
-                new TestingRescaleManager.Factory(() -> {}, 
rescaleTriggeredCount::incrementAndGet);
+        final StateTransitionManager.Factory stateTransitionManagerFactory =
+                new TestingStateTransitionManager.Factory(
+                        () -> {}, rescaleTriggeredCount::incrementAndGet);
         final int rescaleOnFailedCheckpointsCount = 3;
         try (MockExecutingContext ctx = new MockExecutingContext()) {
             final Executing testInstance =
                     new ExecutingStateBuilder()
-                            .setRescaleManagerFactory(rescaleManagerFactory)
+                            
.setStateTransitionManagerFactory(stateTransitionManagerFactory)
                             
.setRescaleOnFailedCheckpointCount(rescaleOnFailedCheckpointsCount)
                             .build(ctx);
 
@@ -581,8 +584,8 @@ class ExecutingTest {
         final Queue<String> actualEvents = new ArrayDeque<>();
         try (MockExecutingContext ctx = new MockExecutingContext()) {
             new ExecutingStateBuilder()
-                    .setRescaleManagerFactory(
-                            new TestingRescaleManager.Factory(
+                    .setStateTransitionManagerFactory(
+                            new TestingStateTransitionManager.Factory(
                                     () -> actualEvents.add(onChangeEventLabel),
                                     () -> 
actualEvents.add(onTriggerEventLabel)))
                     .build(ctx);
@@ -606,8 +609,8 @@ class ExecutingTest {
                 TestingDefaultExecutionGraphBuilder.newBuilder()
                         .build(EXECUTOR_EXTENSION.getExecutor());
         private OperatorCoordinatorHandler operatorCoordinatorHandler;
-        private RescaleManager.Factory rescaleManagerFactory =
-                TestingRescaleManager.Factory.noOpFactory();
+        private StateTransitionManager.Factory stateTransitionManagerFactory =
+                TestingStateTransitionManager.Factory.noOpFactory();
         private int rescaleOnFailedCheckpointCount = 1;
 
         private ExecutingStateBuilder() throws JobException, 
JobExecutionException {
@@ -625,9 +628,9 @@ class ExecutingTest {
             return this;
         }
 
-        public ExecutingStateBuilder setRescaleManagerFactory(
-                RescaleManager.Factory rescaleManagerFactory) {
-            this.rescaleManagerFactory = rescaleManagerFactory;
+        public ExecutingStateBuilder setStateTransitionManagerFactory(
+                StateTransitionManager.Factory stateTransitionManagerFactory) {
+            this.stateTransitionManagerFactory = stateTransitionManagerFactory;
             return this;
         }
 
@@ -649,7 +652,7 @@ class ExecutingTest {
                         ctx,
                         ClassLoader.getSystemClassLoader(),
                         new ArrayList<>(),
-                        rescaleManagerFactory,
+                        stateTransitionManagerFactory,
                         1,
                         rescaleOnFailedCheckpointCount,
                         // will be ignored by the TestingRescaleManager.Factory
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/TestingRescaleManager.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/TestingStateTransitionManager.java
similarity index 70%
rename from 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/TestingRescaleManager.java
rename to 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/TestingStateTransitionManager.java
index cc435ea4f7b..3f4573cd74e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/TestingRescaleManager.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/TestingStateTransitionManager.java
@@ -20,12 +20,13 @@ package org.apache.flink.runtime.scheduler.adaptive;
 
 import java.time.Instant;
 
-public class TestingRescaleManager implements RescaleManager {
+/** Testing implementation for {@link StateTransitionManager}. */
+public class TestingStateTransitionManager implements StateTransitionManager {
 
     private final Runnable onChangeRunnable;
     private final Runnable onTriggerRunnable;
 
-    private TestingRescaleManager(Runnable onChangeRunnable, Runnable 
onTriggerRunnable) {
+    private TestingStateTransitionManager(Runnable onChangeRunnable, Runnable 
onTriggerRunnable) {
         this.onChangeRunnable = onChangeRunnable;
         this.onTriggerRunnable = onTriggerRunnable;
     }
@@ -40,12 +41,15 @@ public class TestingRescaleManager implements 
RescaleManager {
         this.onTriggerRunnable.run();
     }
 
-    public static class Factory implements RescaleManager.Factory {
+    /**
+     * {@code Factory} implementation for creating {@code 
TestingStateTransitionManager} instances.
+     */
+    public static class Factory implements StateTransitionManager.Factory {
 
         private final Runnable onChangeRunnable;
         private final Runnable onTriggerRunnable;
 
-        public static TestingRescaleManager.Factory noOpFactory() {
+        public static TestingStateTransitionManager.Factory noOpFactory() {
             return new Factory(() -> {}, () -> {});
         }
 
@@ -55,8 +59,8 @@ public class TestingRescaleManager implements RescaleManager {
         }
 
         @Override
-        public RescaleManager create(Context ignoredContext, Instant 
ignoredLastRescale) {
-            return new TestingRescaleManager(onChangeRunnable, 
onTriggerRunnable);
+        public StateTransitionManager create(Context ignoredContext, Instant 
ignoredLastRescale) {
+            return new TestingStateTransitionManager(onChangeRunnable, 
onTriggerRunnable);
         }
     }
 }

Reply via email to