mxm commented on code in PR #920:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/920#discussion_r1858997326


##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java:
##########
@@ -289,21 +274,26 @@ private ParallelismChange applyScaleDownInterval(
         var scaleDownInterval = conf.get(SCALE_DOWN_INTERVAL);
         if (scaleDownInterval.toMillis() <= 0) {
             // The scale down interval is disable, so don't block scaling.
-            return ParallelismChange.required(newParallelism);
-        }
-
-        var firstTriggerTime = 
delayedScaleDown.getFirstTriggerTimeForVertex(vertex);
-        if (firstTriggerTime.isEmpty()) {
-            LOG.info("The scale down of {} is delayed by {}.", vertex, 
scaleDownInterval);
-            delayedScaleDown.updateTriggerTime(vertex, clock.instant());
-            return ParallelismChange.optional(newParallelism);
+            return ParallelismChange.build(newParallelism);
         }
 
-        if 
(clock.instant().isBefore(firstTriggerTime.get().plus(scaleDownInterval))) {
-            LOG.debug("Try to skip immediate scale down within scale-down 
interval for {}", vertex);
-            return ParallelismChange.optional(newParallelism);
+        var now = clock.instant();
+        var delayedScaleDownInfo = delayedScaleDown.triggerScaleDown(vertex, 
now, newParallelism);
+
+        // Never scale down within scale down interval
+        if 
(now.isBefore(delayedScaleDownInfo.getFirstTriggerTime().plus(scaleDownInterval)))
 {
+            if (now.equals(delayedScaleDownInfo.getFirstTriggerTime())) {
+                LOG.info("The scale down of {} is delayed by {}.", vertex, 
scaleDownInterval);
+            } else {
+                LOG.debug(
+                        "Try to skip immediate scale down within scale-down 
interval for {}",
+                        vertex);
+            }
+            return ParallelismChange.noChange();
         } else {
-            return ParallelismChange.required(newParallelism);
+            // Using the maximum parallelism within the scale down interval 
window instead of the
+            // latest parallelism when scaling down
+            return 
ParallelismChange.build(delayedScaleDownInfo.getMaxRecommendedParallelism());

Review Comment:
   Can we make sure though, that the recommended scale down parallelism is not 
higher than the current one? This could theoretically happen if the recommended 
parallelism is higher than the current parallelism during the scale down period.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to