This is an automated email from the ASF dual-hosted git repository.

dmvk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5ae48935e665cf2e7dc3b1e65b2968caa40cf348
Author: David Moravek <[email protected]>
AuthorDate: Tue Jan 16 18:42:59 2024 +0100

    fixup! [FLINK-33976] Consolidate AdaptiveScheduler options into 
AdaptiveSchduler#Settings object.
---
 .../DefaultSlotPoolServiceSchedulerFactory.java    | 28 +---------------------
 .../scheduler/adaptive/AdaptiveScheduler.java      |  2 +-
 .../adaptive/AdaptiveSchedulerFactory.java         |  9 +------
 3 files changed, 3 insertions(+), 36 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
index 1820e876718..5b3cf4e9693 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
@@ -186,7 +186,7 @@ public final class DefaultSlotPoolServiceSchedulerFactory
                                 getRequestSlotMatchingStrategy(configuration, 
jobType));
                 break;
             case Adaptive:
-                schedulerNGFactory = 
getAdaptiveSchedulerFactoryFromConfiguration(configuration);
+                schedulerNGFactory = new AdaptiveSchedulerFactory();
                 slotPoolServiceFactory =
                         new DeclarativeSlotPoolServiceFactory(
                                 SystemClock.getInstance(), slotIdleTimeout, 
rpcTimeout);
@@ -281,30 +281,4 @@ public final class DefaultSlotPoolServiceSchedulerFactory
             return SimpleRequestSlotMatchingStrategy.INSTANCE;
         }
     }
-
-    private static AdaptiveSchedulerFactory 
getAdaptiveSchedulerFactoryFromConfiguration(
-            Configuration configuration) {
-        Duration allocationTimeoutDefault = 
JobManagerOptions.RESOURCE_WAIT_TIMEOUT.defaultValue();
-        Duration stabilizationTimeoutDefault =
-                
JobManagerOptions.RESOURCE_STABILIZATION_TIMEOUT.defaultValue();
-
-        if (configuration.get(JobManagerOptions.SCHEDULER_MODE)
-                == SchedulerExecutionMode.REACTIVE) {
-            allocationTimeoutDefault = Duration.ofMillis(-1);
-            stabilizationTimeoutDefault = Duration.ZERO;
-        }
-
-        final Duration initialResourceAllocationTimeout =
-                configuration
-                        .getOptional(JobManagerOptions.RESOURCE_WAIT_TIMEOUT)
-                        .orElse(allocationTimeoutDefault);
-
-        final Duration resourceStabilizationTimeout =
-                configuration
-                        
.getOptional(JobManagerOptions.RESOURCE_STABILIZATION_TIMEOUT)
-                        .orElse(stabilizationTimeoutDefault);
-
-        return new AdaptiveSchedulerFactory(
-                initialResourceAllocationTimeout, 
resourceStabilizationTimeout);
-    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index 279bb38def1..5f6438ce181 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -235,7 +235,7 @@ public class AdaptiveScheduler
         private final Duration scalingIntervalMin;
         private final Duration scalingIntervalMax;
 
-        public Settings(
+        private Settings(
                 SchedulerExecutionMode executionMode,
                 Duration initialResourceAllocationTimeout,
                 Duration resourceStabilizationTimeout,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java
index 9e25ab31678..fe8edbac02a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java
@@ -55,14 +55,7 @@ import java.util.concurrent.ScheduledExecutorService;
 /** Factory for the adaptive scheduler. */
 public class AdaptiveSchedulerFactory implements SchedulerNGFactory {
 
-    private final Duration initialResourceAllocationTimeout;
-    private final Duration resourceStabilizationTimeout;
-
-    public AdaptiveSchedulerFactory(
-            Duration initialResourceAllocationTimeout, Duration 
resourceStabilizationTimeout) {
-        this.initialResourceAllocationTimeout = 
initialResourceAllocationTimeout;
-        this.resourceStabilizationTimeout = resourceStabilizationTimeout;
-    }
+    public AdaptiveSchedulerFactory() {}
 
     @Override
     public SchedulerNG createInstance(

Reply via email to