This is an automated email from the ASF dual-hosted git repository.

dmvk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e04a2326126bd7cfbf883baba5313914335e411f
Author: David Moravek <[email protected]>
AuthorDate: Mon Nov 6 14:00:15 2023 +0100

    [FLINK-33976] Populate `scalingInterval{Min,Max}` options from the 
JobMaster's configuration instead of a Job configuration.
---
 .../scheduler/adaptive/AdaptiveScheduler.java      | 43 ++++++++++++++++--
 .../runtime/scheduler/adaptive/Executing.java      | 19 +++++---
 .../scheduler/adaptive/AdaptiveSchedulerTest.java  | 51 ++++++++++++++++++++++
 3 files changed, 103 insertions(+), 10 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index c881b9c5f7d..279bb38def1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -196,6 +196,25 @@ public class AdaptiveScheduler
                 allocationTimeoutDefault = Duration.ofMillis(-1);
                 stabilizationTimeoutDefault = Duration.ZERO;
             }
+
+            final Duration scalingIntervalMin =
+                    
configuration.get(JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MIN);
+            final Duration scalingIntervalMax =
+                    
configuration.get(JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX);
+            Preconditions.checkState(
+                    !scalingIntervalMin.isNegative(),
+                    "%s must be positive integer or 0",
+                    JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MIN.key());
+            if (scalingIntervalMax != null) {
+                Preconditions.checkState(
+                        scalingIntervalMax.compareTo(scalingIntervalMin) > 0,
+                        "%s(%d) must be greater than %s(%d)",
+                        JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(),
+                        scalingIntervalMax,
+                        JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MIN.key(),
+                        scalingIntervalMin);
+            }
+
             return new Settings(
                     executionMode,
                     configuration
@@ -204,23 +223,31 @@ public class AdaptiveScheduler
                     configuration
                             
.getOptional(JobManagerOptions.RESOURCE_STABILIZATION_TIMEOUT)
                             .orElse(stabilizationTimeoutDefault),
-                    
Duration.ofMillis(configuration.get(JobManagerOptions.SLOT_IDLE_TIMEOUT)));
+                    
Duration.ofMillis(configuration.get(JobManagerOptions.SLOT_IDLE_TIMEOUT)),
+                    scalingIntervalMin,
+                    scalingIntervalMax);
         }
 
         private final SchedulerExecutionMode executionMode;
         private final Duration initialResourceAllocationTimeout;
         private final Duration resourceStabilizationTimeout;
         private final Duration slotIdleTimeout;
+        private final Duration scalingIntervalMin;
+        private final Duration scalingIntervalMax;
 
         public Settings(
                 SchedulerExecutionMode executionMode,
                 Duration initialResourceAllocationTimeout,
                 Duration resourceStabilizationTimeout,
-                Duration slotIdleTimeout) {
+                Duration slotIdleTimeout,
+                Duration scalingIntervalMin,
+                Duration scalingIntervalMax) {
             this.executionMode = executionMode;
             this.initialResourceAllocationTimeout = 
initialResourceAllocationTimeout;
             this.resourceStabilizationTimeout = resourceStabilizationTimeout;
             this.slotIdleTimeout = slotIdleTimeout;
+            this.scalingIntervalMin = scalingIntervalMin;
+            this.scalingIntervalMax = scalingIntervalMax;
         }
 
         public SchedulerExecutionMode getExecutionMode() {
@@ -238,6 +265,14 @@ public class AdaptiveScheduler
         public Duration getSlotIdleTimeout() {
             return slotIdleTimeout;
         }
+
+        public Duration getScalingIntervalMin() {
+            return scalingIntervalMin;
+        }
+
+        public Duration getScalingIntervalMax() {
+            return scalingIntervalMax;
+        }
     }
 
     private final Settings settings;
@@ -989,7 +1024,9 @@ public class AdaptiveScheduler
                         LOG,
                         this,
                         userCodeClassLoader,
