This is an automated email from the ASF dual-hosted git repository.

dmvk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 92d6932ac38780e90bf9e0d659f46dd9e9b26f0e
Author: David Moravek <[email protected]>
AuthorDate: Mon Nov 6 13:42:58 2023 +0100

    [FLINK-33976] Consolidate AdaptiveScheduler options into 
AdaptiveSchduler#Settings object.
---
 .../scheduler/adaptive/AdaptiveScheduler.java      | 93 ++++++++++++++++------
 .../adaptive/AdaptiveSchedulerFactory.java         |  3 +-
 .../runtime/scheduler/adaptive/Executing.java      |  4 +-
 .../adaptive/AdaptiveSchedulerBuilder.java         |  4 +-
 4 files changed, 74 insertions(+), 30 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index d952848f7c1..c881b9c5f7d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -179,6 +179,68 @@ public class AdaptiveScheduler
 
     private static final Logger LOG = 
LoggerFactory.getLogger(AdaptiveScheduler.class);
 
+    /**
+     * Consolidated settings for the adaptive scheduler. This class is used to 
avoid passing around
+     * multiple config options.
+     */
+    public static class Settings {
+
+        public static Settings of(Configuration configuration) {
+            final SchedulerExecutionMode executionMode =
+                    configuration.get(JobManagerOptions.SCHEDULER_MODE);
+            Duration allocationTimeoutDefault =
+                    JobManagerOptions.RESOURCE_WAIT_TIMEOUT.defaultValue();
+            Duration stabilizationTimeoutDefault =
+                    
JobManagerOptions.RESOURCE_STABILIZATION_TIMEOUT.defaultValue();
+            if (executionMode == SchedulerExecutionMode.REACTIVE) {
+                allocationTimeoutDefault = Duration.ofMillis(-1);
+                stabilizationTimeoutDefault = Duration.ZERO;
+            }
+            return new Settings(
+                    executionMode,
+                    configuration
+                            
.getOptional(JobManagerOptions.RESOURCE_WAIT_TIMEOUT)
+                            .orElse(allocationTimeoutDefault),
+                    configuration
+                            
.getOptional(JobManagerOptions.RESOURCE_STABILIZATION_TIMEOUT)
+                            .orElse(stabilizationTimeoutDefault),
+                    
Duration.ofMillis(configuration.get(JobManagerOptions.SLOT_IDLE_TIMEOUT)));
+        }
+
+        private final SchedulerExecutionMode executionMode;
+        private final Duration initialResourceAllocationTimeout;
+        private final Duration resourceStabilizationTimeout;
+        private final Duration slotIdleTimeout;
+
+        public Settings(
+                SchedulerExecutionMode executionMode,
+                Duration initialResourceAllocationTimeout,
+                Duration resourceStabilizationTimeout,
+                Duration slotIdleTimeout) {
+            this.executionMode = executionMode;
+            this.initialResourceAllocationTimeout = 
initialResourceAllocationTimeout;
+            this.resourceStabilizationTimeout = resourceStabilizationTimeout;
+            this.slotIdleTimeout = slotIdleTimeout;
+        }
+
+        public SchedulerExecutionMode getExecutionMode() {
+            return executionMode;
+        }
+
+        public Duration getInitialResourceAllocationTimeout() {
+            return initialResourceAllocationTimeout;
+        }
+
+        public Duration getResourceStabilizationTimeout() {
+            return resourceStabilizationTimeout;
+        }
+
+        public Duration getSlotIdleTimeout() {
+            return slotIdleTimeout;
+        }
+    }
+
+    private final Settings settings;
     private final JobGraph jobGraph;
 
     private final JobInfo jobInfo;
@@ -212,10 +274,6 @@ public class AdaptiveScheduler
 
     private final RescalingController forceRescalingController;
 
-    private final Duration initialResourceAllocationTimeout;
-
-    private final Duration resourceStabilizationTimeout;
-
     private final ExecutionGraphFactory executionGraphFactory;
 
     private State state = new Created(this, LOG);
@@ -229,8 +287,6 @@ public class AdaptiveScheduler
 
     private BackgroundTask<ExecutionGraph> backgroundTask = 
BackgroundTask.finishedBackgroundTask();
 
-    private final SchedulerExecutionMode executionMode;
-
     private final DeploymentStateTimeMetrics deploymentTimeMetrics;
 
     private final BoundedFIFOQueue<RootExceptionHistoryEntry> exceptionHistory;
@@ -239,9 +295,8 @@ public class AdaptiveScheduler
 
     private final JobManagerJobMetricGroup jobManagerJobMetricGroup;
 
