mxm commented on code in PR #920:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/920#discussion_r1858968749


##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java:
##########
@@ -181,15 +178,15 @@ private void updateRecommendedParallelism(
     }
 
     @VisibleForTesting
-    static boolean allRequiredVerticesWithinUtilizationTarget(
+    static boolean allChangedVerticesWithinUtilizationTarget(
             Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> 
evaluatedMetrics,
-            Set<JobVertexID> requiredVertices) {
-        // All vertices' ParallelismChange is optional, rescaling will be 
ignored.
-        if (requiredVertices.isEmpty()) {
+            Set<JobVertexID> changedVertices) {
+        // No any vertex is changed.
+        if (changedVertices.isEmpty()) {
             return true;
         }
 
-        for (JobVertexID vertex : requiredVertices) {
+        for (var vertex : changedVertices) {

Review Comment:
   NIT Why this change? Makes the loop harder to read.



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java:
##########
@@ -289,21 +274,26 @@ private ParallelismChange applyScaleDownInterval(
         var scaleDownInterval = conf.get(SCALE_DOWN_INTERVAL);
         if (scaleDownInterval.toMillis() <= 0) {
             // The scale down interval is disable, so don't block scaling.
-            return ParallelismChange.required(newParallelism);
-        }
-
-        var firstTriggerTime = 
delayedScaleDown.getFirstTriggerTimeForVertex(vertex);
-        if (firstTriggerTime.isEmpty()) {
-            LOG.info("The scale down of {} is delayed by {}.", vertex, 
scaleDownInterval);
-            delayedScaleDown.updateTriggerTime(vertex, clock.instant());
-            return ParallelismChange.optional(newParallelism);
+            return ParallelismChange.build(newParallelism);
         }
 
-        if 
(clock.instant().isBefore(firstTriggerTime.get().plus(scaleDownInterval))) {
-            LOG.debug("Try to skip immediate scale down within scale-down 
interval for {}", vertex);
-            return ParallelismChange.optional(newParallelism);
+        var now = clock.instant();
+        var delayedScaleDownInfo = delayedScaleDown.triggerScaleDown(vertex, 
now, newParallelism);
+
+        // Never scale down within scale down interval
+        if 
(now.isBefore(delayedScaleDownInfo.getFirstTriggerTime().plus(scaleDownInterval)))
 {

Review Comment:
   Could we rename this method to `getFirstScaledUpTime()`?



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java:
##########
@@ -181,15 +178,15 @@ private void updateRecommendedParallelism(
     }
 
     @VisibleForTesting
-    static boolean allRequiredVerticesWithinUtilizationTarget(
+    static boolean allChangedVerticesWithinUtilizationTarget(
             Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> 
evaluatedMetrics,
-            Set<JobVertexID> requiredVertices) {
-        // All vertices' ParallelismChange is optional, rescaling will be 
ignored.
-        if (requiredVertices.isEmpty()) {
+            Set<JobVertexID> changedVertices) {
+        // No any vertex is changed.

Review Comment:
   ```suggestion
           // No vertices with changed parallelism.
   ```



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java:
##########
@@ -289,21 +274,26 @@ private ParallelismChange applyScaleDownInterval(
         var scaleDownInterval = conf.get(SCALE_DOWN_INTERVAL);
         if (scaleDownInterval.toMillis() <= 0) {
             // The scale down interval is disable, so don't block scaling.
-            return ParallelismChange.required(newParallelism);
-        }
-
-        var firstTriggerTime = 
delayedScaleDown.getFirstTriggerTimeForVertex(vertex);
-        if (firstTriggerTime.isEmpty()) {
-            LOG.info("The scale down of {} is delayed by {}.", vertex, 
scaleDownInterval);
-            delayedScaleDown.updateTriggerTime(vertex, clock.instant());
-            return ParallelismChange.optional(newParallelism);
+            return ParallelismChange.build(newParallelism);
         }
 
-        if 
(clock.instant().isBefore(firstTriggerTime.get().plus(scaleDownInterval))) {
-            LOG.debug("Try to skip immediate scale down within scale-down 
interval for {}", vertex);
-            return ParallelismChange.optional(newParallelism);
+        var now = clock.instant();
+        var delayedScaleDownInfo = delayedScaleDown.triggerScaleDown(vertex, 
now, newParallelism);
+
+        // Never scale down within scale down interval
+        if 
(now.isBefore(delayedScaleDownInfo.getFirstTriggerTime().plus(scaleDownInterval)))
 {
+            if (now.equals(delayedScaleDownInfo.getFirstTriggerTime())) {
+                LOG.info("The scale down of {} is delayed by {}.", vertex, 
scaleDownInterval);
+            } else {
+                LOG.debug(
+                        "Try to skip immediate scale down within scale-down 
interval for {}",
+                        vertex);
+            }
+            return ParallelismChange.noChange();
         } else {
-            return ParallelismChange.required(newParallelism);
+            // Using the maximum parallelism within the scale down interval 
window instead of the
+            // latest parallelism when scaling down
+            return 
ParallelismChange.build(delayedScaleDownInfo.getMaxRecommendedParallelism());

Review Comment:
   This is actually quite smart. It's extending the metric window to the 
duration of the scale down interval.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to