This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 1f303a218f4 [FLINK-36012] [runtime] Integrate StateTransitionManager 
into WaitingForResources state
1f303a218f4 is described below

commit 1f303a218f47aa1a8eaf1323fc2a5debe914762d
Author: Zdenek Tison <[email protected]>
AuthorDate: Fri Jul 19 11:00:10 2024 +0200

    [FLINK-36012] [runtime] Integrate StateTransitionManager into 
WaitingForResources state
---
 .../scheduler/adaptive/AdaptiveScheduler.java      |  50 +++-
 .../adaptive/DefaultStateTransitionManager.java    |  88 +++----
 .../runtime/scheduler/adaptive/Executing.java      |  12 +-
 .../scheduler/adaptive/StateTransitionManager.java |  11 -
 .../scheduler/adaptive/WaitingForResources.java    | 109 ++++-----
 .../adaptive/AdaptiveSchedulerBuilder.java         |   8 +-
 .../scheduler/adaptive/AdaptiveSchedulerTest.java  | 193 +++++++++++++---
 .../DefaultStateTransitionManagerTest.java         |  34 +--
 .../runtime/scheduler/adaptive/ExecutingTest.java  |  51 +++--
 .../adaptive/TestingStateTransitionManager.java    |  39 ++--
 .../adaptive/WaitingForResourcesTest.java          | 255 ++++++++-------------
 11 files changed, 445 insertions(+), 405 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index ebcfc369c82..b5b98984a11 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -136,6 +136,8 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.time.Duration;
+import java.time.Instant;
+import java.time.temporal.Temporal;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -185,6 +187,24 @@ public class AdaptiveScheduler
 
     private static final Logger LOG = 
LoggerFactory.getLogger(AdaptiveScheduler.class);
 
+    /**
+     * Named callback interface for creating {@code StateTransitionManager} 
instances. This internal
+     * interface allows for easier testing of the parameter injection in a 
unit test.
+     *
+     * @see
+     *     
DefaultStateTransitionManager#DefaultStateTransitionManager(StateTransitionManager.Context,
+     *     Duration, Duration, Duration, Temporal)
+     */
+    @FunctionalInterface
+    interface StateTransitionManagerFactory {
+        StateTransitionManager create(
+                StateTransitionManager.Context context,
+                Duration cooldownTimeout,
+                @Nullable Duration resourceStabilizationTimeout,
+                Duration maximumDelayForTrigger,
+                Temporal lastStateTransition);
+    }
+
     /**
      * Consolidated settings for the adaptive scheduler. This class is used to 
avoid passing around
      * multiple config options.
@@ -349,7 +369,7 @@ public class AdaptiveScheduler
     }
 
     private final Settings settings;
-    private final StateTransitionManager.Factory stateTransitionManagerFactory;
+    private final StateTransitionManagerFactory stateTransitionManagerFactory;
 
     private final JobGraph jobGraph;
 
@@ -427,7 +447,7 @@ public class AdaptiveScheduler
             throws JobExecutionException {
         this(
                 settings,
-                DefaultStateTransitionManager.Factory.fromSettings(settings),
+                DefaultStateTransitionManager::new,
                 (metricGroup, checkpointStatsListener) ->
                         new DefaultCheckpointStatsTracker(
                                 
configuration.get(WebOptions.CHECKPOINTS_HISTORY_SIZE),
@@ -455,7 +475,7 @@ public class AdaptiveScheduler
     @VisibleForTesting
     AdaptiveScheduler(
             Settings settings,
-            StateTransitionManager.Factory stateTransitionManagerFactory,
+            StateTransitionManagerFactory stateTransitionManagerFactory,
             BiFunction<JobManagerJobMetricGroup, CheckpointStatsListener, 
CheckpointStatsTracker>
                     checkpointStatsTrackerFactory,
             JobGraph jobGraph,
@@ -1143,10 +1163,20 @@ public class AdaptiveScheduler
                         this,
                         LOG,
                         settings.getInitialResourceAllocationTimeout(),
-                        settings.getResourceStabilizationTimeout(),
+                        this::createWaitingForResourceStateTransitionManager,
                         previousExecutionGraph));
     }
 
+    private StateTransitionManager 
createWaitingForResourceStateTransitionManager(
+            StateTransitionManager.Context ctx) {
+        return stateTransitionManagerFactory.create(
+                ctx,
+                Duration.ZERO, // skip cooldown phase
+                settings.getResourceStabilizationTimeout(),
+                Duration.ZERO, // trigger immediately once the stabilization 
phase is over
+                Instant.now());
+    }
+
     private void declareDesiredResources() {
         final ResourceCounter newDesiredResources = 
calculateDesiredResources();
 
@@ -1175,11 +1205,21 @@ public class AdaptiveScheduler
                         this,
                         userCodeClassLoader,
                         failureCollection,
-                        stateTransitionManagerFactory,
+                        this::createExecutingStateTransitionManager,
                         settings.getMinParallelismChangeForDesiredRescale(),
                         settings.getRescaleOnFailedCheckpointCount()));
     }
 
+    private StateTransitionManager createExecutingStateTransitionManager(
+            StateTransitionManager.Context ctx, Instant lastRescaleTimestamp) {
+        return stateTransitionManagerFactory.create(
+                ctx,
+                settings.getScalingIntervalMin(),
+                settings.getScalingIntervalMax(),
+                settings.getMaximumDelayForTriggeringRescale(),
+                lastRescaleTimestamp);
+    }
+
     @Override
     public void goToCanceling(
             ExecutionGraph executionGraph,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManager.java
index aa1ea2c965d..df7969572b7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManager.java
@@ -69,42 +69,60 @@ public class DefaultStateTransitionManager implements 
StateTransitionManager {
     private final StateTransitionManager.Context transitionContext;
     private Phase phase;
     private final List<ScheduledFuture<?>> scheduledFutures;
+    @Nullable private final Duration resourceStabilizationTimeout;
+    private final Duration maxTriggerDelay;
 
-    @VisibleForTesting final Duration cooldownTimeout;
-    @Nullable @VisibleForTesting final Duration resourceStabilizationTimeout;
-    @VisibleForTesting final Duration maxTriggerDelay;
-
+    /**
+     * Creates a {@code DefaultStateTransitionManager} instance with the given 
parameters.
+     *
+     * @param transitionContext The context for the {@code 
StateTransitionManager}.
+     * @param cooldownTimeout The timeout for the cooldown phase.
+     * @param resourceStabilizationTimeout The timeout for the resource 
stabilization phase.
+     * @param maxTriggerDelay The maximum delay for triggering a {@link 
AdaptiveScheduler}'s state
+     *     transition if only sufficient resources are available.
+     * @param initializationTime The last state transition timestamp of {@link 
AdaptiveScheduler}'s
+     *     state machine.
+     */
     DefaultStateTransitionManager(
-            Temporal initializationTime,
-            StateTransitionManager.Context transitionContext,
+            Context transitionContext,
             Duration cooldownTimeout,
             @Nullable Duration resourceStabilizationTimeout,
-            Duration maxTriggerDelay) {
+            Duration maxTriggerDelay,
+            Temporal initializationTime) {
         this(
-                initializationTime,
                 Instant::now,
                 transitionContext,
                 cooldownTimeout,
                 resourceStabilizationTimeout,
-                maxTriggerDelay);
+                maxTriggerDelay,
+                initializationTime);
     }
 
     @VisibleForTesting
     DefaultStateTransitionManager(
-            Temporal initializationTime,
             Supplier<Temporal> clock,
-            StateTransitionManager.Context transitionContext,
+            Context transitionContext,
             Duration cooldownTimeout,
             @Nullable Duration resourceStabilizationTimeout,
-            Duration maxTriggerDelay) {
+            Duration maxTriggerDelay,
+            Temporal initializationTime) {
 
         this.clock = clock;
+        Preconditions.checkArgument(
+                !maxTriggerDelay.isNegative(), "Max trigger delay must not be 
negative");
         this.maxTriggerDelay = maxTriggerDelay;
-        this.cooldownTimeout = cooldownTimeout;
+        Preconditions.checkArgument(
+                resourceStabilizationTimeout == null || 
!resourceStabilizationTimeout.isNegative(),
+                "Resource stabilization timeout must not be negative");
         this.resourceStabilizationTimeout = resourceStabilizationTimeout;
-        this.transitionContext = transitionContext;
+        this.transitionContext = Preconditions.checkNotNull(transitionContext);
         this.scheduledFutures = new ArrayList<>();
-        this.phase = new Cooldown(initializationTime, clock, this, 
cooldownTimeout);
+        this.phase =
+                new Cooldown(
+                        Preconditions.checkNotNull(initializationTime),
+                        clock,
+                        this,
+                        Preconditions.checkNotNull(cooldownTimeout));
     }
 
     @Override
@@ -176,46 +194,6 @@ public class DefaultStateTransitionManager implements 
StateTransitionManager {
         }
     }
 
