This is an automated email from the ASF dual-hosted git repository. dmvk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit e04a2326126bd7cfbf883baba5313914335e411f Author: David Moravek <[email protected]> AuthorDate: Mon Nov 6 14:00:15 2023 +0100 [FLINK-33976] Populate `scalingInterval{Min,Max}` options from the JobMaster's configuration instead of a Job configuration. --- .../scheduler/adaptive/AdaptiveScheduler.java | 43 ++++++++++++++++-- .../runtime/scheduler/adaptive/Executing.java | 19 +++++--- .../scheduler/adaptive/AdaptiveSchedulerTest.java | 51 ++++++++++++++++++++++ 3 files changed, 103 insertions(+), 10 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java index c881b9c5f7d..279bb38def1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java @@ -196,6 +196,25 @@ public class AdaptiveScheduler allocationTimeoutDefault = Duration.ofMillis(-1); stabilizationTimeoutDefault = Duration.ZERO; } + + final Duration scalingIntervalMin = + configuration.get(JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MIN); + final Duration scalingIntervalMax = + configuration.get(JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX); + Preconditions.checkState( + !scalingIntervalMin.isNegative(), + "%s must be positive integer or 0", + JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MIN.key()); + if (scalingIntervalMax != null) { + Preconditions.checkState( + scalingIntervalMax.compareTo(scalingIntervalMin) > 0, + "%s(%d) must be greater than %s(%d)", + JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(), + scalingIntervalMax, + JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MIN.key(), + scalingIntervalMin); + } + return new Settings( executionMode, configuration @@ -204,23 +223,31 @@ public class AdaptiveScheduler configuration .getOptional(JobManagerOptions.RESOURCE_STABILIZATION_TIMEOUT) .orElse(stabilizationTimeoutDefault), - Duration.ofMillis(configuration.get(JobManagerOptions.SLOT_IDLE_TIMEOUT))); + Duration.ofMillis(configuration.get(JobManagerOptions.SLOT_IDLE_TIMEOUT)), + scalingIntervalMin, + scalingIntervalMax); } private final SchedulerExecutionMode executionMode; private final Duration initialResourceAllocationTimeout; private final Duration resourceStabilizationTimeout; private final Duration slotIdleTimeout; + private final Duration scalingIntervalMin; + private final Duration scalingIntervalMax; public Settings( SchedulerExecutionMode executionMode, Duration initialResourceAllocationTimeout, Duration resourceStabilizationTimeout, - Duration slotIdleTimeout) { + Duration slotIdleTimeout, + Duration scalingIntervalMin, + Duration scalingIntervalMax) { this.executionMode = executionMode; this.initialResourceAllocationTimeout = initialResourceAllocationTimeout; this.resourceStabilizationTimeout = resourceStabilizationTimeout; this.slotIdleTimeout = slotIdleTimeout; + this.scalingIntervalMin = scalingIntervalMin; + this.scalingIntervalMax = scalingIntervalMax; } public SchedulerExecutionMode getExecutionMode() { @@ -238,6 +265,14 @@ public class AdaptiveScheduler public Duration getSlotIdleTimeout() { return slotIdleTimeout; } + + public Duration getScalingIntervalMin() { + return scalingIntervalMin; + } + + public Duration getScalingIntervalMax() { + return scalingIntervalMax; + } } private final Settings settings; @@ -989,7 +1024,9 @@ public class AdaptiveScheduler LOG, this, userCodeClassLoader, - failureCollection)); + failureCollection, + settings.getScalingIntervalMin(), + settings.getScalingIntervalMax())); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java index 8298d344711..257883f8123 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java @@ -18,8 +18,8 @@ package org.apache.flink.runtime.scheduler.adaptive; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobStatus; -import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.runtime.JobException; @@ -54,8 +54,8 @@ class Executing extends StateWithExecutionGraph implements ResourceListener { private final Instant lastRescale; // only one schedule at the time private boolean rescaleScheduled = false; - private final Duration scalingIntervalMin; - @Nullable private final Duration scalingIntervalMax; + @VisibleForTesting final Duration scalingIntervalMin; + @VisibleForTesting @Nullable final Duration scalingIntervalMax; Executing( ExecutionGraph executionGraph, @@ -302,6 +302,8 @@ class Executing extends StateWithExecutionGraph implements ResourceListener { private final OperatorCoordinatorHandler operatorCoordinatorHandler; private final ClassLoader userCodeClassLoader; private final List<ExceptionHistoryEntry> failureCollection; + private final Duration scalingIntervalMin; + private final Duration scalingIntervalMax; Factory( ExecutionGraph executionGraph, @@ -310,7 +312,9 @@ class Executing extends StateWithExecutionGraph implements ResourceListener { Logger log, Context context, ClassLoader userCodeClassLoader, - List<ExceptionHistoryEntry> failureCollection) { + List<ExceptionHistoryEntry> failureCollection, + Duration scalingIntervalMin, + Duration scalingIntervalMax) { this.context = context; this.log = log; this.executionGraph = executionGraph; @@ -318,6 +322,8 @@ class Executing extends StateWithExecutionGraph implements ResourceListener { this.operatorCoordinatorHandler = operatorCoordinatorHandler; this.userCodeClassLoader = userCodeClassLoader; this.failureCollection = failureCollection; + this.scalingIntervalMin = scalingIntervalMin; + this.scalingIntervalMax = scalingIntervalMax; } public Class<Executing> getStateClass() { @@ -325,7 +331,6 @@ class Executing extends StateWithExecutionGraph implements ResourceListener { } public Executing getState() { - final Configuration jobConfiguration = executionGraph.getJobConfiguration(); return new Executing( executionGraph, executionGraphHandler, @@ -334,8 +339,8 @@ class Executing extends StateWithExecutionGraph implements ResourceListener { context, userCodeClassLoader, failureCollection, - jobConfiguration.get(JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MIN), - jobConfiguration.get(JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX), + scalingIntervalMin, + scalingIntervalMax, Instant.now()); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java index a1f8a2643ec..641094ab100 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java @@ -2074,6 +2074,57 @@ public class AdaptiveSchedulerTest { .isEqualTo(newJobResourceRequirements2); } + @Test + public void testScalingIntervalConfigurationIsRespected() throws Exception { + final JobGraph jobGraph = createJobGraph(); + final DefaultDeclarativeSlotPool declarativeSlotPool = + createDeclarativeSlotPool(jobGraph.getJobID()); + + final Duration scalingIntervalMin = Duration.ofMillis(1337); + final Duration scalingIntervalMax = Duration.ofMillis(7331); + final Configuration configuration = createConfigurationWithNoTimeouts(); + configuration.set(JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MIN, scalingIntervalMin); + configuration.set(JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX, scalingIntervalMax); + + final AdaptiveScheduler scheduler = + prepareSchedulerWithNoTimeouts(jobGraph, declarativeSlotPool) + .setJobMasterConfiguration(configuration) + .build(); + final SubmissionBufferingTaskManagerGateway taskManagerGateway = + new SubmissionBufferingTaskManagerGateway(PARALLELISM); + startJobWithSlotsMatchingParallelism( + scheduler, declarativeSlotPool, taskManagerGateway, PARALLELISM); + + // Wait for all tasks to be submitted + taskManagerGateway.waitForSubmissions(PARALLELISM); + + final CompletableFuture<Executing> executingFuture = new CompletableFuture<>(); + singleThreadMainThreadExecutor.execute( + () -> { + final Optional<Executing> maybeExecuting = + scheduler.getState().as(Executing.class); + if (maybeExecuting.isPresent()) { + executingFuture.complete(maybeExecuting.get()); + } else { + executingFuture.completeExceptionally( + new IllegalStateException( + String.format("State is not [%s].", Executing.class))); + } + }); + assertThatFuture(executingFuture) + .eventuallySucceeds() + .satisfies( + executing -> { + assertThat(executing.scalingIntervalMin).isEqualTo(scalingIntervalMin); + assertThat(executing.scalingIntervalMax).isEqualTo(scalingIntervalMax); + }); + + final CompletableFuture<Void> closeFuture = new CompletableFuture<>(); + singleThreadMainThreadExecutor.execute( + () -> FutureUtils.forward(scheduler.closeAsync(), closeFuture)); + assertThatFuture(closeFuture).eventuallySucceeds(); + } + // --------------------------------------------------------------------------------------------- // Utils // ---------------------------------------------------------------------------------------------
