1996fanrui commented on code in PR #22985: URL: https://github.com/apache/flink/pull/22985#discussion_r1319551944
########## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java: ########## @@ -496,6 +556,8 @@ private final class ExecutingStateBuilder { TestingDefaultExecutionGraphBuilder.newBuilder() .build(EXECUTOR_RESOURCE.getExecutor()); private OperatorCoordinatorHandler operatorCoordinatorHandler; + private Duration scalingIntervalMin = Duration.ZERO; Review Comment: The default value should be the `SCHEDULER_SCALING_INTERVAL_MIN.defaultValue()`, right? ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java: ########## @@ -47,6 +51,10 @@ class Executing extends StateWithExecutionGraph implements ResourceListener { private final Context context; + private Instant lastRescale = Instant.EPOCH; Review Comment: Actually, I don't understand the `lastRescale` of your implematation. The `Executing` is a new object after each rescale, so `lastRescale` isn't used across multiple rescale. We should record the `Instant.now()` as the lastRescale at the constructor, right? ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java: ########## @@ -55,7 +63,9 @@ class Executing extends StateWithExecutionGraph implements ResourceListener { Logger logger, Context context, ClassLoader userCodeClassLoader, - List<ExceptionHistoryEntry> failureCollection) { + List<ExceptionHistoryEntry> failureCollection, + Duration scalingIntervalMin, + Duration scalingIntervalMax) { Review Comment: ```suggestion @Nullable Duration scalingIntervalMax) { ``` ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java: ########## @@ -67,13 +77,33 @@ class Executing extends StateWithExecutionGraph implements ResourceListener { this.context = context; Preconditions.checkState( executionGraph.getState() == JobStatus.RUNNING, "Assuming running execution graph"); + this.scalingIntervalMin = scalingIntervalMin; + this.scalingIntervalMax = scalingIntervalMax; + Preconditions.checkState( + !scalingIntervalMin.isNegative(), + "{} must be positive integer or 0", + JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MIN.key()); + if (scalingIntervalMax != null) { + Preconditions.checkState( + scalingIntervalMax.compareTo(scalingIntervalMin) > 0, + "{}({}) must be greater than {}({})", + JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(), + scalingIntervalMax, + JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MIN.key(), + scalingIntervalMin); + } deploy(); // check if new resources have come available in the meantime context.runIfState(this, this::maybeRescale, Duration.ZERO); Review Comment: After the `Executing` is created, it means the it's a new rescale. We can rescale after `scalingIntervalMin` even if the new resources have come, right? If yes, we should call `rescaleWhenCooldownPeriodIsOver()` here. ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java: ########## @@ -124,17 +154,33 @@ private void handleDeploymentFailure(ExecutionVertex executionVertex, JobExcepti @Override public void onNewResourcesAvailable() { - maybeRescale(); + rescaleWhenCooldownPeriodIsOver(); } @Override public void onNewResourceRequirements() { - maybeRescale(); + rescaleWhenCooldownPeriodIsOver(); } private void maybeRescale() { - if (context.shouldRescale(getExecutionGraph())) { - getLogger().info("Can change the parallelism of job. Restarting job."); + final Duration timeSinceLastRescale = timeSinceLastRescale(); + rescaleScheduled = false; + final boolean shouldForceRescale = + (scalingIntervalMax != null) + && (timeSinceLastRescale.compareTo(scalingIntervalMax) > 0) + && (lastRescale != Instant.EPOCH); // initial rescale is not forced + if (shouldForceRescale || context.shouldRescale(getExecutionGraph())) { + if (shouldForceRescale) { + getLogger() + .info( + "Time since last rescale ({}) > {} ({}). Force-changing the parallelism of the job. Restarting the job.", + timeSinceLastRescale, + JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(), + scalingIntervalMax); + } else { + getLogger().info("Can change the parallelism of the job. Restarting the job."); + } + lastRescale = Instant.now(); context.goToRestarting( getExecutionGraph(), Review Comment: The implementation of `scalingIntervalMax` may be wrong. As I understand, `scalingIntervalMax` want to trigger a rescale when the `jobmanager.adaptive-scheduler.min-parallelism-increase` isn't met, right? If yes, we should set a timer to call the `rescale()` after `scalingIntervalMax`, right? If we don't set a timer, the `rescale` won't be called after `scalingIntervalMax`, and no one to force rescale. Also, we should add a test for this case. ########## flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java: ########## @@ -488,6 +488,23 @@ public enum SchedulerType { code(SchedulerExecutionMode.REACTIVE.name())) .build()); + @Documentation.Section(Documentation.Sections.EXPERT_SCHEDULING) + public static final ConfigOption<Duration> SCHEDULER_SCALING_INTERVAL_MIN = + key("jobmanager.adaptive-scheduler.scaling-interval.min") + .durationType() + .defaultValue(Duration.ofSeconds(30)) + // rescaling and let the user increase the value for high workloads + .withDescription( + "Determines the minimum time (in seconds) between scaling operations in reactive mode."); Review Comment: 1. `in seconds` can be removed due to the type is duration instead of int. It can be various timeUnit. 2. Is the `in reactive mode` exact? As I understand, this option is related to adaptive scheduler event is the reactive mode is disable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org