-    /** Factory for creating {@link DefaultStateTransitionManager} instances. 
*/
-    public static class Factory implements StateTransitionManager.Factory {
-
-        private final Duration cooldownTimeout;
-        @Nullable private final Duration resourceStabilizationTimeout;
-        private final Duration maximumDelayForTrigger;
-
-        /**
-         * Creates a {@code Factory} instance based on the {@link 
AdaptiveScheduler}'s {@code
-         * Settings} for rescaling.
-         */
-        public static Factory fromSettings(AdaptiveScheduler.Settings 
settings) {
-            // it's not ideal that we use a AdaptiveScheduler internal class 
here. We might want to
-            // change that as part of a more general alignment of the 
rescaling configuration.
-            return new Factory(
-                    settings.getScalingIntervalMin(),
-                    settings.getScalingIntervalMax(),
-                    settings.getMaximumDelayForTriggeringRescale());
-        }
-
-        private Factory(
-                Duration cooldownTimeout,
-                @Nullable Duration resourceStabilizationTimeout,
-                Duration maximumDelayForTrigger) {
-            this.cooldownTimeout = cooldownTimeout;
-            this.resourceStabilizationTimeout = resourceStabilizationTimeout;
-            this.maximumDelayForTrigger = maximumDelayForTrigger;
-        }
-
-        @Override
-        public DefaultStateTransitionManager create(Context context, Instant 
lastStateTransition) {
-            return new DefaultStateTransitionManager(
-                    lastStateTransition,
-                    context,
-                    cooldownTimeout,
-                    resourceStabilizationTimeout,
-                    maximumDelayForTrigger);
-        }
-    }
-
     /**
      * A phase in the state machine of the {@link 
DefaultStateTransitionManager}. Each phase is
      * responsible for a specific part of the state transition process.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
index f7dcb8d97cf..e78bcfac10f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
@@ -54,6 +54,7 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiFunction;
 import java.util.stream.Collectors;
 
 /** State which represents a running job with an {@link ExecutionGraph} and 
assigned slots. */
@@ -77,7 +78,8 @@ class Executing extends StateWithExecutionGraph
             Context context,
             ClassLoader userCodeClassLoader,
             List<ExceptionHistoryEntry> failureCollection,
