1996fanrui commented on code in PR #22985:
URL: https://github.com/apache/flink/pull/22985#discussion_r1319551944


##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java:
##########
@@ -496,6 +556,8 @@ private final class ExecutingStateBuilder {
                 TestingDefaultExecutionGraphBuilder.newBuilder()
                         .build(EXECUTOR_RESOURCE.getExecutor());
         private OperatorCoordinatorHandler operatorCoordinatorHandler;
+        private Duration scalingIntervalMin = Duration.ZERO;

Review Comment:
   The default value should be the 
`SCHEDULER_SCALING_INTERVAL_MIN.defaultValue()`, right?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##########
@@ -47,6 +51,10 @@
 class Executing extends StateWithExecutionGraph implements ResourceListener {
 
     private final Context context;
+    private Instant lastRescale = Instant.EPOCH;

Review Comment:
   Actually, I don't understand the `lastRescale` of your implematation. The 
`Executing` is a new object after each rescale, so `lastRescale` isn't used 
across multiple rescale.
   
   We should record the `Instant.now()` as the lastRescale at the constructor, 
right?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##########
@@ -55,7 +63,9 @@ class Executing extends StateWithExecutionGraph implements 
ResourceListener {
             Logger logger,
             Context context,
             ClassLoader userCodeClassLoader,
-            List<ExceptionHistoryEntry> failureCollection) {
+            List<ExceptionHistoryEntry> failureCollection,
+            Duration scalingIntervalMin,
+            Duration scalingIntervalMax) {

Review Comment:
   ```suggestion
               @Nullable Duration scalingIntervalMax) {
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##########
@@ -67,13 +77,33 @@ class Executing extends StateWithExecutionGraph implements 
ResourceListener {
         this.context = context;
         Preconditions.checkState(
                 executionGraph.getState() == JobStatus.RUNNING, "Assuming 
running execution graph");
+        this.scalingIntervalMin = scalingIntervalMin;
+        this.scalingIntervalMax = scalingIntervalMax;
+        Preconditions.checkState(
+                !scalingIntervalMin.isNegative(),
+                "{} must be positive integer or 0",
+                JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MIN.key());
+        if (scalingIntervalMax != null) {
+            Preconditions.checkState(
+                    scalingIntervalMax.compareTo(scalingIntervalMin) > 0,
+                    "{}({}) must be greater than {}({})",
+                    JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(),
+                    scalingIntervalMax,
+                    JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MIN.key(),
+                    scalingIntervalMin);
+        }
 
         deploy();
 
         // check if new resources have come available in the meantime
         context.runIfState(this, this::maybeRescale, Duration.ZERO);

Review Comment:
   After the `Executing` is created, it means the it's a new rescale. We can 
rescale after `scalingIntervalMin` even if the new resources have come, right? 
If yes, we should call `rescaleWhenCooldownPeriodIsOver()` here.
   



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##########
@@ -124,17 +154,33 @@ private void handleDeploymentFailure(ExecutionVertex 
executionVertex, JobExcepti
 
     @Override
     public void onNewResourcesAvailable() {
-        maybeRescale();
+        rescaleWhenCooldownPeriodIsOver();
     }
 
     @Override
     public void onNewResourceRequirements() {
-        maybeRescale();
+        rescaleWhenCooldownPeriodIsOver();
     }
 
     private void maybeRescale() {
-        if (context.shouldRescale(getExecutionGraph())) {
-            getLogger().info("Can change the parallelism of job. Restarting 
job.");
+        final Duration timeSinceLastRescale = timeSinceLastRescale();
+        rescaleScheduled = false;
+        final boolean shouldForceRescale =
+                (scalingIntervalMax != null)
+                        && (timeSinceLastRescale.compareTo(scalingIntervalMax) 
> 0)
+                        && (lastRescale != Instant.EPOCH); // initial rescale 
is not forced
+        if (shouldForceRescale || context.shouldRescale(getExecutionGraph())) {
+            if (shouldForceRescale) {
+                getLogger()
+                        .info(
+                                "Time since last rescale ({}) >  {} ({}). 
Force-changing the parallelism of the job. Restarting the job.",
+                                timeSinceLastRescale,
+                                
JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(),
+                                scalingIntervalMax);
+            } else {
+                getLogger().info("Can change the parallelism of the job. 
Restarting the job.");
+            }
+            lastRescale = Instant.now();
             context.goToRestarting(
                     getExecutionGraph(),

Review Comment:
   The implementation of  `scalingIntervalMax` may be wrong.
   
   As I understand, `scalingIntervalMax` want to trigger a rescale when the 
`jobmanager.adaptive-scheduler.min-parallelism-increase` isn't met, right?
   
   If yes, we should set a timer to call the `rescale()` after 
`scalingIntervalMax`, right? If we don't set a timer, the `rescale` won't be 
called after `scalingIntervalMax`, and no one to force rescale.
   
   Also, we should add a test for this case.



##########
flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java:
##########
@@ -488,6 +488,23 @@ public enum SchedulerType {
                                             
code(SchedulerExecutionMode.REACTIVE.name()))
                                     .build());
 
+    @Documentation.Section(Documentation.Sections.EXPERT_SCHEDULING)
+    public static final ConfigOption<Duration> SCHEDULER_SCALING_INTERVAL_MIN =
+            key("jobmanager.adaptive-scheduler.scaling-interval.min")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(30))
+                    // rescaling and let the user increase the value for high 
workloads
+                    .withDescription(
+                            "Determines the minimum time (in seconds) between 
scaling operations in reactive mode.");

Review Comment:
   1. `in seconds` can be removed due to the type is duration instead of int. 
It can be various timeUnit.
   2. Is the `in reactive mode` exact? As I understand, this option is related 
to adaptive scheduler event is the reactive mode is disable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to