This is an automated email from the ASF dual-hosted git repository. dmvk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 92d6932ac38780e90bf9e0d659f46dd9e9b26f0e Author: David Moravek <[email protected]> AuthorDate: Mon Nov 6 13:42:58 2023 +0100 [FLINK-33976] Consolidate AdaptiveScheduler options into AdaptiveSchduler#Settings object. --- .../scheduler/adaptive/AdaptiveScheduler.java | 93 ++++++++++++++++------ .../adaptive/AdaptiveSchedulerFactory.java | 3 +- .../runtime/scheduler/adaptive/Executing.java | 4 +- .../adaptive/AdaptiveSchedulerBuilder.java | 4 +- 4 files changed, 74 insertions(+), 30 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java index d952848f7c1..c881b9c5f7d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java @@ -179,6 +179,68 @@ public class AdaptiveScheduler private static final Logger LOG = LoggerFactory.getLogger(AdaptiveScheduler.class); + /** + * Consolidated settings for the adaptive scheduler. This class is used to avoid passing around + * multiple config options. + */ + public static class Settings { + + public static Settings of(Configuration configuration) { + final SchedulerExecutionMode executionMode = + configuration.get(JobManagerOptions.SCHEDULER_MODE); + Duration allocationTimeoutDefault = + JobManagerOptions.RESOURCE_WAIT_TIMEOUT.defaultValue(); + Duration stabilizationTimeoutDefault = + JobManagerOptions.RESOURCE_STABILIZATION_TIMEOUT.defaultValue(); + if (executionMode == SchedulerExecutionMode.REACTIVE) { + allocationTimeoutDefault = Duration.ofMillis(-1); + stabilizationTimeoutDefault = Duration.ZERO; + } + return new Settings( + executionMode, + configuration + .getOptional(JobManagerOptions.RESOURCE_WAIT_TIMEOUT) + .orElse(allocationTimeoutDefault), + configuration + .getOptional(JobManagerOptions.RESOURCE_STABILIZATION_TIMEOUT) + .orElse(stabilizationTimeoutDefault), + Duration.ofMillis(configuration.get(JobManagerOptions.SLOT_IDLE_TIMEOUT))); + } + + private final SchedulerExecutionMode executionMode; + private final Duration initialResourceAllocationTimeout; + private final Duration resourceStabilizationTimeout; + private final Duration slotIdleTimeout; + + public Settings( + SchedulerExecutionMode executionMode, + Duration initialResourceAllocationTimeout, + Duration resourceStabilizationTimeout, + Duration slotIdleTimeout) { + this.executionMode = executionMode; + this.initialResourceAllocationTimeout = initialResourceAllocationTimeout; + this.resourceStabilizationTimeout = resourceStabilizationTimeout; + this.slotIdleTimeout = slotIdleTimeout; + } + + public SchedulerExecutionMode getExecutionMode() { + return executionMode; + } + + public Duration getInitialResourceAllocationTimeout() { + return initialResourceAllocationTimeout; + } + + public Duration getResourceStabilizationTimeout() { + return resourceStabilizationTimeout; + } + + public Duration getSlotIdleTimeout() { + return slotIdleTimeout; + } + } + + private final Settings settings; private final JobGraph jobGraph; private final JobInfo jobInfo; @@ -212,10 +274,6 @@ public class AdaptiveScheduler private final RescalingController forceRescalingController; - private final Duration initialResourceAllocationTimeout; - - private final Duration resourceStabilizationTimeout; - private final ExecutionGraphFactory executionGraphFactory; private State state = new Created(this, LOG); @@ -229,8 +287,6 @@ public class AdaptiveScheduler private BackgroundTask<ExecutionGraph> backgroundTask = BackgroundTask.finishedBackgroundTask(); - private final SchedulerExecutionMode executionMode; - private final DeploymentStateTimeMetrics deploymentTimeMetrics; private final BoundedFIFOQueue<RootExceptionHistoryEntry> exceptionHistory; @@ -239,9 +295,8 @@ public class AdaptiveScheduler private final JobManagerJobMetricGroup jobManagerJobMetricGroup; - private final Duration slotIdleTimeout; - public AdaptiveScheduler( + Settings settings, JobGraph jobGraph, @Nullable JobResourceRequirements jobResourceRequirements, Configuration configuration, @@ -251,8 +306,6 @@ public class AdaptiveScheduler ClassLoader userCodeClassLoader, CheckpointsCleaner checkpointsCleaner, CheckpointRecoveryFactory checkpointRecoveryFactory, - Duration initialResourceAllocationTimeout, - Duration resourceStabilizationTimeout, JobManagerJobMetricGroup jobManagerJobMetricGroup, RestartBackoffTimeStrategy restartBackoffTimeStrategy, long initializationTimestamp, @@ -265,12 +318,12 @@ public class AdaptiveScheduler assertPreconditions(jobGraph); + this.settings = settings; this.jobGraph = jobGraph; this.jobInfo = new JobInfoImpl(jobGraph.getJobID(), jobGraph.getName()); - this.executionMode = configuration.get(JobManagerOptions.SCHEDULER_MODE); VertexParallelismStore vertexParallelismStore = - computeVertexParallelismStore(jobGraph, executionMode); + computeVertexParallelismStore(jobGraph, settings.getExecutionMode()); if (jobResourceRequirements != null) { vertexParallelismStore = DefaultVertexParallelismStore.applyJobResourceRequirements( @@ -305,10 +358,6 @@ public class AdaptiveScheduler this.forceRescalingController = new EnforceParallelismChangeRescalingController(); - this.initialResourceAllocationTimeout = initialResourceAllocationTimeout; - - this.resourceStabilizationTimeout = resourceStabilizationTimeout; - this.executionGraphFactory = executionGraphFactory; final JobStatusStore jobStatusStore = new JobStatusStore(initializationTimestamp); @@ -336,8 +385,6 @@ public class AdaptiveScheduler this.exceptionHistory = new BoundedFIFOQueue<>(configuration.get(WebOptions.MAX_EXCEPTION_HISTORY_SIZE)); this.jobManagerJobMetricGroup = jobManagerJobMetricGroup; - this.slotIdleTimeout = - Duration.ofMillis(configuration.get(JobManagerOptions.SLOT_IDLE_TIMEOUT)); } private static void assertPreconditions(JobGraph jobGraph) throws RuntimeException { @@ -819,7 +866,7 @@ public class AdaptiveScheduler @Override public void updateJobResourceRequirements(JobResourceRequirements jobResourceRequirements) { - if (executionMode == SchedulerExecutionMode.REACTIVE) { + if (settings.getExecutionMode() == SchedulerExecutionMode.REACTIVE) { throw new UnsupportedOperationException( "Cannot change the parallelism of a job running in reactive mode."); } @@ -910,8 +957,8 @@ public class AdaptiveScheduler new WaitingForResources.Factory( this, LOG, - this.initialResourceAllocationTimeout, - this.resourceStabilizationTimeout, + settings.getInitialResourceAllocationTimeout(), + settings.getResourceStabilizationTimeout(), previousExecutionGraph)); } @@ -1078,7 +1125,7 @@ public class AdaptiveScheduler adjustedParallelismStore = computeVertexParallelismStoreForExecution( adjustedJobGraph, - executionMode, + settings.getExecutionMode(), (vertex) -> { VertexParallelismInformation vertexParallelismInfo = initialParallelismStore.getParallelismInfo(vertex.getID()); @@ -1358,7 +1405,7 @@ public class AdaptiveScheduler getMainThreadExecutor() .schedule( this::checkIdleSlotTimeout, - slotIdleTimeout.toMillis(), + settings.getSlotIdleTimeout().toMillis(), TimeUnit.MILLISECONDS); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java index 7e3199d2398..9e25ab31678 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java @@ -127,6 +127,7 @@ public class AdaptiveSchedulerFactory implements SchedulerNGFactory { partitionTracker); return new AdaptiveScheduler( + AdaptiveScheduler.Settings.of(jobMasterConfiguration), jobGraph, JobResourceRequirements.readFromJobGraph(jobGraph).orElse(null), jobMasterConfiguration, @@ -136,8 +137,6 @@ public class AdaptiveSchedulerFactory implements SchedulerNGFactory { userCodeLoader, new CheckpointsCleaner(), checkpointRecoveryFactory, - initialResourceAllocationTimeout, - resourceStabilizationTimeout, jobManagerJobMetricGroup, restartBackoffTimeStrategy, initializationTimestamp, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java index 10f872bbbd5..8298d344711 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java @@ -43,6 +43,7 @@ import javax.annotation.Nullable; import java.time.Duration; import java.time.Instant; import java.util.List; +import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledFuture; @@ -243,8 +244,7 @@ class Executing extends StateWithExecutionGraph implements ResourceListener { schedulingProvider.stopCheckpointScheduler(); final CompletableFuture<String> savepointFuture = - executionGraph - .getCheckpointCoordinator() + Objects.requireNonNull(executionGraph.getCheckpointCoordinator()) .triggerSynchronousSavepoint(terminate, targetDirectory, formatType) .thenApply(CompletedCheckpoint::getExternalPointer); return context.goToStopWithSavepoint( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java index f17c9c1ccc5..b814144ad96 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.scheduler.adaptive; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.core.failure.FailureEnricher; import org.apache.flink.runtime.blob.BlobWriter; import org.apache.flink.runtime.blob.VoidBlobWriter; @@ -215,6 +214,7 @@ public class AdaptiveSchedulerBuilder { partitionTracker); return new AdaptiveScheduler( + AdaptiveScheduler.Settings.of(jobMasterConfiguration), jobGraph, jobResourceRequirements, jobMasterConfiguration, @@ -227,8 +227,6 @@ public class AdaptiveSchedulerBuilder { userCodeLoader, checkpointsCleaner, checkpointRecoveryFactory, - jobMasterConfiguration.get(JobManagerOptions.RESOURCE_WAIT_TIMEOUT), - jobMasterConfiguration.get(JobManagerOptions.RESOURCE_STABILIZATION_TIMEOUT), jobManagerJobMetricGroup, restartBackoffTimeStrategy, initializationTimestamp,