-                        failureCollection));
+                        failureCollection,
+                        settings.getScalingIntervalMin(),
+                        settings.getScalingIntervalMax()));
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
index 8298d344711..257883f8123 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.runtime.scheduler.adaptive;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.runtime.JobException;
@@ -54,8 +54,8 @@ class Executing extends StateWithExecutionGraph implements 
ResourceListener {
     private final Instant lastRescale;
     // only one schedule at the time
     private boolean rescaleScheduled = false;
-    private final Duration scalingIntervalMin;
-    @Nullable private final Duration scalingIntervalMax;
+    @VisibleForTesting final Duration scalingIntervalMin;
+    @VisibleForTesting @Nullable final Duration scalingIntervalMax;
 
     Executing(
             ExecutionGraph executionGraph,
@@ -302,6 +302,8 @@ class Executing extends StateWithExecutionGraph implements 
ResourceListener {
         private final OperatorCoordinatorHandler operatorCoordinatorHandler;
         private final ClassLoader userCodeClassLoader;
         private final List<ExceptionHistoryEntry> failureCollection;
+        private final Duration scalingIntervalMin;
+        private final Duration scalingIntervalMax;
 
         Factory(
                 ExecutionGraph executionGraph,
@@ -310,7 +312,9 @@ class Executing extends StateWithExecutionGraph implements 
ResourceListener {
                 Logger log,
                 Context context,
                 ClassLoader userCodeClassLoader,
-                List<ExceptionHistoryEntry> failureCollection) {
+                List<ExceptionHistoryEntry> failureCollection,
+                Duration scalingIntervalMin,
+                Duration scalingIntervalMax) {
             this.context = context;
             this.log = log;
             this.executionGraph = executionGraph;
@@ -318,6 +322,8 @@ class Executing extends StateWithExecutionGraph implements 
ResourceListener {
             this.operatorCoordinatorHandler = operatorCoordinatorHandler;
             this.userCodeClassLoader = userCodeClassLoader;
             this.failureCollection = failureCollection;
+            this.scalingIntervalMin = scalingIntervalMin;
+            this.scalingIntervalMax = scalingIntervalMax;
         }
 
         public Class<Executing> getStateClass() {
@@ -325,7 +331,6 @@ class Executing extends StateWithExecutionGraph implements 
ResourceListener {
         }
 
         public Executing getState() {
-            final Configuration jobConfiguration = 
executionGraph.getJobConfiguration();
             return new Executing(
                     executionGraph,
                     executionGraphHandler,
@@ -334,8 +339,8 @@ class Executing extends StateWithExecutionGraph implements 
ResourceListener {
                     context,
                     userCodeClassLoader,
                     failureCollection,
-                    
jobConfiguration.get(JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MIN),
-                    
jobConfiguration.get(JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX),
+                    scalingIntervalMin,
+                    scalingIntervalMax,
                     Instant.now());
         }
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
index a1f8a2643ec..641094ab100 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
@@ -2074,6 +2074,57 @@ public class AdaptiveSchedulerTest {
                 .isEqualTo(newJobResourceRequirements2);
     }
 
+    @Test
+    public void testScalingIntervalConfigurationIsRespected() throws Exception 
{
+        final JobGraph jobGraph = createJobGraph();
+        final DefaultDeclarativeSlotPool declarativeSlotPool =
+                createDeclarativeSlotPool(jobGraph.getJobID());
+
+        final Duration scalingIntervalMin = Duration.ofMillis(1337);
+        final Duration scalingIntervalMax = Duration.ofMillis(7331);
+        final Configuration configuration = 
createConfigurationWithNoTimeouts();
+        configuration.set(JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MIN, 
scalingIntervalMin);
+        configuration.set(JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX, 
scalingIntervalMax);
+
+        final AdaptiveScheduler scheduler =
+                prepareSchedulerWithNoTimeouts(jobGraph, declarativeSlotPool)
+                        .setJobMasterConfiguration(configuration)
+                        .build();
+        final SubmissionBufferingTaskManagerGateway taskManagerGateway =
+                new SubmissionBufferingTaskManagerGateway(PARALLELISM);
+        startJobWithSlotsMatchingParallelism(
+                scheduler, declarativeSlotPool, taskManagerGateway, 
PARALLELISM);
+
+        // Wait for all tasks to be submitted
+        taskManagerGateway.waitForSubmissions(PARALLELISM);
+
+        final CompletableFuture<Executing> executingFuture = new 
CompletableFuture<>();
+        singleThreadMainThreadExecutor.execute(
+                () -> {
+                    final Optional<Executing> maybeExecuting =
+                            scheduler.getState().as(Executing.class);
+                    if (maybeExecuting.isPresent()) {
+                        executingFuture.complete(maybeExecuting.get());
+                    } else {
+                        executingFuture.completeExceptionally(
+                                new IllegalStateException(
+                                        String.format("State is not [%s].", 
Executing.class)));
+                    }
+                });
+        assertThatFuture(executingFuture)
+                .eventuallySucceeds()
+                .satisfies(
+                        executing -> {
+                            
assertThat(executing.scalingIntervalMin).isEqualTo(scalingIntervalMin);
+                            
assertThat(executing.scalingIntervalMax).isEqualTo(scalingIntervalMax);
+                        });
+
+        final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
+        singleThreadMainThreadExecutor.execute(
+                () -> FutureUtils.forward(scheduler.closeAsync(), 
closeFuture));
+        assertThatFuture(closeFuture).eventuallySucceeds();
+    }
+
     // 
---------------------------------------------------------------------------------------------
     // Utils
     // 
---------------------------------------------------------------------------------------------

Reply via email to