-    private final Duration slotIdleTimeout;
-
     public AdaptiveScheduler(
+            Settings settings,
             JobGraph jobGraph,
             @Nullable JobResourceRequirements jobResourceRequirements,
             Configuration configuration,
@@ -251,8 +306,6 @@ public class AdaptiveScheduler
             ClassLoader userCodeClassLoader,
             CheckpointsCleaner checkpointsCleaner,
             CheckpointRecoveryFactory checkpointRecoveryFactory,
-            Duration initialResourceAllocationTimeout,
-            Duration resourceStabilizationTimeout,
             JobManagerJobMetricGroup jobManagerJobMetricGroup,
             RestartBackoffTimeStrategy restartBackoffTimeStrategy,
             long initializationTimestamp,
@@ -265,12 +318,12 @@ public class AdaptiveScheduler
 
         assertPreconditions(jobGraph);
 
+        this.settings = settings;
         this.jobGraph = jobGraph;
         this.jobInfo = new JobInfoImpl(jobGraph.getJobID(), 
jobGraph.getName());
-        this.executionMode = 
configuration.get(JobManagerOptions.SCHEDULER_MODE);
 
         VertexParallelismStore vertexParallelismStore =
-                computeVertexParallelismStore(jobGraph, executionMode);
+                computeVertexParallelismStore(jobGraph, 
settings.getExecutionMode());
         if (jobResourceRequirements != null) {
             vertexParallelismStore =
                     DefaultVertexParallelismStore.applyJobResourceRequirements(
@@ -305,10 +358,6 @@ public class AdaptiveScheduler
 
         this.forceRescalingController = new 
EnforceParallelismChangeRescalingController();
 
-        this.initialResourceAllocationTimeout = 
initialResourceAllocationTimeout;
-
-        this.resourceStabilizationTimeout = resourceStabilizationTimeout;
-
         this.executionGraphFactory = executionGraphFactory;
 
         final JobStatusStore jobStatusStore = new 
JobStatusStore(initializationTimestamp);
@@ -336,8 +385,6 @@ public class AdaptiveScheduler
         this.exceptionHistory =
                 new 
BoundedFIFOQueue<>(configuration.get(WebOptions.MAX_EXCEPTION_HISTORY_SIZE));
         this.jobManagerJobMetricGroup = jobManagerJobMetricGroup;
-        this.slotIdleTimeout =
-                
Duration.ofMillis(configuration.get(JobManagerOptions.SLOT_IDLE_TIMEOUT));
     }
 
     private static void assertPreconditions(JobGraph jobGraph) throws 
RuntimeException {
@@ -819,7 +866,7 @@ public class AdaptiveScheduler
 
     @Override
     public void updateJobResourceRequirements(JobResourceRequirements 
jobResourceRequirements) {
-        if (executionMode == SchedulerExecutionMode.REACTIVE) {
+        if (settings.getExecutionMode() == SchedulerExecutionMode.REACTIVE) {
             throw new UnsupportedOperationException(
                     "Cannot change the parallelism of a job running in 
reactive mode.");
         }
@@ -910,8 +957,8 @@ public class AdaptiveScheduler
                 new WaitingForResources.Factory(
                         this,
                         LOG,
-                        this.initialResourceAllocationTimeout,
-                        this.resourceStabilizationTimeout,
+                        settings.getInitialResourceAllocationTimeout(),
+                        settings.getResourceStabilizationTimeout(),
                         previousExecutionGraph));
     }
 
@@ -1078,7 +1125,7 @@ public class AdaptiveScheduler
             adjustedParallelismStore =
                     computeVertexParallelismStoreForExecution(
                             adjustedJobGraph,
-                            executionMode,
+                            settings.getExecutionMode(),
                             (vertex) -> {
                                 VertexParallelismInformation 
vertexParallelismInfo =
                                         
initialParallelismStore.getParallelismInfo(vertex.getID());
@@ -1358,7 +1405,7 @@ public class AdaptiveScheduler
         getMainThreadExecutor()
                 .schedule(
                         this::checkIdleSlotTimeout,
-                        slotIdleTimeout.toMillis(),
+                        settings.getSlotIdleTimeout().toMillis(),
                         TimeUnit.MILLISECONDS);
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java
index 7e3199d2398..9e25ab31678 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java
@@ -127,6 +127,7 @@ public class AdaptiveSchedulerFactory implements 
SchedulerNGFactory {
                         partitionTracker);
 
         return new AdaptiveScheduler(
+                AdaptiveScheduler.Settings.of(jobMasterConfiguration),
                 jobGraph,
                 
JobResourceRequirements.readFromJobGraph(jobGraph).orElse(null),
                 jobMasterConfiguration,
@@ -136,8 +137,6 @@ public class AdaptiveSchedulerFactory implements 
SchedulerNGFactory {
                 userCodeLoader,
                 new CheckpointsCleaner(),
                 checkpointRecoveryFactory,
-                initialResourceAllocationTimeout,
-                resourceStabilizationTimeout,
                 jobManagerJobMetricGroup,
                 restartBackoffTimeStrategy,
                 initializationTimestamp,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
index 10f872bbbd5..8298d344711 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
@@ -43,6 +43,7 @@ import javax.annotation.Nullable;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledFuture;
 
@@ -243,8 +244,7 @@ class Executing extends StateWithExecutionGraph implements 
ResourceListener {
         schedulingProvider.stopCheckpointScheduler();
 
         final CompletableFuture<String> savepointFuture =
-                executionGraph
-                        .getCheckpointCoordinator()
+                
Objects.requireNonNull(executionGraph.getCheckpointCoordinator())
                         .triggerSynchronousSavepoint(terminate, 
targetDirectory, formatType)
                         .thenApply(CompletedCheckpoint::getExternalPointer);
         return context.goToStopWithSavepoint(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
index f17c9c1ccc5..b814144ad96 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
@@ -19,7 +19,6 @@ package org.apache.flink.runtime.scheduler.adaptive;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.core.failure.FailureEnricher;
 import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.blob.VoidBlobWriter;
@@ -215,6 +214,7 @@ public class AdaptiveSchedulerBuilder {
                         partitionTracker);
 
         return new AdaptiveScheduler(
+                AdaptiveScheduler.Settings.of(jobMasterConfiguration),
                 jobGraph,
                 jobResourceRequirements,
                 jobMasterConfiguration,
@@ -227,8 +227,6 @@ public class AdaptiveSchedulerBuilder {
                 userCodeLoader,
                 checkpointsCleaner,
                 checkpointRecoveryFactory,
-                
jobMasterConfiguration.get(JobManagerOptions.RESOURCE_WAIT_TIMEOUT),
-                
jobMasterConfiguration.get(JobManagerOptions.RESOURCE_STABILIZATION_TIMEOUT),
                 jobManagerJobMetricGroup,
                 restartBackoffTimeStrategy,
                 initializationTimestamp,

Reply via email to