This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit 6e16bac00caf631ac6f54cdd8f06c67bfdd82255
Author: Rui Fan <fan...@apache.org>
AuthorDate: Fri Dec 6 16:41:54 2024 +0800

    [FLINK-36863][autoscaler] Use the maximum parallelism in the past 
scale-down.interval window when scaling down
---
 .../apache/flink/autoscaler/DelayedScaleDown.java  | 104 ++++++-
 .../apache/flink/autoscaler/JobVertexScaler.java   |  77 ++++-
 .../apache/flink/autoscaler/ScalingExecutor.java   |  51 +---
 .../flink/autoscaler/DelayedScaleDownTest.java     | 173 ++++++++++-
 .../flink/autoscaler/JobVertexScalerTest.java      |  52 ++--
 .../flink/autoscaler/ScalingExecutorTest.java      | 319 +++++++++++++--------
 .../state/AbstractAutoScalerStateStoreTest.java    |   5 +-
 7 files changed, 544 insertions(+), 237 deletions(-)

diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/DelayedScaleDown.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/DelayedScaleDown.java
index b5a0dba7..489bbc9f 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/DelayedScaleDown.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/DelayedScaleDown.java
@@ -30,23 +30,96 @@ import javax.annotation.Nonnull;
 
 import java.time.Instant;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.Map;
 
