This is an automated email from the ASF dual-hosted git repository. dmvk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 5ae48935e665cf2e7dc3b1e65b2968caa40cf348 Author: David Moravek <[email protected]> AuthorDate: Tue Jan 16 18:42:59 2024 +0100 fixup! [FLINK-33976] Consolidate AdaptiveScheduler options into AdaptiveSchduler#Settings object. --- .../DefaultSlotPoolServiceSchedulerFactory.java | 28 +--------------------- .../scheduler/adaptive/AdaptiveScheduler.java | 2 +- .../adaptive/AdaptiveSchedulerFactory.java | 9 +------ 3 files changed, 3 insertions(+), 36 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java index 1820e876718..5b3cf4e9693 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java @@ -186,7 +186,7 @@ public final class DefaultSlotPoolServiceSchedulerFactory getRequestSlotMatchingStrategy(configuration, jobType)); break; case Adaptive: - schedulerNGFactory = getAdaptiveSchedulerFactoryFromConfiguration(configuration); + schedulerNGFactory = new AdaptiveSchedulerFactory(); slotPoolServiceFactory = new DeclarativeSlotPoolServiceFactory( SystemClock.getInstance(), slotIdleTimeout, rpcTimeout); @@ -281,30 +281,4 @@ public final class DefaultSlotPoolServiceSchedulerFactory return SimpleRequestSlotMatchingStrategy.INSTANCE; } } - - private static AdaptiveSchedulerFactory getAdaptiveSchedulerFactoryFromConfiguration( - Configuration configuration) { - Duration allocationTimeoutDefault = JobManagerOptions.RESOURCE_WAIT_TIMEOUT.defaultValue(); - Duration stabilizationTimeoutDefault = - JobManagerOptions.RESOURCE_STABILIZATION_TIMEOUT.defaultValue(); - - if (configuration.get(JobManagerOptions.SCHEDULER_MODE) - == SchedulerExecutionMode.REACTIVE) { - allocationTimeoutDefault = Duration.ofMillis(-1); - stabilizationTimeoutDefault = Duration.ZERO; - } - - final Duration initialResourceAllocationTimeout = - configuration - .getOptional(JobManagerOptions.RESOURCE_WAIT_TIMEOUT) - .orElse(allocationTimeoutDefault); - - final Duration resourceStabilizationTimeout = - configuration - .getOptional(JobManagerOptions.RESOURCE_STABILIZATION_TIMEOUT) - .orElse(stabilizationTimeoutDefault); - - return new AdaptiveSchedulerFactory( - initialResourceAllocationTimeout, resourceStabilizationTimeout); - } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java index 279bb38def1..5f6438ce181 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java @@ -235,7 +235,7 @@ public class AdaptiveScheduler private final Duration scalingIntervalMin; private final Duration scalingIntervalMax; - public Settings( + private Settings( SchedulerExecutionMode executionMode, Duration initialResourceAllocationTimeout, Duration resourceStabilizationTimeout, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java index 9e25ab31678..fe8edbac02a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java @@ -55,14 +55,7 @@ import java.util.concurrent.ScheduledExecutorService; /** Factory for the adaptive scheduler. */ public class AdaptiveSchedulerFactory implements SchedulerNGFactory { - private final Duration initialResourceAllocationTimeout; - private final Duration resourceStabilizationTimeout; - - public AdaptiveSchedulerFactory( - Duration initialResourceAllocationTimeout, Duration resourceStabilizationTimeout) { - this.initialResourceAllocationTimeout = initialResourceAllocationTimeout; - this.resourceStabilizationTimeout = resourceStabilizationTimeout; - } + public AdaptiveSchedulerFactory() {} @Override public SchedulerNG createInstance(
