This is an automated email from the ASF dual-hosted git repository.
mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new c869326d089 [FLINK-36011] [runtime] Generalize RescaleManager to
become StateTransitionManager
c869326d089 is described below
commit c869326d089705475481c2c2ea42a6efabb8c828
Author: Zdenek Tison <[email protected]>
AuthorDate: Fri Jul 12 15:01:55 2024 +0200
[FLINK-36011] [runtime] Generalize RescaleManager to become
StateTransitionManager
---
.../scheduler/adaptive/AdaptiveScheduler.java | 10 +-
.../scheduler/adaptive/DefaultRescaleManager.java | 232 -------
.../adaptive/DefaultStateTransitionManager.java | 434 +++++++++++++
.../runtime/scheduler/adaptive/Executing.java | 38 +-
...aleManager.java => StateTransitionManager.java} | 39 +-
.../adaptive/AdaptiveSchedulerBuilder.java | 14 +-
.../scheduler/adaptive/AdaptiveSchedulerTest.java | 4 +-
.../adaptive/DefaultRescaleManagerTest.java | 675 -------------------
.../DefaultStateTransitionManagerTest.java | 722 +++++++++++++++++++++
.../runtime/scheduler/adaptive/ExecutingTest.java | 41 +-
...ger.java => TestingStateTransitionManager.java} | 16 +-
11 files changed, 1250 insertions(+), 975 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index 2fd7b694b73..ebcfc369c82 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -349,7 +349,7 @@ public class AdaptiveScheduler
}
private final Settings settings;
- private final RescaleManager.Factory rescaleManagerFactory;
+ private final StateTransitionManager.Factory stateTransitionManagerFactory;
private final JobGraph jobGraph;
@@ -427,7 +427,7 @@ public class AdaptiveScheduler
throws JobExecutionException {
this(
settings,
- DefaultRescaleManager.Factory.fromSettings(settings),
+ DefaultStateTransitionManager.Factory.fromSettings(settings),
(metricGroup, checkpointStatsListener) ->
new DefaultCheckpointStatsTracker(
configuration.get(WebOptions.CHECKPOINTS_HISTORY_SIZE),
@@ -455,7 +455,7 @@ public class AdaptiveScheduler
@VisibleForTesting
AdaptiveScheduler(
Settings settings,
- RescaleManager.Factory rescaleManagerFactory,
+ StateTransitionManager.Factory stateTransitionManagerFactory,
BiFunction<JobManagerJobMetricGroup, CheckpointStatsListener,
CheckpointStatsTracker>
checkpointStatsTrackerFactory,
JobGraph jobGraph,
@@ -480,7 +480,7 @@ public class AdaptiveScheduler
assertPreconditions(jobGraph);
this.settings = settings;
- this.rescaleManagerFactory = rescaleManagerFactory;
+ this.stateTransitionManagerFactory = stateTransitionManagerFactory;
this.jobGraph = jobGraph;
this.jobInfo = new JobInfoImpl(jobGraph.getJobID(),
jobGraph.getName());
@@ -1175,7 +1175,7 @@ public class AdaptiveScheduler
this,
userCodeClassLoader,
failureCollection,
- rescaleManagerFactory,
+ stateTransitionManagerFactory,
settings.getMinParallelismChangeForDesiredRescale(),
settings.getRescaleOnFailedCheckpointCount()));
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultRescaleManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultRescaleManager.java
deleted file mode 100644
index 0b0fc013357..00000000000
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultRescaleManager.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.scheduler.adaptive;
-
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.concurrent.FutureUtils;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-import javax.annotation.concurrent.NotThreadSafe;
-
-import java.time.Duration;
-import java.time.Instant;
-import java.time.temporal.Temporal;
-import java.util.concurrent.CompletableFuture;
-import java.util.function.Supplier;
-
-/**
- * {@code DefaultRescaleManager} manages triggering the next rescaling based
on when the previous
- * rescale operation happened and the available resources. It handles the
event based on the
- * following phases (in that order):
- *
- * <ol>
- * <li>Cooldown phase: No rescaling takes place (its upper threshold is
defined by {@code
- * scalingIntervalMin}.
- * <li>Soft-rescaling phase: Rescaling is triggered if the desired amount of
resources is
- * available.
- * <li>Hard-rescaling phase: Rescaling is triggered if a sufficient amount
of resources is
- * available (its lower threshold is defined by (@code
scalingIntervalMax}).
- * </ol>
- *
- * <p>Thread-safety: This class is not implemented in a thread-safe manner and
relies on the fact
- * that any method call happens within a single thread.
- *
- * @see Executing
- */
-@NotThreadSafe
-public class DefaultRescaleManager implements RescaleManager {
-
- private static final Logger LOG =
LoggerFactory.getLogger(DefaultRescaleManager.class);
-
- private final Temporal initializationTime;
- private final Supplier<Temporal> clock;
-
- @VisibleForTesting final Duration scalingIntervalMin;
- @VisibleForTesting @Nullable final Duration scalingIntervalMax;
-
- private final RescaleManager.Context rescaleContext;
-
- private boolean rescaleScheduled = false;
-
- @VisibleForTesting final Duration maxTriggerDelay;
-
- /**
- * {@code triggerFuture} is used to allow triggering a scheduled callback.
Rather than
- * scheduling the callback itself, the callback is just chained with the
future. The completion
- * of the future is then scheduled which will, as a consequence, run the
callback as part of the
- * scheduled operation.
- *
- * <p>{@code triggerFuture} can be used to trigger the callback even
earlier (before the
- * scheduled delay has passed). See {@link #onTrigger()}.
- */
- private CompletableFuture<Void> triggerFuture;
-
- DefaultRescaleManager(
- Temporal initializationTime,
- RescaleManager.Context rescaleContext,
- Duration scalingIntervalMin,
- @Nullable Duration scalingIntervalMax,
- Duration maxTriggerDelay) {
- this(
- initializationTime,
- Instant::now,
- rescaleContext,
- scalingIntervalMin,
- scalingIntervalMax,
- maxTriggerDelay);
- }
-
- @VisibleForTesting
- DefaultRescaleManager(
- Temporal initializationTime,
- Supplier<Temporal> clock,
- RescaleManager.Context rescaleContext,
- Duration scalingIntervalMin,
- @Nullable Duration scalingIntervalMax,
- Duration maxTriggerDelay) {
- this.initializationTime = initializationTime;
- this.clock = clock;
-
- this.maxTriggerDelay = maxTriggerDelay;
- this.triggerFuture = FutureUtils.completedVoidFuture();
-
- Preconditions.checkArgument(
- scalingIntervalMax == null ||
scalingIntervalMin.compareTo(scalingIntervalMax) <= 0,
- "scalingIntervalMax should at least match or be longer than
scalingIntervalMin.");
- this.scalingIntervalMin = scalingIntervalMin;
- this.scalingIntervalMax = scalingIntervalMax;
-
- this.rescaleContext = rescaleContext;
- }
-
- @Override
- public void onChange() {
- if (this.triggerFuture.isDone()) {
- this.triggerFuture =
scheduleOperationWithTrigger(this::evaluateChangeEvent);
- }
- }
-
- @Override
- public void onTrigger() {
- if (!this.triggerFuture.isDone()) {
- this.triggerFuture.complete(null);
- LOG.debug(
- "A rescale trigger event was observed causing the rescale
verification logic to be initiated.");
- } else {
- LOG.debug(
- "A rescale trigger event was observed outside of a rescale
cycle. No action taken.");
- }
- }
-
- private void evaluateChangeEvent() {
- if (timeSinceLastRescale().compareTo(scalingIntervalMin) > 0) {
- maybeRescale();
- } else if (!rescaleScheduled) {
- rescaleScheduled = true;
- rescaleContext.scheduleOperation(this::maybeRescale,
scalingIntervalMin);
- }
- }
-
- private CompletableFuture<Void> scheduleOperationWithTrigger(Runnable
callback) {
- final CompletableFuture<Void> triggerFuture = new
CompletableFuture<>();
- triggerFuture.thenRun(callback);
- this.rescaleContext.scheduleOperation(
- () -> triggerFuture.complete(null), this.maxTriggerDelay);
-
- return triggerFuture;
- }
-
- private Duration timeSinceLastRescale() {
- return Duration.between(this.initializationTime, clock.get());
- }
-
- private void maybeRescale() {
- rescaleScheduled = false;
- if (rescaleContext.hasDesiredResources()) {
- LOG.info("Desired parallelism for job was reached: Rescaling will
be triggered.");
- rescaleContext.rescale();
- } else if (scalingIntervalMax != null) {
- LOG.info(
- "The longer the pipeline runs, the more the (small)
resource gain is worth the restarting time. "
- + "Last resource added does not meet the
configured minimal parallelism change. Forced rescaling will be triggered after
{} if the resource is still there.",
- scalingIntervalMax);
-
- // reasoning for inconsistent scheduling:
- // https://lists.apache.org/thread/m2w2xzfjpxlw63j0k7tfxfgs0rshhwwr
- if (timeSinceLastRescale().compareTo(scalingIntervalMax) > 0) {
- rescaleWithSufficientResources();
- } else {
- rescaleContext.scheduleOperation(
- this::rescaleWithSufficientResources,
scalingIntervalMax);
- }
- }
- }
-
- private void rescaleWithSufficientResources() {
- if (rescaleContext.hasSufficientResources()) {
- LOG.info(
- "Resources for desired job parallelism couldn't be
collected after {}: Rescaling will be enforced.",
- scalingIntervalMax);
- rescaleContext.rescale();
- }
- }
-
- public static class Factory implements RescaleManager.Factory {
-
- private final Duration scalingIntervalMin;
- @Nullable private final Duration scalingIntervalMax;
- private final Duration maximumDelayForTrigger;
-
- /**
- * Creates a {@code Factory} instance based on the {@link
AdaptiveScheduler}'s {@code
- * Settings} for rescaling.
- */
- public static Factory fromSettings(AdaptiveScheduler.Settings
settings) {
- // it's not ideal that we use a AdaptiveScheduler internal class
here. We might want to
- // change that as part of a more general alignment of the
rescaling configuration.
- return new Factory(
- settings.getScalingIntervalMin(),
- settings.getScalingIntervalMax(),
- settings.getMaximumDelayForTriggeringRescale());
- }
-
- private Factory(
- Duration scalingIntervalMin,
- @Nullable Duration scalingIntervalMax,
- Duration maximumDelayForTrigger) {
- this.scalingIntervalMin = scalingIntervalMin;
- this.scalingIntervalMax = scalingIntervalMax;
- this.maximumDelayForTrigger = maximumDelayForTrigger;
- }
-
- @Override
- public DefaultRescaleManager create(Context rescaleContext, Instant
lastRescale) {
- return new DefaultRescaleManager(
- lastRescale,
- rescaleContext,
- scalingIntervalMin,
- scalingIntervalMax,
- maximumDelayForTrigger);
- }
- }
-}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManager.java
new file mode 100644
index 00000000000..aa1ea2c965d
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManager.java
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.time.temporal.Temporal;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ScheduledFuture;
+import java.util.function.Supplier;
+
+/**
+ * {@code DefaultStateTransitionManager} is a state machine which manages the
{@link
+ * AdaptiveScheduler}'s state transitions based on the previous transition
time and the available
+ * resources. See {@link Phase} for details on each individual phase of this
state machine. Note: We
+ * use the term phase here to avoid confusion with the state used in the
{@link AdaptiveScheduler}.
+ *
+ * <pre>
+ * {@link Cooldown}
+ * |
+ * +--> {@link Idling}
+ * | |
+ * | V
+ * +--> {@link Stabilizing}
+ * |
+ * +--> {@link Stabilized} --> {@link Idling}
+ * | |
+ * | V
+ * \--> {@link Transitioning}
+ * </pre>
+ *
+ * <p>Thread-safety: This class is not implemented in a thread-safe manner and
relies on the fact
+ * that any method call happens within a single thread.
+ *
+ * @see Executing
+ */
+@NotThreadSafe
+public class DefaultStateTransitionManager implements StateTransitionManager {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(DefaultStateTransitionManager.class);
+
+ private final Supplier<Temporal> clock;
+ private final StateTransitionManager.Context transitionContext;
+ private Phase phase;
+ private final List<ScheduledFuture<?>> scheduledFutures;
+
+ @VisibleForTesting final Duration cooldownTimeout;
+ @Nullable @VisibleForTesting final Duration resourceStabilizationTimeout;
+ @VisibleForTesting final Duration maxTriggerDelay;
+
+ DefaultStateTransitionManager(
+ Temporal initializationTime,
+ StateTransitionManager.Context transitionContext,
+ Duration cooldownTimeout,
+ @Nullable Duration resourceStabilizationTimeout,
+ Duration maxTriggerDelay) {
+ this(
+ initializationTime,
+ Instant::now,
+ transitionContext,
+ cooldownTimeout,
+ resourceStabilizationTimeout,
+ maxTriggerDelay);
+ }
+
+ @VisibleForTesting
+ DefaultStateTransitionManager(
+ Temporal initializationTime,
+ Supplier<Temporal> clock,
+ StateTransitionManager.Context transitionContext,
+ Duration cooldownTimeout,
+ @Nullable Duration resourceStabilizationTimeout,
+ Duration maxTriggerDelay) {
+
+ this.clock = clock;
+ this.maxTriggerDelay = maxTriggerDelay;
+ this.cooldownTimeout = cooldownTimeout;
+ this.resourceStabilizationTimeout = resourceStabilizationTimeout;
+ this.transitionContext = transitionContext;
+ this.scheduledFutures = new ArrayList<>();
+ this.phase = new Cooldown(initializationTime, clock, this,
cooldownTimeout);
+ }
+
+ @Override
+ public void onChange() {
+ phase.onChange();
+ }
+
+ @Override
+ public void onTrigger() {
+ phase.onTrigger();
+ }
+
+ @Override
+ public void close() {
+ scheduledFutures.forEach(future -> future.cancel(true));
+ scheduledFutures.clear();
+ }
+
+ @VisibleForTesting
+ Phase getPhase() {
+ return phase;
+ }
+
+ private void progressToIdling() {
+ progressToPhase(new Idling(clock, this));
+ }
+
+ private void progressToStabilizing(Temporal firstChangeEventTimestamp) {
+ progressToPhase(
+ new Stabilizing(
+ clock,
+ this,
+ resourceStabilizationTimeout,
+ firstChangeEventTimestamp,
+ maxTriggerDelay));
+ }
+
+ private void progressToStabilized(Temporal firstChangeEventTimestamp) {
+ progressToPhase(new Stabilized(clock, this, firstChangeEventTimestamp,
maxTriggerDelay));
+ }
+
+ private void triggerTransitionToSubsequentState() {
+ progressToPhase(new Transitioning(clock, this));
+ transitionContext.transitionToSubsequentState();
+ }
+
+ private void progressToPhase(Phase newPhase) {
+ Preconditions.checkState(
+ !(phase instanceof Transitioning),
+ "The state transition operation has already been triggered.");
+ LOG.debug("Transitioning from {} to {}.", phase, newPhase);
+ phase = newPhase;
+ }
+
+ @VisibleForTesting
+ void scheduleFromNow(Runnable callback, Duration delay, Phase phase) {
+ scheduledFutures.add(
+ transitionContext.scheduleOperation(() -> runIfPhase(phase,
callback), delay));
+ }
+
+ private void runIfPhase(Phase expectedPhase, Runnable callback) {
+ if (getPhase() == expectedPhase) {
+ callback.run();
+ } else {
+ LOG.debug(
+ "Ignoring scheduled action because expected phase {} is
not the actual phase {}.",
+ expectedPhase,
+ getPhase());
+ }
+ }
+
+ /** Factory for creating {@link DefaultStateTransitionManager} instances.
*/
+ public static class Factory implements StateTransitionManager.Factory {
+
+ private final Duration cooldownTimeout;
+ @Nullable private final Duration resourceStabilizationTimeout;
+ private final Duration maximumDelayForTrigger;
+
+ /**
+ * Creates a {@code Factory} instance based on the {@link
AdaptiveScheduler}'s {@code
+ * Settings} for rescaling.
+ */
+ public static Factory fromSettings(AdaptiveScheduler.Settings
settings) {
+ // it's not ideal that we use a AdaptiveScheduler internal class
here. We might want to
+ // change that as part of a more general alignment of the
rescaling configuration.
+ return new Factory(
+ settings.getScalingIntervalMin(),
+ settings.getScalingIntervalMax(),
+ settings.getMaximumDelayForTriggeringRescale());
+ }
+
+ private Factory(
+ Duration cooldownTimeout,
+ @Nullable Duration resourceStabilizationTimeout,
+ Duration maximumDelayForTrigger) {
+ this.cooldownTimeout = cooldownTimeout;
+ this.resourceStabilizationTimeout = resourceStabilizationTimeout;
+ this.maximumDelayForTrigger = maximumDelayForTrigger;
+ }
+
+ @Override
+ public DefaultStateTransitionManager create(Context context, Instant
lastStateTransition) {
+ return new DefaultStateTransitionManager(
+ lastStateTransition,
+ context,
+ cooldownTimeout,
+ resourceStabilizationTimeout,
+ maximumDelayForTrigger);
+ }
+ }
+
+ /**
+ * A phase in the state machine of the {@link
DefaultStateTransitionManager}. Each phase is
+ * responsible for a specific part of the state transition process.
+ */
+ @VisibleForTesting
+ abstract static class Phase {
+
+ private final Supplier<Temporal> clock;
+ private final DefaultStateTransitionManager context;
+
+ @VisibleForTesting
+ Phase(Supplier<Temporal> clock, DefaultStateTransitionManager context)
{
+ this.clock = clock;
+ this.context = context;
+ }
+
+ Temporal now() {
+ return clock.get();
+ }
+
+ DefaultStateTransitionManager context() {
+ return context;
+ }
+
+ void scheduleRelativelyTo(Runnable callback, Temporal startOfTimeout,
Duration timeout) {
+ final Duration passedTimeout = Duration.between(startOfTimeout,
now());
+ Preconditions.checkArgument(
+ !passedTimeout.isNegative(),
+ "The startOfTimeout ({}) should be in the past but is
after the current time.",
+ startOfTimeout);
+
+ final Duration timeoutLeft = timeout.minus(passedTimeout);
+ scheduleFromNow(callback, timeoutLeft.isNegative() ? Duration.ZERO
: timeoutLeft);
+ }
+
+ void scheduleFromNow(Runnable callback, Duration delay) {
+ context.scheduleFromNow(callback, delay, this);
+ }
+
+ boolean hasDesiredResources() {
+ return context.transitionContext.hasDesiredResources();
+ }
+
+ boolean hasSufficientResources() {
+ return context.transitionContext.hasSufficientResources();
+ }
+
+ void onChange() {}
+
+ void onTrigger() {}
+ }
+
+ /**
+ * {@link Phase} to prevent any rescaling. {@link
StateTransitionManager#onChange()} events will
+ * be monitored and forwarded to the next phase. {@link
StateTransitionManager#onTrigger()}
+ * events will be ignored.
+ */
+ @VisibleForTesting
+ static final class Cooldown extends Phase {
+
+ @Nullable private Temporal firstChangeEventTimestamp;
+
+ private Cooldown(
+ Temporal timeOfLastRescale,
+ Supplier<Temporal> clock,
+ DefaultStateTransitionManager context,
+ Duration cooldownTimeout) {
+ super(clock, context);
+
+ this.scheduleRelativelyTo(this::finalizeCooldown,
timeOfLastRescale, cooldownTimeout);
+ }
+
+ @Override
+ void onChange() {
+ if (hasSufficientResources() && firstChangeEventTimestamp == null)
{
+ firstChangeEventTimestamp = now();
+ }
+ }
+
+ private void finalizeCooldown() {
+ if (firstChangeEventTimestamp == null) {
+ context().progressToIdling();
+ } else {
+ context().progressToStabilizing(firstChangeEventTimestamp);
+ }
+ }
+ }
+
+ /**
+ * {@link Phase} which follows the {@link Cooldown} phase if no {@link
+ * StateTransitionManager#onChange()} was observed, yet. The {@code
+ * DefaultStateTransitionManager} waits for a first {@link
StateTransitionManager#onChange()}
+ * event. {@link StateTransitionManager#onTrigger()} events will be
ignored.
+ */
+ @VisibleForTesting
+ static final class Idling extends Phase {
+
+ private Idling(Supplier<Temporal> clock, DefaultStateTransitionManager
context) {
+ super(clock, context);
+ }
+
+ @Override
+ void onChange() {
+ if (hasSufficientResources()) {
+ context().progressToStabilizing(now());
+ }
+ }
+ }
+
+ /**
+ * {@link Phase} that handles the resources stabilization. In this phase,
{@link
+ * StateTransitionManager#onTrigger()} will initiate rescaling if desired
resources are met and
+ * {@link StateTransitionManager#onChange()} will schedule the evaluation
of the desired
+ * resources.
+ */
+ static final class Stabilizing extends Phase {
+
+ private Temporal onChangeEventTimestamp;
+ private final Duration maxTriggerDelay;
+ private boolean evaluationScheduled = false;
+
+ private Stabilizing(
+ Supplier<Temporal> clock,
+ DefaultStateTransitionManager context,
+ @Nullable Duration resourceStabilizationTimeout,
+ Temporal firstOnChangeEventTimestamp,
+ Duration maxTriggerDelay) {
+ super(clock, context);
+ this.onChangeEventTimestamp = firstOnChangeEventTimestamp;
+ this.maxTriggerDelay = maxTriggerDelay;
+
+ if (resourceStabilizationTimeout != null) {
+ scheduleRelativelyTo(
+ () ->
context().progressToStabilized(firstOnChangeEventTimestamp),
+ firstOnChangeEventTimestamp,
+ resourceStabilizationTimeout);
+ }
+ scheduleTransitionEvaluation();
+ }
+
+ @Override
+ void onChange() {
+ // schedule another desired-resource evaluation in scenarios where
the previous change
+ // event was already handled by a onTrigger callback with a no-op
+ onChangeEventTimestamp = now();
+ scheduleTransitionEvaluation();
+ }
+
+ @Override
+ void onTrigger() {
+ transitionToSubSequentStateForDesiredResources();
+ }
+
+ private void scheduleTransitionEvaluation() {
+ if (!evaluationScheduled) {
+ evaluationScheduled = true;
+ this.scheduleRelativelyTo(
+ () -> {
+ evaluationScheduled = false;
+ transitionToSubSequentStateForDesiredResources();
+ },
+ onChangeEventTimestamp,
+ maxTriggerDelay);
+ }
+ }
+
+ private void transitionToSubSequentStateForDesiredResources() {
+ if (hasDesiredResources()) {
+ context().triggerTransitionToSubsequentState();
+ } else {
+ LOG.debug(
+ "Desired resources are not met, skipping the
transition to the subsequent state.");
+ }
+ }
+ }
+
+ /**
+ * {@link Phase} that handles the post-stabilization phase. A {@link
+ * StateTransitionManager#onTrigger()} event initiates rescaling if
sufficient resources are
+ * available; otherwise transitioning to {@link Idling} will be performed.
+ */
+ @VisibleForTesting
+ static final class Stabilized extends Phase {
+
+ private Stabilized(
+ Supplier<Temporal> clock,
+ DefaultStateTransitionManager context,
+ Temporal firstChangeEventTimestamp,
+ Duration maxTriggerDelay) {
+ super(clock, context);
+ this.scheduleRelativelyTo(this::onTrigger,
firstChangeEventTimestamp, maxTriggerDelay);
+ }
+
+ @Override
+ void onTrigger() {
+ if (hasSufficientResources()) {
+ context().triggerTransitionToSubsequentState();
+ } else {
+ LOG.debug("Sufficient resources are not met, progressing to
idling.");
+ context().progressToIdling();
+ }
+ }
+ }
+
+ /**
+ * In this final {@link Phase} no additional transition is possible: {@link
+ * StateTransitionManager#onChange()} and {@link
StateTransitionManager#onTrigger()} events will
+ * be ignored.
+ */
+ @VisibleForTesting
+ static final class Transitioning extends Phase {
+ private Transitioning(Supplier<Temporal> clock,
DefaultStateTransitionManager context) {
+ super(clock, context);
+ }
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
index 1fcd23884f5..f7dcb8d97cf 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
@@ -58,13 +58,13 @@ import java.util.stream.Collectors;
/** State which represents a running job with an {@link ExecutionGraph} and
assigned slots. */
class Executing extends StateWithExecutionGraph
- implements ResourceListener, RescaleManager.Context,
CheckpointStatsListener {
+ implements ResourceListener, StateTransitionManager.Context,
CheckpointStatsListener {
private final Context context;
private final RescalingController sufficientResourcesController;
private final RescalingController desiredResourcesController;
- private final RescaleManager rescaleManager;
+ private final StateTransitionManager stateTransitionManager;
private final int rescaleOnFailedCheckpointCount;
// null indicates that there was no change event observed, yet
@Nullable private AtomicInteger failedCheckpointCountdown;
@@ -77,7 +77,7 @@ class Executing extends StateWithExecutionGraph
Context context,
ClassLoader userCodeClassLoader,
List<ExceptionHistoryEntry> failureCollection,
- RescaleManager.Factory rescaleManagerFactory,
+ StateTransitionManager.Factory stateTransitionManagerFactory,
int minParallelismChangeForRescale,
int rescaleOnFailedCheckpointCount,
Instant lastRescale) {
@@ -96,7 +96,7 @@ class Executing extends StateWithExecutionGraph
this.sufficientResourcesController = new
EnforceParallelismChangeRescalingController();
this.desiredResourcesController =
new
EnforceMinimalIncreaseRescalingController(minParallelismChangeForRescale);
- this.rescaleManager = rescaleManagerFactory.create(this, lastRescale);
+ this.stateTransitionManager =
stateTransitionManagerFactory.create(this, lastRescale);
Preconditions.checkArgument(
rescaleOnFailedCheckpointCount > 0,
@@ -110,8 +110,8 @@ class Executing extends StateWithExecutionGraph
context.runIfState(
this,
() -> {
- rescaleManager.onChange();
- rescaleManager.onTrigger();
+ stateTransitionManager.onChange();
+ stateTransitionManager.onTrigger();
},
Duration.ZERO);
}
@@ -147,12 +147,12 @@ class Executing extends StateWithExecutionGraph
}
@Override
- public void scheduleOperation(Runnable callback, Duration delay) {
- context.runIfState(this, callback, delay);
+ public ScheduledFuture<?> scheduleOperation(Runnable callback, Duration
delay) {
+ return context.runIfState(this, callback, delay);
}
@Override
- public void rescale() {
+ public void transitionToSubsequentState() {
context.goToRestarting(
getExecutionGraph(),
getExecutionGraphHandler(),
@@ -186,6 +186,12 @@ class Executing extends StateWithExecutionGraph
context.goToFinished(ArchivedExecutionGraph.createFrom(getExecutionGraph()));
}
+ @Override
+ public void onLeave(Class<? extends State> newState) {
+ stateTransitionManager.close();
+ super.onLeave(newState);
+ }
+
private void deploy() {
for (ExecutionJobVertex executionJobVertex :
getExecutionGraph().getVerticesTopologically()) {
@@ -212,13 +218,13 @@ class Executing extends StateWithExecutionGraph
@Override
public void onNewResourcesAvailable() {
- rescaleManager.onChange();
+ stateTransitionManager.onChange();
initializeFailedCheckpointCountdownIfUnset();
}
@Override
public void onNewResourceRequirements() {
- rescaleManager.onChange();
+ stateTransitionManager.onChange();
initializeFailedCheckpointCountdownIfUnset();
}
@@ -236,7 +242,7 @@ class Executing extends StateWithExecutionGraph
}
private void triggerPotentialRescale() {
- rescaleManager.onTrigger();
+ stateTransitionManager.onTrigger();
this.failedCheckpointCountdown = null;
}
@@ -322,7 +328,7 @@ class Executing extends StateWithExecutionGraph
private final OperatorCoordinatorHandler operatorCoordinatorHandler;
private final ClassLoader userCodeClassLoader;
private final List<ExceptionHistoryEntry> failureCollection;
- private final RescaleManager.Factory rescaleManagerFactory;
+ private final StateTransitionManager.Factory
stateTransitionManagerFactory;
private final int minParallelismChangeForRescale;
private final int rescaleOnFailedCheckpointCount;
@@ -334,7 +340,7 @@ class Executing extends StateWithExecutionGraph
Context context,
ClassLoader userCodeClassLoader,
List<ExceptionHistoryEntry> failureCollection,
- RescaleManager.Factory rescaleManagerFactory,
+ StateTransitionManager.Factory stateTransitionManagerFactory,
int minParallelismChangeForRescale,
int rescaleOnFailedCheckpointCount) {
this.context = context;
@@ -344,7 +350,7 @@ class Executing extends StateWithExecutionGraph
this.operatorCoordinatorHandler = operatorCoordinatorHandler;
this.userCodeClassLoader = userCodeClassLoader;
this.failureCollection = failureCollection;
- this.rescaleManagerFactory = rescaleManagerFactory;
+ this.stateTransitionManagerFactory = stateTransitionManagerFactory;
this.minParallelismChangeForRescale =
minParallelismChangeForRescale;
this.rescaleOnFailedCheckpointCount =
rescaleOnFailedCheckpointCount;
}
@@ -362,7 +368,7 @@ class Executing extends StateWithExecutionGraph
context,
userCodeClassLoader,
failureCollection,
- rescaleManagerFactory,
+ stateTransitionManagerFactory,
minParallelismChangeForRescale,
rescaleOnFailedCheckpointCount,
Instant.now());
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/RescaleManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateTransitionManager.java
similarity index 55%
rename from
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/RescaleManager.java
rename to
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateTransitionManager.java
index a2e53bbe027..a6a6aaaca2e 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/RescaleManager.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateTransitionManager.java
@@ -20,21 +20,30 @@ package org.apache.flink.runtime.scheduler.adaptive;
import java.time.Duration;
import java.time.Instant;
+import java.util.concurrent.ScheduledFuture;
-/** The {@code RescaleManager} decides on whether rescaling should happen or
not. */
-public interface RescaleManager {
+/**
+ * The {@code StateTransitionManager} decides on whether {@link
AdaptiveScheduler} state transition
+ * should happen or not.
+ */
+public interface StateTransitionManager {
- /** Is called if the environment changed in a way that a rescaling could
be considered. */
+ /**
+ * Is called if the environment changed in a way that a state transition
could be considered.
+ */
void onChange();
/**
* Is called when any previous observed environment changes shall be
verified possibly
- * triggering a rescale operation.
+ * triggering a state transition operation.
*/
void onTrigger();
+ /** Is called when the state transition manager should be closed. */
+ default void close() {}
+
/**
- * The interface that can be used by the {@code RescaleManager} to
communicate with the
+ * The interface that can be used by the {@code StateTransitionManager} to
communicate with the
* underlying system.
*/
interface Context {
@@ -51,20 +60,24 @@ public interface RescaleManager {
*/
boolean hasDesiredResources();
- /** Triggers the rescaling of the job. */
- void rescale();
+ /** Triggers the transition to the subsequent state of the {@link
AdaptiveScheduler}. */
+ void transitionToSubsequentState();
- /** Runs operation with a given delay in the underlying main thread. */
- void scheduleOperation(Runnable callback, Duration delay);
+ /**
+ * Runs operation with a given delay in the underlying main thread.
+ *
+ * @return a ScheduledFuture representing pending completion of the
operation.
+ */
+ ScheduledFuture<?> scheduleOperation(Runnable callback, Duration
delay);
}
- /** Interface for creating {@code RescaleManager} instances. */
+ /** Interface for creating {@code StateTransitionManager} instances. */
interface Factory {
/**
- * Creates a {@code RescaleManager} instance for the given {@code
rescaleContext} and
- * previous rescale time.
+ * Creates a {@code StateTransitionManager} instance for the given
{@code Context} and
+ * previous state transition time.
*/
- RescaleManager create(Context rescaleContext, Instant lastRescale);
+ StateTransitionManager create(Context context, Instant
lastStateTransition);
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
index 433c6d66a90..26bd64e68b8 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
@@ -97,7 +97,7 @@ public class AdaptiveSchedulerBuilder {
/**
* {@code null} indicates that the default factory will be used based on
the set configuration.
*/
- @Nullable private RescaleManager.Factory rescaleManagerFactory = null;
+ @Nullable private StateTransitionManager.Factory
stateTransitionManagerFactory = null;
private BiFunction<JobManagerJobMetricGroup, CheckpointStatsListener,
CheckpointStatsTracker>
checkpointStatsTrackerFactory =
@@ -224,9 +224,9 @@ public class AdaptiveSchedulerBuilder {
return this;
}
- public AdaptiveSchedulerBuilder setRescaleManagerFactory(
- @Nullable RescaleManager.Factory rescaleManagerFactory) {
- this.rescaleManagerFactory = rescaleManagerFactory;
+ public AdaptiveSchedulerBuilder setStateTransitionManagerFactory(
+ @Nullable StateTransitionManager.Factory
stateTransitionManagerFactory) {
+ this.stateTransitionManagerFactory = stateTransitionManagerFactory;
return this;
}
@@ -259,9 +259,9 @@ public class AdaptiveSchedulerBuilder {
AdaptiveScheduler.Settings.of(jobMasterConfiguration);
return new AdaptiveScheduler(
settings,
- rescaleManagerFactory == null
- ? DefaultRescaleManager.Factory.fromSettings(settings)
- : rescaleManagerFactory,
+ stateTransitionManagerFactory == null
+ ?
DefaultStateTransitionManager.Factory.fromSettings(settings)
+ : stateTransitionManagerFactory,
checkpointStatsTrackerFactory,
jobGraph,
jobResourceRequirements,
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
index ab0b96fa765..5bc3d859c48 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
@@ -2306,8 +2306,8 @@ public class AdaptiveSchedulerTest {
EXECUTOR_RESOURCE.getExecutor())
.setJobMasterConfiguration(config)
.setDeclarativeSlotPool(slotPool)
- .setRescaleManagerFactory(
- new TestingRescaleManager.Factory(
+ .setStateTransitionManagerFactory(
+ new TestingStateTransitionManager.Factory(
() -> {},
() -> {
singleThreadMainThreadExecutor
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/DefaultRescaleManagerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/DefaultRescaleManagerTest.java
deleted file mode 100644
index 94e6a57ea09..00000000000
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/DefaultRescaleManagerTest.java
+++ /dev/null
@@ -1,675 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.scheduler.adaptive;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.util.ConfigurationException;
-
-import org.junit.jupiter.api.Test;
-
-import java.time.Duration;
-import java.time.Instant;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Function;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-
-class DefaultRescaleManagerTest {
-
- @Test
- void testProperConfiguration() throws ConfigurationException {
- final Duration scalingIntervalMin = Duration.ofMillis(1337);
- final Duration scalingIntervalMax = Duration.ofMillis(7331);
- final Duration maximumDelayForRescaleTrigger = Duration.ofMillis(4242);
-
- final Configuration configuration = new Configuration();
- configuration.set(JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MIN,
scalingIntervalMin);
- configuration.set(JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX,
scalingIntervalMax);
- configuration.set(
- JobManagerOptions.MAXIMUM_DELAY_FOR_SCALE_TRIGGER,
maximumDelayForRescaleTrigger);
-
- final DefaultRescaleManager testInstance =
- DefaultRescaleManager.Factory.fromSettings(
- AdaptiveScheduler.Settings.of(configuration))
- .create(TestingRescaleManagerContext.stableContext(),
Instant.now());
-
assertThat(testInstance.scalingIntervalMin).isEqualTo(scalingIntervalMin);
-
assertThat(testInstance.scalingIntervalMax).isEqualTo(scalingIntervalMax);
-
assertThat(testInstance.maxTriggerDelay).isEqualTo(maximumDelayForRescaleTrigger);
- }
-
- @Test
- void testInvalidConfiguration() {
- final Duration cooldownThreshold = Duration.ofMinutes(2);
- final TestingRescaleManagerContext ctx =
TestingRescaleManagerContext.stableContext();
- assertThatThrownBy(
- () ->
- new DefaultRescaleManager(
- Instant.now(),
- ctx,
- cooldownThreshold,
- cooldownThreshold.minusNanos(1),
- Duration.ofHours(5)))
- .isInstanceOf(IllegalArgumentException.class);
- }
-
- @Test
- void triggerWithoutChangeEventNoopInCooldownPhase() {
- triggerWithoutChangeEventNoop(
-
TestingRescaleManagerContext::createTestInstanceInCooldownPhase);
- }
-
- @Test
- void triggerWithoutChangeEventNoopInSoftRescalingPhase() {
- triggerWithoutChangeEventNoop(
-
TestingRescaleManagerContext::createTestInstanceInSoftRescalePhase);
- }
-
- @Test
- void triggerWithoutChangeEventNoopInHardRescalingPhase() {
- triggerWithoutChangeEventNoop(
-
TestingRescaleManagerContext::createTestInstanceInHardRescalePhase);
- }
-
- private void triggerWithoutChangeEventNoop(
- Function<TestingRescaleManagerContext, DefaultRescaleManager>
testInstanceCreator) {
- final TestingRescaleManagerContext ctx =
-
TestingRescaleManagerContext.stableContext().withDesiredRescaling();
- final DefaultRescaleManager testInstance =
testInstanceCreator.apply(ctx);
-
- testInstance.onTrigger();
-
- assertThat(ctx.rescaleWasTriggered())
- .as(
- "No rescaling should have been triggered due to the
missing change event despite the fact that desired rescaling would be
possible.")
- .isFalse();
- assertThat(ctx.additionalTasksWaiting()).as("No tasks should be
scheduled.").isFalse();
- }
-
- @Test
- void testDesiredChangeEventDuringCooldown() {
- final TestingRescaleManagerContext softScalePossibleCtx =
-
TestingRescaleManagerContext.stableContext().withDesiredRescaling();
- final DefaultRescaleManager testInstance =
- softScalePossibleCtx.createTestInstanceInCooldownPhase();
-
- testInstance.onChange();
-
- assertIntermediateStateWithoutRescale(softScalePossibleCtx);
-
- testInstance.onTrigger();
-
- assertIntermediateStateWithoutRescale(softScalePossibleCtx);
-
- softScalePossibleCtx.transitionIntoSoftScalingTimeframe();
-
- assertFinalStateWithRescale(softScalePossibleCtx);
- }
-
- @Test
- void testDesiredChangeEventInSoftRescalePhase() {
- final TestingRescaleManagerContext desiredRescalePossibleCtx =
-
TestingRescaleManagerContext.stableContext().withDesiredRescaling();
- final DefaultRescaleManager testInstance =
-
desiredRescalePossibleCtx.createTestInstanceInSoftRescalePhase();
-
- testInstance.onChange();
-
- assertIntermediateStateWithoutRescale(desiredRescalePossibleCtx);
-
- testInstance.onTrigger();
-
- assertFinalStateWithRescale(desiredRescalePossibleCtx);
- }
-
- @Test
- void testDesiredChangeEventInHardRescalePhase() {
- final TestingRescaleManagerContext desiredRescalePossibleCtx =
-
TestingRescaleManagerContext.stableContext().withDesiredRescaling();
- final DefaultRescaleManager testInstance =
-
desiredRescalePossibleCtx.createTestInstanceInHardRescalePhase();
-
- testInstance.onChange();
-
- assertIntermediateStateWithoutRescale(desiredRescalePossibleCtx);
-
- testInstance.onTrigger();
-
- assertFinalStateWithRescale(desiredRescalePossibleCtx);
- }
-
- @Test
- void testNoRescaleInCooldownPhase() {
- final TestingRescaleManagerContext noRescalePossibleCtx =
- TestingRescaleManagerContext.stableContext();
- final DefaultRescaleManager testInstance =
- noRescalePossibleCtx.createTestInstanceInCooldownPhase();
-
- testInstance.onChange();
-
- assertIntermediateStateWithoutRescale(noRescalePossibleCtx);
-
- testInstance.onTrigger();
-
- assertIntermediateStateWithoutRescale(noRescalePossibleCtx);
-
- noRescalePossibleCtx.transitionIntoSoftScalingTimeframe();
-
- assertIntermediateStateWithoutRescale(noRescalePossibleCtx);
-
- noRescalePossibleCtx.transitionIntoHardScalingTimeframe();
-
- assertThat(noRescalePossibleCtx.rescaleWasTriggered())
- .as("No rescaling should have happened even in the
hard-rescaling phase.")
- .isFalse();
- assertThat(noRescalePossibleCtx.additionalTasksWaiting())
- .as("No further tasks should have been waiting for execution.")
- .isTrue();
- }
-
- @Test
- void testNoRescaleInSoftRescalePhase() {
- final TestingRescaleManagerContext noRescalePossibleCtx =
- TestingRescaleManagerContext.stableContext();
- final DefaultRescaleManager testInstance =
- noRescalePossibleCtx.createTestInstanceInSoftRescalePhase();
-
- testInstance.onChange();
-
- assertIntermediateStateWithoutRescale(noRescalePossibleCtx);
-
- testInstance.onTrigger();
-
- assertIntermediateStateWithoutRescale(noRescalePossibleCtx);
-
- noRescalePossibleCtx.transitionIntoHardScalingTimeframe();
-
- assertThat(noRescalePossibleCtx.rescaleWasTriggered())
- .as("No rescaling should have happened even in the
hard-rescaling phase.")
- .isFalse();
- assertThat(noRescalePossibleCtx.additionalTasksWaiting())
- .as("No further tasks should have been waiting for execution.")
- .isTrue();
- }
-
- @Test
- void testNoResaleInHardRescalePhase() {
- final TestingRescaleManagerContext noRescalePossibleCtx =
- TestingRescaleManagerContext.stableContext();
- final DefaultRescaleManager testInstance =
- noRescalePossibleCtx.createTestInstanceInHardRescalePhase();
-
- testInstance.onChange();
-
- assertIntermediateStateWithoutRescale(noRescalePossibleCtx);
-
- testInstance.onTrigger();
-
- assertThat(noRescalePossibleCtx.rescaleWasTriggered())
- .as("No rescaling should have happened even in the
hard-rescaling phase.")
- .isFalse();
- assertThat(noRescalePossibleCtx.additionalTasksWaiting())
- .as("No further tasks should have been waiting for execution.")
- .isTrue();
- }
-
- @Test
- void testSufficientChangeInCooldownPhase() {
- final TestingRescaleManagerContext hardRescalePossibleCtx =
-
TestingRescaleManagerContext.stableContext().withSufficientRescaling();
- final DefaultRescaleManager testInstance =
- hardRescalePossibleCtx.createTestInstanceInCooldownPhase();
-
- testInstance.onChange();
-
- assertIntermediateStateWithoutRescale(hardRescalePossibleCtx);
-
- testInstance.onTrigger();
-
- assertIntermediateStateWithoutRescale(hardRescalePossibleCtx);
-
- hardRescalePossibleCtx.transitionIntoSoftScalingTimeframe();
-
- assertIntermediateStateWithoutRescale(hardRescalePossibleCtx);
-
- hardRescalePossibleCtx.transitionIntoHardScalingTimeframe();
-
- assertFinalStateWithRescale(hardRescalePossibleCtx);
- }
-
- @Test
- void testSufficientChangeInSoftRescalePhase() {
- final TestingRescaleManagerContext hardRescalePossibleCtx =
-
TestingRescaleManagerContext.stableContext().withSufficientRescaling();
- final DefaultRescaleManager testInstance =
- hardRescalePossibleCtx.createTestInstanceInSoftRescalePhase();
-
- testInstance.onChange();
-
- assertIntermediateStateWithoutRescale(hardRescalePossibleCtx);
-
- testInstance.onTrigger();
-
- assertIntermediateStateWithoutRescale(hardRescalePossibleCtx);
-
- hardRescalePossibleCtx.transitionIntoHardScalingTimeframe();
-
- assertFinalStateWithRescale(hardRescalePossibleCtx);
- }
-
- @Test
- void testSufficientChangeInHardRescalePhase() {
- final TestingRescaleManagerContext hardRescalePossibleCtx =
-
TestingRescaleManagerContext.stableContext().withSufficientRescaling();
- final DefaultRescaleManager testInstance =
- hardRescalePossibleCtx.createTestInstanceInHardRescalePhase();
-
- testInstance.onChange();
-
- assertIntermediateStateWithoutRescale(hardRescalePossibleCtx);
-
- testInstance.onTrigger();
-
- assertFinalStateWithRescale(hardRescalePossibleCtx);
- }
-
- @Test
- void
testSufficientChangeInCooldownWithSubsequentDesiredChangeInSoftRescalePhase() {
- final TestingRescaleManagerContext ctx =
-
TestingRescaleManagerContext.stableContext().withSufficientRescaling();
- final DefaultRescaleManager testInstance =
ctx.createTestInstanceInCooldownPhase();
-
- testInstance.onChange();
-
- assertIntermediateStateWithoutRescale(ctx);
-
- testInstance.onTrigger();
-
- assertIntermediateStateWithoutRescale(ctx);
-
- ctx.transitionIntoSoftScalingTimeframe();
-
- ctx.withDesiredRescaling();
-
- testInstance.onChange();
-
- assertIntermediateStateWithoutRescale(ctx);
-
- testInstance.onTrigger();
-
- assertThat(ctx.rescaleWasTriggered()).isTrue();
- assertThat(ctx.numberOfTasksWaiting())
- .as(
- "There should be a task scheduled that allows
transitioning into hard-rescaling phase.")
- .isEqualTo(3);
- }
-
- @Test
- void testSufficientChangeWithSubsequentDesiredChangeInSoftRescalePhase() {
- final TestingRescaleManagerContext ctx =
-
TestingRescaleManagerContext.stableContext().withSufficientRescaling();
- final DefaultRescaleManager testInstance =
ctx.createTestInstanceInSoftRescalePhase();
-
- testInstance.onChange();
-
- assertIntermediateStateWithoutRescale(ctx);
-
- testInstance.onTrigger();
-
- assertIntermediateStateWithoutRescale(ctx);
-
- assertThat(ctx.numberOfTasksWaiting())
- .as(
- "There should be a task scheduled that allows
transitioning into hard-rescaling phase.")
- .isEqualTo(2);
-
- ctx.withDesiredRescaling();
-
- testInstance.onChange();
-
- assertIntermediateStateWithoutRescale(ctx);
-
- testInstance.onTrigger();
-
- assertThat(ctx.rescaleWasTriggered()).isTrue();
- }
-
- @Test
- void
-
testRevokedSufficientChangeInSoftRescalePhaseWithSubsequentSufficientChangeInHardRescalingPhase()
{
- final TestingRescaleManagerContext ctx =
-
TestingRescaleManagerContext.stableContext().withSufficientRescaling();
- final DefaultRescaleManager testInstance =
ctx.createTestInstanceInSoftRescalePhase();
-
- testInstance.onChange();
-
- assertIntermediateStateWithoutRescale(ctx);
-
- testInstance.onTrigger();
-
- assertIntermediateStateWithoutRescale(ctx);
-
- assertThat(ctx.numberOfTasksWaiting())
- .as(
- "There should be a task scheduled that allows
transitioning into hard-rescaling phase.")
- .isEqualTo(2);
-
- ctx.revertAnyParallelismImprovements();
-
- testInstance.onChange();
-
- assertIntermediateStateWithoutRescale(ctx);
-
- testInstance.onTrigger();
-
- assertIntermediateStateWithoutRescale(ctx);
-
- assertThat(ctx.numberOfTasksWaiting())
- .as(
- "There should be a task scheduled that allows
transitioning into hard-rescaling phase.")
- .isEqualTo(2);
-
- ctx.transitionIntoHardScalingTimeframe();
-
- assertThat(ctx.rescaleWasTriggered())
- .as(
- "No rescaling should have been triggered because of
the previous revert of the additional resources.")
- .isFalse();
- assertThat(ctx.additionalTasksWaiting())
- .as(
- "The transition to hard-rescaling should have happened
without any additional tasks in waiting state.")
- .isTrue();
-
- ctx.withSufficientRescaling();
-
- testInstance.onChange();
-
- assertIntermediateStateWithoutRescale(ctx);
-
- testInstance.onTrigger();
-
- assertFinalStateWithRescale(ctx);
- }
-
- @Test
- void
testRevokedChangeInHardRescalingPhaseCausesWithSubsequentSufficientChange() {
- final TestingRescaleManagerContext ctx =
TestingRescaleManagerContext.stableContext();
- final DefaultRescaleManager testInstance =
ctx.createTestInstanceInHardRescalePhase();
-
- testInstance.onChange();
-
- assertIntermediateStateWithoutRescale(ctx);
-
- testInstance.onTrigger();
-
- assertThat(ctx.rescaleWasTriggered()).isFalse();
- assertThat(ctx.additionalTasksWaiting()).isTrue();
-
- ctx.withSufficientRescaling();
-
- testInstance.onChange();
-
- assertIntermediateStateWithoutRescale(ctx);
-
- testInstance.onTrigger();
-
- assertFinalStateWithRescale(ctx);
- }
-
- private static void
assertIntermediateStateWithoutRescale(TestingRescaleManagerContext ctx) {
- assertThat(ctx.rescaleWasTriggered())
- .as("The rescale should not have been triggered, yet.")
- .isFalse();
- assertThat(ctx.additionalTasksWaiting())
- .as("There should be still tasks being scheduled.")
- .isTrue();
- }
-
- private static void
assertFinalStateWithRescale(TestingRescaleManagerContext ctx) {
- assertThat(ctx.rescaleWasTriggered())
- .as("The rescale should have been triggered already.")
- .isTrue();
- assertThat(ctx.additionalTasksWaiting())
- .as("All scheduled tasks should have been executed.")
- .isTrue();
- }
-
- /**
- * {@code TestingRescaleManagerContext} provides methods for adjusting the
elapsed time and for
- * adjusting the available resources for rescaling.
- */
- private static class TestingRescaleManagerContext implements
RescaleManager.Context {
-
- // default configuration values to allow for easy transitioning
between the phases
- private static final Duration SCALING_MIN = Duration.ofHours(1);
- private static final Duration SCALING_MAX = Duration.ofHours(2);
-
- // configuration that defines what kind of rescaling would be possible
- private boolean hasSufficientResources = false;
- private boolean hasDesiredResources = false;
-
- // internal state used for assertions
- private final AtomicBoolean rescaleTriggered = new AtomicBoolean();
- private final SortedMap<Instant, List<Runnable>> scheduledTasks = new
TreeMap<>();
-
- // Instant.MIN makes debugging easier because timestamps become
human-readable
- private final Instant initializationTime = Instant.MIN;
- private Duration elapsedTime = Duration.ZERO;
-
- // ///////////////////////////////////////////////
- // Context creation
- // ///////////////////////////////////////////////
-
- public static TestingRescaleManagerContext stableContext() {
- return new TestingRescaleManagerContext();
- }
-
- private TestingRescaleManagerContext() {
- // no rescaling is enabled by default
- revertAnyParallelismImprovements();
- }
-
- public void revertAnyParallelismImprovements() {
- this.hasSufficientResources = false;
- this.hasDesiredResources = false;
- }
-
- public TestingRescaleManagerContext withDesiredRescaling() {
- // having desired resources should also mean that the sufficient
resources are met
- this.hasSufficientResources = true;
- this.hasDesiredResources = true;
-
- return this;
- }
-
- public TestingRescaleManagerContext withSufficientRescaling() {
- this.hasSufficientResources = true;
- this.hasDesiredResources = false;
-
- return this;
- }
-
- // ///////////////////////////////////////////////
- // RescaleManager.Context interface methods
- // ///////////////////////////////////////////////
-
- @Override
- public boolean hasSufficientResources() {
- return this.hasSufficientResources;
- }
-
- @Override
- public boolean hasDesiredResources() {
- return this.hasDesiredResources;
- }
-
- @Override
- public void rescale() {
- rescaleTriggered.set(true);
- }
-
- @Override
- public void scheduleOperation(Runnable callback, Duration delay) {
- final Instant triggerTime =
-
Objects.requireNonNull(initializationTime).plus(elapsedTime).plus(delay);
- if (!scheduledTasks.containsKey(triggerTime)) {
- scheduledTasks.put(triggerTime, new ArrayList<>());
- }
-
- scheduledTasks.get(triggerTime).add(callback);
- }
-
- // ///////////////////////////////////////////////
- // Test instance creation
- // ///////////////////////////////////////////////
-
- /**
- * Creates the {@code DefaultRescaleManager} test instance and
transitions into a period in
- * time where the instance is in cooldown phase.
- */
- public DefaultRescaleManager createTestInstanceInCooldownPhase() {
- return createTestInstance(this::transitionIntoCooldownTimeframe);
- }
-
- /**
- * Creates the {@code DefaultRescaleManager} test instance and
transitions into a period in
- * time where the instance is in soft-rescaling phase.
- */
- public DefaultRescaleManager createTestInstanceInSoftRescalePhase() {
- return
createTestInstance(this::transitionIntoSoftScalingTimeframe);
- }
-
- /**
- * Creates the {@code DefaultRescaleManager} test instance and
transitions into a period in
- * time where the instance is in hard-rescaling phase.
- */
- public DefaultRescaleManager createTestInstanceInHardRescalePhase() {
- return
createTestInstance(this::transitionIntoHardScalingTimeframe);
- }
-
- /**
- * Initializes the test instance and sets the context's elapsed time
based on the passed
- * callback.
- */
- private DefaultRescaleManager createTestInstance(Runnable
timeTransitioning) {
- final DefaultRescaleManager testInstance =
- new DefaultRescaleManager(
- initializationTime,
- // clock that returns the time based on the
configured elapsedTime
- () ->
Objects.requireNonNull(initializationTime).plus(elapsedTime),
- this,
- SCALING_MIN,
- SCALING_MAX,
- Duration.ofHours(5)) {
- @Override
- public void onChange() {
- super.onChange();
-
- // hack to avoid calling this method in every test
method
- // we want to trigger tasks that are meant to run
right-away
-
TestingRescaleManagerContext.this.triggerOutdatedTasks();
- }
-
- @Override
- public void onTrigger() {
- super.onTrigger();
-
- // hack to avoid calling this method in every test
method
- // we want to trigger tasks that are meant to run
right-away
-
TestingRescaleManagerContext.this.triggerOutdatedTasks();
- }
- };
-
- timeTransitioning.run();
- return testInstance;
- }
-
- // ///////////////////////////////////////////////
- // Time-adjustment functionality
- // ///////////////////////////////////////////////
-
- /**
- * Transitions the context's time to a moment that falls into the test
instance's cooldown
- * phase.
- */
- public void transitionIntoCooldownTimeframe() {
- this.elapsedTime = SCALING_MIN.dividedBy(2);
- this.triggerOutdatedTasks();
- }
-
- /**
- * Transitions the context's time to a moment that falls into the test
instance's
- * soft-scaling phase.
- */
- public void transitionIntoSoftScalingTimeframe() {
- // the state transition is scheduled based on the current event's
time rather than the
- // initializationTime
- this.elapsedTime = elapsedTime.plus(SCALING_MIN);
-
- // make sure that we're still below the scalingIntervalMax
- this.elapsedTime =
elapsedTime.plus(SCALING_MAX.minus(elapsedTime).dividedBy(2));
- this.triggerOutdatedTasks();
- }
-
- /**
- * Transitions the context's time to a moment that falls into the test
instance's
- * hard-scaling phase.
- */
- public void transitionIntoHardScalingTimeframe() {
- // the state transition is scheduled based on the current event's
time rather than the
- // initializationTime
- this.elapsedTime = elapsedTime.plus(SCALING_MAX).plusMinutes(1);
- this.triggerOutdatedTasks();
- }
-
- private void triggerOutdatedTasks() {
- while (!scheduledTasks.isEmpty()) {
- final Instant timeOfExecution = scheduledTasks.firstKey();
- if (!timeOfExecution.isAfter(
-
Objects.requireNonNull(initializationTime).plus(elapsedTime))) {
-
scheduledTasks.remove(timeOfExecution).forEach(Runnable::run);
- } else {
- break;
- }
- }
- }
-
- // ///////////////////////////////////////////////
- // Methods for verifying the context's state
- // ///////////////////////////////////////////////
-
- public boolean rescaleWasTriggered() {
- return rescaleTriggered.get();
- }
-
- public int numberOfTasksWaiting() {
- return scheduledTasks.size();
- }
-
- public boolean additionalTasksWaiting() {
- return !scheduledTasks.isEmpty();
- }
- }
-}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManagerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManagerTest.java
new file mode 100644
index 00000000000..ca90aa2a1c5
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManagerTest.java
@@ -0,0 +1,722 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.core.testutils.ScheduledTask;
+import
org.apache.flink.runtime.scheduler.adaptive.DefaultStateTransitionManager.Idling;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.Preconditions;
+
+import org.junit.jupiter.api.Test;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+
+import static
org.apache.flink.runtime.scheduler.adaptive.DefaultStateTransitionManager.Cooldown;
+import static
org.apache.flink.runtime.scheduler.adaptive.DefaultStateTransitionManager.Phase;
+import static
org.apache.flink.runtime.scheduler.adaptive.DefaultStateTransitionManager.Stabilized;
+import static
org.apache.flink.runtime.scheduler.adaptive.DefaultStateTransitionManager.Stabilizing;
+import static
org.apache.flink.runtime.scheduler.adaptive.DefaultStateTransitionManager.Transitioning;
+import static org.assertj.core.api.Assertions.assertThat;
+
+class DefaultStateTransitionManagerTest {
+
+ @Test
+ void testProperConfiguration() throws ConfigurationException {
+ final Duration cooldownTimeout = Duration.ofMillis(1337);
+ final Duration resourceStabilizationTimeout = Duration.ofMillis(7331);
+ final Duration maximumDelayForRescaleTrigger = Duration.ofMillis(4242);
+
+ final Configuration configuration = new Configuration();
+ configuration.set(JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MIN,
cooldownTimeout);
+ configuration.set(
+ JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX,
resourceStabilizationTimeout);
+ configuration.set(
+ JobManagerOptions.MAXIMUM_DELAY_FOR_SCALE_TRIGGER,
maximumDelayForRescaleTrigger);
+
+ final DefaultStateTransitionManager testInstance =
+ DefaultStateTransitionManager.Factory.fromSettings(
+ AdaptiveScheduler.Settings.of(configuration))
+ .create(
+
TestingStateTransitionManagerContext.stableContext(),
+ Instant.now());
+ assertThat(testInstance.cooldownTimeout).isEqualTo(cooldownTimeout);
+ assertThat(testInstance.resourceStabilizationTimeout)
+ .isEqualTo(resourceStabilizationTimeout);
+
assertThat(testInstance.maxTriggerDelay).isEqualTo(maximumDelayForRescaleTrigger);
+ }
+
+ @Test
+ void testTriggerWithoutChangeEventNoopInCooldownPhase() {
+ final TestingStateTransitionManagerContext ctx =
+
TestingStateTransitionManagerContext.stableContext().withDesiredResources();
+ final DefaultStateTransitionManager testInstance =
ctx.createTestInstanceInCooldownPhase();
+ triggerWithoutPhaseMove(ctx, testInstance, Cooldown.class);
+ }
+
+ @Test
+ void testTriggerWithoutChangeEventNoopInIdlingPhase() {
+ final TestingStateTransitionManagerContext ctx =
+
TestingStateTransitionManagerContext.stableContext().withDesiredResources();
+ final DefaultStateTransitionManager testInstance =
+ ctx.createTestInstanceThatPassedCooldownPhase();
+ triggerWithoutPhaseMove(ctx, testInstance, Idling.class);
+ }
+
+ @Test
+ void testTriggerWithoutChangeEventNoopInTransitioningPhase() {
+ final TestingStateTransitionManagerContext ctx =
+
TestingStateTransitionManagerContext.stableContext().withDesiredResources();
+ final DefaultStateTransitionManager testInstance =
+ ctx.createTestInstanceInTransitioningPhase();
+ triggerWithoutPhaseMove(ctx, testInstance, Transitioning.class);
+ }
+
+ @Test
+ void testStateTransitionRightAfterCooldown() {
+ final TestingStateTransitionManagerContext ctx =
+
TestingStateTransitionManagerContext.stableContext().withDesiredResources();
+ final DefaultStateTransitionManager testInstance =
ctx.createTestInstanceInCooldownPhase();
+
+ changeWithoutPhaseMove(ctx, testInstance, Cooldown.class);
+
+ triggerWithoutPhaseMove(ctx, testInstance, Cooldown.class);
+
+ ctx.transitionToInclusiveCooldownEnd();
+
+ assertPhaseWithoutStateTransition(ctx, testInstance, Cooldown.class);
+
+ testInstance.onTrigger();
+
+ ctx.passTime(Duration.ofMillis(1));
+
+ assertPhaseWithoutStateTransition(ctx, testInstance,
Stabilizing.class);
+
+ testInstance.onTrigger();
+
+ assertFinalStateTransitionHappened(ctx, testInstance);
+ }
+
+ @Test
+ void testDesiredChangeInCooldownPhase() {
+ final TestingStateTransitionManagerContext ctx =
+
TestingStateTransitionManagerContext.stableContext().withDesiredResources();
+ final DefaultStateTransitionManager testInstance =
ctx.createTestInstanceInCooldownPhase();
+
+ changeWithoutPhaseMove(ctx, testInstance, Cooldown.class);
+
+ triggerWithoutPhaseMove(ctx, testInstance, Cooldown.class);
+
+ ctx.transitionOutOfCooldownPhase();
+
+ assertPhaseWithoutStateTransition(ctx, testInstance,
Stabilizing.class);
+
+ testInstance.onTrigger();
+
+ assertFinalStateTransitionHappened(ctx, testInstance);
+ }
+
+ @Test
+ void testDesiredChangeInIdlingPhase() {
+ final TestingStateTransitionManagerContext ctx =
+
TestingStateTransitionManagerContext.stableContext().withDesiredResources();
+ final DefaultStateTransitionManager testInstance =
+ ctx.createTestInstanceThatPassedCooldownPhase();
+
+ assertPhaseWithoutStateTransition(ctx, testInstance, Idling.class);
+
+ testInstance.onChange();
+
+ assertPhaseWithoutStateTransition(ctx, testInstance,
Stabilizing.class);
+
+ testInstance.onTrigger();
+
+ assertFinalStateTransitionHappened(ctx, testInstance);
+ }
+
+ @Test
+ void testDesiredChangeInStabilizedPhase() {
+ final TestingStateTransitionManagerContext ctx =
+
TestingStateTransitionManagerContext.stableContext().withSufficientResources();
+ final DefaultStateTransitionManager testInstance =
+ ctx.createTestInstanceInStabilizedPhase();
+
+ assertPhaseWithoutStateTransition(ctx, testInstance, Stabilized.class);
+
+ withDesiredChange(ctx, testInstance);
+
+ assertPhaseWithoutStateTransition(ctx, testInstance, Stabilized.class);
+
+ testInstance.onTrigger();
+
+ assertFinalStateTransitionHappened(ctx, testInstance);
+ }
+
+ @Test
+ void testDesiredResourcesInStabilizingPhaseAfterMaxTriggerDelay() {
+ final TestingStateTransitionManagerContext ctx =
+
TestingStateTransitionManagerContext.stableContext().withSufficientResources();
+ final DefaultStateTransitionManager testInstance =
+ ctx.createTestInstanceWithoutStabilizationTimeout(
+ manager -> {
+ manager.onChange();
+
ctx.passTime(TestingStateTransitionManagerContext.COOLDOWN_TIMEOUT);
+ });
+
+ assertPhaseWithoutStateTransition(ctx, testInstance,
Stabilizing.class);
+
+ ctx.passMaxDelayTriggerTimeout();
+
+ withDesiredChange(ctx, testInstance);
+
+ assertPhaseWithoutStateTransition(ctx, testInstance,
Stabilizing.class);
+
+ ctx.passMaxDelayTriggerTimeout();
+
+ assertFinalStateTransitionHappened(ctx, testInstance);
+ }
+
+ @Test
+ void testNoResourcesChangeInCooldownPhase() {
+ final TestingStateTransitionManagerContext ctx =
+ TestingStateTransitionManagerContext.stableContext();
+ final DefaultStateTransitionManager testInstance =
ctx.createTestInstanceInCooldownPhase();
+
+ changeWithoutPhaseMove(ctx, testInstance, Cooldown.class);
+
+ triggerWithoutPhaseMove(ctx, testInstance, Cooldown.class);
+
+ ctx.transitionOutOfCooldownPhase();
+
+ assertPhaseWithoutStateTransition(ctx, testInstance, Idling.class);
+ }
+
+ @Test
+ void testNoResourcesChangeInIdlingPhase() {
+ final TestingStateTransitionManagerContext ctx =
+ TestingStateTransitionManagerContext.stableContext();
+ final DefaultStateTransitionManager testInstance =
+ ctx.createTestInstanceThatPassedCooldownPhase();
+
+ changeWithoutPhaseMove(ctx, testInstance, Idling.class);
+
+ triggerWithoutPhaseMove(ctx, testInstance, Idling.class);
+ }
+
+ @Test
+ void testSufficientResourcesInStabilizingPhaseAfterMaxTriggerDelay() {
+ final TestingStateTransitionManagerContext ctx =
+
TestingStateTransitionManagerContext.stableContext().withSufficientResources();
+ final DefaultStateTransitionManager testInstance =
+ ctx.createTestInstanceWithoutStabilizationTimeout(
+ manager -> {
+ manager.onChange();
+
ctx.passTime(TestingStateTransitionManagerContext.COOLDOWN_TIMEOUT);
+ });
+
+ assertPhaseWithoutStateTransition(ctx, testInstance,
Stabilizing.class);
+
+ ctx.passMaxDelayTriggerTimeout();
+
+ assertPhaseWithoutStateTransition(ctx, testInstance,
Stabilizing.class);
+ }
+
+ @Test
+ void testSufficientResourcesInStabilizedPhaseAfterMaxTriggerDelay() {
+ final TestingStateTransitionManagerContext ctx =
+
TestingStateTransitionManagerContext.stableContext().withSufficientResources();
+ final DefaultStateTransitionManager testInstance =
+ ctx.createTestInstanceInStabilizedPhase();
+
+ assertPhaseWithoutStateTransition(ctx, testInstance, Stabilized.class);
+
+ ctx.passMaxDelayTriggerTimeout();
+
+ assertFinalStateTransitionHappened(ctx, testInstance);
+ }
+
+ @Test
+ void testSufficientChangeInCooldownPhase() {
+ final TestingStateTransitionManagerContext ctx =
+
TestingStateTransitionManagerContext.stableContext().withSufficientResources();
+ final DefaultStateTransitionManager testInstance =
ctx.createTestInstanceInCooldownPhase();
+
+ changeWithoutPhaseMove(ctx, testInstance, Cooldown.class);
+
+ triggerWithoutPhaseMove(ctx, testInstance, Cooldown.class);
+
+ ctx.transitionOutOfCooldownPhase();
+
+ triggerWithoutPhaseMove(ctx, testInstance, Stabilizing.class);
+
+ ctx.passResourceStabilizationTimeout();
+
+ testInstance.onTrigger();
+
+ assertFinalStateTransitionHappened(ctx, testInstance);
+ }
+
+ @Test
+ void testSufficientChangeInIdlingPhase() {
+ final TestingStateTransitionManagerContext ctx =
+
TestingStateTransitionManagerContext.stableContext().withSufficientResources();
+ final DefaultStateTransitionManager testInstance =
+ ctx.createTestInstanceThatPassedCooldownPhase();
+
+ assertPhaseWithoutStateTransition(ctx, testInstance, Idling.class);
+
+ testInstance.onChange();
+
+ triggerWithoutPhaseMove(ctx, testInstance, Stabilizing.class);
+
+ ctx.passResourceStabilizationTimeout();
+
+ testInstance.onTrigger();
+
+ assertFinalStateTransitionHappened(ctx, testInstance);
+ }
+
+ @Test
+ void testSufficientChangeInStabilizedPhase() {
+ final TestingStateTransitionManagerContext ctx =
+
TestingStateTransitionManagerContext.stableContext().withSufficientResources();
+ final DefaultStateTransitionManager testInstance =
+ ctx.createTestInstanceInStabilizedPhase();
+
+ assertPhaseWithoutStateTransition(ctx, testInstance, Stabilized.class);
+
+ testInstance.onTrigger();
+
+ assertFinalStateTransitionHappened(ctx, testInstance);
+ }
+
+ @Test
+ void testSufficientChangeWithSubsequentDesiredChangeInStabilizingPhase() {
+ final TestingStateTransitionManagerContext ctx =
+
TestingStateTransitionManagerContext.stableContext().withSufficientResources();
+ final DefaultStateTransitionManager testInstance =
+ ctx.createTestInstanceThatPassedCooldownPhase();
+
+ assertPhaseWithoutStateTransition(ctx, testInstance, Idling.class);
+
+ testInstance.onChange();
+
+ triggerWithoutPhaseMove(ctx, testInstance, Stabilizing.class);
+
+ withDesiredChange(ctx, testInstance);
+
+ assertPhaseWithoutStateTransition(ctx, testInstance,
Stabilizing.class);
+
+ testInstance.onTrigger();
+
+ assertFinalStateTransitionHappened(ctx, testInstance);
+ }
+
+ @Test
+ void
testRevokedChangeInStabilizingPhaseWithSubsequentSufficientChangeInStabilizedPhase()
{
+ final TestingStateTransitionManagerContext ctx =
+
TestingStateTransitionManagerContext.stableContext().withSufficientResources();
+ final DefaultStateTransitionManager testInstance =
+ ctx.createTestInstanceThatPassedCooldownPhase();
+
+ assertPhaseWithoutStateTransition(ctx, testInstance, Idling.class);
+
+ testInstance.onChange();
+
+ triggerWithoutPhaseMove(ctx, testInstance, Stabilizing.class);
+
+ ctx.withRevokeResources();
+
+ changeWithoutPhaseMove(ctx, testInstance, Stabilizing.class);
+
+ triggerWithoutPhaseMove(ctx, testInstance, Stabilizing.class);
+
+ ctx.passResourceStabilizationTimeout();
+
+ assertPhaseWithoutStateTransition(ctx, testInstance, Stabilized.class);
+
+ withSufficientChange(ctx, testInstance);
+
+ assertPhaseWithoutStateTransition(ctx, testInstance, Stabilized.class);
+
+ testInstance.onTrigger();
+
+ assertFinalStateTransitionHappened(ctx, testInstance);
+ }
+
+ @Test
+ void testRevokedChangeInStabilizedPhase() {
+ final TestingStateTransitionManagerContext ctx =
+
TestingStateTransitionManagerContext.stableContext().withSufficientResources();
+ final DefaultStateTransitionManager testInstance =
+ ctx.createTestInstanceInStabilizedPhase();
+
+ assertPhaseWithoutStateTransition(ctx, testInstance, Stabilized.class);
+
+ ctx.withRevokeResources();
+
+ testInstance.onTrigger();
+
+ assertPhaseWithoutStateTransition(ctx, testInstance, Idling.class);
+ }
+
+ @Test
+ void testScheduledTaskBeingIgnoredAfterStateChanged() {
+ final TestingStateTransitionManagerContext ctx =
+ TestingStateTransitionManagerContext.stableContext();
+ final DefaultStateTransitionManager testInstance =
+ ctx.createTestInstanceInStabilizedPhase();
+
+ final AtomicBoolean callbackCalled = new AtomicBoolean();
+ testInstance.scheduleFromNow(
+ () -> callbackCalled.set(true), Duration.ZERO, new
TestPhase());
+ ctx.triggerOutdatedTasks();
+
+ assertThat(callbackCalled).isFalse();
+ }
+
+ private static class TestPhase extends Phase {
+ private TestPhase() {
+ super(Instant::now, null);
+ }
+ }
+
+ private static void assertPhaseWithoutStateTransition(
+ TestingStateTransitionManagerContext ctx,
+ DefaultStateTransitionManager testInstance,
+ Class<? extends Phase> expectedPhase) {
+ assertThat(ctx.stateTransitionWasTriggered()).isFalse();
+ assertThat(testInstance.getPhase()).isInstanceOf(expectedPhase);
+ }
+
+ private static void assertFinalStateTransitionHappened(
+ TestingStateTransitionManagerContext ctx,
DefaultStateTransitionManager testInstance) {
+ assertThat(ctx.stateTransitionWasTriggered()).isTrue();
+ assertThat(testInstance.getPhase()).isInstanceOf(Transitioning.class);
+ }
+
+ private static void changeWithoutPhaseMove(
+ TestingStateTransitionManagerContext ctx,
+ DefaultStateTransitionManager testInstance,
+ Class<? extends Phase> expectedPhase) {
+ assertPhaseWithoutStateTransition(ctx, testInstance, expectedPhase);
+
+ testInstance.onChange();
+
+ assertPhaseWithoutStateTransition(ctx, testInstance, expectedPhase);
+ }
+
+ private static void triggerWithoutPhaseMove(
+ TestingStateTransitionManagerContext ctx,
+ DefaultStateTransitionManager testInstance,
+ Class<? extends Phase> expectedPhase) {
+
+ assertPhaseWithoutStateTransition(ctx, testInstance, expectedPhase);
+
+ testInstance.onTrigger();
+
+ assertPhaseWithoutStateTransition(ctx, testInstance, expectedPhase);
+ }
+
+ private static void withSufficientChange(
+ TestingStateTransitionManagerContext ctx,
DefaultStateTransitionManager testInstance) {
+
+ ctx.withSufficientResources();
+ testInstance.onChange();
+ }
+
+ private static void withDesiredChange(
+ TestingStateTransitionManagerContext ctx,
DefaultStateTransitionManager testInstance) {
+
+ ctx.withDesiredResources();
+ testInstance.onChange();
+ }
+
+ /**
+ * {@code TestingStateTransitionManagerContext} provides methods for
adjusting the elapsed time
+ * and for adjusting the available resources for rescaling.
+ */
+ private static class TestingStateTransitionManagerContext
+ implements StateTransitionManager.Context {
+
+ // default configuration values to allow for easy transitioning
between the phases
+ private static final Duration COOLDOWN_TIMEOUT = Duration.ofHours(1);
+ private static final Duration RESOURCE_STABILIZATION_TIMEOUT =
Duration.ofHours(2);
+ private static final Duration MAX_TRIGGER_DELAY =
+ RESOURCE_STABILIZATION_TIMEOUT.plus(Duration.ofMinutes(10));
+
+ // configuration that defines what kind of rescaling would be possible
+ private boolean hasSufficientResources = false;
+ private boolean hasDesiredResources = false;
+
+ // internal state used for assertions
+ private final AtomicBoolean transitionTriggered = new AtomicBoolean();
+ private final SortedMap<Instant, List<ScheduledTask<Object>>>
scheduledTasks =
+ new TreeMap<>();
+
+ // Instant.MIN makes debugging easier because timestamps become
human-readable
+ private final Instant initializationTime = Instant.MIN;
+ private Duration elapsedTime = Duration.ZERO;
+
+ // ///////////////////////////////////////////////
+ // Context creation
+ // ///////////////////////////////////////////////
+
+ public static TestingStateTransitionManagerContext stableContext() {
+ return new TestingStateTransitionManagerContext();
+ }
+
+ private TestingStateTransitionManagerContext() {
+ // no rescaling is enabled by default
+ withRevokeResources();
+ }
+
+ public TestingStateTransitionManagerContext withRevokeResources() {
+ this.hasSufficientResources = false;
+ this.hasDesiredResources = false;
+
+ return this;
+ }
+
+ public TestingStateTransitionManagerContext withDesiredResources() {
+ // having desired resources should also mean that the sufficient
resources are met
+ this.hasSufficientResources = true;
+ this.hasDesiredResources = true;
+
+ return this;
+ }
+
+ public TestingStateTransitionManagerContext withSufficientResources() {
+ this.hasSufficientResources = true;
+ this.hasDesiredResources = false;
+
+ return this;
+ }
+
+ // ///////////////////////////////////////////////
+ // StateTransitionManager.Context interface methods
+ // ///////////////////////////////////////////////
+
+ @Override
+ public boolean hasSufficientResources() {
+ return this.hasSufficientResources;
+ }
+
+ @Override
+ public boolean hasDesiredResources() {
+ return this.hasDesiredResources;
+ }
+
+ @Override
+ public void transitionToSubsequentState() {
+ transitionTriggered.set(true);
+ }
+
+ @Override
+ public ScheduledFuture<?> scheduleOperation(Runnable callback,
Duration delay) {
+ final Instant triggerTime =
+
Objects.requireNonNull(initializationTime).plus(elapsedTime).plus(delay);
+ if (!scheduledTasks.containsKey(triggerTime)) {
+ scheduledTasks.put(triggerTime, new ArrayList<>());
+ }
+ ScheduledTask<Object> scheduledTask =
+ new ScheduledTask<>(Executors.callable(callback),
delay.toMillis());
+ scheduledTasks.get(triggerTime).add(scheduledTask);
+ return scheduledTask;
+ }
+
+ // ///////////////////////////////////////////////
+ // Test instance creation
+ // ///////////////////////////////////////////////
+
+ /**
+ * Creates the {@code DefaultStateTransitionManager} test instance and
advances into a
+ * period in time where the instance is in cooldown phase.
+ */
+ public DefaultStateTransitionManager
createTestInstanceInCooldownPhase() {
+ return createTestInstance(ignored ->
this.transitionIntoCooldownTimeframe());
+ }
+
+ /**
+ * Creates the {@code DefaultStateTransitionManager} test instance and
advances into a
+ * period in time where the instance is in stabilizing phase.
+ */
+ public DefaultStateTransitionManager
createTestInstanceThatPassedCooldownPhase() {
+ return createTestInstance(ignored ->
this.transitionOutOfCooldownPhase());
+ }
+
+ /**
+ * Creates the {@code DefaultStateTransitionManager} test instance and
advances into a
+ * period in time where the instance is in stabilized phase.
+ */
+ public DefaultStateTransitionManager
createTestInstanceInStabilizedPhase() {
+ return createTestInstance(
+ manager -> {
+ manager.onChange();
+ passResourceStabilizationTimeout();
+ });
+ }
+
+ /**
+ * Creates the {@code DefaultStateTransitionManager} test instance in
terminal transitioning
+ * phase.
+ */
+ public DefaultStateTransitionManager
createTestInstanceInTransitioningPhase() {
+ return createTestInstance(
+ manager -> {
+ manager.onChange();
+ passResourceStabilizationTimeout();
+ manager.onTrigger();
+ clearStateTransition();
+ });
+ }
+
+ /**
+ * Initializes the test instance with set stabilization timeout and
sets the context's
+ * elapsed time based on the passed callback.
+ */
+ public DefaultStateTransitionManager createTestInstance(
+ Consumer<DefaultStateTransitionManager> callback) {
+ return createTestInstance(callback,
RESOURCE_STABILIZATION_TIMEOUT);
+ }
+
+ /**
+ * Initializes the test instance without stabilization timeout and
sets the context's
+ * elapsed time based on the passed callback.
+ */
+ public DefaultStateTransitionManager
createTestInstanceWithoutStabilizationTimeout(
+ Consumer<DefaultStateTransitionManager> callback) {
+ return createTestInstance(callback, null);
+ }
+
+ private DefaultStateTransitionManager createTestInstance(
+ Consumer<DefaultStateTransitionManager> callback,
+ @Nullable Duration resourceStabilizationTimeout) {
+ final DefaultStateTransitionManager testInstance =
+ new DefaultStateTransitionManager(
+ initializationTime,
+ // clock that returns the time based on the
configured elapsedTime
+ () ->
Objects.requireNonNull(initializationTime).plus(elapsedTime),
+ this,
+
TestingStateTransitionManagerContext.COOLDOWN_TIMEOUT,
+ resourceStabilizationTimeout,
+
TestingStateTransitionManagerContext.MAX_TRIGGER_DELAY) {
+ @Override
+ public void onChange() {
+ super.onChange();
+
+ // hack to avoid calling this method in every test
method
+ // we want to trigger tasks that are meant to run
right-away
+
TestingStateTransitionManagerContext.this.triggerOutdatedTasks();
+ }
+
+ @Override
+ public void onTrigger() {
+ super.onTrigger();
+
+ // hack to avoid calling this method in every test
method
+ // we want to trigger tasks that are meant to run
right-away
+
TestingStateTransitionManagerContext.this.triggerOutdatedTasks();
+ }
+ };
+
+ callback.accept(testInstance);
+ return testInstance;
+ }
+
+ // ///////////////////////////////////////////////
+ // Time-adjustment functionality
+ // ///////////////////////////////////////////////
+
+ /**
+ * Transitions the context's time to a moment that falls into the test
instance's cooldown
+ * phase.
+ */
+ public void transitionIntoCooldownTimeframe() {
+ setElapsedTime(COOLDOWN_TIMEOUT.dividedBy(2));
+ this.triggerOutdatedTasks();
+ }
+
+ public void transitionOutOfCooldownPhase() {
+ this.setElapsedTime(COOLDOWN_TIMEOUT.plusMillis(1));
+ }
+
+ public void passResourceStabilizationTimeout() {
+ // resource stabilization is based on the current time
+ this.passTime(RESOURCE_STABILIZATION_TIMEOUT.plusMillis(1));
+ }
+
+ public void transitionToInclusiveCooldownEnd() {
+ setElapsedTime(COOLDOWN_TIMEOUT.minusMillis(1));
+ }
+
+ public void passMaxDelayTriggerTimeout() {
+ this.passTime(MAX_TRIGGER_DELAY.plusMillis(1));
+ }
+
+ public void passTime(Duration elapsed) {
+ setElapsedTime(this.elapsedTime.plus(elapsed));
+ }
+
+ public void setElapsedTime(Duration elapsedTime) {
+ Preconditions.checkState(
+ this.elapsedTime.compareTo(elapsedTime) <= 0,
+ "The elapsed time should monotonically increase.");
+ this.elapsedTime = elapsedTime;
+ this.triggerOutdatedTasks();
+ }
+
+ private void triggerOutdatedTasks() {
+ while (!scheduledTasks.isEmpty()) {
+ final Instant timeOfExecution = scheduledTasks.firstKey();
+ if (!timeOfExecution.isAfter(
+
Objects.requireNonNull(initializationTime).plus(elapsedTime))) {
+
scheduledTasks.remove(timeOfExecution).forEach(ScheduledTask::execute);
+ } else {
+ break;
+ }
+ }
+ }
+
+ // ///////////////////////////////////////////////
+ // Methods for verifying the context's state
+ // ///////////////////////////////////////////////
+
+ public boolean stateTransitionWasTriggered() {
+ return transitionTriggered.get();
+ }
+
+ public void clearStateTransition() {
+ transitionTriggered.set(false);
+ }
+ }
+}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
index 48ee70bfd29..b49c1c9b35d 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
@@ -155,7 +155,7 @@ class ExecutingTest {
ctx,
ClassLoader.getSystemClassLoader(),
new ArrayList<>(),
- TestingRescaleManager.Factory.noOpFactory(),
+ TestingStateTransitionManager.Factory.noOpFactory(),
1,
1,
Instant.now());
@@ -183,7 +183,7 @@ class ExecutingTest {
ctx,
ClassLoader.getSystemClassLoader(),
new ArrayList<>(),
-
TestingRescaleManager.Factory.noOpFactory(),
+
TestingStateTransitionManager.Factory.noOpFactory(),
1,
1,
Instant.now());
@@ -195,12 +195,13 @@ class ExecutingTest {
@Test
public void testTriggerRescaleOnCompletedCheckpoint() throws Exception {
final AtomicBoolean rescaleTriggered = new AtomicBoolean();
- final RescaleManager.Factory rescaleManagerFactory =
- new TestingRescaleManager.Factory(() -> {}, () ->
rescaleTriggered.set(true));
+ final StateTransitionManager.Factory stateTransitionManagerFactory =
+ new TestingStateTransitionManager.Factory(
+ () -> {}, () -> rescaleTriggered.set(true));
try (MockExecutingContext ctx = new MockExecutingContext()) {
final Executing testInstance =
new ExecutingStateBuilder()
- .setRescaleManagerFactory(rescaleManagerFactory)
+
.setStateTransitionManagerFactory(stateTransitionManagerFactory)
.build(ctx);
assertThat(rescaleTriggered).isFalse();
@@ -212,13 +213,14 @@ class ExecutingTest {
@Test
public void testTriggerRescaleOnFailedCheckpoint() throws Exception {
final AtomicInteger rescaleTriggerCount = new AtomicInteger();
- final RescaleManager.Factory rescaleManagerFactory =
- new TestingRescaleManager.Factory(() -> {},
rescaleTriggerCount::incrementAndGet);
+ final StateTransitionManager.Factory stateTransitionManagerFactory =
+ new TestingStateTransitionManager.Factory(
+ () -> {}, rescaleTriggerCount::incrementAndGet);
final int rescaleOnFailedCheckpointsCount = 3;
try (MockExecutingContext ctx = new MockExecutingContext()) {
final Executing testInstance =
new ExecutingStateBuilder()
- .setRescaleManagerFactory(rescaleManagerFactory)
+
.setStateTransitionManagerFactory(stateTransitionManagerFactory)
.setRescaleOnFailedCheckpointCount(rescaleOnFailedCheckpointsCount)
.build(ctx);
@@ -254,13 +256,14 @@ class ExecutingTest {
@Test
public void testOnCompletedCheckpointResetsFailedCheckpointCount() throws
Exception {
final AtomicInteger rescaleTriggeredCount = new AtomicInteger();
- final RescaleManager.Factory rescaleManagerFactory =
- new TestingRescaleManager.Factory(() -> {},
rescaleTriggeredCount::incrementAndGet);
+ final StateTransitionManager.Factory stateTransitionManagerFactory =
+ new TestingStateTransitionManager.Factory(
+ () -> {}, rescaleTriggeredCount::incrementAndGet);
final int rescaleOnFailedCheckpointsCount = 3;
try (MockExecutingContext ctx = new MockExecutingContext()) {
final Executing testInstance =
new ExecutingStateBuilder()
- .setRescaleManagerFactory(rescaleManagerFactory)
+
.setStateTransitionManagerFactory(stateTransitionManagerFactory)
.setRescaleOnFailedCheckpointCount(rescaleOnFailedCheckpointsCount)
.build(ctx);
@@ -581,8 +584,8 @@ class ExecutingTest {
final Queue<String> actualEvents = new ArrayDeque<>();
try (MockExecutingContext ctx = new MockExecutingContext()) {
new ExecutingStateBuilder()
- .setRescaleManagerFactory(
- new TestingRescaleManager.Factory(
+ .setStateTransitionManagerFactory(
+ new TestingStateTransitionManager.Factory(
() -> actualEvents.add(onChangeEventLabel),
() ->
actualEvents.add(onTriggerEventLabel)))
.build(ctx);
@@ -606,8 +609,8 @@ class ExecutingTest {
TestingDefaultExecutionGraphBuilder.newBuilder()
.build(EXECUTOR_EXTENSION.getExecutor());
private OperatorCoordinatorHandler operatorCoordinatorHandler;
- private RescaleManager.Factory rescaleManagerFactory =
- TestingRescaleManager.Factory.noOpFactory();
+ private StateTransitionManager.Factory stateTransitionManagerFactory =
+ TestingStateTransitionManager.Factory.noOpFactory();
private int rescaleOnFailedCheckpointCount = 1;
private ExecutingStateBuilder() throws JobException,
JobExecutionException {
@@ -625,9 +628,9 @@ class ExecutingTest {
return this;
}
- public ExecutingStateBuilder setRescaleManagerFactory(
- RescaleManager.Factory rescaleManagerFactory) {
- this.rescaleManagerFactory = rescaleManagerFactory;
+ public ExecutingStateBuilder setStateTransitionManagerFactory(
+ StateTransitionManager.Factory stateTransitionManagerFactory) {
+ this.stateTransitionManagerFactory = stateTransitionManagerFactory;
return this;
}
@@ -649,7 +652,7 @@ class ExecutingTest {
ctx,
ClassLoader.getSystemClassLoader(),
new ArrayList<>(),
- rescaleManagerFactory,
+ stateTransitionManagerFactory,
1,
rescaleOnFailedCheckpointCount,
// will be ignored by the TestingRescaleManager.Factory
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/TestingRescaleManager.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/TestingStateTransitionManager.java
similarity index 70%
rename from
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/TestingRescaleManager.java
rename to
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/TestingStateTransitionManager.java
index cc435ea4f7b..3f4573cd74e 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/TestingRescaleManager.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/TestingStateTransitionManager.java
@@ -20,12 +20,13 @@ package org.apache.flink.runtime.scheduler.adaptive;
import java.time.Instant;
-public class TestingRescaleManager implements RescaleManager {
+/** Testing implementation for {@link StateTransitionManager}. */
+public class TestingStateTransitionManager implements StateTransitionManager {
private final Runnable onChangeRunnable;
private final Runnable onTriggerRunnable;
- private TestingRescaleManager(Runnable onChangeRunnable, Runnable
onTriggerRunnable) {
+ private TestingStateTransitionManager(Runnable onChangeRunnable, Runnable
onTriggerRunnable) {
this.onChangeRunnable = onChangeRunnable;
this.onTriggerRunnable = onTriggerRunnable;
}
@@ -40,12 +41,15 @@ public class TestingRescaleManager implements
RescaleManager {
this.onTriggerRunnable.run();
}
- public static class Factory implements RescaleManager.Factory {
+ /**
+ * {@code Factory} implementation for creating {@code
TestingStateTransitionManager} instances.
+ */
+ public static class Factory implements StateTransitionManager.Factory {
private final Runnable onChangeRunnable;
private final Runnable onTriggerRunnable;
- public static TestingRescaleManager.Factory noOpFactory() {
+ public static TestingStateTransitionManager.Factory noOpFactory() {
return new Factory(() -> {}, () -> {});
}
@@ -55,8 +59,8 @@ public class TestingRescaleManager implements RescaleManager {
}
@Override
- public RescaleManager create(Context ignoredContext, Instant
ignoredLastRescale) {
- return new TestingRescaleManager(onChangeRunnable,
onTriggerRunnable);
+ public StateTransitionManager create(Context ignoredContext, Instant
ignoredLastRescale) {
+ return new TestingStateTransitionManager(onChangeRunnable,
onTriggerRunnable);
}
}
}