+import static org.apache.flink.util.Preconditions.checkState;
+
 /** All delayed scale down requests. */
 public class DelayedScaleDown {
 
+    /** Details of the recommended parallelism. */
+    @Data
+    public static class RecommendedParallelism {
+        @Nonnull private final Instant triggerTime;
+        private final int parallelism;
+        private final boolean outsideUtilizationBound;
+
+        @JsonCreator
+        public RecommendedParallelism(
+                @Nonnull @JsonProperty("triggerTime") Instant triggerTime,
+                @JsonProperty("parallelism") int parallelism,
+                @JsonProperty("outsideUtilizationBound") boolean 
outsideUtilizationBound) {
+            this.triggerTime = triggerTime;
+            this.parallelism = parallelism;
+            this.outsideUtilizationBound = outsideUtilizationBound;
+        }
+    }
+
     /** The delayed scale down info for vertex. */
     @Data
     public static class VertexDelayedScaleDownInfo {
         private final Instant firstTriggerTime;
-        private int maxRecommendedParallelism;
+
+        /**
+         * In theory, it maintains all recommended parallelisms at each time 
within the past
+         * `scale-down.interval` window period, so all recommended 
parallelisms before the window
+         * start time will be evicted.
+         *
+         * <p>Also, if latest parallelism is greater than the past 
parallelism, all smaller
+         * parallelism in the past never be the max recommended parallelism, 
so we could evict all
+         * smaller parallelism in the past. It's a general optimization for 
calculating max value
+         * for sliding window. So we only need to maintain a list with 
monotonically decreasing
+         * parallelism within the past window, and the first parallelism will 
be the max recommended
+         * parallelism within the past `scale-down.interval` window period.
+         */
+        private final LinkedList<RecommendedParallelism> 
recommendedParallelisms;
+
+        public VertexDelayedScaleDownInfo(Instant firstTriggerTime) {
+            this.firstTriggerTime = firstTriggerTime;
+            this.recommendedParallelisms = new LinkedList<>();
+        }
 
         @JsonCreator
         public VertexDelayedScaleDownInfo(
                 @JsonProperty("firstTriggerTime") Instant firstTriggerTime,
-                @JsonProperty("maxRecommendedParallelism") int 
maxRecommendedParallelism) {
+                @JsonProperty("recommendedParallelisms")
+                        LinkedList<RecommendedParallelism> 
recommendedParallelisms) {
             this.firstTriggerTime = firstTriggerTime;
-            this.maxRecommendedParallelism = maxRecommendedParallelism;
+            this.recommendedParallelisms = recommendedParallelisms;
+        }
+
+        /** Record current recommended parallelism. */
+        public void recordRecommendedParallelism(
+                Instant triggerTime, int parallelism, boolean 
outsideUtilizationBound) {
+            // Evict all recommended parallelisms that are lower than or equal 
to the latest
+            // parallelism. When the past parallelism is equal to the latest 
parallelism,
+            // triggerTime needs to be updated, so it also needs to be evicted.
+            while (!recommendedParallelisms.isEmpty()
+                    && recommendedParallelisms.peekLast().getParallelism() <= 
parallelism) {
+                recommendedParallelisms.pollLast();
+            }
+
+            recommendedParallelisms.addLast(
+                    new RecommendedParallelism(triggerTime, parallelism, 
outsideUtilizationBound));
+        }
+
+        @JsonIgnore
+        public RecommendedParallelism getMaxRecommendedParallelism(Instant 
windowStartTime) {
+            // Evict all recommended parallelisms before the window start time.
+            while (!recommendedParallelisms.isEmpty()
+                    && recommendedParallelisms
+                            .peekFirst()
+                            .getTriggerTime()
+                            .isBefore(windowStartTime)) {
+                recommendedParallelisms.pollFirst();
+            }
+
+            var maxRecommendedParallelism = 
recommendedParallelisms.peekFirst();
+            checkState(
+                    maxRecommendedParallelism != null,
+                    "The getMaxRecommendedParallelism should be called after 
triggering a scale down, it may be a bug.");
+            return maxRecommendedParallelism;
         }
     }
 
@@ -63,18 +136,19 @@ public class DelayedScaleDown {
     /** Trigger a scale down, and return the corresponding {@link 
VertexDelayedScaleDownInfo}. */
     @Nonnull
     public VertexDelayedScaleDownInfo triggerScaleDown(
-            JobVertexID vertex, Instant triggerTime, int parallelism) {
-        var vertexDelayedScaleDownInfo = delayedVertices.get(vertex);
-        if (vertexDelayedScaleDownInfo == null) {
-            // It's the first trigger
-            vertexDelayedScaleDownInfo = new 
VertexDelayedScaleDownInfo(triggerTime, parallelism);
-            delayedVertices.put(vertex, vertexDelayedScaleDownInfo);
-            updated = true;
-        } else if (parallelism > 
vertexDelayedScaleDownInfo.getMaxRecommendedParallelism()) {
-            // Not the first trigger, but the maxRecommendedParallelism needs 
to be updated.
-            
vertexDelayedScaleDownInfo.setMaxRecommendedParallelism(parallelism);
-            updated = true;
-        }
+            JobVertexID vertex,
+            Instant triggerTime,
+            int parallelism,
+            boolean outsideUtilizationBound) {
+        // The vertexDelayedScaleDownInfo is updated once scale down is 
triggered due to we need
+        // update the triggerTime each time.
+        updated = true;
+
+        var vertexDelayedScaleDownInfo =
+                delayedVertices.computeIfAbsent(
+                        vertex, k -> new 
VertexDelayedScaleDownInfo(triggerTime));
+        vertexDelayedScaleDownInfo.recordRecommendedParallelism(
+                triggerTime, parallelism, outsideUtilizationBound);
 
         return vertexDelayedScaleDownInfo;
     }
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
index 41075b7f..4c185f89 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
@@ -56,6 +56,8 @@ import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESS
 import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
 import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.NUM_SOURCE_PARTITIONS;
 import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM;
+import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD;
+import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD;
 import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
 import static org.apache.flink.autoscaler.topology.ShipStrategy.HASH;
 import static org.apache.flink.configuration.description.TextElement.text;
@@ -92,12 +94,15 @@ public class JobVertexScaler<KEY, Context extends 
JobAutoScalerContext<KEY>> {
     @Getter
     public static class ParallelismChange {
 
-        private static final ParallelismChange NO_CHANGE = new 
ParallelismChange(-1);
+        private static final ParallelismChange NO_CHANGE = new 
ParallelismChange(-1, false);
 
         private final int newParallelism;
 
-        private ParallelismChange(int newParallelism) {
+        private final boolean outsideUtilizationBound;
+
+        private ParallelismChange(int newParallelism, boolean 
outsideUtilizationBound) {
             this.newParallelism = newParallelism;
+            this.outsideUtilizationBound = outsideUtilizationBound;
         }
 
         public boolean isNoChange() {
@@ -113,24 +118,29 @@ public class JobVertexScaler<KEY, Context extends 
JobAutoScalerContext<KEY>> {
                 return false;
             }
             ParallelismChange that = (ParallelismChange) o;
-            return newParallelism == that.newParallelism;
+            return newParallelism == that.newParallelism
+                    && outsideUtilizationBound == that.outsideUtilizationBound;
         }
 
         @Override
         public int hashCode() {
-            return Objects.hash(newParallelism);
+            return Objects.hash(newParallelism, outsideUtilizationBound);
         }
 
         @Override
         public String toString() {
             return isNoChange()
                     ? "NoParallelismChange"
-                    : "ParallelismChange{newParallelism=" + newParallelism + 
'}';
+                    : "ParallelismChange{newParallelism="
+                            + newParallelism
+                            + ", outsideUtilizationBound="
+                            + outsideUtilizationBound
+                            + "}";
         }
 
-        public static ParallelismChange build(int newParallelism) {
+        public static ParallelismChange build(int newParallelism, boolean 
outsideUtilizationBound) {
             checkArgument(newParallelism > 0, "The parallelism should be 
greater than 0.");
-            return new ParallelismChange(newParallelism);
+            return new ParallelismChange(newParallelism, 
outsideUtilizationBound);
         }
 
         public static ParallelismChange noChange() {
@@ -239,6 +249,8 @@ public class JobVertexScaler<KEY, Context extends 
JobAutoScalerContext<KEY>> {
                 currentParallelism != newParallelism,
                 "The newParallelism is equal to currentParallelism, no scaling 
is needed. This is probably a bug.");
 
+        var outsideUtilizationBound = outsideUtilizationBound(vertex, 
evaluatedMetrics);
+
         var scaledUp = currentParallelism < newParallelism;
 
         if (scaledUp) {
@@ -248,7 +260,7 @@ public class JobVertexScaler<KEY, Context extends 
JobAutoScalerContext<KEY>> {
 
             // If we don't have past scaling actions for this vertex, don't 
block scale up.
             if (history.isEmpty()) {
-                return ParallelismChange.build(newParallelism);
+                return ParallelismChange.build(newParallelism, 
outsideUtilizationBound);
             }
 
             var lastSummary = history.get(history.lastKey());
@@ -260,28 +272,59 @@ public class JobVertexScaler<KEY, Context extends 
JobAutoScalerContext<KEY>> {
                 return ParallelismChange.noChange();
             }
 
-            return ParallelismChange.build(newParallelism);
+            return ParallelismChange.build(newParallelism, 
outsideUtilizationBound);
+        } else {
+            return applyScaleDownInterval(
+                    delayedScaleDown, vertex, conf, newParallelism, 
outsideUtilizationBound);
+        }
+    }
+
+    private static boolean outsideUtilizationBound(
+            JobVertexID vertex, Map<ScalingMetric, EvaluatedScalingMetric> 
metrics) {
+        double trueProcessingRate = 
metrics.get(TRUE_PROCESSING_RATE).getAverage();
+        double scaleUpRateThreshold = 
metrics.get(SCALE_UP_RATE_THRESHOLD).getCurrent();
+        double scaleDownRateThreshold = 
metrics.get(SCALE_DOWN_RATE_THRESHOLD).getCurrent();
+
+        if (trueProcessingRate < scaleUpRateThreshold
+                || trueProcessingRate > scaleDownRateThreshold) {
+            LOG.debug(
+                    "Vertex {} processing rate {} is outside ({}, {})",
+                    vertex,
+                    trueProcessingRate,
+                    scaleUpRateThreshold,
+                    scaleDownRateThreshold);
+            return true;
         } else {
-            return applyScaleDownInterval(delayedScaleDown, vertex, conf, 
newParallelism);
+            LOG.debug(
+                    "Vertex {} processing rate {} is within target ({}, {})",
+                    vertex,
+                    trueProcessingRate,
+                    scaleUpRateThreshold,
+                    scaleDownRateThreshold);
         }
+        return false;
     }
 
     private ParallelismChange applyScaleDownInterval(
             DelayedScaleDown delayedScaleDown,
             JobVertexID vertex,
             Configuration conf,
-            int newParallelism) {
+            int newParallelism,
+            boolean outsideUtilizationBound) {
         var scaleDownInterval = conf.get(SCALE_DOWN_INTERVAL);
         if (scaleDownInterval.toMillis() <= 0) {
             // The scale down interval is disable, so don't block scaling.
-            return ParallelismChange.build(newParallelism);
+            return ParallelismChange.build(newParallelism, 
outsideUtilizationBound);
         }
 
         var now = clock.instant();
-        var delayedScaleDownInfo = delayedScaleDown.triggerScaleDown(vertex, 
now, newParallelism);
+        var windowStartTime = now.minus(scaleDownInterval);
+        var delayedScaleDownInfo =
+                delayedScaleDown.triggerScaleDown(
+                        vertex, now, newParallelism, outsideUtilizationBound);
 
         // Never scale down within scale down interval
-        if 
(now.isBefore(delayedScaleDownInfo.getFirstTriggerTime().plus(scaleDownInterval)))
 {
+        if 
(windowStartTime.isBefore(delayedScaleDownInfo.getFirstTriggerTime())) {
             if (now.equals(delayedScaleDownInfo.getFirstTriggerTime())) {
                 LOG.info("The scale down of {} is delayed by {}.", vertex, 
scaleDownInterval);
             } else {
@@ -293,7 +336,11 @@ public class JobVertexScaler<KEY, Context extends 
JobAutoScalerContext<KEY>> {
         } else {
             // Using the maximum parallelism within the scale down interval 
window instead of the
             // latest parallelism when scaling down
-            return 
ParallelismChange.build(delayedScaleDownInfo.getMaxRecommendedParallelism());
+            var maxRecommendedParallelism =
+                    
delayedScaleDownInfo.getMaxRecommendedParallelism(windowStartTime);
+            return ParallelismChange.build(
+                    maxRecommendedParallelism.getParallelism(),
+                    maxRecommendedParallelism.isOutsideUtilizationBound());
         }
     }
 
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
index af8384c4..a9b04276 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
@@ -47,8 +47,8 @@ import java.time.Instant;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Set;
 import java.util.SortedMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.EXCLUDED_PERIODS;
 import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_ENABLED;
@@ -57,9 +57,6 @@ import static 
org.apache.flink.autoscaler.event.AutoScalerEventHandler.SCALING_E
 import static 
org.apache.flink.autoscaler.event.AutoScalerEventHandler.SCALING_SUMMARY_HEADER_SCALING_EXECUTION_DISABLED;
 import static 
org.apache.flink.autoscaler.event.AutoScalerEventHandler.SCALING_SUMMARY_HEADER_SCALING_EXECUTION_ENABLED;
 import static 
org.apache.flink.autoscaler.metrics.ScalingHistoryUtils.addToScalingHistoryAndStore;
-import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD;
-import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD;
-import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
 
 /** Class responsible for executing scaling decisions. */
 public class ScalingExecutor<KEY, Context extends JobAutoScalerContext<KEY>> {
@@ -178,44 +175,6 @@ public class ScalingExecutor<KEY, Context extends 
JobAutoScalerContext<KEY>> {
                                                 
scalingSummary.getNewParallelism())));
     }
 
-    @VisibleForTesting
-    static boolean allChangedVerticesWithinUtilizationTarget(
-            Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> 
evaluatedMetrics,
-            Set<JobVertexID> changedVertices) {
-        // No vertices with changed parallelism.
-        if (changedVertices.isEmpty()) {
-            return true;
-        }
-
-        for (JobVertexID vertex : changedVertices) {
-            var metrics = evaluatedMetrics.get(vertex);
-
-            double trueProcessingRate = 
metrics.get(TRUE_PROCESSING_RATE).getAverage();
-            double scaleUpRateThreshold = 
metrics.get(SCALE_UP_RATE_THRESHOLD).getCurrent();
-            double scaleDownRateThreshold = 
metrics.get(SCALE_DOWN_RATE_THRESHOLD).getCurrent();
-
-            if (trueProcessingRate < scaleUpRateThreshold
-                    || trueProcessingRate > scaleDownRateThreshold) {
-                LOG.debug(
-                        "Vertex {} processing rate {} is outside ({}, {})",
-                        vertex,
-                        trueProcessingRate,
-                        scaleUpRateThreshold,
-                        scaleDownRateThreshold);
-                return false;
-            } else {
-                LOG.debug(
-                        "Vertex {} processing rate {} is within target ({}, 
{})",
-                        vertex,
-                        trueProcessingRate,
-                        scaleUpRateThreshold,
-                        scaleDownRateThreshold);
-            }
-        }
-        LOG.info("All vertex processing rates are within target.");
-        return true;
-    }
-
     @VisibleForTesting
     Map<JobVertexID, ScalingSummary> computeScalingSummary(
             Context context,
@@ -235,6 +194,7 @@ public class ScalingExecutor<KEY, Context extends 
JobAutoScalerContext<KEY>> {
 
         var excludeVertexIdList =
                 
context.getConfiguration().get(AutoScalerOptions.VERTEX_EXCLUDE_IDS);
+        AtomicBoolean anyVertexOutsideBound = new AtomicBoolean(false);
         evaluatedMetrics
                 .getVertexMetrics()
                 .forEach(
@@ -260,6 +220,9 @@ public class ScalingExecutor<KEY, Context extends 
JobAutoScalerContext<KEY>> {
                                 if (parallelismChange.isNoChange()) {
                                     return;
                                 }
+                                if 
(parallelismChange.isOutsideUtilizationBound()) {
+                                    anyVertexOutsideBound.set(true);
+                                }
                                 out.put(
                                         v,
                                         new ScalingSummary(
@@ -270,8 +233,8 @@ public class ScalingExecutor<KEY, Context extends 
JobAutoScalerContext<KEY>> {
                         });
 
         // If the Utilization of all tasks is within range, we can skip 
scaling.
-        if (allChangedVerticesWithinUtilizationTarget(
-                evaluatedMetrics.getVertexMetrics(), out.keySet())) {
+        if (!anyVertexOutsideBound.get()) {
+            LOG.info("All vertex processing rates are within target.");
             return Map.of();
         }
 
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/DelayedScaleDownTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/DelayedScaleDownTest.java
index f27ad5b0..8adc62bb 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/DelayedScaleDownTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/DelayedScaleDownTest.java
@@ -24,12 +24,153 @@ import org.junit.jupiter.api.Test;
 import java.time.Instant;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Test for {@link DelayedScaleDown}. */
 public class DelayedScaleDownTest {
 
     private final JobVertexID vertex = new JobVertexID();
 
+    @Test
+    void testWrongWindowStartTime() {
+        var instant = Instant.now();
+        var delayedScaleDown = new DelayedScaleDown();
+
+        // First trigger time as the trigger time, and it won't be updated.
+        var vertexDelayedScaleDownInfo =
+                delayedScaleDown.triggerScaleDown(vertex, instant, 5, true);
+        assertVertexDelayedScaleDownInfo(
+                vertexDelayedScaleDownInfo,
+                instant,
+                new DelayedScaleDown.RecommendedParallelism(instant, 5, true),
+                instant);
+
+        // Get the max recommended parallelism from a wrong window, and no any 
recommended
+        // parallelism since the start window.
+        assertThatThrownBy(
+                        () ->
+                                
vertexDelayedScaleDownInfo.getMaxRecommendedParallelism(
+                                        instant.plusSeconds(1)))
+                .isInstanceOf(IllegalStateException.class);
+    }
+
+    @Test
+    void testMaxRecommendedParallelismForInitialWindow() {
+        var instant = Instant.now();
+        var delayedScaleDown = new DelayedScaleDown();
+        assertThat(delayedScaleDown.isUpdated()).isFalse();
+
+        delayedScaleDown.triggerScaleDown(vertex, instant, 5, true);
+        delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(1), 9, 
false);
+        delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(2), 4, 
true);
+        delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(3), 5, 
true);
+        var vertexDelayedScaleDownInfo =
+                delayedScaleDown.triggerScaleDown(vertex, 
instant.plusSeconds(4), 6, true);
+
+        // [5, 9, 4, 5, 6] -> 9
+        assertVertexDelayedScaleDownInfo(
+                vertexDelayedScaleDownInfo,
+                instant,
+                new 
DelayedScaleDown.RecommendedParallelism(instant.plusSeconds(1), 9, false),
+                instant);
+        // 5, [9, 4, 5, 6] -> 9
+        assertVertexDelayedScaleDownInfo(
+                vertexDelayedScaleDownInfo,
+                instant,
+                new 
DelayedScaleDown.RecommendedParallelism(instant.plusSeconds(1), 9, false),
+                instant.plusSeconds(1));
+        // 5, 9, [4, 5, 6] -> 6
+        assertVertexDelayedScaleDownInfo(
+                vertexDelayedScaleDownInfo,
+                instant,
+                new 
DelayedScaleDown.RecommendedParallelism(instant.plusSeconds(4), 6, true),
+                instant.plusSeconds(2));
+        // 5, 9, 4, [5, 6] -> 6
+        assertVertexDelayedScaleDownInfo(
+                vertexDelayedScaleDownInfo,
+                instant,
+                new 
DelayedScaleDown.RecommendedParallelism(instant.plusSeconds(4), 6, true),
+                instant.plusSeconds(3));
+        // 5, 9, 4, 5, [6] -> 6
+        assertVertexDelayedScaleDownInfo(
+                vertexDelayedScaleDownInfo,
+                instant,
+                new 
DelayedScaleDown.RecommendedParallelism(instant.plusSeconds(4), 6, true),
+                instant.plusSeconds(4));
+    }
+
+    @Test
+    void testMaxRecommendedParallelismForSlidingWindow() {
+        var instant = Instant.now();
+        var delayedScaleDown = new DelayedScaleDown();
+        assertThat(delayedScaleDown.isUpdated()).isFalse();
+
+        // [5] -> 5
+        assertVertexDelayedScaleDownInfo(
+                delayedScaleDown.triggerScaleDown(vertex, instant, 5, true),
+                instant,
+                new DelayedScaleDown.RecommendedParallelism(instant, 5, true),
+                instant);
+        // [5, 8] -> 8
+        assertVertexDelayedScaleDownInfo(
+                delayedScaleDown.triggerScaleDown(vertex, 
instant.plusSeconds(1), 8, false),
+                instant,
+                new 
DelayedScaleDown.RecommendedParallelism(instant.plusSeconds(1), 8, false),
+                instant);
+        // [5, 8, 6] -> 8
+        assertVertexDelayedScaleDownInfo(
+                delayedScaleDown.triggerScaleDown(vertex, 
instant.plusSeconds(2), 6, true),
+                instant,
+                new 
DelayedScaleDown.RecommendedParallelism(instant.plusSeconds(1), 8, false),
+                instant);
+        // [5, 8, 6, 4] -> 8
+        assertVertexDelayedScaleDownInfo(
+                delayedScaleDown.triggerScaleDown(vertex, 
instant.plusSeconds(3), 4, true),
+                instant,
+                new 
DelayedScaleDown.RecommendedParallelism(instant.plusSeconds(1), 8, false),
+                instant);
+        // 5, [8, 6, 4, 3] -> 8
+        assertVertexDelayedScaleDownInfo(
+                delayedScaleDown.triggerScaleDown(vertex, 
instant.plusSeconds(4), 3, true),
+                instant,
+                new 
DelayedScaleDown.RecommendedParallelism(instant.plusSeconds(1), 8, false),
+                instant.plusSeconds(1));
+        // 5, 8, [6, 4, 3, 3] -> 6
+        assertVertexDelayedScaleDownInfo(
+                delayedScaleDown.triggerScaleDown(vertex, 
instant.plusSeconds(5), 3, true),
+                instant,
+                new 
DelayedScaleDown.RecommendedParallelism(instant.plusSeconds(2), 6, true),
+                instant.plusSeconds(2));
+        // 5, 8, 6, [4, 3, 3, 3] -> 4
+        assertVertexDelayedScaleDownInfo(
+                delayedScaleDown.triggerScaleDown(vertex, 
instant.plusSeconds(6), 3, true),
+                instant,
+                new 
DelayedScaleDown.RecommendedParallelism(instant.plusSeconds(3), 4, true),
+                instant.plusSeconds(3));
+
+        // Check the timestamp of latest parallelism is maintained correctly 
when recommended
+        // parallelism is same.
+        // 5, 8, 6, 4, [3, 3, 3, 3] -> 3
+        // 5, 8, 6, 4, 3, [3, 3, 3] -> 3
+        // 5, 8, 6, 4, 3, 3, [3, 3] -> 3
+        // 5, 8, 6, 4, 3, 3, 3, [3] -> 3
+        var vertexDelayedScaleDownInfo =
+                delayedScaleDown.triggerScaleDown(vertex, 
instant.plusSeconds(7), 3, true);
+        for (int offset = 4; offset <= 7; offset++) {
+            assertVertexDelayedScaleDownInfo(
+                    vertexDelayedScaleDownInfo,
+                    instant,
+                    new 
DelayedScaleDown.RecommendedParallelism(instant.plusSeconds(7), 3, true),
+                    instant.plusSeconds(offset));
+        }
+        // 5, 8, 6, 4, 3, 3, 3, [3, 9] -> 9
+        assertVertexDelayedScaleDownInfo(
+                delayedScaleDown.triggerScaleDown(vertex, 
instant.plusSeconds(9), 9, false),
+                instant,
+                new 
DelayedScaleDown.RecommendedParallelism(instant.plusSeconds(9), 9, false),
+                instant.plusSeconds(7));
+    }
+
     @Test
     void testTriggerUpdateAndClean() {
         var instant = Instant.now();
@@ -38,40 +179,52 @@ public class DelayedScaleDownTest {
 
         // First trigger time as the trigger time, and it won't be updated.
         assertVertexDelayedScaleDownInfo(
-                delayedScaleDown.triggerScaleDown(vertex, instant, 5), 
instant, 5);
+                delayedScaleDown.triggerScaleDown(vertex, instant, 5, true),
+                instant,
+                new DelayedScaleDown.RecommendedParallelism(instant, 5, true),
+                instant);
         assertThat(delayedScaleDown.isUpdated()).isTrue();
 
         // The lower parallelism doesn't update the result
         assertVertexDelayedScaleDownInfo(
-                delayedScaleDown.triggerScaleDown(vertex, 
instant.plusSeconds(5), 3), instant, 5);
+                delayedScaleDown.triggerScaleDown(vertex, 
instant.plusSeconds(5), 3, true),
+                instant,
+                new DelayedScaleDown.RecommendedParallelism(instant, 5, true),
+                instant);
 
         // The higher parallelism will update the result
         assertVertexDelayedScaleDownInfo(
-                delayedScaleDown.triggerScaleDown(vertex, 
instant.plusSeconds(10), 8), instant, 8);
+                delayedScaleDown.triggerScaleDown(vertex, 
instant.plusSeconds(10), 8, true),
+                instant,
+                new 
DelayedScaleDown.RecommendedParallelism(instant.plusSeconds(10), 8, true),
+                instant);
 
         // The scale down could be re-triggered again after clean
         delayedScaleDown.clearVertex(vertex);
         assertThat(delayedScaleDown.getDelayedVertices()).isEmpty();
         assertVertexDelayedScaleDownInfo(
-                delayedScaleDown.triggerScaleDown(vertex, 
instant.plusSeconds(15), 4),
+                delayedScaleDown.triggerScaleDown(vertex, 
instant.plusSeconds(15), 4, true),
                 instant.plusSeconds(15),
-                4);
+                new 
DelayedScaleDown.RecommendedParallelism(instant.plusSeconds(15), 4, true),
+                instant);
 
         // The scale down could be re-triggered again after cleanAll
         delayedScaleDown.clearAll();
         assertThat(delayedScaleDown.getDelayedVertices()).isEmpty();
         assertVertexDelayedScaleDownInfo(
-                delayedScaleDown.triggerScaleDown(vertex, 
instant.plusSeconds(15), 2),
-                instant.plusSeconds(15),
-                2);
+                delayedScaleDown.triggerScaleDown(vertex, 
instant.plusSeconds(16), 2, true),
+                instant.plusSeconds(16),
+                new 
DelayedScaleDown.RecommendedParallelism(instant.plusSeconds(16), 2, true),
+                instant);
     }
 
     void assertVertexDelayedScaleDownInfo(
             DelayedScaleDown.VertexDelayedScaleDownInfo 
vertexDelayedScaleDownInfo,
             Instant expectedTriggerTime,
-            int expectedMaxRecommendedParallelism) {
+            DelayedScaleDown.RecommendedParallelism 
expectedMaxRecommendedParallelism,
+            Instant windowStartTime) {
         
assertThat(vertexDelayedScaleDownInfo.getFirstTriggerTime()).isEqualTo(expectedTriggerTime);
-        assertThat(vertexDelayedScaleDownInfo.getMaxRecommendedParallelism())
+        
assertThat(vertexDelayedScaleDownInfo.getMaxRecommendedParallelism(windowStartTime))
                 .isEqualTo(expectedMaxRecommendedParallelism);
     }
 }
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
index af01ce34..9cdc7159 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
@@ -104,7 +104,7 @@ public class JobVertexScalerTest {
         var delayedScaleDown = new DelayedScaleDown();
 
         assertEquals(
-                ParallelismChange.build(5),
+                ParallelismChange.build(5, true),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         op,
@@ -116,7 +116,7 @@ public class JobVertexScalerTest {
 
         conf.set(UTILIZATION_TARGET, .8);
         assertEquals(
-                ParallelismChange.build(8),
+                ParallelismChange.build(8, false),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         op,
@@ -140,7 +140,7 @@ public class JobVertexScalerTest {
 
         conf.set(UTILIZATION_TARGET, .8);
         assertEquals(
-                ParallelismChange.build(8),
+                ParallelismChange.build(8, false),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         op,
@@ -151,7 +151,7 @@ public class JobVertexScalerTest {
                         delayedScaleDown));
 
         assertEquals(
-                ParallelismChange.build(8),
+                ParallelismChange.build(8, false),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         op,
@@ -163,7 +163,7 @@ public class JobVertexScalerTest {
 
         conf.set(UTILIZATION_TARGET, 0.5);
         assertEquals(
-                ParallelismChange.build(10),
+                ParallelismChange.build(10, true),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         op,
@@ -175,7 +175,7 @@ public class JobVertexScalerTest {
 
         conf.set(UTILIZATION_TARGET, 0.6);
         assertEquals(
-                ParallelismChange.build(4),
+                ParallelismChange.build(4, true),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         op,
@@ -188,7 +188,7 @@ public class JobVertexScalerTest {
         conf.set(UTILIZATION_TARGET, 1.);
         conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.5);
         assertEquals(
-                ParallelismChange.build(5),
+                ParallelismChange.build(5, true),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         op,
@@ -200,7 +200,7 @@ public class JobVertexScalerTest {
 
         conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.6);
         assertEquals(
-                ParallelismChange.build(4),
+                ParallelismChange.build(4, true),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         op,
@@ -213,7 +213,7 @@ public class JobVertexScalerTest {
         conf.set(UTILIZATION_TARGET, 1.);
         conf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, 0.5);
         assertEquals(
-                ParallelismChange.build(15),
+                ParallelismChange.build(15, true),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         op,
@@ -225,7 +225,7 @@ public class JobVertexScalerTest {
 
         conf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, 0.6);
         assertEquals(
-                ParallelismChange.build(16),
+                ParallelismChange.build(16, true),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         op,
@@ -533,7 +533,7 @@ public class JobVertexScalerTest {
         var delayedScaleDown = new DelayedScaleDown();
 
         assertEquals(
-                ParallelismChange.build(5),
+                ParallelismChange.build(5, true),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         new JobVertexID(),
@@ -593,7 +593,7 @@ public class JobVertexScalerTest {
 
         var delayedScaleDown = new DelayedScaleDown();
 
-        assertParallelismChange(10, 50, 100, ParallelismChange.build(5), 
delayedScaleDown);
+        assertParallelismChange(10, 50, 100, ParallelismChange.build(5, true), 
delayedScaleDown);
     }
 
     @Test
@@ -625,7 +625,8 @@ public class JobVertexScalerTest {
         // interval.
         vertexScaler.setClock(
                 Clock.fixed(instant.plus(Duration.ofSeconds(60)), 
ZoneId.systemDefault()));
-        assertParallelismChange(100, 700, 1000, ParallelismChange.build(90), 
delayedScaleDown);
+        assertParallelismChange(
+                100, 700, 1000, ParallelismChange.build(90, false), 
delayedScaleDown);
     }
 
     @Test
@@ -650,7 +651,8 @@ public class JobVertexScalerTest {
         // Allow immediate scale up
         vertexScaler.setClock(
                 Clock.fixed(instant.plus(Duration.ofSeconds(12)), 
ZoneId.systemDefault()));
-        assertParallelismChange(100, 1700, 1000, ParallelismChange.build(170), 
delayedScaleDown);
+        assertParallelismChange(
+                100, 1700, 1000, ParallelismChange.build(170, true), 
delayedScaleDown);
         assertThat(delayedScaleDown.getDelayedVertices()).isEmpty();
     }
 
@@ -710,7 +712,7 @@ public class JobVertexScalerTest {
         var delayedScaleDown = new DelayedScaleDown();
 
         assertEquals(
-                ParallelismChange.build(10),
+                ParallelismChange.build(10, true),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         op,
@@ -725,7 +727,7 @@ public class JobVertexScalerTest {
         // Allow to scale higher if scaling was effective (80%)
         evaluated = evaluated(10, 180, 90);
         assertEquals(
-                ParallelismChange.build(20),
+                ParallelismChange.build(20, true),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         op,
@@ -767,7 +769,7 @@ public class JobVertexScalerTest {
         // Allow scale up if current parallelism doesnt match last (user 
rescaled manually)
         evaluated = evaluated(10, 180, 90);
         assertEquals(
-                ParallelismChange.build(20),
+                ParallelismChange.build(20, true),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         op,
@@ -780,7 +782,7 @@ public class JobVertexScalerTest {
         // Over 10%, effective
         evaluated = evaluated(20, 180, 100);
         assertEquals(
-                ParallelismChange.build(36),
+                ParallelismChange.build(36, true),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         op,
@@ -795,7 +797,7 @@ public class JobVertexScalerTest {
         conf.set(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED, 
false);
         evaluated = evaluated(20, 180, 90);
         assertEquals(
-                ParallelismChange.build(40),
+                ParallelismChange.build(40, true),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         op,
@@ -810,7 +812,7 @@ public class JobVertexScalerTest {
         // Allow scale down even if ineffective
         evaluated = evaluated(20, 45, 90);
         assertEquals(
-                ParallelismChange.build(10),
+                ParallelismChange.build(10, true),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         op,
@@ -835,7 +837,7 @@ public class JobVertexScalerTest {
         var delayedScaleDown = new DelayedScaleDown();
 
         assertEquals(
-                ParallelismChange.build(10),
+                ParallelismChange.build(10, true),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         jobVertexID,
@@ -850,7 +852,7 @@ public class JobVertexScalerTest {
         // Effective scale, no events triggered
         evaluated = evaluated(10, 180, 90);
         assertEquals(
-                ParallelismChange.build(20),
+                ParallelismChange.build(20, true),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         jobVertexID,
@@ -945,7 +947,7 @@ public class JobVertexScalerTest {
         // Test ineffective scaling switched off
         conf.set(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED, 
false);
         assertEquals(
-                ParallelismChange.build(40),
+                ParallelismChange.build(40, true),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         jobVertexID,
@@ -1092,7 +1094,7 @@ public class JobVertexScalerTest {
         var delayedScaleDown = new DelayedScaleDown();
         // partition limited
         assertEquals(
-                ParallelismChange.build(15),
+                ParallelismChange.build(15, true),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         jobVertexID,
@@ -1115,7 +1117,7 @@ public class JobVertexScalerTest {
         smallChangesForScaleFactor.put(
                 ScalingMetric.NUM_SOURCE_PARTITIONS, 
EvaluatedScalingMetric.of(15));
         assertEquals(
-                ParallelismChange.build(15),
+                ParallelismChange.build(15, true),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         jobVertexID,
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
index a3bc6746..1b1a10c1 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
@@ -50,7 +50,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -120,91 +119,104 @@ public class ScalingExecutorTest {
     }
 
     @Test
-    public void testUtilizationBoundariesForAllRequiredVertices() throws 
Exception {
+    public void testUtilizationBoundaries() {
         // Restart time should not affect utilization boundary
         var conf = context.getConfiguration();
         conf.set(AutoScalerOptions.RESTART_TIME, Duration.ZERO);
         conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ZERO);
+        conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO);
 
-        var op1 = new JobVertexID();
+        var sourceHexString = "0bfd135746ac8efb3cce668b12e16d3a";
+        var source = JobVertexID.fromHexString(sourceHexString);
+        var sinkHexString = "a6b7102b8d3e3a9564998c1ffeb5e2b7";
+        var sink = JobVertexID.fromHexString(sinkHexString);
+        var jobTopology =
+                new JobTopology(
+                        new VertexInfo(source, Map.of(), 10, 1000, false, 
null),
+                        new VertexInfo(sink, Map.of(source, HASH), 10, 1000, 
false, null));
 
+        // 0.7 is outside of the utilization bound [0.6 - 0.0, 0.6 + 0.0], so 
scaling happens
         conf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.6);
         conf.set(AutoScalerOptions.UTILIZATION_MAX, 0.6);
         conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.6);
+        var metrics =
+                new EvaluatedMetrics(
+                        Map.of(source, evaluated(10, 70, 100), sink, 
evaluated(10, 60, 100)),
+                        dummyGlobalMetrics);
+        assertThat(
+                        scalingExecutor.computeScalingSummary(
+                                context,
+                                metrics,
+                                Map.of(),
+                                Duration.ZERO,
+                                jobTopology,
+                                new DelayedScaleDown()))
+                .hasSize(1);
 
-        var evaluated = Map.of(op1, evaluated(1, 70, 100));
-        assertFalse(
-                ScalingExecutor.allChangedVerticesWithinUtilizationTarget(
-                        evaluated, evaluated.keySet()));
-
+        // Both of 0.7 and 0.6 are within the utilization bound [0.6 - 0.2, 
0.6 + 0.2], so scaling
+        // could be ignored.
+        conf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.6);
         conf.set(AutoScalerOptions.UTILIZATION_MAX, 0.8);
         conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.4);
-        evaluated = Map.of(op1, evaluated(1, 70, 100));
-        assertTrue(
-                ScalingExecutor.allChangedVerticesWithinUtilizationTarget(
-                        evaluated, evaluated.keySet()));
-        assertTrue(getScaledParallelism(stateStore, context).isEmpty());
-
-        var op2 = new JobVertexID();
-        evaluated =
-                Map.of(
-                        op1, evaluated(1, 70, 100),
-                        op2, evaluated(1, 85, 100));
+        metrics =
+                new EvaluatedMetrics(
+                        Map.of(source, evaluated(10, 70, 100), sink, 
evaluated(10, 60, 100)),
+                        dummyGlobalMetrics);
+        assertThat(
+                        scalingExecutor.computeScalingSummary(
+                                context,
+                                metrics,
+                                Map.of(),
+                                Duration.ZERO,
+                                jobTopology,
+                                new DelayedScaleDown()))
+                .isEmpty();
 
-        assertFalse(
-                ScalingExecutor.allChangedVerticesWithinUtilizationTarget(
-                        evaluated, evaluated.keySet()));
+        // 0.85 is outside of the utilization bound [0.6 - 0.2, 0.6 + 0.2], so 
scaling happens
+        metrics =
+                new EvaluatedMetrics(
+                        Map.of(source, evaluated(10, 70, 100), sink, 
evaluated(10, 85, 100)),
+                        dummyGlobalMetrics);
+        assertThat(
+                        scalingExecutor.computeScalingSummary(
+                                context,
+                                metrics,
+                                Map.of(),
+                                Duration.ZERO,
+                                jobTopology,
+                                new DelayedScaleDown()))
+                .hasSize(2);
 
-        evaluated =
-                Map.of(
-                        op1, evaluated(1, 70, 100),
-                        op2, evaluated(1, 70, 100));
-        assertTrue(
-                ScalingExecutor.allChangedVerticesWithinUtilizationTarget(
-                        evaluated, evaluated.keySet()));
+        // Both of 0.7 are within the utilization bound [0.6 - 0.2, 0.6 + 
0.2], so scaling could be
+        // ignored.
+        metrics =
+                new EvaluatedMetrics(
+                        Map.of(source, evaluated(10, 70, 100), sink, 
evaluated(10, 70, 100)),
+                        dummyGlobalMetrics);
+        assertThat(
+                        scalingExecutor.computeScalingSummary(
+                                context,
+                                metrics,
+                                Map.of(),
+                                Duration.ZERO,
+                                jobTopology,
+                                new DelayedScaleDown()))
+                .isEmpty();
 
         // Test with backlog based scaling
-        evaluated = Map.of(op1, evaluated(1, 70, 100, 15));
-        assertFalse(
-                ScalingExecutor.allChangedVerticesWithinUtilizationTarget(
-                        evaluated, evaluated.keySet()));
-    }
-
-    @Test
-    public void testUtilizationBoundariesWithOptionalVertex() {
-        // Restart time should not affect utilization boundary
-        var conf = context.getConfiguration();
-        conf.set(AutoScalerOptions.RESTART_TIME, Duration.ZERO);
-        conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ZERO);
-        var op1 = new JobVertexID();
-        var op2 = new JobVertexID();
-
-        // All vertices are optional
-        conf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.6);
-        conf.set(AutoScalerOptions.UTILIZATION_MAX, 0.6);
-        conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.6);
-
-        var evaluated =
-                Map.of(
-                        op1, evaluated(1, 70, 100),
-                        op2, evaluated(1, 85, 100));
-
-        
assertTrue(ScalingExecutor.allChangedVerticesWithinUtilizationTarget(evaluated, 
Set.of()));
-
-        // One vertex is required, and it's out of range.
-        assertFalse(
-                
ScalingExecutor.allChangedVerticesWithinUtilizationTarget(evaluated, 
Set.of(op1)));
-
-        // One vertex is required, and it's within the range.
-        // The op2 is optional, so it shouldn't affect the scaling even if it 
is out of range,
-        conf.set(AutoScalerOptions.UTILIZATION_MAX, 0.8);
-        conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.6);
-        evaluated =
-                Map.of(
-                        op1, evaluated(1, 65, 100),
-                        op2, evaluated(1, 85, 100));
-        assertTrue(
-                
ScalingExecutor.allChangedVerticesWithinUtilizationTarget(evaluated, 
Set.of(op1)));
+        metrics =
+                new EvaluatedMetrics(
+                        Map.of(source, evaluated(10, 70, 100, 15), sink, 
evaluated(10, 70, 100)),
+                        dummyGlobalMetrics);
+        assertThat(
+                        scalingExecutor.computeScalingSummary(
+                                context,
+                                metrics,
+                                Map.of(),
+                                Duration.ZERO,
+                                jobTopology,
+                                new DelayedScaleDown()))
+                .hasSize(2);
     }
 
     @Test
@@ -230,10 +242,6 @@ public class ScalingExecutorTest {
                         Map.of(vertex, evaluated(parallelism, targetRate, 
trueProcessingRate)),
                         dummyGlobalMetrics);
 
-        assertTrue(
-                ScalingExecutor.allChangedVerticesWithinUtilizationTarget(
-                        evaluated.getVertexMetrics(), 
evaluated.getVertexMetrics().keySet()));
-
         // Execute the full scaling path
         var now = Instant.now();
         var jobTopology =
@@ -260,83 +268,142 @@ public class ScalingExecutorTest {
         var conf = context.getConfiguration();
         conf.set(AutoScalerOptions.RESTART_TIME, Duration.ZERO);
         conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ZERO);
-        var op1 = new JobVertexID();
-        var op2 = new JobVertexID();
+        conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO);
+
+        var source = new JobVertexID();
+        var sink = new JobVertexID();
+        var jobTopology =
+                new JobTopology(
+                        new VertexInfo(source, Map.of(), 10, 1000, false, 
null),
+                        new VertexInfo(sink, Map.of(source, HASH), 10, 1000, 
false, null));
 
-        // All vertices are optional
+        // target 0.6, target boundary 0.1 -> max 0.7, min 0.5
         conf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.6);
         conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.1);
-        var evaluated =
-                Map.of(
-                        op1, evaluated(1, 70, 100),
-                        op2, evaluated(1, 85, 100));
 
-        // target boundary 0.1, target 0.6, max 0.7, min 0.5
-        boolean boundaryOp1 =
-                
ScalingExecutor.allChangedVerticesWithinUtilizationTarget(evaluated, 
Set.of(op1));
-        boolean boundaryOp2 =
-                
ScalingExecutor.allChangedVerticesWithinUtilizationTarget(evaluated, 
Set.of(op2));
+        var metrics =
+                new EvaluatedMetrics(
+                        Map.of(source, evaluated(10, 70, 100), sink, 
evaluated(10, 85, 100)),
+                        dummyGlobalMetrics);
+        assertThat(
+                        scalingExecutor.computeScalingSummary(
+                                context,
+                                metrics,
+                                Map.of(),
+                                Duration.ZERO,
+                                jobTopology,
+                                new DelayedScaleDown()))
+                .containsOnlyKeys(source, sink);
 
-        // Remove target boundary and use min max, should get the same result
+        // target 0.6, max 0.7, min 0.5
+        conf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.6);
         conf.removeConfig(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY);
         conf.set(AutoScalerOptions.UTILIZATION_MAX, 0.7);
         conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.5);
-        boolean minMaxOp1 =
-                
ScalingExecutor.allChangedVerticesWithinUtilizationTarget(evaluated, 
Set.of(op1));
-        boolean minMaxOp2 =
-                
ScalingExecutor.allChangedVerticesWithinUtilizationTarget(evaluated, 
Set.of(op2));
-        assertEquals(boundaryOp1, minMaxOp1);
-        assertEquals(boundaryOp2, minMaxOp2);
+        assertThat(
+                        scalingExecutor.computeScalingSummary(
+                                context,
+                                metrics,
+                                Map.of(),
+                                Duration.ZERO,
+                                jobTopology,
+                                new DelayedScaleDown()))
+                .containsOnlyKeys(source, sink);
 
         // When the target boundary parameter is used,
         // but the min max parameter is also set,
         // the min max parameter shall prevail.
+        // target 0.6, max 0.7, min 0.3
+        conf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.6);
         conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 1.);
         conf.set(AutoScalerOptions.UTILIZATION_MAX, 0.7);
         conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.3);
-        evaluated =
-                Map.of(
-                        op1, evaluated(2, 150, 100),
-                        op2, evaluated(1, 85, 100));
-        assertFalse(
-                ScalingExecutor.allChangedVerticesWithinUtilizationTarget(
-                        evaluated, evaluated.keySet()));
+        metrics =
+                new EvaluatedMetrics(
+                        Map.of(source, evaluated(10, 60, 100), sink, 
evaluated(10, 85, 100)),
+                        dummyGlobalMetrics);
+        assertThat(
+                        scalingExecutor.computeScalingSummary(
+                                context,
+                                metrics,
+                                Map.of(),
+                                Duration.ZERO,
+                                jobTopology,
+                                new DelayedScaleDown()))
+                .containsOnlyKeys(sink);
 
-        // When the target boundary parameter is used,
-        // but the max parameter is also set,
-        conf.removeConfig(AutoScalerOptions.UTILIZATION_MIN);
-        conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 1.);
+        // When the target boundary parameter and max parameter are set,
+        // we expect the min is derived from boundary.
+        // target 0.5, max 0.6, min 0
         conf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.5);
+        conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 1.);
         conf.set(AutoScalerOptions.UTILIZATION_MAX, 0.6);
+        conf.removeConfig(AutoScalerOptions.UTILIZATION_MIN);
 
-        evaluated =
-                Map.of(
-                        op1, evaluated(2, 100, 99999),
-                        op2, evaluated(1, 80, 99999));
-        assertTrue(
-                ScalingExecutor.allChangedVerticesWithinUtilizationTarget(
-                        evaluated, evaluated.keySet()));
+        metrics =
+                new EvaluatedMetrics(
+                        Map.of(source, evaluated(10, 100, 99999), sink, 
evaluated(10, 80, 99999)),
+                        dummyGlobalMetrics);
+        assertThat(
+                        scalingExecutor.computeScalingSummary(
+                                context,
+                                metrics,
+                                Map.of(),
+                                Duration.ZERO,
+                                jobTopology,
+                                new DelayedScaleDown()))
+                .isEmpty();
 
-        evaluated = Map.of(op2, evaluated(1, 85, 100));
-        assertFalse(
-                ScalingExecutor.allChangedVerticesWithinUtilizationTarget(
-                        evaluated, evaluated.keySet()));
+        metrics =
+                new EvaluatedMetrics(
+                        Map.of(source, evaluated(10, 50, 100), sink, 
evaluated(10, 85, 100)),
+                        dummyGlobalMetrics);
+        assertThat(
+                        scalingExecutor.computeScalingSummary(
+                                context,
+                                metrics,
+                                Map.of(),
+                                Duration.ZERO,
+                                jobTopology,
+                                new DelayedScaleDown()))
+                .containsOnlyKeys(sink);
 
+        // When the target boundary parameter and min parameter are set,
+        // we expect the max is derived from boundary.
+        // target 0.5, max 1.0, min 0.3
+        conf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.5);
+        conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 1.);
         conf.removeConfig(AutoScalerOptions.UTILIZATION_MAX);
         conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.3);
 
-        evaluated =
-                Map.of(
-                        op1, evaluated(2, 80, 81),
-                        op2, evaluated(1, 100, 101));
-        assertTrue(
-                ScalingExecutor.allChangedVerticesWithinUtilizationTarget(
-                        evaluated, evaluated.keySet()));
+        metrics =
+                new EvaluatedMetrics(
+                        Map.of(source, evaluated(10, 80, 81), sink, 
evaluated(10, 100, 101)),
+                        dummyGlobalMetrics);
 
-        evaluated = Map.of(op1, evaluated(1, 80, 79));
-        assertFalse(
-                ScalingExecutor.allChangedVerticesWithinUtilizationTarget(
-                        evaluated, evaluated.keySet()));
+        assertThat(
+                        scalingExecutor.computeScalingSummary(
+                                context,
+                                metrics,
+                                Map.of(),
+                                Duration.ZERO,
+                                jobTopology,
+                                new DelayedScaleDown()))
+                .isEmpty();
+
+        metrics =
+                new EvaluatedMetrics(
+                        Map.of(source, evaluated(10, 80, 79), sink, 
evaluated(10, 100, 101)),
+                        dummyGlobalMetrics);
+        assertThat(
+                        scalingExecutor.computeScalingSummary(
+                                context,
+                                metrics,
+                                Map.of(),
+                                Duration.ZERO,
+                                jobTopology,
+                                new DelayedScaleDown()))
+                .containsOnlyKeys(source, sink);
     }
 
     @Test
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/AbstractAutoScalerStateStoreTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/AbstractAutoScalerStateStoreTest.java
index d4e0db9a..da2fa097 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/AbstractAutoScalerStateStoreTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/AbstractAutoScalerStateStoreTest.java
@@ -207,8 +207,9 @@ public abstract class AbstractAutoScalerStateStoreTest<
         stateStore.storeScalingTracking(ctx, scalingTracking);
 
         var delayedScaleDown = new DelayedScaleDown();
-        delayedScaleDown.triggerScaleDown(new JobVertexID(), Instant.now(), 
10);
-        delayedScaleDown.triggerScaleDown(new JobVertexID(), 
Instant.now().plusSeconds(10), 12);
+        delayedScaleDown.triggerScaleDown(new JobVertexID(), Instant.now(), 
10, true);
+        delayedScaleDown.triggerScaleDown(
+                new JobVertexID(), Instant.now().plusSeconds(10), 12, true);
 
         stateStore.storeDelayedScaleDown(ctx, delayedScaleDown);
 

Reply via email to