-            StateTransitionManager.Factory stateTransitionManagerFactory,
+            BiFunction<StateTransitionManager.Context, Instant, 
StateTransitionManager>
+                    stateTransitionManagerFactory,
             int minParallelismChangeForRescale,
             int rescaleOnFailedCheckpointCount,
             Instant lastRescale) {
@@ -96,7 +98,7 @@ class Executing extends StateWithExecutionGraph
         this.sufficientResourcesController = new 
EnforceParallelismChangeRescalingController();
         this.desiredResourcesController =
                 new 
EnforceMinimalIncreaseRescalingController(minParallelismChangeForRescale);
-        this.stateTransitionManager = 
stateTransitionManagerFactory.create(this, lastRescale);
+        this.stateTransitionManager = 
stateTransitionManagerFactory.apply(this, lastRescale);
 
         Preconditions.checkArgument(
                 rescaleOnFailedCheckpointCount > 0,
@@ -328,7 +330,8 @@ class Executing extends StateWithExecutionGraph
         private final OperatorCoordinatorHandler operatorCoordinatorHandler;
         private final ClassLoader userCodeClassLoader;
         private final List<ExceptionHistoryEntry> failureCollection;
-        private final StateTransitionManager.Factory 
stateTransitionManagerFactory;
+        private final BiFunction<StateTransitionManager.Context, Instant, 
StateTransitionManager>
+                stateTransitionManagerFactory;
         private final int minParallelismChangeForRescale;
         private final int rescaleOnFailedCheckpointCount;
 
@@ -340,7 +343,8 @@ class Executing extends StateWithExecutionGraph
                 Context context,
                 ClassLoader userCodeClassLoader,
                 List<ExceptionHistoryEntry> failureCollection,
-                StateTransitionManager.Factory stateTransitionManagerFactory,
+                BiFunction<StateTransitionManager.Context, Instant, 
StateTransitionManager>
+                        stateTransitionManagerFactory,
                 int minParallelismChangeForRescale,
                 int rescaleOnFailedCheckpointCount) {
             this.context = context;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateTransitionManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateTransitionManager.java
index a6a6aaaca2e..322e8aae373 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateTransitionManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateTransitionManager.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.scheduler.adaptive;
 
 import java.time.Duration;
-import java.time.Instant;
 import java.util.concurrent.ScheduledFuture;
 
 /**
@@ -70,14 +69,4 @@ public interface StateTransitionManager {
          */
         ScheduledFuture<?> scheduleOperation(Runnable callback, Duration 
delay);
     }
-
-    /** Interface for creating {@code StateTransitionManager} instances. */
-    interface Factory {
-
-        /**
-         * Creates a {@code StateTransitionManager} instance for the given 
{@code Context} and
-         * previous state transition time.
-         */
-        StateTransitionManager create(Context context, Instant 
lastStateTransition);
-    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java
index d7933984db3..01f084bc093 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java
@@ -20,11 +20,8 @@ package org.apache.flink.runtime.scheduler.adaptive;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.clock.Clock;
-import org.apache.flink.util.clock.SystemClock;
 
 import org.slf4j.Logger;
 
@@ -32,57 +29,43 @@ import javax.annotation.Nullable;
 
 import java.time.Duration;
 import java.util.concurrent.ScheduledFuture;
+import java.util.function.Function;
 
 /**
  * State which describes that the scheduler is waiting for resources in order 
to execute the job.
  */
-class WaitingForResources extends StateWithoutExecutionGraph implements 
ResourceListener {
+class WaitingForResources extends StateWithoutExecutionGraph
+        implements ResourceListener, StateTransitionManager.Context {
 
     private final Context context;
 
-    private final Clock clock;
-
-    /** If set, there's an ongoing deadline waiting for a resource 
stabilization. */
-    @Nullable private Deadline resourceStabilizationDeadline;
-
-    private final Duration resourceStabilizationTimeout;
-
     @Nullable private ScheduledFuture<?> resourceTimeoutFuture;
 
     @Nullable private final ExecutionGraph previousExecutionGraph;
 
+    private final StateTransitionManager stateTransitionManager;
+
     @VisibleForTesting
     WaitingForResources(
             Context context,
             Logger log,
             Duration initialResourceAllocationTimeout,
-            Duration resourceStabilizationTimeout) {
-        this(
-                context,
-                log,
-                initialResourceAllocationTimeout,
-                resourceStabilizationTimeout,
-                SystemClock.getInstance(),
-                null);
+            Function<StateTransitionManager.Context, StateTransitionManager>
+                    stateTransitionManagerFactory) {
+        this(context, log, initialResourceAllocationTimeout, null, 
stateTransitionManagerFactory);
     }
 
     WaitingForResources(
             Context context,
             Logger log,
             Duration initialResourceAllocationTimeout,
-            Duration resourceStabilizationTimeout,
-            Clock clock,
-            @Nullable ExecutionGraph previousExecutionGraph) {
+            @Nullable ExecutionGraph previousExecutionGraph,
+            Function<StateTransitionManager.Context, StateTransitionManager>
+                    stateTransitionManagerFactory) {
         super(context, log);
         this.context = Preconditions.checkNotNull(context);
-        this.resourceStabilizationTimeout =
-                Preconditions.checkNotNull(resourceStabilizationTimeout);
-        this.clock = clock;
         Preconditions.checkNotNull(initialResourceAllocationTimeout);
-
-        Preconditions.checkArgument(
-                !resourceStabilizationTimeout.isNegative(),
-                "Resource stabilization timeout must not be negative");
+        this.stateTransitionManager = 
stateTransitionManagerFactory.apply(this);
 
         // since state transitions are not allowed in state constructors, 
schedule calls for later.
         if (!initialResourceAllocationTimeout.isNegative()) {
@@ -91,7 +74,7 @@ class WaitingForResources extends StateWithoutExecutionGraph 
implements Resource
                             this, this::resourceTimeout, 
initialResourceAllocationTimeout);
         }
         this.previousExecutionGraph = previousExecutionGraph;
-        context.runIfState(this, 
this::checkDesiredOrSufficientResourcesAvailable, Duration.ZERO);
+        context.runIfState(this, this::checkPotentialStateTransition, 
Duration.ZERO);
     }
 
     @Override
@@ -99,6 +82,8 @@ class WaitingForResources extends StateWithoutExecutionGraph 
implements Resource
         if (resourceTimeoutFuture != null) {
             resourceTimeoutFuture.cancel(false);
         }
+        stateTransitionManager.close();
+        super.onLeave(newState);
     }
 
     @Override
@@ -108,51 +93,46 @@ class WaitingForResources extends 
StateWithoutExecutionGraph implements Resource
 
     @Override
     public void onNewResourcesAvailable() {
-        checkDesiredOrSufficientResourcesAvailable();
+        checkPotentialStateTransition();
     }
 
     @Override
     public void onNewResourceRequirements() {
-        checkDesiredOrSufficientResourcesAvailable();
+        checkPotentialStateTransition();
     }
 
-    private void checkDesiredOrSufficientResourcesAvailable() {
-        if (context.hasDesiredResources()) {
-            createExecutionGraphWithAvailableResources();
-            return;
-        }
-
-        if (context.hasSufficientResources()) {
-            if (resourceStabilizationDeadline == null) {
-                resourceStabilizationDeadline =
-                        
Deadline.fromNowWithClock(resourceStabilizationTimeout, clock);
-            }
-            if (resourceStabilizationDeadline.isOverdue()) {
-                createExecutionGraphWithAvailableResources();
-            } else {
-                // schedule next resource check
-                context.runIfState(
-                        this,
-                        this::checkDesiredOrSufficientResourcesAvailable,
-                        resourceStabilizationDeadline.timeLeft());
-            }
-        } else {
-            // clear deadline due to insufficient resources
-            resourceStabilizationDeadline = null;
-        }
+    private void checkPotentialStateTransition() {
+        stateTransitionManager.onChange();
+        stateTransitionManager.onTrigger();
     }
 
     private void resourceTimeout() {
         getLogger()
                 .debug(
                         "Initial resource allocation timeout triggered: 
Creating ExecutionGraph with available resources.");
-        createExecutionGraphWithAvailableResources();
+        transitionToSubsequentState();
+    }
+
+    @Override
+    public boolean hasSufficientResources() {
+        return context.hasSufficientResources();
+    }
+
+    @Override
+    public boolean hasDesiredResources() {
+        return context.hasDesiredResources();
     }
 
-    private void createExecutionGraphWithAvailableResources() {
+    @Override
+    public void transitionToSubsequentState() {
         context.goToCreatingExecutionGraph(previousExecutionGraph);
     }
 
+    @Override
+    public ScheduledFuture<?> scheduleOperation(Runnable callback, Duration 
delay) {
+        return context.runIfState(this, callback, delay);
+    }
+
     /** Context of the {@link WaitingForResources} state. */
     interface Context
             extends StateWithoutExecutionGraph.Context, 
StateTransitions.ToCreatingExecutionGraph {
@@ -188,20 +168,22 @@ class WaitingForResources extends 
StateWithoutExecutionGraph implements Resource
         private final Context context;
         private final Logger log;
         private final Duration initialResourceAllocationTimeout;
-        private final Duration resourceStabilizationTimeout;
         @Nullable private final ExecutionGraph previousExecutionGraph;
+        private final Function<StateTransitionManager.Context, 
StateTransitionManager>
+                stateTransitionManagerFactory;
 
         public Factory(
                 Context context,
                 Logger log,
                 Duration initialResourceAllocationTimeout,
-                Duration resourceStabilizationTimeout,
+                Function<StateTransitionManager.Context, 
StateTransitionManager>
+                        stateTransitionManagerFactory,
                 @Nullable ExecutionGraph previousExecutionGraph) {
             this.context = context;
             this.log = log;
             this.initialResourceAllocationTimeout = 
initialResourceAllocationTimeout;
-            this.resourceStabilizationTimeout = resourceStabilizationTimeout;
             this.previousExecutionGraph = previousExecutionGraph;
+            this.stateTransitionManagerFactory = stateTransitionManagerFactory;
         }
 
         public Class<WaitingForResources> getStateClass() {
@@ -213,9 +195,8 @@ class WaitingForResources extends 
StateWithoutExecutionGraph implements Resource
                     context,
                     log,
                     initialResourceAllocationTimeout,
-                    resourceStabilizationTimeout,
-                    SystemClock.getInstance(),
-                    previousExecutionGraph);
+                    previousExecutionGraph,
+                    stateTransitionManagerFactory);
         }
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
index 761dac96155..f35683c3f7b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
@@ -98,7 +98,8 @@ public class AdaptiveSchedulerBuilder {
     /**
      * {@code null} indicates that the default factory will be used based on 
the set configuration.
      */
-    @Nullable private StateTransitionManager.Factory 
stateTransitionManagerFactory = null;
+    @Nullable
+    private AdaptiveScheduler.StateTransitionManagerFactory 
stateTransitionManagerFactory = null;
 
     private BiFunction<JobManagerJobMetricGroup, CheckpointStatsListener, 
CheckpointStatsTracker>
             checkpointStatsTrackerFactory =
@@ -226,7 +227,8 @@ public class AdaptiveSchedulerBuilder {
     }
 
     public AdaptiveSchedulerBuilder setStateTransitionManagerFactory(
-            @Nullable StateTransitionManager.Factory 
stateTransitionManagerFactory) {
+            @Nullable
+                    AdaptiveScheduler.StateTransitionManagerFactory 
stateTransitionManagerFactory) {
         this.stateTransitionManagerFactory = stateTransitionManagerFactory;
         return this;
     }
@@ -261,7 +263,7 @@ public class AdaptiveSchedulerBuilder {
         return new AdaptiveScheduler(
                 settings,
                 stateTransitionManagerFactory == null
-                        ? 
DefaultStateTransitionManager.Factory.fromSettings(settings)
+                        ? DefaultStateTransitionManager::new
                         : stateTransitionManagerFactory,
                 checkpointStatsTrackerFactory,
                 jobGraph,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
index 5bc3d859c48..88a0719bbbf 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.core.failure.FailureEnricher;
 import org.apache.flink.core.failure.TestingFailureEnricher;
 import org.apache.flink.core.testutils.FlinkAssertions;
+import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
@@ -130,6 +131,7 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.time.Duration;
+import java.time.temporal.Temporal;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -149,6 +151,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiConsumer;
 import java.util.function.BiFunction;
 import java.util.function.Consumer;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -2247,10 +2250,126 @@ public class AdaptiveSchedulerTest {
         assertThat(eventQueue.take()).as("Only one event should have been 
observed.").isEqualTo(1);
     }
 
+    @Test
+    void testGoToWaitingForResourcesConfiguresStateTransitionManagerFactory() 
throws Exception {
+        final OneShotLatch latch = new OneShotLatch();
+        final TestingStateTransitionManagerFactory factory =
+                new TestingStateTransitionManagerFactory(
+                        ctx ->
+                                
TestingStateTransitionManager.withOnChangeEventOnly(
+                                        () -> {
+                                            if (ctx instanceof 
WaitingForResources) {
+                                                latch.trigger();
+                                            }
+                                        }));
+
+        final Configuration configuration = new Configuration();
+        final Duration resourceStabilizationTimeout = Duration.ofMillis(10L);
+        configuration.set(
+                JobManagerOptions.RESOURCE_STABILIZATION_TIMEOUT, 
resourceStabilizationTimeout);
+
+        scheduler =
+                new AdaptiveSchedulerBuilder(
+                                createJobGraph(),
+                                singleThreadMainThreadExecutor,
+                                EXECUTOR_RESOURCE.getExecutor())
+                        .setStateTransitionManagerFactory(factory)
+                        .setJobMasterConfiguration(configuration)
+                        .build();
+
+        // start scheduling to reach Executing state
+        singleThreadMainThreadExecutor.execute(scheduler::startScheduling);
+
+        // let's wait for the onChange event in Executing state.
+        latch.await();
+
+        
assertThat(scheduler.getState()).isInstanceOf(WaitingForResources.class);
+        assertThat(factory.cooldownTimeout).isEqualTo(Duration.ZERO);
+        assertThat(factory.maximumDelayForTrigger).isEqualTo(Duration.ZERO);
+        
assertThat(factory.resourceStabilizationTimeout).isEqualTo(resourceStabilizationTimeout);
+    }
+
+    @Test
+    void testGoToExecutingConfiguresStateTransitionManagerFactory() throws 
Exception {
+        final OneShotLatch latch = new OneShotLatch();
+        final TestingStateTransitionManagerFactory factory =
+                new TestingStateTransitionManagerFactory(
+                        ctx ->
+                                
TestingStateTransitionManager.withOnChangeEventOnly(
+                                        () -> {
+                                            if (ctx instanceof 
WaitingForResources) {
+                                                
ctx.transitionToSubsequentState();
+                                            }
+                                            if (ctx instanceof Executing) {
+                                                latch.trigger();
+                                            }
+                                        }));
+
+        final Configuration configuration = new Configuration();
+        final Duration scalingIntervalMin = Duration.ofMillis(1L);
+        final Duration scalingIntervalMax = Duration.ofMillis(5L);
+        final Duration maxDelayForTrigger = Duration.ofMillis(10L);
+
+        configuration.set(JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MIN, 
scalingIntervalMin);
+        configuration.set(JobManagerOptions.MAXIMUM_DELAY_FOR_SCALE_TRIGGER, 
maxDelayForTrigger);
+        configuration.set(JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX, 
scalingIntervalMax);
+
+        scheduler =
+                new AdaptiveSchedulerBuilder(
+                                createJobGraph(),
+                                singleThreadMainThreadExecutor,
+                                EXECUTOR_RESOURCE.getExecutor())
+                        .setJobMasterConfiguration(configuration)
+                        
.setDeclarativeSlotPool(getSlotPoolWithFreeSlots(PARALLELISM))
+                        .setStateTransitionManagerFactory(factory)
+                        .build();
+
+        // start scheduling to reach Executing state
+        singleThreadMainThreadExecutor.execute(scheduler::startScheduling);
+
+        // let's wait for the onChange event in Executing state.
+        latch.await();
+
+        assertThat(scheduler.getState()).isInstanceOf(Executing.class);
+        assertThat(factory.cooldownTimeout).isEqualTo(scalingIntervalMin);
+        
assertThat(factory.maximumDelayForTrigger).isEqualTo(maxDelayForTrigger);
+        
assertThat(factory.resourceStabilizationTimeout).isEqualTo(scalingIntervalMax);
+    }
+
     // 
---------------------------------------------------------------------------------------------
     // Utils
     // 
---------------------------------------------------------------------------------------------
 
+    private static class TestingStateTransitionManagerFactory
+            implements AdaptiveScheduler.StateTransitionManagerFactory {
+
+        private final Function<StateTransitionManager.Context, 
StateTransitionManager>
+                stateTransitionManagerCreator;
+
+        private Duration cooldownTimeout;
+        private Duration resourceStabilizationTimeout;
+        private Duration maximumDelayForTrigger;
+
+        public TestingStateTransitionManagerFactory(
+                Function<StateTransitionManager.Context, 
StateTransitionManager>
+                        stateTransitionManagerCreator) {
+            this.stateTransitionManagerCreator = stateTransitionManagerCreator;
+        }
+
+        public StateTransitionManager create(
+                StateTransitionManager.Context context,
+                Duration cooldownTimeout,
+                @Nullable Duration resourceStabilizationTimeout,
+                Duration maximumDelayForTrigger,
+                Temporal ignoredTimestamp) {
+            this.cooldownTimeout = cooldownTimeout;
+            this.resourceStabilizationTimeout = resourceStabilizationTimeout;
+            this.maximumDelayForTrigger = maximumDelayForTrigger;
+
+            return stateTransitionManagerCreator.apply(context);
+        }
+    }
+
     private AdaptiveScheduler createSchedulerThatReachesExecutingState(
             int parallelism,
             int onFailedCheckpointCount,
@@ -2274,30 +2393,7 @@ public class AdaptiveSchedulerTest {
                         .build();
         SchedulerTestingUtils.enableCheckpointing(jobGraph);
 
-        // testing SlotPool instance that would allow for the scheduler to 
transition to Executing
-        // state
-        final DeclarativeSlotPool slotPool =
-                new TestingDeclarativeSlotPoolBuilder()
-                        .setContainsFreeSlotFunction(allocationID -> true)
-                        .setReserveFreeSlotFunction(
-                                (allocationId, resourceProfile) ->
-                                        TestingPhysicalSlot.builder()
-                                                .withAllocationID(allocationId)
-                                                .build())
-                        .setGetFreeSlotTrackerSupplier(
-                                () ->
-                                        TestingFreeSlotTracker.newBuilder()
-                                                
.setGetFreeSlotsInformationSupplier(
-                                                        () ->
-                                                                
IntStream.range(0, parallelism)
-                                                                        
.mapToObj(
-                                                                               
 v ->
-                                                                               
         new TestingSlot())
-                                                                        
.collect(
-                                                                               
 Collectors.toSet()))
-                                                .build())
-                        .build();
-
+        final DeclarativeSlotPool slotPool = 
getSlotPoolWithFreeSlots(parallelism);
         final AtomicInteger eventCounter = new AtomicInteger();
         scheduler =
                 new AdaptiveSchedulerBuilder(
@@ -2307,14 +2403,23 @@ public class AdaptiveSchedulerTest {
                         .setJobMasterConfiguration(config)
                         .setDeclarativeSlotPool(slotPool)
                         .setStateTransitionManagerFactory(
-                                new TestingStateTransitionManager.Factory(
-                                        () -> {},
-                                        () -> {
-                                            singleThreadMainThreadExecutor
-                                                    
.assertRunningInMainThread();
-
-                                            
eventQueue.offer(eventCounter.getAndIncrement());
-                                        }))
+                                (context,
+                                        ignoredCooldown,
+                                        ignoredResourceStabilizationTimeout,
+                                        ignoredMaxTriggerDelay,
+                                        ignoredTimestamp) ->
+                                        
TestingStateTransitionManager.withOnTriggerEventOnly(
+                                                () -> {
+                                                    
singleThreadMainThreadExecutor
+                                                            
.assertRunningInMainThread();
+
+                                                    if (context instanceof 
WaitingForResources) {
+                                                        
context.transitionToSubsequentState();
+                                                    } else if (context 
instanceof Executing) {
+                                                        eventQueue.offer(
+                                                                
eventCounter.getAndIncrement());
+                                                    }
+                                                }))
                         .setCheckpointStatsTrackerFactory(
                                 (metricGroup, listener) -> {
                                     assertThat(statsListenerInstantiatedFuture)
@@ -2366,6 +2471,30 @@ public class AdaptiveSchedulerTest {
                 forMainThread());
     }
 
+    /**
+     * Creates a testing SlotPool instance that would allow for the scheduler 
to transition to
+     * Executing state.
+     */
+    private static DeclarativeSlotPool getSlotPoolWithFreeSlots(int freeSlots) 
{
+        return new TestingDeclarativeSlotPoolBuilder()
+                .setContainsFreeSlotFunction(allocationID -> true)
+                .setReserveFreeSlotFunction(
+                        (allocationId, resourceProfile) ->
+                                TestingPhysicalSlot.builder()
+                                        .withAllocationID(allocationId)
+                                        .build())
+                .setGetFreeSlotTrackerSupplier(
+                        () ->
+                                TestingFreeSlotTracker.newBuilder()
+                                        .setGetFreeSlotsInformationSupplier(
+                                                () ->
+                                                        IntStream.range(0, 
freeSlots)
+                                                                .mapToObj(v -> 
new TestingSlot())
+                                                                
.collect(Collectors.toSet()))
+                                        .build())
+                .build();
+    }
+
     private static JobGraph createJobGraph() {
         return streamingJobGraph(JOB_VERTEX);
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManagerTest.java
index ca90aa2a1c5..1fd9e5fd0fa 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManagerTest.java
@@ -18,11 +18,8 @@
 
 package org.apache.flink.runtime.scheduler.adaptive;
 
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.core.testutils.ScheduledTask;
 import 
org.apache.flink.runtime.scheduler.adaptive.DefaultStateTransitionManager.Idling;
-import org.apache.flink.util.ConfigurationException;
 import org.apache.flink.util.Preconditions;
 
 import org.junit.jupiter.api.Test;
@@ -50,31 +47,6 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 class DefaultStateTransitionManagerTest {
 
-    @Test
-    void testProperConfiguration() throws ConfigurationException {
-        final Duration cooldownTimeout = Duration.ofMillis(1337);
-        final Duration resourceStabilizationTimeout = Duration.ofMillis(7331);
-        final Duration maximumDelayForRescaleTrigger = Duration.ofMillis(4242);
-
-        final Configuration configuration = new Configuration();
-        configuration.set(JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MIN, 
cooldownTimeout);
-        configuration.set(
-                JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX, 
resourceStabilizationTimeout);
-        configuration.set(
-                JobManagerOptions.MAXIMUM_DELAY_FOR_SCALE_TRIGGER, 
maximumDelayForRescaleTrigger);
-
-        final DefaultStateTransitionManager testInstance =
-                DefaultStateTransitionManager.Factory.fromSettings(
-                                AdaptiveScheduler.Settings.of(configuration))
-                        .create(
-                                
TestingStateTransitionManagerContext.stableContext(),
-                                Instant.now());
-        assertThat(testInstance.cooldownTimeout).isEqualTo(cooldownTimeout);
-        assertThat(testInstance.resourceStabilizationTimeout)
-                .isEqualTo(resourceStabilizationTimeout);
-        
assertThat(testInstance.maxTriggerDelay).isEqualTo(maximumDelayForRescaleTrigger);
-    }
-
     @Test
     void testTriggerWithoutChangeEventNoopInCooldownPhase() {
         final TestingStateTransitionManagerContext ctx =
@@ -623,13 +595,13 @@ class DefaultStateTransitionManagerTest {
                 @Nullable Duration resourceStabilizationTimeout) {
             final DefaultStateTransitionManager testInstance =
                     new DefaultStateTransitionManager(
-                            initializationTime,
                             // clock that returns the time based on the 
configured elapsedTime
                             () -> 
Objects.requireNonNull(initializationTime).plus(elapsedTime),
                             this,
-                            
TestingStateTransitionManagerContext.COOLDOWN_TIMEOUT,
+                            COOLDOWN_TIMEOUT,
                             resourceStabilizationTimeout,
-                            
TestingStateTransitionManagerContext.MAX_TRIGGER_DELAY) {
+                            MAX_TRIGGER_DELAY,
+                            initializationTime) {
                         @Override
                         public void onChange() {
                             super.onChange();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
index b49c1c9b35d..0fc8f9d2073 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
@@ -99,6 +99,7 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiFunction;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Supplier;
@@ -155,7 +156,7 @@ class ExecutingTest {
                     ctx,
                     ClassLoader.getSystemClassLoader(),
                     new ArrayList<>(),
-                    TestingStateTransitionManager.Factory.noOpFactory(),
+                    (context, ts) -> TestingStateTransitionManager.withNoOp(),
                     1,
                     1,
                     Instant.now());
@@ -183,7 +184,7 @@ class ExecutingTest {
                                         ctx,
                                         ClassLoader.getSystemClassLoader(),
                                         new ArrayList<>(),
-                                        
TestingStateTransitionManager.Factory.noOpFactory(),
+                                        (context, ts) -> 
TestingStateTransitionManager.withNoOp(),
                                         1,
                                         1,
                                         Instant.now());
@@ -195,9 +196,12 @@ class ExecutingTest {
     @Test
     public void testTriggerRescaleOnCompletedCheckpoint() throws Exception {
         final AtomicBoolean rescaleTriggered = new AtomicBoolean();
-        final StateTransitionManager.Factory stateTransitionManagerFactory =
-                new TestingStateTransitionManager.Factory(
-                        () -> {}, () -> rescaleTriggered.set(true));
+        final BiFunction<StateTransitionManager.Context, Instant, 
StateTransitionManager>
+                stateTransitionManagerFactory =
+                        (context, ts) ->
+                                
TestingStateTransitionManager.withOnTriggerEventOnly(
+                                        () -> rescaleTriggered.set(true));
+
         try (MockExecutingContext ctx = new MockExecutingContext()) {
             final Executing testInstance =
                     new ExecutingStateBuilder()
@@ -213,9 +217,12 @@ class ExecutingTest {
     @Test
     public void testTriggerRescaleOnFailedCheckpoint() throws Exception {
         final AtomicInteger rescaleTriggerCount = new AtomicInteger();
-        final StateTransitionManager.Factory stateTransitionManagerFactory =
-                new TestingStateTransitionManager.Factory(
-                        () -> {}, rescaleTriggerCount::incrementAndGet);
+        final BiFunction<StateTransitionManager.Context, Instant, 
StateTransitionManager>
+                stateTransitionManagerFactory =
+                        (context, ts) ->
+                                
TestingStateTransitionManager.withOnTriggerEventOnly(
+                                        rescaleTriggerCount::incrementAndGet);
+
         final int rescaleOnFailedCheckpointsCount = 3;
         try (MockExecutingContext ctx = new MockExecutingContext()) {
             final Executing testInstance =
@@ -256,9 +263,12 @@ class ExecutingTest {
     @Test
     public void testOnCompletedCheckpointResetsFailedCheckpointCount() throws 
Exception {
         final AtomicInteger rescaleTriggeredCount = new AtomicInteger();
-        final StateTransitionManager.Factory stateTransitionManagerFactory =
-                new TestingStateTransitionManager.Factory(
-                        () -> {}, rescaleTriggeredCount::incrementAndGet);
+        final BiFunction<StateTransitionManager.Context, Instant, 
StateTransitionManager>
+                stateTransitionManagerFactory =
+                        (context, ts) ->
+                                
TestingStateTransitionManager.withOnTriggerEventOnly(
+                                        
rescaleTriggeredCount::incrementAndGet);
+
         final int rescaleOnFailedCheckpointsCount = 3;
         try (MockExecutingContext ctx = new MockExecutingContext()) {
             final Executing testInstance =
@@ -585,9 +595,10 @@ class ExecutingTest {
         try (MockExecutingContext ctx = new MockExecutingContext()) {
             new ExecutingStateBuilder()
                     .setStateTransitionManagerFactory(
-                            new TestingStateTransitionManager.Factory(
-                                    () -> actualEvents.add(onChangeEventLabel),
-                                    () -> 
actualEvents.add(onTriggerEventLabel)))
+                            (context, ts) ->
+                                    new TestingStateTransitionManager(
+                                            () -> 
actualEvents.add(onChangeEventLabel),
+                                            () -> 
actualEvents.add(onTriggerEventLabel)))
                     .build(ctx);
 
             ctx.triggerExecutors();
@@ -609,8 +620,9 @@ class ExecutingTest {
                 TestingDefaultExecutionGraphBuilder.newBuilder()
                         .build(EXECUTOR_EXTENSION.getExecutor());
         private OperatorCoordinatorHandler operatorCoordinatorHandler;
-        private StateTransitionManager.Factory stateTransitionManagerFactory =
-                TestingStateTransitionManager.Factory.noOpFactory();
+        private BiFunction<StateTransitionManager.Context, Instant, 
StateTransitionManager>
+                stateTransitionManagerFactory =
+                        (context, ts) -> 
TestingStateTransitionManager.withNoOp();
         private int rescaleOnFailedCheckpointCount = 1;
 
         private ExecutingStateBuilder() throws JobException, 
JobExecutionException {
@@ -629,7 +641,8 @@ class ExecutingTest {
         }
 
         public ExecutingStateBuilder setStateTransitionManagerFactory(
-                StateTransitionManager.Factory stateTransitionManagerFactory) {
+                BiFunction<StateTransitionManager.Context, Instant, 
StateTransitionManager>
+                        stateTransitionManagerFactory) {
             this.stateTransitionManagerFactory = stateTransitionManagerFactory;
             return this;
         }
@@ -652,10 +665,10 @@ class ExecutingTest {
                         ctx,
                         ClassLoader.getSystemClassLoader(),
                         new ArrayList<>(),
-                        stateTransitionManagerFactory,
+                        stateTransitionManagerFactory::apply,
                         1,
                         rescaleOnFailedCheckpointCount,
-                        // will be ignored by the TestingRescaleManager.Factory
+                        // will be ignored by the 
TestingStateTransitionManager.Factory
                         Instant.now());
             } finally {
                 Preconditions.checkState(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/TestingStateTransitionManager.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/TestingStateTransitionManager.java
index 3f4573cd74e..ee3b35dda2b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/TestingStateTransitionManager.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/TestingStateTransitionManager.java
@@ -18,15 +18,25 @@
 
 package org.apache.flink.runtime.scheduler.adaptive;
 
-import java.time.Instant;
-
 /** Testing implementation for {@link StateTransitionManager}. */
 public class TestingStateTransitionManager implements StateTransitionManager {
 
     private final Runnable onChangeRunnable;
     private final Runnable onTriggerRunnable;
 
-    private TestingStateTransitionManager(Runnable onChangeRunnable, Runnable 
onTriggerRunnable) {
+    public static TestingStateTransitionManager withNoOp() {
+        return withOnTriggerEventOnly(() -> {});
+    }
+
+    public static TestingStateTransitionManager withOnChangeEventOnly(Runnable 
onChangeCallback) {
+        return new TestingStateTransitionManager(onChangeCallback, () -> {});
+    }
+
+    public static TestingStateTransitionManager 
withOnTriggerEventOnly(Runnable onTriggerCallback) {
+        return new TestingStateTransitionManager(() -> {}, onTriggerCallback);
+    }
+
+    public TestingStateTransitionManager(Runnable onChangeRunnable, Runnable 
onTriggerRunnable) {
         this.onChangeRunnable = onChangeRunnable;
         this.onTriggerRunnable = onTriggerRunnable;
     }
@@ -40,27 +50,4 @@ public class TestingStateTransitionManager implements 
StateTransitionManager {
     public void onTrigger() {
         this.onTriggerRunnable.run();
     }
-
-    /**
-     * {@code Factory} implementation for creating {@code 
TestingStateTransitionManager} instances.
-     */
-    public static class Factory implements StateTransitionManager.Factory {
-
-        private final Runnable onChangeRunnable;
-        private final Runnable onTriggerRunnable;
-
-        public static TestingStateTransitionManager.Factory noOpFactory() {
-            return new Factory(() -> {}, () -> {});
-        }
-
-        public Factory(Runnable onChangeRunnable, Runnable onTriggerRunnable) {
-            this.onChangeRunnable = onChangeRunnable;
-            this.onTriggerRunnable = onTriggerRunnable;
-        }
-
-        @Override
-        public StateTransitionManager create(Context ignoredContext, Instant 
ignoredLastRescale) {
-            return new TestingStateTransitionManager(onChangeRunnable, 
onTriggerRunnable);
-        }
-    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java
index a4a0dbec38a..a7b40cc50bf 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.scheduler.adaptive;
 
 import org.apache.flink.core.testutils.ScheduledTask;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.util.clock.Clock;
 import org.apache.flink.util.clock.ManualClock;
 
 import org.junit.jupiter.api.Test;
@@ -38,7 +37,9 @@ import java.util.Queue;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
+import java.util.function.Function;
 import java.util.function.Supplier;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -49,162 +50,154 @@ class WaitingForResourcesTest {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(WaitingForResourcesTest.class);
 
-    private static final Duration STABILIZATION_TIMEOUT = 
Duration.ofSeconds(1);
+    private static final Duration DISABLED_RESOURCE_WAIT_TIMEOUT = 
Duration.ofSeconds(-1);
 
     @RegisterExtension private MockContext ctx = new MockContext();
 
     /** WaitingForResources is transitioning to Executing if there are enough 
resources. */
     @Test
     void testTransitionToCreatingExecutionGraph() {
-        ctx.setHasDesiredResources(() -> true);
+        final AtomicBoolean onTriggerCalled = new AtomicBoolean();
+        final Function<StateTransitionManager.Context, StateTransitionManager>
+                stateTransitionManagerFactory =
+                        context ->
+                                new TestingStateTransitionManager(
+                                        () -> {
+                                            
assertThat(context.hasDesiredResources()).isTrue();
+                                            
assertThat(context.hasSufficientResources()).isTrue();
+
+                                            
context.transitionToSubsequentState();
+                                        },
+                                        () -> onTriggerCalled.set(true));
 
+        ctx.setHasDesiredResources(() -> true);
+        ctx.setHasSufficientResources(() -> true);
         ctx.setExpectCreatingExecutionGraph();
 
-        new WaitingForResources(ctx, LOG, Duration.ZERO, 
STABILIZATION_TIMEOUT);
+        new WaitingForResources(
+                ctx, LOG, DISABLED_RESOURCE_WAIT_TIMEOUT, 
stateTransitionManagerFactory);
 
         ctx.runScheduledTasks();
+
+        assertThat(onTriggerCalled.get()).isTrue();
     }
 
     @Test
     void testNotEnoughResources() {
+        final AtomicBoolean onChangeCalled = new AtomicBoolean();
+        final AtomicBoolean onTriggerCalled = new AtomicBoolean();
+        final Function<StateTransitionManager.Context, StateTransitionManager>
+                stateTransitionManagerFactory =
+                        context ->
+                                new TestingStateTransitionManager(
+                                        () -> {
+                                            onChangeCalled.set(true);
+
+                                            
assertThat(context.hasDesiredResources()).isFalse();
+                                            
assertThat(context.hasSufficientResources()).isFalse();
+                                        },
+                                        () -> onTriggerCalled.set(true));
+
         ctx.setHasDesiredResources(() -> false);
+        ctx.setHasSufficientResources(() -> false);
         WaitingForResources wfr =
-                new WaitingForResources(ctx, LOG, Duration.ZERO, 
STABILIZATION_TIMEOUT);
+                new WaitingForResources(
+                        ctx, LOG, DISABLED_RESOURCE_WAIT_TIMEOUT, 
stateTransitionManagerFactory);
 
+        ctx.runScheduledTasks();
         // we expect no state transition.
-        wfr.onNewResourcesAvailable();
+        assertThat(ctx.hasStateTransition()).isFalse();
+        assertThat(onChangeCalled.get()).isTrue();
+        assertThat(onTriggerCalled.get()).isTrue();
     }
 
     @Test
     void testNotifyNewResourcesAvailable() {
-        ctx.setHasDesiredResources(() -> false); // initially, not enough 
resources
-        WaitingForResources wfr =
-                new WaitingForResources(ctx, LOG, Duration.ZERO, 
STABILIZATION_TIMEOUT);
-        ctx.setHasDesiredResources(() -> true); // make resources available
-        ctx.setExpectCreatingExecutionGraph();
-        wfr.onNewResourcesAvailable(); // .. and notify
-    }
+        final AtomicInteger callsCounter = new AtomicInteger();
+        final Function<StateTransitionManager.Context, StateTransitionManager>
+                stateTransitionManagerFactory =
+                        context ->
+                                
TestingStateTransitionManager.withOnChangeEventOnly(
+                                        () -> {
+                                            if (callsCounter.incrementAndGet() 
== 0) {
+                                                // initially, not enough 
resources
+                                                
assertThat(context.hasDesiredResources()).isFalse();
+                                                
assertThat(context.hasSufficientResources())
+                                                        .isFalse();
+                                            }
+
+                                            if (context.hasDesiredResources()
+                                                    && 
context.hasSufficientResources()) {
+                                                
context.transitionToSubsequentState();
+                                            }
+                                        });
+
+        // initially, not enough resources
+        ctx.setHasDesiredResources(() -> false);
+        ctx.setHasSufficientResources(() -> false);
 
-    @Test
-    void testSchedulingWithSufficientResourcesAndNoStabilizationTimeout() {
-        Duration noStabilizationTimeout = Duration.ofMillis(0);
         WaitingForResources wfr =
-                new WaitingForResources(ctx, LOG, Duration.ofSeconds(1000), 
noStabilizationTimeout);
+                new WaitingForResources(
+                        ctx, LOG, DISABLED_RESOURCE_WAIT_TIMEOUT, 
stateTransitionManagerFactory);
+        ctx.runScheduledTasks();
 
-        ctx.setHasDesiredResources(() -> false);
+        // make resources available
+        ctx.setHasDesiredResources(() -> true);
         ctx.setHasSufficientResources(() -> true);
         ctx.setExpectCreatingExecutionGraph();
-        wfr.onNewResourcesAvailable();
+        wfr.onNewResourcesAvailable(); // .. and notify
     }
 
     @Test
-    void testNoSchedulingIfStabilizationTimeoutIsConfigured() {
-        Duration stabilizationTimeout = Duration.ofMillis(50000);
-
+    void testSchedulingWithSufficientResources() {
+        final Function<StateTransitionManager.Context, StateTransitionManager>
+                stateTransitionManagerFactory =
+                        context ->
+                                
TestingStateTransitionManager.withOnChangeEventOnly(
+                                        () -> {
+                                            
assertThat(context.hasDesiredResources()).isFalse();
+                                            if 
(context.hasSufficientResources()) {
+                                                
context.transitionToSubsequentState();
+                                            }
+                                        });
         WaitingForResources wfr =
-                new WaitingForResources(ctx, LOG, Duration.ofSeconds(1000), 
stabilizationTimeout);
-
-        ctx.setHasDesiredResources(() -> false);
-        ctx.setHasSufficientResources(() -> true);
-        wfr.onNewResourcesAvailable();
-        // we are not triggering the scheduled tasks, to simulate a long 
stabilization timeout
-
+                new WaitingForResources(
+                        ctx, LOG, DISABLED_RESOURCE_WAIT_TIMEOUT, 
stateTransitionManagerFactory);
+        ctx.runScheduledTasks();
+        // we expect no state transition.
         assertThat(ctx.hasStateTransition()).isFalse();
-    }
 
-    @Test
-    void testSchedulingWithSufficientResourcesAfterStabilizationTimeout() {
-        Duration initialResourceTimeout = Duration.ofMillis(-1);
-        Duration stabilizationTimeout = Duration.ofMillis(50_000L);
-
-        WaitingForResources wfr =
-                new WaitingForResources(
-                        ctx,
-                        LOG,
-                        initialResourceTimeout,
-                        stabilizationTimeout,
-                        ctx.getClock(),
-                        null);
-        // sufficient resources available
         ctx.setHasDesiredResources(() -> false);
         ctx.setHasSufficientResources(() -> true);
-
-        // notify about sufficient resources
-        wfr.onNewResourcesAvailable();
-
         ctx.setExpectCreatingExecutionGraph();
-
-        // execute all runnables and trigger expected state transition
-        final Duration afterStabilizationTimeout = 
stabilizationTimeout.plusMillis(1);
-        ctx.advanceTimeByMillis(afterStabilizationTimeout.toMillis());
-
-        ctx.runScheduledTasks(afterStabilizationTimeout.toMillis());
-
-        assertThat(ctx.hasStateTransition()).isTrue();
+        wfr.onNewResourcesAvailable();
     }
 
     @Test
-    void testStabilizationTimeoutReset() {
-        Duration initialResourceTimeout = Duration.ofMillis(-1);
-        Duration stabilizationTimeout = Duration.ofMillis(50L);
+    void testNoStateTransitionOnNoResourceTimeout() {
+        ctx.setHasDesiredResources(() -> false);
+        ctx.setHasSufficientResources(() -> false);
 
         WaitingForResources wfr =
                 new WaitingForResources(
                         ctx,
                         LOG,
-                        initialResourceTimeout,
-                        stabilizationTimeout,
-                        ctx.getClock(),
-                        null);
-
-        ctx.setHasDesiredResources(() -> false);
-
-        // notify about resources, trigger stabilization timeout
-        ctx.setHasSufficientResources(() -> true);
-        ctx.advanceTimeByMillis(40); // advance time, but don't trigger 
stabilizationTimeout
-        wfr.onNewResourcesAvailable();
-
-        // notify again, but insufficient (reset stabilization timeout)
-        ctx.setHasSufficientResources(() -> false);
-        ctx.advanceTimeByMillis(40);
-        wfr.onNewResourcesAvailable();
-
-        // notify again, but sufficient, trigger timeout
-        ctx.setHasSufficientResources(() -> true);
-        ctx.advanceTimeByMillis(40);
-        wfr.onNewResourcesAvailable();
-
-        // sanity check: no state transition has been triggered so far
-        assertThat(ctx.hasStateTransition()).isFalse();
-        assertThat(ctx.getTestDuration()).isGreaterThan(stabilizationTimeout);
-
-        ctx.setExpectCreatingExecutionGraph();
-
-        ctx.advanceTimeByMillis(1);
-        assertThat(ctx.hasStateTransition()).isFalse();
-
-        ctx.advanceTimeByMillis(stabilizationTimeout.toMillis());
-        assertThat(ctx.hasStateTransition()).isTrue();
-    }
-
-    @Test
-    void testNoStateTransitionOnNoResourceTimeout() {
-        ctx.setHasDesiredResources(() -> false);
-        WaitingForResources wfr =
-                new WaitingForResources(ctx, LOG, Duration.ofMillis(-1), 
STABILIZATION_TIMEOUT);
-
+                        DISABLED_RESOURCE_WAIT_TIMEOUT,
+                        context -> TestingStateTransitionManager.withNoOp());
         ctx.runScheduledTasks();
+
         assertThat(ctx.hasStateTransition()).isFalse();
     }
 
     @Test
     void testStateTransitionOnResourceTimeout() {
-        ctx.setHasDesiredResources(() -> false);
         WaitingForResources wfr =
-                new WaitingForResources(ctx, LOG, Duration.ZERO, 
STABILIZATION_TIMEOUT);
-
+                new WaitingForResources(
+                        ctx,
+                        LOG,
+                        Duration.ZERO,
+                        context -> TestingStateTransitionManager.withNoOp());
         ctx.setExpectCreatingExecutionGraph();
-
         ctx.runScheduledTasks();
     }
 
@@ -301,10 +294,7 @@ class WaitingForResourcesTest {
                 new PriorityQueue<>(
                         Comparator.comparingLong(o -> 
o.getDelay(TimeUnit.MILLISECONDS)));
 
-        private final ManualTestTime testTime =
-                new ManualTestTime(
-                        (durationSinceTestStart) ->
-                                
runScheduledTasks(durationSinceTestStart.toMillis()));
+        private final ManualClock testingClock = new ManualClock();
 
         public void setHasDesiredResources(Supplier<Boolean> sup) {
             hasDesiredResourcesSupplier = sup;
@@ -361,7 +351,7 @@ class WaitingForResourcesTest {
             LOG.info(
                     "Scheduling work with delay {} for earliest execution at 
{}",
                     delay.toMillis(),
-                    testTime.getClock().absoluteTimeMillis() + 
delay.toMillis());
+                    testingClock.absoluteTimeMillis() + delay.toMillis());
             final ScheduledTask<Void> scheduledTask =
                     new ScheduledTask<>(
                             () -> {
@@ -371,7 +361,7 @@ class WaitingForResourcesTest {
 
                                 return null;
                             },
-                            testTime.getClock().absoluteTimeMillis() + 
delay.toMillis());
+                            testingClock.absoluteTimeMillis() + 
delay.toMillis());
 
             scheduledTasks.add(scheduledTask);
 
@@ -383,51 +373,6 @@ class WaitingForResourcesTest {
             creatingExecutionGraphStateValidator.validateInput(null);
             registerStateTransition();
         }
-
-        public Clock getClock() {
-            return testTime.getClock();
-        }
-
-        public void advanceTimeByMillis(long millis) {
-            testTime.advanceMillis(millis);
-        }
-
-        public Duration getTestDuration() {
-            return testTime.getTestDuration();
-        }
-    }
-
-    private static final class ManualTestTime {
-        private static final Logger LOG = 
LoggerFactory.getLogger(ManualTestTime.class);
-
-        private final ManualClock testingClock = new ManualClock();
-        private final Consumer<Duration> runOnAdvance;
-
-        private Duration durationSinceTestStart = Duration.ZERO;
-
-        private ManualTestTime(Consumer<Duration> runOnAdvance) {
-            this.runOnAdvance = runOnAdvance;
-        }
-
-        private Clock getClock() {
-            return testingClock;
-        }
-
-        public void advanceMillis(long millis) {
-            durationSinceTestStart = durationSinceTestStart.plusMillis(millis);
-
-            LOG.info(
-                    "Advance testing time by {} ms to time {} ms",
-                    millis,
-                    durationSinceTestStart.toMillis());
-
-            testingClock.advanceTime(millis, TimeUnit.MILLISECONDS);
-            runOnAdvance.accept(durationSinceTestStart);
-        }
-
-        public Duration getTestDuration() {
-            return durationSinceTestStart;
-        }
     }
 
     static <T> Consumer<T> assertNonNull() {

Reply via email to