This is an automated email from the ASF dual-hosted git repository.
mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 1f303a218f4 [FLINK-36012] [runtime] Integrate StateTransitionManager
into WaitingForResources state
1f303a218f4 is described below
commit 1f303a218f47aa1a8eaf1323fc2a5debe914762d
Author: Zdenek Tison <[email protected]>
AuthorDate: Fri Jul 19 11:00:10 2024 +0200
[FLINK-36012] [runtime] Integrate StateTransitionManager into
WaitingForResources state
---
.../scheduler/adaptive/AdaptiveScheduler.java | 50 +++-
.../adaptive/DefaultStateTransitionManager.java | 88 +++----
.../runtime/scheduler/adaptive/Executing.java | 12 +-
.../scheduler/adaptive/StateTransitionManager.java | 11 -
.../scheduler/adaptive/WaitingForResources.java | 109 ++++-----
.../adaptive/AdaptiveSchedulerBuilder.java | 8 +-
.../scheduler/adaptive/AdaptiveSchedulerTest.java | 193 +++++++++++++---
.../DefaultStateTransitionManagerTest.java | 34 +--
.../runtime/scheduler/adaptive/ExecutingTest.java | 51 +++--
.../adaptive/TestingStateTransitionManager.java | 39 ++--
.../adaptive/WaitingForResourcesTest.java | 255 ++++++++-------------
11 files changed, 445 insertions(+), 405 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index ebcfc369c82..b5b98984a11 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -136,6 +136,8 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.time.Duration;
+import java.time.Instant;
+import java.time.temporal.Temporal;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -185,6 +187,24 @@ public class AdaptiveScheduler
private static final Logger LOG =
LoggerFactory.getLogger(AdaptiveScheduler.class);
+ /**
+ * Named callback interface for creating {@code StateTransitionManager}
instances. This internal
+ * interface allows for easier testing of the parameter injection in a
unit test.
+ *
+ * @see
+ *
DefaultStateTransitionManager#DefaultStateTransitionManager(StateTransitionManager.Context,
+ * Duration, Duration, Duration, Temporal)
+ */
+ @FunctionalInterface
+ interface StateTransitionManagerFactory {
+ StateTransitionManager create(
+ StateTransitionManager.Context context,
+ Duration cooldownTimeout,
+ @Nullable Duration resourceStabilizationTimeout,
+ Duration maximumDelayForTrigger,
+ Temporal lastStateTransition);
+ }
+
/**
* Consolidated settings for the adaptive scheduler. This class is used to
avoid passing around
* multiple config options.
@@ -349,7 +369,7 @@ public class AdaptiveScheduler
}
private final Settings settings;
- private final StateTransitionManager.Factory stateTransitionManagerFactory;
+ private final StateTransitionManagerFactory stateTransitionManagerFactory;
private final JobGraph jobGraph;
@@ -427,7 +447,7 @@ public class AdaptiveScheduler
throws JobExecutionException {
this(
settings,
- DefaultStateTransitionManager.Factory.fromSettings(settings),
+ DefaultStateTransitionManager::new,
(metricGroup, checkpointStatsListener) ->
new DefaultCheckpointStatsTracker(
configuration.get(WebOptions.CHECKPOINTS_HISTORY_SIZE),
@@ -455,7 +475,7 @@ public class AdaptiveScheduler
@VisibleForTesting
AdaptiveScheduler(
Settings settings,
- StateTransitionManager.Factory stateTransitionManagerFactory,
+ StateTransitionManagerFactory stateTransitionManagerFactory,
BiFunction<JobManagerJobMetricGroup, CheckpointStatsListener,
CheckpointStatsTracker>
checkpointStatsTrackerFactory,
JobGraph jobGraph,
@@ -1143,10 +1163,20 @@ public class AdaptiveScheduler
this,
LOG,
settings.getInitialResourceAllocationTimeout(),
- settings.getResourceStabilizationTimeout(),
+ this::createWaitingForResourceStateTransitionManager,
previousExecutionGraph));
}
+ private StateTransitionManager
createWaitingForResourceStateTransitionManager(
+ StateTransitionManager.Context ctx) {
+ return stateTransitionManagerFactory.create(
+ ctx,
+ Duration.ZERO, // skip cooldown phase
+ settings.getResourceStabilizationTimeout(),
+ Duration.ZERO, // trigger immediately once the stabilization
phase is over
+ Instant.now());
+ }
+
private void declareDesiredResources() {
final ResourceCounter newDesiredResources =
calculateDesiredResources();
@@ -1175,11 +1205,21 @@ public class AdaptiveScheduler
this,
userCodeClassLoader,
failureCollection,
- stateTransitionManagerFactory,
+ this::createExecutingStateTransitionManager,
settings.getMinParallelismChangeForDesiredRescale(),
settings.getRescaleOnFailedCheckpointCount()));
}
+ private StateTransitionManager createExecutingStateTransitionManager(
+ StateTransitionManager.Context ctx, Instant lastRescaleTimestamp) {
+ return stateTransitionManagerFactory.create(
+ ctx,
+ settings.getScalingIntervalMin(),
+ settings.getScalingIntervalMax(),
+ settings.getMaximumDelayForTriggeringRescale(),
+ lastRescaleTimestamp);
+ }
+
@Override
public void goToCanceling(
ExecutionGraph executionGraph,
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManager.java
index aa1ea2c965d..df7969572b7 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManager.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManager.java
@@ -69,42 +69,60 @@ public class DefaultStateTransitionManager implements
StateTransitionManager {
private final StateTransitionManager.Context transitionContext;
private Phase phase;
private final List<ScheduledFuture<?>> scheduledFutures;
+ @Nullable private final Duration resourceStabilizationTimeout;
+ private final Duration maxTriggerDelay;
- @VisibleForTesting final Duration cooldownTimeout;
- @Nullable @VisibleForTesting final Duration resourceStabilizationTimeout;
- @VisibleForTesting final Duration maxTriggerDelay;
-
+ /**
+ * Creates a {@code DefaultStateTransitionManager} instance with the given
parameters.
+ *
+ * @param transitionContext The context for the {@code
StateTransitionManager}.
+ * @param cooldownTimeout The timeout for the cooldown phase.
+ * @param resourceStabilizationTimeout The timeout for the resource
stabilization phase.
+ * @param maxTriggerDelay The maximum delay for triggering a {@link
AdaptiveScheduler}'s state
+ * transition if only sufficient resources are available.
+ * @param initializationTime The last state transition timestamp of {@link
AdaptiveScheduler}'s
+ * state machine.
+ */
DefaultStateTransitionManager(
- Temporal initializationTime,
- StateTransitionManager.Context transitionContext,
+ Context transitionContext,
Duration cooldownTimeout,
@Nullable Duration resourceStabilizationTimeout,
- Duration maxTriggerDelay) {
+ Duration maxTriggerDelay,
+ Temporal initializationTime) {
this(
- initializationTime,
Instant::now,
transitionContext,
cooldownTimeout,
resourceStabilizationTimeout,
- maxTriggerDelay);
+ maxTriggerDelay,
+ initializationTime);
}
@VisibleForTesting
DefaultStateTransitionManager(
- Temporal initializationTime,
Supplier<Temporal> clock,
- StateTransitionManager.Context transitionContext,
+ Context transitionContext,
Duration cooldownTimeout,
@Nullable Duration resourceStabilizationTimeout,
- Duration maxTriggerDelay) {
+ Duration maxTriggerDelay,
+ Temporal initializationTime) {
this.clock = clock;
+ Preconditions.checkArgument(
+ !maxTriggerDelay.isNegative(), "Max trigger delay must not be
negative");
this.maxTriggerDelay = maxTriggerDelay;
- this.cooldownTimeout = cooldownTimeout;
+ Preconditions.checkArgument(
+ resourceStabilizationTimeout == null ||
!resourceStabilizationTimeout.isNegative(),
+ "Resource stabilization timeout must not be negative");
this.resourceStabilizationTimeout = resourceStabilizationTimeout;
- this.transitionContext = transitionContext;
+ this.transitionContext = Preconditions.checkNotNull(transitionContext);
this.scheduledFutures = new ArrayList<>();
- this.phase = new Cooldown(initializationTime, clock, this,
cooldownTimeout);
+ this.phase =
+ new Cooldown(
+ Preconditions.checkNotNull(initializationTime),
+ clock,
+ this,
+ Preconditions.checkNotNull(cooldownTimeout));
}
@Override
@@ -176,46 +194,6 @@ public class DefaultStateTransitionManager implements
StateTransitionManager {
}
}
- /** Factory for creating {@link DefaultStateTransitionManager} instances.
*/
- public static class Factory implements StateTransitionManager.Factory {
-
- private final Duration cooldownTimeout;
- @Nullable private final Duration resourceStabilizationTimeout;
- private final Duration maximumDelayForTrigger;
-
- /**
- * Creates a {@code Factory} instance based on the {@link
AdaptiveScheduler}'s {@code
- * Settings} for rescaling.
- */
- public static Factory fromSettings(AdaptiveScheduler.Settings
settings) {
- // it's not ideal that we use a AdaptiveScheduler internal class
here. We might want to
- // change that as part of a more general alignment of the
rescaling configuration.
- return new Factory(
- settings.getScalingIntervalMin(),
- settings.getScalingIntervalMax(),
- settings.getMaximumDelayForTriggeringRescale());
- }
-
- private Factory(
- Duration cooldownTimeout,
- @Nullable Duration resourceStabilizationTimeout,
- Duration maximumDelayForTrigger) {
- this.cooldownTimeout = cooldownTimeout;
- this.resourceStabilizationTimeout = resourceStabilizationTimeout;
- this.maximumDelayForTrigger = maximumDelayForTrigger;
- }
-
- @Override
- public DefaultStateTransitionManager create(Context context, Instant
lastStateTransition) {
- return new DefaultStateTransitionManager(
- lastStateTransition,
- context,
- cooldownTimeout,
- resourceStabilizationTimeout,
- maximumDelayForTrigger);
- }
- }
-
/**
* A phase in the state machine of the {@link
DefaultStateTransitionManager}. Each phase is
* responsible for a specific part of the state transition process.
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
index f7dcb8d97cf..e78bcfac10f 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
@@ -54,6 +54,7 @@ import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiFunction;
import java.util.stream.Collectors;
/** State which represents a running job with an {@link ExecutionGraph} and
assigned slots. */
@@ -77,7 +78,8 @@ class Executing extends StateWithExecutionGraph
Context context,
ClassLoader userCodeClassLoader,
List<ExceptionHistoryEntry> failureCollection,
- StateTransitionManager.Factory stateTransitionManagerFactory,
+ BiFunction<StateTransitionManager.Context, Instant,
StateTransitionManager>
+ stateTransitionManagerFactory,
int minParallelismChangeForRescale,
int rescaleOnFailedCheckpointCount,
Instant lastRescale) {
@@ -96,7 +98,7 @@ class Executing extends StateWithExecutionGraph
this.sufficientResourcesController = new
EnforceParallelismChangeRescalingController();
this.desiredResourcesController =
new
EnforceMinimalIncreaseRescalingController(minParallelismChangeForRescale);
- this.stateTransitionManager =
stateTransitionManagerFactory.create(this, lastRescale);
+ this.stateTransitionManager =
stateTransitionManagerFactory.apply(this, lastRescale);
Preconditions.checkArgument(
rescaleOnFailedCheckpointCount > 0,
@@ -328,7 +330,8 @@ class Executing extends StateWithExecutionGraph
private final OperatorCoordinatorHandler operatorCoordinatorHandler;
private final ClassLoader userCodeClassLoader;
private final List<ExceptionHistoryEntry> failureCollection;
- private final StateTransitionManager.Factory
stateTransitionManagerFactory;
+ private final BiFunction<StateTransitionManager.Context, Instant,
StateTransitionManager>
+ stateTransitionManagerFactory;
private final int minParallelismChangeForRescale;
private final int rescaleOnFailedCheckpointCount;
@@ -340,7 +343,8 @@ class Executing extends StateWithExecutionGraph
Context context,
ClassLoader userCodeClassLoader,
List<ExceptionHistoryEntry> failureCollection,
- StateTransitionManager.Factory stateTransitionManagerFactory,
+ BiFunction<StateTransitionManager.Context, Instant,
StateTransitionManager>
+ stateTransitionManagerFactory,
int minParallelismChangeForRescale,
int rescaleOnFailedCheckpointCount) {
this.context = context;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateTransitionManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateTransitionManager.java
index a6a6aaaca2e..322e8aae373 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateTransitionManager.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateTransitionManager.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.scheduler.adaptive;
import java.time.Duration;
-import java.time.Instant;
import java.util.concurrent.ScheduledFuture;
/**
@@ -70,14 +69,4 @@ public interface StateTransitionManager {
*/
ScheduledFuture<?> scheduleOperation(Runnable callback, Duration
delay);
}
-
- /** Interface for creating {@code StateTransitionManager} instances. */
- interface Factory {
-
- /**
- * Creates a {@code StateTransitionManager} instance for the given
{@code Context} and
- * previous state transition time.
- */
- StateTransitionManager create(Context context, Instant
lastStateTransition);
- }
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java
index d7933984db3..01f084bc093 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java
@@ -20,11 +20,8 @@ package org.apache.flink.runtime.scheduler.adaptive;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.clock.Clock;
-import org.apache.flink.util.clock.SystemClock;
import org.slf4j.Logger;
@@ -32,57 +29,43 @@ import javax.annotation.Nullable;
import java.time.Duration;
import java.util.concurrent.ScheduledFuture;
+import java.util.function.Function;
/**
* State which describes that the scheduler is waiting for resources in order
to execute the job.
*/
-class WaitingForResources extends StateWithoutExecutionGraph implements
ResourceListener {
+class WaitingForResources extends StateWithoutExecutionGraph
+ implements ResourceListener, StateTransitionManager.Context {
private final Context context;
- private final Clock clock;
-
- /** If set, there's an ongoing deadline waiting for a resource
stabilization. */
- @Nullable private Deadline resourceStabilizationDeadline;
-
- private final Duration resourceStabilizationTimeout;
-
@Nullable private ScheduledFuture<?> resourceTimeoutFuture;
@Nullable private final ExecutionGraph previousExecutionGraph;
+ private final StateTransitionManager stateTransitionManager;
+
@VisibleForTesting
WaitingForResources(
Context context,
Logger log,
Duration initialResourceAllocationTimeout,
- Duration resourceStabilizationTimeout) {
- this(
- context,
- log,
- initialResourceAllocationTimeout,
- resourceStabilizationTimeout,
- SystemClock.getInstance(),
- null);
+ Function<StateTransitionManager.Context, StateTransitionManager>
+ stateTransitionManagerFactory) {
+ this(context, log, initialResourceAllocationTimeout, null,
stateTransitionManagerFactory);
}
WaitingForResources(
Context context,
Logger log,
Duration initialResourceAllocationTimeout,
- Duration resourceStabilizationTimeout,
- Clock clock,
- @Nullable ExecutionGraph previousExecutionGraph) {
+ @Nullable ExecutionGraph previousExecutionGraph,
+ Function<StateTransitionManager.Context, StateTransitionManager>
+ stateTransitionManagerFactory) {
super(context, log);
this.context = Preconditions.checkNotNull(context);
- this.resourceStabilizationTimeout =
- Preconditions.checkNotNull(resourceStabilizationTimeout);
- this.clock = clock;
Preconditions.checkNotNull(initialResourceAllocationTimeout);
-
- Preconditions.checkArgument(
- !resourceStabilizationTimeout.isNegative(),
- "Resource stabilization timeout must not be negative");
+ this.stateTransitionManager =
stateTransitionManagerFactory.apply(this);
// since state transitions are not allowed in state constructors,
schedule calls for later.
if (!initialResourceAllocationTimeout.isNegative()) {
@@ -91,7 +74,7 @@ class WaitingForResources extends StateWithoutExecutionGraph
implements Resource
this, this::resourceTimeout,
initialResourceAllocationTimeout);
}
this.previousExecutionGraph = previousExecutionGraph;
- context.runIfState(this,
this::checkDesiredOrSufficientResourcesAvailable, Duration.ZERO);
+ context.runIfState(this, this::checkPotentialStateTransition,
Duration.ZERO);
}
@Override
@@ -99,6 +82,8 @@ class WaitingForResources extends StateWithoutExecutionGraph
implements Resource
if (resourceTimeoutFuture != null) {
resourceTimeoutFuture.cancel(false);
}
+ stateTransitionManager.close();
+ super.onLeave(newState);
}
@Override
@@ -108,51 +93,46 @@ class WaitingForResources extends
StateWithoutExecutionGraph implements Resource
@Override
public void onNewResourcesAvailable() {
- checkDesiredOrSufficientResourcesAvailable();
+ checkPotentialStateTransition();
}
@Override
public void onNewResourceRequirements() {
- checkDesiredOrSufficientResourcesAvailable();
+ checkPotentialStateTransition();
}
- private void checkDesiredOrSufficientResourcesAvailable() {
- if (context.hasDesiredResources()) {
- createExecutionGraphWithAvailableResources();
- return;
- }
-
- if (context.hasSufficientResources()) {
- if (resourceStabilizationDeadline == null) {
- resourceStabilizationDeadline =
-
Deadline.fromNowWithClock(resourceStabilizationTimeout, clock);
- }
- if (resourceStabilizationDeadline.isOverdue()) {
- createExecutionGraphWithAvailableResources();
- } else {
- // schedule next resource check
- context.runIfState(
- this,
- this::checkDesiredOrSufficientResourcesAvailable,
- resourceStabilizationDeadline.timeLeft());
- }
- } else {
- // clear deadline due to insufficient resources
- resourceStabilizationDeadline = null;
- }
+ private void checkPotentialStateTransition() {
+ stateTransitionManager.onChange();
+ stateTransitionManager.onTrigger();
}
private void resourceTimeout() {
getLogger()
.debug(
"Initial resource allocation timeout triggered:
Creating ExecutionGraph with available resources.");
- createExecutionGraphWithAvailableResources();
+ transitionToSubsequentState();
+ }
+
+ @Override
+ public boolean hasSufficientResources() {
+ return context.hasSufficientResources();
+ }
+
+ @Override
+ public boolean hasDesiredResources() {
+ return context.hasDesiredResources();
}
- private void createExecutionGraphWithAvailableResources() {
+ @Override
+ public void transitionToSubsequentState() {
context.goToCreatingExecutionGraph(previousExecutionGraph);
}
+ @Override
+ public ScheduledFuture<?> scheduleOperation(Runnable callback, Duration
delay) {
+ return context.runIfState(this, callback, delay);
+ }
+
/** Context of the {@link WaitingForResources} state. */
interface Context
extends StateWithoutExecutionGraph.Context,
StateTransitions.ToCreatingExecutionGraph {
@@ -188,20 +168,22 @@ class WaitingForResources extends
StateWithoutExecutionGraph implements Resource
private final Context context;
private final Logger log;
private final Duration initialResourceAllocationTimeout;
- private final Duration resourceStabilizationTimeout;
@Nullable private final ExecutionGraph previousExecutionGraph;
+ private final Function<StateTransitionManager.Context,
StateTransitionManager>
+ stateTransitionManagerFactory;
public Factory(
Context context,
Logger log,
Duration initialResourceAllocationTimeout,
- Duration resourceStabilizationTimeout,
+ Function<StateTransitionManager.Context,
StateTransitionManager>
+ stateTransitionManagerFactory,
@Nullable ExecutionGraph previousExecutionGraph) {
this.context = context;
this.log = log;
this.initialResourceAllocationTimeout =
initialResourceAllocationTimeout;
- this.resourceStabilizationTimeout = resourceStabilizationTimeout;
this.previousExecutionGraph = previousExecutionGraph;
+ this.stateTransitionManagerFactory = stateTransitionManagerFactory;
}
public Class<WaitingForResources> getStateClass() {
@@ -213,9 +195,8 @@ class WaitingForResources extends
StateWithoutExecutionGraph implements Resource
context,
log,
initialResourceAllocationTimeout,
- resourceStabilizationTimeout,
- SystemClock.getInstance(),
- previousExecutionGraph);
+ previousExecutionGraph,
+ stateTransitionManagerFactory);
}
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
index 761dac96155..f35683c3f7b 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
@@ -98,7 +98,8 @@ public class AdaptiveSchedulerBuilder {
/**
* {@code null} indicates that the default factory will be used based on
the set configuration.
*/
- @Nullable private StateTransitionManager.Factory
stateTransitionManagerFactory = null;
+ @Nullable
+ private AdaptiveScheduler.StateTransitionManagerFactory
stateTransitionManagerFactory = null;
private BiFunction<JobManagerJobMetricGroup, CheckpointStatsListener,
CheckpointStatsTracker>
checkpointStatsTrackerFactory =
@@ -226,7 +227,8 @@ public class AdaptiveSchedulerBuilder {
}
public AdaptiveSchedulerBuilder setStateTransitionManagerFactory(
- @Nullable StateTransitionManager.Factory
stateTransitionManagerFactory) {
+ @Nullable
+ AdaptiveScheduler.StateTransitionManagerFactory
stateTransitionManagerFactory) {
this.stateTransitionManagerFactory = stateTransitionManagerFactory;
return this;
}
@@ -261,7 +263,7 @@ public class AdaptiveSchedulerBuilder {
return new AdaptiveScheduler(
settings,
stateTransitionManagerFactory == null
- ?
DefaultStateTransitionManager.Factory.fromSettings(settings)
+ ? DefaultStateTransitionManager::new
: stateTransitionManagerFactory,
checkpointStatsTrackerFactory,
jobGraph,
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
index 5bc3d859c48..88a0719bbbf 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.core.failure.TestingFailureEnricher;
import org.apache.flink.core.testutils.FlinkAssertions;
+import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
@@ -130,6 +131,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.time.Duration;
+import java.time.temporal.Temporal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -149,6 +151,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
+import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -2247,10 +2250,126 @@ public class AdaptiveSchedulerTest {
assertThat(eventQueue.take()).as("Only one event should have been
observed.").isEqualTo(1);
}
+ @Test
+ void testGoToWaitingForResourcesConfiguresStateTransitionManagerFactory()
throws Exception {
+ final OneShotLatch latch = new OneShotLatch();
+ final TestingStateTransitionManagerFactory factory =
+ new TestingStateTransitionManagerFactory(
+ ctx ->
+
TestingStateTransitionManager.withOnChangeEventOnly(
+ () -> {
+ if (ctx instanceof
WaitingForResources) {
+ latch.trigger();
+ }
+ }));
+
+ final Configuration configuration = new Configuration();
+ final Duration resourceStabilizationTimeout = Duration.ofMillis(10L);
+ configuration.set(
+ JobManagerOptions.RESOURCE_STABILIZATION_TIMEOUT,
resourceStabilizationTimeout);
+
+ scheduler =
+ new AdaptiveSchedulerBuilder(
+ createJobGraph(),
+ singleThreadMainThreadExecutor,
+ EXECUTOR_RESOURCE.getExecutor())
+ .setStateTransitionManagerFactory(factory)
+ .setJobMasterConfiguration(configuration)
+ .build();
+
+ // start scheduling to reach Executing state
+ singleThreadMainThreadExecutor.execute(scheduler::startScheduling);
+
+ // let's wait for the onChange event in Executing state.
+ latch.await();
+
+
assertThat(scheduler.getState()).isInstanceOf(WaitingForResources.class);
+ assertThat(factory.cooldownTimeout).isEqualTo(Duration.ZERO);
+ assertThat(factory.maximumDelayForTrigger).isEqualTo(Duration.ZERO);
+
assertThat(factory.resourceStabilizationTimeout).isEqualTo(resourceStabilizationTimeout);
+ }
+
+ @Test
+ void testGoToExecutingConfiguresStateTransitionManagerFactory() throws
Exception {
+ final OneShotLatch latch = new OneShotLatch();
+ final TestingStateTransitionManagerFactory factory =
+ new TestingStateTransitionManagerFactory(
+ ctx ->
+
TestingStateTransitionManager.withOnChangeEventOnly(
+ () -> {
+ if (ctx instanceof
WaitingForResources) {
+
ctx.transitionToSubsequentState();
+ }
+ if (ctx instanceof Executing) {
+ latch.trigger();
+ }
+ }));
+
+ final Configuration configuration = new Configuration();
+ final Duration scalingIntervalMin = Duration.ofMillis(1L);
+ final Duration scalingIntervalMax = Duration.ofMillis(5L);
+ final Duration maxDelayForTrigger = Duration.ofMillis(10L);
+
+ configuration.set(JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MIN,
scalingIntervalMin);
+ configuration.set(JobManagerOptions.MAXIMUM_DELAY_FOR_SCALE_TRIGGER,
maxDelayForTrigger);
+ configuration.set(JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX,
scalingIntervalMax);
+
+ scheduler =
+ new AdaptiveSchedulerBuilder(
+ createJobGraph(),
+ singleThreadMainThreadExecutor,
+ EXECUTOR_RESOURCE.getExecutor())
+ .setJobMasterConfiguration(configuration)
+
.setDeclarativeSlotPool(getSlotPoolWithFreeSlots(PARALLELISM))
+ .setStateTransitionManagerFactory(factory)
+ .build();
+
+ // start scheduling to reach Executing state
+ singleThreadMainThreadExecutor.execute(scheduler::startScheduling);
+
+ // let's wait for the onChange event in Executing state.
+ latch.await();
+
+ assertThat(scheduler.getState()).isInstanceOf(Executing.class);
+ assertThat(factory.cooldownTimeout).isEqualTo(scalingIntervalMin);
+
assertThat(factory.maximumDelayForTrigger).isEqualTo(maxDelayForTrigger);
+
assertThat(factory.resourceStabilizationTimeout).isEqualTo(scalingIntervalMax);
+ }
+
//
---------------------------------------------------------------------------------------------
// Utils
//
---------------------------------------------------------------------------------------------
+ private static class TestingStateTransitionManagerFactory
+ implements AdaptiveScheduler.StateTransitionManagerFactory {
+
+ private final Function<StateTransitionManager.Context,
StateTransitionManager>
+ stateTransitionManagerCreator;
+
+ private Duration cooldownTimeout;
+ private Duration resourceStabilizationTimeout;
+ private Duration maximumDelayForTrigger;
+
+ public TestingStateTransitionManagerFactory(
+ Function<StateTransitionManager.Context,
StateTransitionManager>
+ stateTransitionManagerCreator) {
+ this.stateTransitionManagerCreator = stateTransitionManagerCreator;
+ }
+
+ public StateTransitionManager create(
+ StateTransitionManager.Context context,
+ Duration cooldownTimeout,
+ @Nullable Duration resourceStabilizationTimeout,
+ Duration maximumDelayForTrigger,
+ Temporal ignoredTimestamp) {
+ this.cooldownTimeout = cooldownTimeout;
+ this.resourceStabilizationTimeout = resourceStabilizationTimeout;
+ this.maximumDelayForTrigger = maximumDelayForTrigger;
+
+ return stateTransitionManagerCreator.apply(context);
+ }
+ }
+
private AdaptiveScheduler createSchedulerThatReachesExecutingState(
int parallelism,
int onFailedCheckpointCount,
@@ -2274,30 +2393,7 @@ public class AdaptiveSchedulerTest {
.build();
SchedulerTestingUtils.enableCheckpointing(jobGraph);
- // testing SlotPool instance that would allow for the scheduler to
transition to Executing
- // state
- final DeclarativeSlotPool slotPool =
- new TestingDeclarativeSlotPoolBuilder()
- .setContainsFreeSlotFunction(allocationID -> true)
- .setReserveFreeSlotFunction(
- (allocationId, resourceProfile) ->
- TestingPhysicalSlot.builder()
- .withAllocationID(allocationId)
- .build())
- .setGetFreeSlotTrackerSupplier(
- () ->
- TestingFreeSlotTracker.newBuilder()
-
.setGetFreeSlotsInformationSupplier(
- () ->
-
IntStream.range(0, parallelism)
-
.mapToObj(
-
v ->
-
new TestingSlot())
-
.collect(
-
Collectors.toSet()))
- .build())
- .build();
-
+ final DeclarativeSlotPool slotPool =
getSlotPoolWithFreeSlots(parallelism);
final AtomicInteger eventCounter = new AtomicInteger();
scheduler =
new AdaptiveSchedulerBuilder(
@@ -2307,14 +2403,23 @@ public class AdaptiveSchedulerTest {
.setJobMasterConfiguration(config)
.setDeclarativeSlotPool(slotPool)
.setStateTransitionManagerFactory(
- new TestingStateTransitionManager.Factory(
- () -> {},
- () -> {
- singleThreadMainThreadExecutor
-
.assertRunningInMainThread();
-
-
eventQueue.offer(eventCounter.getAndIncrement());
- }))
+ (context,
+ ignoredCooldown,
+ ignoredResourceStabilizationTimeout,
+ ignoredMaxTriggerDelay,
+ ignoredTimestamp) ->
+
TestingStateTransitionManager.withOnTriggerEventOnly(
+ () -> {
+
singleThreadMainThreadExecutor
+
.assertRunningInMainThread();
+
+ if (context instanceof
WaitingForResources) {
+
context.transitionToSubsequentState();
+ } else if (context
instanceof Executing) {
+ eventQueue.offer(
+
eventCounter.getAndIncrement());
+ }
+ }))
.setCheckpointStatsTrackerFactory(
(metricGroup, listener) -> {
assertThat(statsListenerInstantiatedFuture)
@@ -2366,6 +2471,30 @@ public class AdaptiveSchedulerTest {
forMainThread());
}
+ /**
+ * Creates a testing SlotPool instance that would allow for the scheduler
to transition to
+ * Executing state.
+ */
+ private static DeclarativeSlotPool getSlotPoolWithFreeSlots(int freeSlots)
{
+ return new TestingDeclarativeSlotPoolBuilder()
+ .setContainsFreeSlotFunction(allocationID -> true)
+ .setReserveFreeSlotFunction(
+ (allocationId, resourceProfile) ->
+ TestingPhysicalSlot.builder()
+ .withAllocationID(allocationId)
+ .build())
+ .setGetFreeSlotTrackerSupplier(
+ () ->
+ TestingFreeSlotTracker.newBuilder()
+ .setGetFreeSlotsInformationSupplier(
+ () ->
+ IntStream.range(0,
freeSlots)
+ .mapToObj(v ->
new TestingSlot())
+
.collect(Collectors.toSet()))
+ .build())
+ .build();
+ }
+
private static JobGraph createJobGraph() {
return streamingJobGraph(JOB_VERTEX);
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManagerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManagerTest.java
index ca90aa2a1c5..1fd9e5fd0fa 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManagerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManagerTest.java
@@ -18,11 +18,8 @@
package org.apache.flink.runtime.scheduler.adaptive;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.core.testutils.ScheduledTask;
import
org.apache.flink.runtime.scheduler.adaptive.DefaultStateTransitionManager.Idling;
-import org.apache.flink.util.ConfigurationException;
import org.apache.flink.util.Preconditions;
import org.junit.jupiter.api.Test;
@@ -50,31 +47,6 @@ import static org.assertj.core.api.Assertions.assertThat;
class DefaultStateTransitionManagerTest {
- @Test
- void testProperConfiguration() throws ConfigurationException {
- final Duration cooldownTimeout = Duration.ofMillis(1337);
- final Duration resourceStabilizationTimeout = Duration.ofMillis(7331);
- final Duration maximumDelayForRescaleTrigger = Duration.ofMillis(4242);
-
- final Configuration configuration = new Configuration();
- configuration.set(JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MIN,
cooldownTimeout);
- configuration.set(
- JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX,
resourceStabilizationTimeout);
- configuration.set(
- JobManagerOptions.MAXIMUM_DELAY_FOR_SCALE_TRIGGER,
maximumDelayForRescaleTrigger);
-
- final DefaultStateTransitionManager testInstance =
- DefaultStateTransitionManager.Factory.fromSettings(
- AdaptiveScheduler.Settings.of(configuration))
- .create(
-
TestingStateTransitionManagerContext.stableContext(),
- Instant.now());
- assertThat(testInstance.cooldownTimeout).isEqualTo(cooldownTimeout);
- assertThat(testInstance.resourceStabilizationTimeout)
- .isEqualTo(resourceStabilizationTimeout);
-
assertThat(testInstance.maxTriggerDelay).isEqualTo(maximumDelayForRescaleTrigger);
- }
-
@Test
void testTriggerWithoutChangeEventNoopInCooldownPhase() {
final TestingStateTransitionManagerContext ctx =
@@ -623,13 +595,13 @@ class DefaultStateTransitionManagerTest {
@Nullable Duration resourceStabilizationTimeout) {
final DefaultStateTransitionManager testInstance =
new DefaultStateTransitionManager(
- initializationTime,
// clock that returns the time based on the
configured elapsedTime
() ->
Objects.requireNonNull(initializationTime).plus(elapsedTime),
this,
-
TestingStateTransitionManagerContext.COOLDOWN_TIMEOUT,
+ COOLDOWN_TIMEOUT,
resourceStabilizationTimeout,
-
TestingStateTransitionManagerContext.MAX_TRIGGER_DELAY) {
+ MAX_TRIGGER_DELAY,
+ initializationTime) {
@Override
public void onChange() {
super.onChange();
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
index b49c1c9b35d..0fc8f9d2073 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
@@ -99,6 +99,7 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
@@ -155,7 +156,7 @@ class ExecutingTest {
ctx,
ClassLoader.getSystemClassLoader(),
new ArrayList<>(),
- TestingStateTransitionManager.Factory.noOpFactory(),
+ (context, ts) -> TestingStateTransitionManager.withNoOp(),
1,
1,
Instant.now());
@@ -183,7 +184,7 @@ class ExecutingTest {
ctx,
ClassLoader.getSystemClassLoader(),
new ArrayList<>(),
-
TestingStateTransitionManager.Factory.noOpFactory(),
+ (context, ts) ->
TestingStateTransitionManager.withNoOp(),
1,
1,
Instant.now());
@@ -195,9 +196,12 @@ class ExecutingTest {
@Test
public void testTriggerRescaleOnCompletedCheckpoint() throws Exception {
final AtomicBoolean rescaleTriggered = new AtomicBoolean();
- final StateTransitionManager.Factory stateTransitionManagerFactory =
- new TestingStateTransitionManager.Factory(
- () -> {}, () -> rescaleTriggered.set(true));
+ final BiFunction<StateTransitionManager.Context, Instant,
StateTransitionManager>
+ stateTransitionManagerFactory =
+ (context, ts) ->
+
TestingStateTransitionManager.withOnTriggerEventOnly(
+ () -> rescaleTriggered.set(true));
+
try (MockExecutingContext ctx = new MockExecutingContext()) {
final Executing testInstance =
new ExecutingStateBuilder()
@@ -213,9 +217,12 @@ class ExecutingTest {
@Test
public void testTriggerRescaleOnFailedCheckpoint() throws Exception {
final AtomicInteger rescaleTriggerCount = new AtomicInteger();
- final StateTransitionManager.Factory stateTransitionManagerFactory =
- new TestingStateTransitionManager.Factory(
- () -> {}, rescaleTriggerCount::incrementAndGet);
+ final BiFunction<StateTransitionManager.Context, Instant,
StateTransitionManager>
+ stateTransitionManagerFactory =
+ (context, ts) ->
+
TestingStateTransitionManager.withOnTriggerEventOnly(
+ rescaleTriggerCount::incrementAndGet);
+
final int rescaleOnFailedCheckpointsCount = 3;
try (MockExecutingContext ctx = new MockExecutingContext()) {
final Executing testInstance =
@@ -256,9 +263,12 @@ class ExecutingTest {
@Test
public void testOnCompletedCheckpointResetsFailedCheckpointCount() throws
Exception {
final AtomicInteger rescaleTriggeredCount = new AtomicInteger();
- final StateTransitionManager.Factory stateTransitionManagerFactory =
- new TestingStateTransitionManager.Factory(
- () -> {}, rescaleTriggeredCount::incrementAndGet);
+ final BiFunction<StateTransitionManager.Context, Instant,
StateTransitionManager>
+ stateTransitionManagerFactory =
+ (context, ts) ->
+
TestingStateTransitionManager.withOnTriggerEventOnly(
+
rescaleTriggeredCount::incrementAndGet);
+
final int rescaleOnFailedCheckpointsCount = 3;
try (MockExecutingContext ctx = new MockExecutingContext()) {
final Executing testInstance =
@@ -585,9 +595,10 @@ class ExecutingTest {
try (MockExecutingContext ctx = new MockExecutingContext()) {
new ExecutingStateBuilder()
.setStateTransitionManagerFactory(
- new TestingStateTransitionManager.Factory(
- () -> actualEvents.add(onChangeEventLabel),
- () ->
actualEvents.add(onTriggerEventLabel)))
+ (context, ts) ->
+ new TestingStateTransitionManager(
+ () ->
actualEvents.add(onChangeEventLabel),
+ () ->
actualEvents.add(onTriggerEventLabel)))
.build(ctx);
ctx.triggerExecutors();
@@ -609,8 +620,9 @@ class ExecutingTest {
TestingDefaultExecutionGraphBuilder.newBuilder()
.build(EXECUTOR_EXTENSION.getExecutor());
private OperatorCoordinatorHandler operatorCoordinatorHandler;
- private StateTransitionManager.Factory stateTransitionManagerFactory =
- TestingStateTransitionManager.Factory.noOpFactory();
+ private BiFunction<StateTransitionManager.Context, Instant,
StateTransitionManager>
+ stateTransitionManagerFactory =
+ (context, ts) ->
TestingStateTransitionManager.withNoOp();
private int rescaleOnFailedCheckpointCount = 1;
private ExecutingStateBuilder() throws JobException,
JobExecutionException {
@@ -629,7 +641,8 @@ class ExecutingTest {
}
public ExecutingStateBuilder setStateTransitionManagerFactory(
- StateTransitionManager.Factory stateTransitionManagerFactory) {
+ BiFunction<StateTransitionManager.Context, Instant,
StateTransitionManager>
+ stateTransitionManagerFactory) {
this.stateTransitionManagerFactory = stateTransitionManagerFactory;
return this;
}
@@ -652,10 +665,10 @@ class ExecutingTest {
ctx,
ClassLoader.getSystemClassLoader(),
new ArrayList<>(),
- stateTransitionManagerFactory,
+ stateTransitionManagerFactory::apply,
1,
rescaleOnFailedCheckpointCount,
- // will be ignored by the TestingRescaleManager.Factory
+ // will be ignored by the
TestingStateTransitionManager.Factory
Instant.now());
} finally {
Preconditions.checkState(
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/TestingStateTransitionManager.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/TestingStateTransitionManager.java
index 3f4573cd74e..ee3b35dda2b 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/TestingStateTransitionManager.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/TestingStateTransitionManager.java
@@ -18,15 +18,25 @@
package org.apache.flink.runtime.scheduler.adaptive;
-import java.time.Instant;
-
/** Testing implementation for {@link StateTransitionManager}. */
public class TestingStateTransitionManager implements StateTransitionManager {
private final Runnable onChangeRunnable;
private final Runnable onTriggerRunnable;
- private TestingStateTransitionManager(Runnable onChangeRunnable, Runnable
onTriggerRunnable) {
+ public static TestingStateTransitionManager withNoOp() {
+ return withOnTriggerEventOnly(() -> {});
+ }
+
+ public static TestingStateTransitionManager withOnChangeEventOnly(Runnable
onChangeCallback) {
+ return new TestingStateTransitionManager(onChangeCallback, () -> {});
+ }
+
+ public static TestingStateTransitionManager
withOnTriggerEventOnly(Runnable onTriggerCallback) {
+ return new TestingStateTransitionManager(() -> {}, onTriggerCallback);
+ }
+
+ public TestingStateTransitionManager(Runnable onChangeRunnable, Runnable
onTriggerRunnable) {
this.onChangeRunnable = onChangeRunnable;
this.onTriggerRunnable = onTriggerRunnable;
}
@@ -40,27 +50,4 @@ public class TestingStateTransitionManager implements
StateTransitionManager {
public void onTrigger() {
this.onTriggerRunnable.run();
}
-
- /**
- * {@code Factory} implementation for creating {@code
TestingStateTransitionManager} instances.
- */
- public static class Factory implements StateTransitionManager.Factory {
-
- private final Runnable onChangeRunnable;
- private final Runnable onTriggerRunnable;
-
- public static TestingStateTransitionManager.Factory noOpFactory() {
- return new Factory(() -> {}, () -> {});
- }
-
- public Factory(Runnable onChangeRunnable, Runnable onTriggerRunnable) {
- this.onChangeRunnable = onChangeRunnable;
- this.onTriggerRunnable = onTriggerRunnable;
- }
-
- @Override
- public StateTransitionManager create(Context ignoredContext, Instant
ignoredLastRescale) {
- return new TestingStateTransitionManager(onChangeRunnable,
onTriggerRunnable);
- }
- }
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java
index a4a0dbec38a..a7b40cc50bf 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.scheduler.adaptive;
import org.apache.flink.core.testutils.ScheduledTask;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.ManualClock;
import org.junit.jupiter.api.Test;
@@ -38,7 +37,9 @@ import java.util.Queue;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
+import java.util.function.Function;
import java.util.function.Supplier;
import static org.assertj.core.api.Assertions.assertThat;
@@ -49,162 +50,154 @@ class WaitingForResourcesTest {
private static final Logger LOG =
LoggerFactory.getLogger(WaitingForResourcesTest.class);
- private static final Duration STABILIZATION_TIMEOUT =
Duration.ofSeconds(1);
+ private static final Duration DISABLED_RESOURCE_WAIT_TIMEOUT =
Duration.ofSeconds(-1);
@RegisterExtension private MockContext ctx = new MockContext();
/** WaitingForResources is transitioning to Executing if there are enough
resources. */
@Test
void testTransitionToCreatingExecutionGraph() {
- ctx.setHasDesiredResources(() -> true);
+ final AtomicBoolean onTriggerCalled = new AtomicBoolean();
+ final Function<StateTransitionManager.Context, StateTransitionManager>
+ stateTransitionManagerFactory =
+ context ->
+ new TestingStateTransitionManager(
+ () -> {
+
assertThat(context.hasDesiredResources()).isTrue();
+
assertThat(context.hasSufficientResources()).isTrue();
+
+
context.transitionToSubsequentState();
+ },
+ () -> onTriggerCalled.set(true));
+ ctx.setHasDesiredResources(() -> true);
+ ctx.setHasSufficientResources(() -> true);
ctx.setExpectCreatingExecutionGraph();
- new WaitingForResources(ctx, LOG, Duration.ZERO,
STABILIZATION_TIMEOUT);
+ new WaitingForResources(
+ ctx, LOG, DISABLED_RESOURCE_WAIT_TIMEOUT,
stateTransitionManagerFactory);
ctx.runScheduledTasks();
+
+ assertThat(onTriggerCalled.get()).isTrue();
}
@Test
void testNotEnoughResources() {
+ final AtomicBoolean onChangeCalled = new AtomicBoolean();
+ final AtomicBoolean onTriggerCalled = new AtomicBoolean();
+ final Function<StateTransitionManager.Context, StateTransitionManager>
+ stateTransitionManagerFactory =
+ context ->
+ new TestingStateTransitionManager(
+ () -> {
+ onChangeCalled.set(true);
+
+
assertThat(context.hasDesiredResources()).isFalse();
+
assertThat(context.hasSufficientResources()).isFalse();
+ },
+ () -> onTriggerCalled.set(true));
+
ctx.setHasDesiredResources(() -> false);
+ ctx.setHasSufficientResources(() -> false);
WaitingForResources wfr =
- new WaitingForResources(ctx, LOG, Duration.ZERO,
STABILIZATION_TIMEOUT);
+ new WaitingForResources(
+ ctx, LOG, DISABLED_RESOURCE_WAIT_TIMEOUT,
stateTransitionManagerFactory);
+ ctx.runScheduledTasks();
// we expect no state transition.
- wfr.onNewResourcesAvailable();
+ assertThat(ctx.hasStateTransition()).isFalse();
+ assertThat(onChangeCalled.get()).isTrue();
+ assertThat(onTriggerCalled.get()).isTrue();
}
@Test
void testNotifyNewResourcesAvailable() {
- ctx.setHasDesiredResources(() -> false); // initially, not enough
resources
- WaitingForResources wfr =
- new WaitingForResources(ctx, LOG, Duration.ZERO,
STABILIZATION_TIMEOUT);
- ctx.setHasDesiredResources(() -> true); // make resources available
- ctx.setExpectCreatingExecutionGraph();
- wfr.onNewResourcesAvailable(); // .. and notify
- }
+ final AtomicInteger callsCounter = new AtomicInteger();
+ final Function<StateTransitionManager.Context, StateTransitionManager>
+ stateTransitionManagerFactory =
+ context ->
+
TestingStateTransitionManager.withOnChangeEventOnly(
+ () -> {
+ if (callsCounter.incrementAndGet()
== 0) {
+ // initially, not enough
resources
+
assertThat(context.hasDesiredResources()).isFalse();
+
assertThat(context.hasSufficientResources())
+ .isFalse();
+ }
+
+ if (context.hasDesiredResources()
+ &&
context.hasSufficientResources()) {
+
context.transitionToSubsequentState();
+ }
+ });
+
+ // initially, not enough resources
+ ctx.setHasDesiredResources(() -> false);
+ ctx.setHasSufficientResources(() -> false);
- @Test
- void testSchedulingWithSufficientResourcesAndNoStabilizationTimeout() {
- Duration noStabilizationTimeout = Duration.ofMillis(0);
WaitingForResources wfr =
- new WaitingForResources(ctx, LOG, Duration.ofSeconds(1000),
noStabilizationTimeout);
+ new WaitingForResources(
+ ctx, LOG, DISABLED_RESOURCE_WAIT_TIMEOUT,
stateTransitionManagerFactory);
+ ctx.runScheduledTasks();
- ctx.setHasDesiredResources(() -> false);
+ // make resources available
+ ctx.setHasDesiredResources(() -> true);
ctx.setHasSufficientResources(() -> true);
ctx.setExpectCreatingExecutionGraph();
- wfr.onNewResourcesAvailable();
+ wfr.onNewResourcesAvailable(); // .. and notify
}
@Test
- void testNoSchedulingIfStabilizationTimeoutIsConfigured() {
- Duration stabilizationTimeout = Duration.ofMillis(50000);
-
+ void testSchedulingWithSufficientResources() {
+ final Function<StateTransitionManager.Context, StateTransitionManager>
+ stateTransitionManagerFactory =
+ context ->
+
TestingStateTransitionManager.withOnChangeEventOnly(
+ () -> {
+
assertThat(context.hasDesiredResources()).isFalse();
+ if
(context.hasSufficientResources()) {
+
context.transitionToSubsequentState();
+ }
+ });
WaitingForResources wfr =
- new WaitingForResources(ctx, LOG, Duration.ofSeconds(1000),
stabilizationTimeout);
-
- ctx.setHasDesiredResources(() -> false);
- ctx.setHasSufficientResources(() -> true);
- wfr.onNewResourcesAvailable();
- // we are not triggering the scheduled tasks, to simulate a long
stabilization timeout
-
+ new WaitingForResources(
+ ctx, LOG, DISABLED_RESOURCE_WAIT_TIMEOUT,
stateTransitionManagerFactory);
+ ctx.runScheduledTasks();
+ // we expect no state transition.
assertThat(ctx.hasStateTransition()).isFalse();
- }
- @Test
- void testSchedulingWithSufficientResourcesAfterStabilizationTimeout() {
- Duration initialResourceTimeout = Duration.ofMillis(-1);
- Duration stabilizationTimeout = Duration.ofMillis(50_000L);
-
- WaitingForResources wfr =
- new WaitingForResources(
- ctx,
- LOG,
- initialResourceTimeout,
- stabilizationTimeout,
- ctx.getClock(),
- null);
- // sufficient resources available
ctx.setHasDesiredResources(() -> false);
ctx.setHasSufficientResources(() -> true);
-
- // notify about sufficient resources
- wfr.onNewResourcesAvailable();
-
ctx.setExpectCreatingExecutionGraph();
-
- // execute all runnables and trigger expected state transition
- final Duration afterStabilizationTimeout =
stabilizationTimeout.plusMillis(1);
- ctx.advanceTimeByMillis(afterStabilizationTimeout.toMillis());
-
- ctx.runScheduledTasks(afterStabilizationTimeout.toMillis());
-
- assertThat(ctx.hasStateTransition()).isTrue();
+ wfr.onNewResourcesAvailable();
}
@Test
- void testStabilizationTimeoutReset() {
- Duration initialResourceTimeout = Duration.ofMillis(-1);
- Duration stabilizationTimeout = Duration.ofMillis(50L);
+ void testNoStateTransitionOnNoResourceTimeout() {
+ ctx.setHasDesiredResources(() -> false);
+ ctx.setHasSufficientResources(() -> false);
WaitingForResources wfr =
new WaitingForResources(
ctx,
LOG,
- initialResourceTimeout,
- stabilizationTimeout,
- ctx.getClock(),
- null);
-
- ctx.setHasDesiredResources(() -> false);
-
- // notify about resources, trigger stabilization timeout
- ctx.setHasSufficientResources(() -> true);
- ctx.advanceTimeByMillis(40); // advance time, but don't trigger
stabilizationTimeout
- wfr.onNewResourcesAvailable();
-
- // notify again, but insufficient (reset stabilization timeout)
- ctx.setHasSufficientResources(() -> false);
- ctx.advanceTimeByMillis(40);
- wfr.onNewResourcesAvailable();
-
- // notify again, but sufficient, trigger timeout
- ctx.setHasSufficientResources(() -> true);
- ctx.advanceTimeByMillis(40);
- wfr.onNewResourcesAvailable();
-
- // sanity check: no state transition has been triggered so far
- assertThat(ctx.hasStateTransition()).isFalse();
- assertThat(ctx.getTestDuration()).isGreaterThan(stabilizationTimeout);
-
- ctx.setExpectCreatingExecutionGraph();
-
- ctx.advanceTimeByMillis(1);
- assertThat(ctx.hasStateTransition()).isFalse();
-
- ctx.advanceTimeByMillis(stabilizationTimeout.toMillis());
- assertThat(ctx.hasStateTransition()).isTrue();
- }
-
- @Test
- void testNoStateTransitionOnNoResourceTimeout() {
- ctx.setHasDesiredResources(() -> false);
- WaitingForResources wfr =
- new WaitingForResources(ctx, LOG, Duration.ofMillis(-1),
STABILIZATION_TIMEOUT);
-
+ DISABLED_RESOURCE_WAIT_TIMEOUT,
+ context -> TestingStateTransitionManager.withNoOp());
ctx.runScheduledTasks();
+
assertThat(ctx.hasStateTransition()).isFalse();
}
@Test
void testStateTransitionOnResourceTimeout() {
- ctx.setHasDesiredResources(() -> false);
WaitingForResources wfr =
- new WaitingForResources(ctx, LOG, Duration.ZERO,
STABILIZATION_TIMEOUT);
-
+ new WaitingForResources(
+ ctx,
+ LOG,
+ Duration.ZERO,
+ context -> TestingStateTransitionManager.withNoOp());
ctx.setExpectCreatingExecutionGraph();
-
ctx.runScheduledTasks();
}
@@ -301,10 +294,7 @@ class WaitingForResourcesTest {
new PriorityQueue<>(
Comparator.comparingLong(o ->
o.getDelay(TimeUnit.MILLISECONDS)));
- private final ManualTestTime testTime =
- new ManualTestTime(
- (durationSinceTestStart) ->
-
runScheduledTasks(durationSinceTestStart.toMillis()));
+ private final ManualClock testingClock = new ManualClock();
public void setHasDesiredResources(Supplier<Boolean> sup) {
hasDesiredResourcesSupplier = sup;
@@ -361,7 +351,7 @@ class WaitingForResourcesTest {
LOG.info(
"Scheduling work with delay {} for earliest execution at
{}",
delay.toMillis(),
- testTime.getClock().absoluteTimeMillis() +
delay.toMillis());
+ testingClock.absoluteTimeMillis() + delay.toMillis());
final ScheduledTask<Void> scheduledTask =
new ScheduledTask<>(
() -> {
@@ -371,7 +361,7 @@ class WaitingForResourcesTest {
return null;
},
- testTime.getClock().absoluteTimeMillis() +
delay.toMillis());
+ testingClock.absoluteTimeMillis() +
delay.toMillis());
scheduledTasks.add(scheduledTask);
@@ -383,51 +373,6 @@ class WaitingForResourcesTest {
creatingExecutionGraphStateValidator.validateInput(null);
registerStateTransition();
}
-
- public Clock getClock() {
- return testTime.getClock();
- }
-
- public void advanceTimeByMillis(long millis) {
- testTime.advanceMillis(millis);
- }
-
- public Duration getTestDuration() {
- return testTime.getTestDuration();
- }
- }
-
- private static final class ManualTestTime {
- private static final Logger LOG =
LoggerFactory.getLogger(ManualTestTime.class);
-
- private final ManualClock testingClock = new ManualClock();
- private final Consumer<Duration> runOnAdvance;
-
- private Duration durationSinceTestStart = Duration.ZERO;
-
- private ManualTestTime(Consumer<Duration> runOnAdvance) {
- this.runOnAdvance = runOnAdvance;
- }
-
- private Clock getClock() {
- return testingClock;
- }
-
- public void advanceMillis(long millis) {
- durationSinceTestStart = durationSinceTestStart.plusMillis(millis);
-
- LOG.info(
- "Advance testing time by {} ms to time {} ms",
- millis,
- durationSinceTestStart.toMillis());
-
- testingClock.advanceTime(millis, TimeUnit.MILLISECONDS);
- runOnAdvance.accept(durationSinceTestStart);
- }
-
- public Duration getTestDuration() {
- return durationSinceTestStart;
- }
}
static <T> Consumer<T> assertNonNull() {