1996fanrui commented on code in PR #920:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/920#discussion_r1858342140


##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java:
##########
@@ -289,21 +274,26 @@ private ParallelismChange applyScaleDownInterval(
         var scaleDownInterval = conf.get(SCALE_DOWN_INTERVAL);
         if (scaleDownInterval.toMillis() <= 0) {
             // The scale down interval is disable, so don't block scaling.
-            return ParallelismChange.required(newParallelism);
-        }
-
-        var firstTriggerTime = 
delayedScaleDown.getFirstTriggerTimeForVertex(vertex);
-        if (firstTriggerTime.isEmpty()) {
-            LOG.info("The scale down of {} is delayed by {}.", vertex, 
scaleDownInterval);
-            delayedScaleDown.updateTriggerTime(vertex, clock.instant());
-            return ParallelismChange.optional(newParallelism);
+            return ParallelismChange.build(newParallelism);
         }
 
-        if 
(clock.instant().isBefore(firstTriggerTime.get().plus(scaleDownInterval))) {
-            LOG.debug("Try to skip immediate scale down within scale-down 
interval for {}", vertex);
-            return ParallelismChange.optional(newParallelism);
+        var now = clock.instant();
+        var delayedScaleDownInfo = delayedScaleDown.triggerScaleDown(vertex, 
now, newParallelism);
+
+        // Never scale down within scale down interval
+        if 
(now.isBefore(delayedScaleDownInfo.getFirstTriggerTime().plus(scaleDownInterval)))
 {
+            if (now.equals(delayedScaleDownInfo.getFirstTriggerTime())) {
+                LOG.info("The scale down of {} is delayed by {}.", vertex, 
scaleDownInterval);
+            } else {
+                LOG.debug(
+                        "Try to skip immediate scale down within scale-down 
interval for {}",
+                        vertex);
+            }
+            return ParallelismChange.noChange();

Review Comment:
   It ensures any vertex never scale down when `currentTime - triggerTime < 
scale-down.interval`. 
   
   It means the scale down is only executed after itself is triggered more than 
scale-down.interval.



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/DelayedScaleDown.java:
##########
@@ -19,51 +19,81 @@
 
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import lombok.Data;
 import lombok.Getter;
 
+import javax.annotation.Nonnull;
+
 import java.time.Instant;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Optional;
 
 /** All delayed scale down requests. */
 public class DelayedScaleDown {
 
-    @Getter private final Map<JobVertexID, Instant> firstTriggerTime;
+    /** The delayed scale down info for vertex. */
+    @Data
+    public static class VertexDelayedScaleDownInfo {
+        private final Instant firstTriggerTime;
+        private int maxRecommendedParallelism;
+
+        @JsonCreator
+        public VertexDelayedScaleDownInfo(
+                @JsonProperty("firstTriggerTime") Instant firstTriggerTime,
+                @JsonProperty("maxRecommendedParallelism") int 
maxRecommendedParallelism) {
+            this.firstTriggerTime = firstTriggerTime;
+            this.maxRecommendedParallelism = maxRecommendedParallelism;
+        }
+    }
+
+    @Getter private final Map<JobVertexID, VertexDelayedScaleDownInfo> 
delayedVertices;
 
     // Have any scale down request been updated? It doesn't need to be stored, 
it is only used to
     // determine whether DelayedScaleDown needs to be stored.
-    @Getter private boolean isUpdated = false;
+    @JsonIgnore @Getter private boolean updated = false;
 
     public DelayedScaleDown() {
-        this.firstTriggerTime = new HashMap<>();
-    }
-
-    public DelayedScaleDown(Map<JobVertexID, Instant> firstTriggerTime) {
-        this.firstTriggerTime = firstTriggerTime;
+        this.delayedVertices = new HashMap<>();
     }
 
-    Optional<Instant> getFirstTriggerTimeForVertex(JobVertexID vertex) {
-        return Optional.ofNullable(firstTriggerTime.get(vertex));
-    }
+    /** Trigger a scale down, and return the corresponding {@link 
VertexDelayedScaleDownInfo}. */
+    @Nonnull
+    public VertexDelayedScaleDownInfo triggerScaleDown(
+            JobVertexID vertex, Instant triggerTime, int parallelism) {
+        var vertexInfo = delayedVertices.get(vertex);
+        if (vertexInfo == null) {
+            // It's the first trigger
+            vertexInfo = new VertexDelayedScaleDownInfo(triggerTime, 
parallelism);
+            delayedVertices.put(vertex, vertexInfo);
+            updated = true;
+        } else if (parallelism > vertexInfo.getMaxRecommendedParallelism()) {
+            // Not the first trigger, but the maxRecommendedParallelism needs 
to be updated.
+            vertexInfo.setMaxRecommendedParallelism(parallelism);
+            updated = true;
+        }
 
-    void updateTriggerTime(JobVertexID vertex, Instant instant) {
-        firstTriggerTime.put(vertex, instant);
-        isUpdated = true;
+        return vertexInfo;
     }
 
+    // Clear the delayed scale down for corresponding vertex when the 
recommended parallelism is
+    // greater than or equal to the currentParallelism.
     void clearVertex(JobVertexID vertex) {
-        Instant removed = firstTriggerTime.remove(vertex);
+        VertexDelayedScaleDownInfo removed = delayedVertices.remove(vertex);
         if (removed != null) {
-            isUpdated = true;
+            updated = true;
         }
     }
 
+    // Clear all delayed scale down when rescale happens.
     void clearAll() {
-        if (firstTriggerTime.isEmpty()) {
+        if (delayedVertices.isEmpty()) {
             return;
         }
-        firstTriggerTime.clear();
-        isUpdated = true;
+        delayedVertices.clear();
+        updated = true;
     }

Review Comment:
   We clear all delayed scale down when rescale happens, so the scale down 
won't happen for vertices (that trigger time is less than scale down interval).
   
   Ideally, these vertices will re-trigger scale down after rescaling. It likes 
this demo:
   
   > So if you have 10 vertices and they would be scaled down at different 
times you can have 10 restarts within the scale down window. Which does not 
feel right.
   
   Actually, we only rescale twice (instead of 10 times) if we have 10 vertices 
and they would be scaled down at different times.
   Assuming scale down interval is 1 hour:
   
   - vertex1 triggers scale down at 12:10,
   - vertex2 triggers scale down at 12:15,
   - vertex3 triggers scale down at 12:18,
   - vertex10 triggers scale down at 12:50,
   
   And then vertex1 will be scaled down at 13:10, and rest of them do not be 
changed.
   
   After 13: 10, if all of vertex2 to vertex10 need to scale down, all of them 
will trigger scale down at 13: 11 (next reconcile loop), and will be scaled 
down at 14:11 .



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to