gyfora commented on code in PR #922:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/922#discussion_r1881460715


##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/DelayedScaleDown.java:
##########
@@ -30,23 +30,78 @@
 
 import java.time.Instant;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.Map;
 
+import static org.apache.flink.util.Preconditions.checkState;
+
 /** All delayed scale down requests. */
 public class DelayedScaleDown {
 
+    @Data
+    private static class RecommendedParallelism {
+        @Nonnull private final Instant triggerTime;
+        private final int parallelism;
+
+        @JsonCreator
+        public RecommendedParallelism(
+                @Nonnull @JsonProperty("triggerTime") Instant triggerTime,
+                @JsonProperty("parallelism") int parallelism) {
+            this.triggerTime = triggerTime;
+            this.parallelism = parallelism;
+        }
+    }
+
     /** The delayed scale down info for vertex. */
     @Data
     public static class VertexDelayedScaleDownInfo {
         private final Instant firstTriggerTime;
-        private int maxRecommendedParallelism;
+        // TODO : add the comment to explain how to calculate the max 
parallelism within the sliding
+        // window.
+        private final LinkedList<RecommendedParallelism> 
recommendedParallelisms;
+
+        public VertexDelayedScaleDownInfo(Instant firstTriggerTime) {
+            this.firstTriggerTime = firstTriggerTime;
+            this.recommendedParallelisms = new LinkedList<>();
+        }
 
         @JsonCreator
         public VertexDelayedScaleDownInfo(
                 @JsonProperty("firstTriggerTime") Instant firstTriggerTime,
-                @JsonProperty("maxRecommendedParallelism") int 
maxRecommendedParallelism) {
+                @JsonProperty("recommendedParallelisms")
+                        LinkedList<RecommendedParallelism> 
recommendedParallelisms) {
             this.firstTriggerTime = firstTriggerTime;
-            this.maxRecommendedParallelism = maxRecommendedParallelism;
+            this.recommendedParallelisms = recommendedParallelisms;
+        }
+
+        /** Record current recommended parallelism. */
+        public void recordRecommendedParallelism(Instant triggerTime, int 
parallelism) {
+
+            // Remove all recommended parallelisms that are lower than the 
latest parallelism.
+            while (!recommendedParallelisms.isEmpty()
+                    && recommendedParallelisms.peekLast().getParallelism() <= 
parallelism) {
+                recommendedParallelisms.pollLast();
+            }

Review Comment:
   This is a good optimization 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to