This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 968a5785 [FLINK-36018][autoscaler] Support lazy scale down to avoid 
frequent rescaling (#875)
968a5785 is described below

commit 968a578515d6269bbd5637594a7a342d74c1cd5c
Author: Rui Fan <[email protected]>
AuthorDate: Fri Sep 6 16:55:18 2024 +0800

    [FLINK-36018][autoscaler] Support lazy scale down to avoid frequent 
rescaling (#875)
---
 .../generated/auto_scaler_configuration.html       |  12 +-
 e2e-tests/data/autoscaler.yaml                     |   1 +
 .../jdbc/state/JdbcAutoScalerStateStore.java       |  43 +++
 .../flink/autoscaler/jdbc/state/StateType.java     |   3 +-
 .../apache/flink/autoscaler/DelayedScaleDown.java  |  69 ++++
 .../apache/flink/autoscaler/JobAutoScalerImpl.java |  13 +-
 .../apache/flink/autoscaler/JobVertexScaler.java   | 184 ++++++++---
 .../apache/flink/autoscaler/ScalingExecutor.java   |  67 ++--
 .../flink/autoscaler/config/AutoScalerOptions.java |  15 +-
 .../autoscaler/state/AutoScalerStateStore.java     |   7 +
 .../state/InMemoryAutoScalerStateStore.java        |  23 +-
 .../flink/autoscaler/BacklogBasedScalingTest.java  |   2 +-
 .../flink/autoscaler/JobVertexScalerTest.java      | 350 +++++++++++++++------
 .../MetricsCollectionAndEvaluationTest.java        |  15 +-
 .../autoscaler/RecommendedParallelismTest.java     |   2 +-
 .../flink/autoscaler/ScalingExecutorTest.java      | 149 ++++++---
 .../state/AbstractAutoScalerStateStoreTest.java    |  30 ++
 .../state/KubernetesAutoScalerStateStore.java      |  47 +++
 18 files changed, 802 insertions(+), 230 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html 
b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
index d9bed101..49ff71ef 100644
--- a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
+++ b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
@@ -146,18 +146,18 @@
             <td>Duration</td>
             <td>Maximum cap for the observed restart time when 
'job.autoscaler.restart.time-tracking.enabled' is set to true.</td>
         </tr>
+        <tr>
+            <td><h5>job.autoscaler.scale-down.interval</h5></td>
+            <td style="word-wrap: break-word;">1 h</td>
+            <td>Duration</td>
+            <td>The delay time for scale down to be executed. If it is greater 
than 0, the scale down will be delayed. Delayed rescale can merge multiple 
scale downs within `scale-down.interval` into a scale down, thereby reducing 
the number of rescales. Reducing the frequency of job restarts can improve job 
availability. Scale down can be executed directly if it's less than or equal 
0.</td>
+        </tr>
         <tr>
             <td><h5>job.autoscaler.scale-down.max-factor</h5></td>
             <td style="word-wrap: break-word;">0.6</td>
             <td>Double</td>
             <td>Max scale down factor. 1 means no limit on scale down, 0.6 
means job can only be scaled down with 60% of the original parallelism.</td>
         </tr>
-        <tr>
-            <td><h5>job.autoscaler.scale-up.grace-period</h5></td>
-            <td style="word-wrap: break-word;">1 h</td>
-            <td>Duration</td>
-            <td>Duration in which no scale down of a vertex is allowed after 
it has been scaled up.</td>
-        </tr>
         <tr>
             <td><h5>job.autoscaler.scale-up.max-factor</h5></td>
             <td style="word-wrap: break-word;">100000.0</td>
diff --git a/e2e-tests/data/autoscaler.yaml b/e2e-tests/data/autoscaler.yaml
index 9deebe92..5e0b3043 100644
--- a/e2e-tests/data/autoscaler.yaml
+++ b/e2e-tests/data/autoscaler.yaml
@@ -40,6 +40,7 @@ spec:
     job.autoscaler.scaling.enabled: "true"
     job.autoscaler.stabilization.interval: "5s"
     job.autoscaler.metrics.window: "1m"
+    job.autoscaler.scale-down.interval: "0m"
 
 #    Invalid Validations for testing autoscaler configurations
 #    kubernetes.operator.job.autoscaler.scale-down.max-factor: "-0.6"
diff --git 
a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java
 
b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java
index 1224e0d4..5ac870fe 100644
--- 
a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java
+++ 
b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java
@@ -19,6 +19,7 @@ package org.apache.flink.autoscaler.jdbc.state;
 
 import org.apache.flink.annotation.Experimental;
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.autoscaler.DelayedScaleDown;
 import org.apache.flink.autoscaler.JobAutoScalerContext;
 import org.apache.flink.autoscaler.ScalingSummary;
 import org.apache.flink.autoscaler.ScalingTracking;
@@ -49,6 +50,7 @@ import java.util.TreeMap;
 
 import static 
org.apache.flink.autoscaler.jdbc.state.StateType.COLLECTED_METRICS;
 import static 
org.apache.flink.autoscaler.jdbc.state.StateType.CONFIG_OVERRIDES;
+import static 
org.apache.flink.autoscaler.jdbc.state.StateType.DELAYED_SCALE_DOWN;
 import static 
org.apache.flink.autoscaler.jdbc.state.StateType.PARALLELISM_OVERRIDES;
 import static org.apache.flink.autoscaler.jdbc.state.StateType.SCALING_HISTORY;
 import static 
org.apache.flink.autoscaler.jdbc.state.StateType.SCALING_TRACKING;
@@ -214,6 +216,35 @@ public class JdbcAutoScalerStateStore<KEY, Context extends 
JobAutoScalerContext<
         jdbcStateStore.removeSerializedState(getSerializeKey(jobContext), 
CONFIG_OVERRIDES);
     }
 
+    @Override
+    public void storeDelayedScaleDown(Context jobContext, DelayedScaleDown 
delayedScaleDown)
+            throws Exception {
+        jdbcStateStore.putSerializedState(
+                getSerializeKey(jobContext),
+                DELAYED_SCALE_DOWN,
+                serializeDelayedScaleDown(delayedScaleDown));
+    }
+
+    @Nonnull
+    @Override
+    public DelayedScaleDown getDelayedScaleDown(Context jobContext) {
+        Optional<String> delayedScaleDown =
+                jdbcStateStore.getSerializedState(getSerializeKey(jobContext), 
DELAYED_SCALE_DOWN);
+        if (delayedScaleDown.isEmpty()) {
+            return new DelayedScaleDown();
+        }
+
+        try {
+            return deserializeDelayedScaleDown(delayedScaleDown.get());
+        } catch (JacksonException e) {
+            LOG.error(
+                    "Could not deserialize delayed scale down, possibly the 
format changed. Discarding...",
+                    e);
+            jdbcStateStore.removeSerializedState(getSerializeKey(jobContext), 
DELAYED_SCALE_DOWN);
+            return new DelayedScaleDown();
+        }
+    }
+
     @Override
     public void clearAll(Context jobContext) {
         jdbcStateStore.clearAll(getSerializeKey(jobContext));
@@ -296,4 +327,16 @@ public class JdbcAutoScalerStateStore<KEY, Context extends 
JobAutoScalerContext<
             return null;
         }
     }
+
+    private static String serializeDelayedScaleDown(DelayedScaleDown 
delayedScaleDown)
+            throws JacksonException {
+        return 
YAML_MAPPER.writeValueAsString(delayedScaleDown.getFirstTriggerTime());
+    }
+
+    private static DelayedScaleDown deserializeDelayedScaleDown(String 
delayedScaleDown)
+            throws JacksonException {
+        Map<JobVertexID, Instant> firstTriggerTime =
+                YAML_MAPPER.readValue(delayedScaleDown, new TypeReference<>() 
{});
+        return new DelayedScaleDown(firstTriggerTime);
+    }
 }
diff --git 
a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/StateType.java
 
b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/StateType.java
index ca2864f4..fa3da3f4 100644
--- 
a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/StateType.java
+++ 
b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/StateType.java
@@ -27,7 +27,8 @@ public enum StateType {
     SCALING_TRACKING("scalingTracking"),
     COLLECTED_METRICS("collectedMetrics"),
     PARALLELISM_OVERRIDES("parallelismOverrides"),
-    CONFIG_OVERRIDES("configOverrides");
+    CONFIG_OVERRIDES("configOverrides"),
+    DELAYED_SCALE_DOWN("delayedScaleDown");
 
     /**
      * The identifier of each state type, it will be used to store. Please 
ensure the identifier is
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/DelayedScaleDown.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/DelayedScaleDown.java
new file mode 100644
index 00000000..c2402264
--- /dev/null
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/DelayedScaleDown.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import lombok.Getter;
+
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+/** All delayed scale down requests. */
+public class DelayedScaleDown {
+
+    @Getter private final Map<JobVertexID, Instant> firstTriggerTime;
+
+    // Have any scale down request been updated? It doesn't need to be stored, 
it is only used to
+    // determine whether DelayedScaleDown needs to be stored.
+    @Getter private boolean isUpdated = false;
+
+    public DelayedScaleDown() {
+        this.firstTriggerTime = new HashMap<>();
+    }
+
+    public DelayedScaleDown(Map<JobVertexID, Instant> firstTriggerTime) {
+        this.firstTriggerTime = firstTriggerTime;
+    }
+
+    Optional<Instant> getFirstTriggerTimeForVertex(JobVertexID vertex) {
+        return Optional.ofNullable(firstTriggerTime.get(vertex));
+    }
+
+    void updateTriggerTime(JobVertexID vertex, Instant instant) {
+        firstTriggerTime.put(vertex, instant);
+        isUpdated = true;
+    }
+
+    void clearVertex(JobVertexID vertex) {
+        Instant removed = firstTriggerTime.remove(vertex);
+        if (removed != null) {
+            isUpdated = true;
+        }
+    }
+
+    void clearAll() {
+        if (firstTriggerTime.isEmpty()) {
+            return;
+        }
+        firstTriggerTime.clear();
+        isUpdated = true;
+    }
+}
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
index 19a23d5a..f3af3baa 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
@@ -214,9 +214,20 @@ public class JobAutoScalerImpl<KEY, Context extends 
JobAutoScalerContext<KEY>>
             return;
         }
 
+        var delayedScaleDown = stateStore.getDelayedScaleDown(ctx);
         var parallelismChanged =
                 scalingExecutor.scaleResource(
-                        ctx, evaluatedMetrics, scalingHistory, 
scalingTracking, now, jobTopology);
+                        ctx,
+                        evaluatedMetrics,
+                        scalingHistory,
+                        scalingTracking,
+                        now,
+                        jobTopology,
+                        delayedScaleDown);
+
+        if (delayedScaleDown.isUpdated()) {
+            stateStore.storeDelayedScaleDown(ctx, delayedScaleDown);
+        }
 
         if (parallelismChanged) {
             autoscalerMetrics.incrementScaling();
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
index 7442532a..5a43ad36 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
@@ -28,6 +28,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.util.Preconditions;
 
+import lombok.Getter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,11 +38,12 @@ import java.time.Instant;
 import java.time.ZoneId;
 import java.util.Collection;
 import java.util.Map;
+import java.util.Objects;
 import java.util.SortedMap;
 
 import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
 import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_UP_FACTOR;
-import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.SCALE_UP_GRACE_PERIOD;
+import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.SCALE_DOWN_INTERVAL;
 import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_EVENT_INTERVAL;
 import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
 import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM;
@@ -51,6 +53,7 @@ import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
 import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM;
 import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
 import static org.apache.flink.autoscaler.topology.ShipStrategy.HASH;
+import static org.apache.flink.util.Preconditions.checkArgument;
 
 /** Component responsible for computing vertex parallelism based on the 
scaling metrics. */
 public class JobVertexScaler<KEY, Context extends JobAutoScalerContext<KEY>> {
@@ -71,13 +74,79 @@ public class JobVertexScaler<KEY, Context extends 
JobAutoScalerContext<KEY>> {
         this.autoScalerEventHandler = autoScalerEventHandler;
     }
 
-    public int computeScaleTargetParallelism(
+    /** The parallelism change type of {@link ParallelismChange}. */
+    public enum ParallelismChangeType {
+        NO_CHANGE,
+        REQUIRED_CHANGE,
+        OPTIONAL_CHANGE;
+    }
+
+    /**
+     * The rescaling will be triggered if any vertex's ParallelismChange is 
required. This means
+     * that if all vertices' ParallelismChange is optional, rescaling will be 
ignored.
+     */
+    @Getter
+    public static class ParallelismChange {
+
+        private static final ParallelismChange NO_CHANGE =
+                new ParallelismChange(ParallelismChangeType.NO_CHANGE, -1);
+
+        private final ParallelismChangeType changeType;
+        private final int newParallelism;
+
+        private ParallelismChange(ParallelismChangeType changeType, int 
newParallelism) {
+            this.changeType = changeType;
+            this.newParallelism = newParallelism;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            ParallelismChange that = (ParallelismChange) o;
+            return changeType == that.changeType && newParallelism == 
that.newParallelism;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(changeType, newParallelism);
+        }
+
+        @Override
+        public String toString() {
+            return "ParallelismChange{"
+                    + "changeType="
+                    + changeType
+                    + ", newParallelism="
+                    + newParallelism
+                    + '}';
+        }
+
+        public static ParallelismChange required(int newParallelism) {
+            return new 
ParallelismChange(ParallelismChangeType.REQUIRED_CHANGE, newParallelism);
+        }
+
+        public static ParallelismChange optional(int newParallelism) {
+            return new 
ParallelismChange(ParallelismChangeType.OPTIONAL_CHANGE, newParallelism);
+        }
+
+        public static ParallelismChange noChange() {
+            return NO_CHANGE;
+        }
+    }
+
+    public ParallelismChange computeScaleTargetParallelism(
             Context context,
             JobVertexID vertex,
             Collection<ShipStrategy> inputShipStrategies,
             Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
             SortedMap<Instant, ScalingSummary> history,
-            Duration restartTime) {
+            Duration restartTime,
+            DelayedScaleDown delayedScaleDown) {
         var conf = context.getConfiguration();
         var currentParallelism = (int) 
evaluatedMetrics.get(PARALLELISM).getCurrent();
         double averageTrueProcessingRate = 
evaluatedMetrics.get(TRUE_PROCESSING_RATE).getAverage();
@@ -85,7 +154,7 @@ public class JobVertexScaler<KEY, Context extends 
JobAutoScalerContext<KEY>> {
             LOG.warn(
                     "True processing rate is not available for {}, cannot 
compute new parallelism",
                     vertex);
-            return currentParallelism;
+            return ParallelismChange.noChange();
         }
 
         double targetCapacity =
@@ -95,7 +164,7 @@ public class JobVertexScaler<KEY, Context extends 
JobAutoScalerContext<KEY>> {
             LOG.warn(
                     "Target data rate is not available for {}, cannot compute 
new parallelism",
                     vertex);
-            return currentParallelism;
+            return ParallelismChange.noChange();
         }
 
         LOG.debug("Target processing capacity for {} is {}", vertex, 
targetCapacity);
@@ -131,65 +200,92 @@ public class JobVertexScaler<KEY, Context extends 
JobAutoScalerContext<KEY>> {
                         Math.min(currentParallelism, 
conf.getInteger(VERTEX_MIN_PARALLELISM)),
                         Math.max(currentParallelism, 
conf.getInteger(VERTEX_MAX_PARALLELISM)));
 
-        if (newParallelism == currentParallelism
-                || blockScalingBasedOnPastActions(
-                        context,
-                        vertex,
-                        conf,
-                        evaluatedMetrics,
-                        history,
-                        currentParallelism,
-                        newParallelism)) {
-            return currentParallelism;
+        if (newParallelism == currentParallelism) {
+            // Clear delayed scale down request if the new parallelism is 
equal to
+            // currentParallelism.
+            delayedScaleDown.clearVertex(vertex);
+            return ParallelismChange.noChange();
         }
 
         // We record our expectations for this scaling operation
         evaluatedMetrics.put(
                 ScalingMetric.EXPECTED_PROCESSING_RATE,
                 EvaluatedScalingMetric.of(cappedTargetCapacity));
-        return newParallelism;
+
+        return detectBlockScaling(
+                context,
+                vertex,
+                conf,
+                evaluatedMetrics,
+                history,
+                currentParallelism,
+                newParallelism,
+                delayedScaleDown);
     }
 
-    private boolean blockScalingBasedOnPastActions(
+    private ParallelismChange detectBlockScaling(
             Context context,
             JobVertexID vertex,
             Configuration conf,
             Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
             SortedMap<Instant, ScalingSummary> history,
             int currentParallelism,
-            int newParallelism) {
-
-        // If we don't have past scaling actions for this vertex, there is 
nothing to do
-        if (history.isEmpty()) {
-            return false;
-        }
-
-        boolean scaledUp = currentParallelism < newParallelism;
-        var lastScalingTs = history.lastKey();
-        var lastSummary = history.get(lastScalingTs);
+            int newParallelism,
+            DelayedScaleDown delayedScaleDown) {
+        checkArgument(
+                currentParallelism != newParallelism,
+                "The newParallelism is equal to currentParallelism, no scaling 
is needed. This is probably a bug.");
+
+        var scaledUp = currentParallelism < newParallelism;
+
+        if (scaledUp) {
+            // Clear delayed scale down request if the new parallelism is 
greater than
+            // currentParallelism.
+            delayedScaleDown.clearVertex(vertex);
+
+            // If we don't have past scaling actions for this vertex, don't 
block scale up.
+            if (history.isEmpty()) {
+                return ParallelismChange.required(newParallelism);
+            }
 
-        if (currentParallelism == lastSummary.getNewParallelism() && 
lastSummary.isScaledUp()) {
-            if (scaledUp) {
-                return detectIneffectiveScaleUp(
-                        context, vertex, conf, evaluatedMetrics, lastSummary);
-            } else {
-                return detectImmediateScaleDownAfterScaleUp(vertex, conf, 
lastScalingTs);
+            var lastSummary = history.get(history.lastKey());
+            if (currentParallelism == lastSummary.getNewParallelism()
+                    && lastSummary.isScaledUp()
+                    && detectIneffectiveScaleUp(
+                            context, vertex, conf, evaluatedMetrics, 
lastSummary)) {
+                // Block scale up when last rescale is ineffective.
+                return ParallelismChange.noChange();
             }
+
+            return ParallelismChange.required(newParallelism);
+        } else {
+            return applyScaleDownInterval(delayedScaleDown, vertex, conf, 
newParallelism);
         }
-        return false;
     }
 
-    private boolean detectImmediateScaleDownAfterScaleUp(
-            JobVertexID vertex, Configuration conf, Instant lastScalingTs) {
+    private ParallelismChange applyScaleDownInterval(
+            DelayedScaleDown delayedScaleDown,
+            JobVertexID vertex,
+            Configuration conf,
+            int newParallelism) {
+        var scaleDownInterval = conf.get(SCALE_DOWN_INTERVAL);
+        if (scaleDownInterval.toMillis() <= 0) {
+            // The scale down interval is disable, so don't block scaling.
+            return ParallelismChange.required(newParallelism);
+        }
 
-        var gracePeriod = conf.get(SCALE_UP_GRACE_PERIOD);
-        if (lastScalingTs.plus(gracePeriod).isAfter(clock.instant())) {
-            LOG.info(
-                    "Skipping immediate scale down after scale up within grace 
period for {}",
-                    vertex);
-            return true;
+        var firstTriggerTime = 
delayedScaleDown.getFirstTriggerTimeForVertex(vertex);
+        if (firstTriggerTime.isEmpty()) {
+            LOG.info("The scale down of {} is delayed by {}.", vertex, 
scaleDownInterval);
+            delayedScaleDown.updateTriggerTime(vertex, clock.instant());
+            return ParallelismChange.optional(newParallelism);
+        }
+
+        if 
(clock.instant().isBefore(firstTriggerTime.get().plus(scaleDownInterval))) {
+            LOG.debug("Try to skip immediate scale down within scale-down 
interval for {}", vertex);
+            return ParallelismChange.optional(newParallelism);
         } else {
-            return false;
+            return ParallelismChange.required(newParallelism);
         }
     }
 
@@ -258,7 +354,7 @@ public class JobVertexScaler<KEY, Context extends 
JobAutoScalerContext<KEY>> {
             double scaleFactor,
             int parallelismLowerLimit,
             int parallelismUpperLimit) {
-        Preconditions.checkArgument(
+        checkArgument(
                 parallelismLowerLimit <= parallelismUpperLimit,
                 "The parallelism lower limitation must not be greater than the 
parallelism upper limitation.");
         if (parallelismLowerLimit > maxParallelism) {
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
index af325371..02e5ad4f 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
@@ -45,9 +45,13 @@ import java.time.Duration;
 import java.time.Instant;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.SortedMap;
 
+import static 
org.apache.flink.autoscaler.JobVertexScaler.ParallelismChangeType.NO_CHANGE;
+import static 
org.apache.flink.autoscaler.JobVertexScaler.ParallelismChangeType.REQUIRED_CHANGE;
 import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.EXCLUDED_PERIODS;
 import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_ENABLED;
 import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_EVENT_INTERVAL;
@@ -100,25 +104,26 @@ public class ScalingExecutor<KEY, Context extends 
JobAutoScalerContext<KEY>> {
             Map<JobVertexID, SortedMap<Instant, ScalingSummary>> 
scalingHistory,
             ScalingTracking scalingTracking,
             Instant now,
-            JobTopology jobTopology)
+            JobTopology jobTopology,
+            DelayedScaleDown delayedScaleDown)
             throws Exception {
         var conf = context.getConfiguration();
         var restartTime = scalingTracking.getMaxRestartTimeOrDefault(conf);
 
         var scalingSummaries =
                 computeScalingSummary(
-                        context, evaluatedMetrics, scalingHistory, 
restartTime, jobTopology);
+                        context,
+                        evaluatedMetrics,
+                        scalingHistory,
+                        restartTime,
+                        jobTopology,
+                        delayedScaleDown);
 
         if (scalingSummaries.isEmpty()) {
             LOG.info("All job vertices are currently running at their target 
parallelism.");
             return false;
         }
 
-        if (allVerticesWithinUtilizationTarget(
-                evaluatedMetrics.getVertexMetrics(), scalingSummaries)) {
-            return false;
-        }
-
         updateRecommendedParallelism(evaluatedMetrics.getVertexMetrics(), 
scalingSummaries);
 
         if (checkIfBlockedAndTriggerScalingEvent(context, scalingSummaries, 
conf, now)) {
@@ -156,6 +161,9 @@ public class ScalingExecutor<KEY, Context extends 
JobAutoScalerContext<KEY>> {
 
         autoScalerStateStore.storeConfigChanges(context, configOverrides);
 
+        // Try to clear all delayed scale down requests after scaling.
+        delayedScaleDown.clearAll();
+
         return true;
     }
 
@@ -172,12 +180,16 @@ public class ScalingExecutor<KEY, Context extends 
JobAutoScalerContext<KEY>> {
                                                 
scalingSummary.getNewParallelism())));
     }
 
-    protected static boolean allVerticesWithinUtilizationTarget(
+    @VisibleForTesting
+    static boolean allRequiredVerticesWithinUtilizationTarget(
             Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> 
evaluatedMetrics,
-            Map<JobVertexID, ScalingSummary> scalingSummaries) {
+            Set<JobVertexID> requiredVertices) {
+        // All vertices' ParallelismChange is optional, rescaling will be 
ignored.
+        if (requiredVertices.isEmpty()) {
+            return true;
+        }
 
-        for (Map.Entry<JobVertexID, ScalingSummary> entry : 
scalingSummaries.entrySet()) {
-            var vertex = entry.getKey();
+        for (JobVertexID vertex : requiredVertices) {
             var metrics = evaluatedMetrics.get(vertex);
 
             double trueProcessingRate = 
metrics.get(TRUE_PROCESSING_RATE).getAverage();
@@ -212,7 +224,8 @@ public class ScalingExecutor<KEY, Context extends 
JobAutoScalerContext<KEY>> {
             EvaluatedMetrics evaluatedMetrics,
             Map<JobVertexID, SortedMap<Instant, ScalingSummary>> 
scalingHistory,
             Duration restartTime,
-            JobTopology jobTopology) {
+            JobTopology jobTopology,
+            DelayedScaleDown delayedScaleDown) {
         LOG.debug("Restart time used in scaling summary computation: {}", 
restartTime);
 
         if (isJobUnderMemoryPressure(context, 
evaluatedMetrics.getGlobalMetrics())) {
@@ -221,6 +234,8 @@ public class ScalingExecutor<KEY, Context extends 
JobAutoScalerContext<KEY>> {
         }
 
         var out = new HashMap<JobVertexID, ScalingSummary>();
+        var requiredVertices = new HashSet<JobVertexID>();
+
         var excludeVertexIdList =
                 
context.getConfiguration().get(AutoScalerOptions.VERTEX_EXCLUDE_IDS);
         evaluatedMetrics
@@ -235,7 +250,7 @@ public class ScalingExecutor<KEY, Context extends 
JobAutoScalerContext<KEY>> {
                                 var currentParallelism =
                                         (int) 
metrics.get(ScalingMetric.PARALLELISM).getCurrent();
 
-                                var newParallelism =
+                                var parallelismChange =
                                         
jobVertexScaler.computeScaleTargetParallelism(
                                                 context,
                                                 v,
@@ -243,15 +258,29 @@ public class ScalingExecutor<KEY, Context extends 
JobAutoScalerContext<KEY>> {
                                                 metrics,
                                                 scalingHistory.getOrDefault(
                                                         v, 
Collections.emptySortedMap()),
-                                                restartTime);
-                                if (currentParallelism != newParallelism) {
-                                    out.put(
-                                            v,
-                                            new ScalingSummary(
-                                                    currentParallelism, 
newParallelism, metrics));
+                                                restartTime,
+                                                delayedScaleDown);
+                                if (NO_CHANGE == 
parallelismChange.getChangeType()) {
+                                    return;
+                                } else if (REQUIRED_CHANGE == 
parallelismChange.getChangeType()) {
+                                    requiredVertices.add(v);
                                 }
+                                out.put(
+                                        v,
+                                        new ScalingSummary(
+                                                currentParallelism,
+                                                
parallelismChange.getNewParallelism(),
+                                                metrics));
                             }
                         });
+
+        // If the Utilization of all required tasks is within range, we can 
skip scaling.
+        // It means that if only optional tasks are out of scope, we still 
need to ignore scale.
+        if (allRequiredVerticesWithinUtilizationTarget(
+                evaluatedMetrics.getVertexMetrics(), requiredVertices)) {
+            return Map.of();
+        }
+
         return out;
     }
 
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
index 6922448b..e1ea6a86 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
@@ -102,13 +102,20 @@ public class AutoScalerOptions {
                     
.withFallbackKeys(oldOperatorConfigKey("target.utilization.boundary"))
                     .withDescription(
                             "Target vertex utilization boundary. Scaling won't 
be performed if the processing capacity is within [target_rate / 
(target_utilization - boundary), (target_rate / (target_utilization + 
boundary)]");
-    public static final ConfigOption<Duration> SCALE_UP_GRACE_PERIOD =
-            autoScalerConfig("scale-up.grace-period")
+
+    public static final ConfigOption<Duration> SCALE_DOWN_INTERVAL =
+            autoScalerConfig("scale-down.interval")
                     .durationType()
                     .defaultValue(Duration.ofHours(1))
-                    
.withFallbackKeys(oldOperatorConfigKey("scale-up.grace-period"))
+                    
.withDeprecatedKeys(autoScalerConfigKey("scale-up.grace-period"))
+                    .withFallbackKeys(
+                            oldOperatorConfigKey("scale-up.grace-period"),
+                            oldOperatorConfigKey("scale-down.interval"))
                     .withDescription(
-                            "Duration in which no scale down of a vertex is 
allowed after it has been scaled up.");
+                            "The delay time for scale down to be executed. If 
it is greater than 0, the scale down will be delayed. "
+                                    + "Delayed rescale can merge multiple 
scale downs within `scale-down.interval` into a scale down, thereby reducing 
the number of rescales. "
+                                    + "Reducing the frequency of job restarts 
can improve job availability. "
+                                    + "Scale down can be executed directly if 
it's less than or equal 0.");
 
     public static final ConfigOption<Integer> VERTEX_MIN_PARALLELISM =
             autoScalerConfig("vertex.min-parallelism")
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java
index 7fb369cf..15581b75 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java
@@ -18,6 +18,7 @@
 package org.apache.flink.autoscaler.state;
 
 import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.DelayedScaleDown;
 import org.apache.flink.autoscaler.JobAutoScalerContext;
 import org.apache.flink.autoscaler.ScalingSummary;
 import org.apache.flink.autoscaler.ScalingTracking;
@@ -77,6 +78,12 @@ public interface AutoScalerStateStore<KEY, Context extends 
JobAutoScalerContext<
 
     void removeConfigChanges(Context jobContext) throws Exception;
 
+    void storeDelayedScaleDown(Context jobContext, DelayedScaleDown 
delayedScaleDown)
+            throws Exception;
+
+    @Nonnull
+    DelayedScaleDown getDelayedScaleDown(Context jobContext) throws Exception;
+
     /** Removes all data from this context. Flush stil needs to be called. */
     void clearAll(Context jobContext) throws Exception;
 
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java
index 35084db3..cf6aa74f 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.autoscaler.state;
 
+import org.apache.flink.autoscaler.DelayedScaleDown;
 import org.apache.flink.autoscaler.JobAutoScalerContext;
 import org.apache.flink.autoscaler.ScalingSummary;
 import org.apache.flink.autoscaler.ScalingTracking;
@@ -54,12 +55,15 @@ public class InMemoryAutoScalerStateStore<KEY, Context 
extends JobAutoScalerCont
 
     private final Map<KEY, ScalingTracking> scalingTrackingStore;
 
+    private final Map<KEY, DelayedScaleDown> delayedScaleDownStore;
+
     public InMemoryAutoScalerStateStore() {
         scalingHistoryStore = new ConcurrentHashMap<>();
         collectedMetricsStore = new ConcurrentHashMap<>();
         parallelismOverridesStore = new ConcurrentHashMap<>();
         scalingTrackingStore = new ConcurrentHashMap<>();
-        tmConfigOverrides = new ConcurrentHashMap<KEY, ConfigChanges>();
+        tmConfigOverrides = new ConcurrentHashMap<>();
+        delayedScaleDownStore = new ConcurrentHashMap<>();
     }
 
     @Override
@@ -69,6 +73,7 @@ public class InMemoryAutoScalerStateStore<KEY, Context 
extends JobAutoScalerCont
         scalingHistoryStore.put(jobContext.getJobKey(), scalingHistory);
     }
 
+    @Nonnull
     @Override
     public Map<JobVertexID, SortedMap<Instant, ScalingSummary>> 
getScalingHistory(
             Context jobContext) {
@@ -98,6 +103,7 @@ public class InMemoryAutoScalerStateStore<KEY, Context 
extends JobAutoScalerCont
         collectedMetricsStore.put(jobContext.getJobKey(), metrics);
     }
 
+    @Nonnull
     @Override
     public SortedMap<Instant, CollectedMetrics> getCollectedMetrics(Context 
jobContext) {
         return 
Optional.ofNullable(collectedMetricsStore.get(jobContext.getJobKey()))
@@ -115,6 +121,7 @@ public class InMemoryAutoScalerStateStore<KEY, Context 
extends JobAutoScalerCont
         parallelismOverridesStore.put(jobContext.getJobKey(), 
parallelismOverrides);
     }
 
+    @Nonnull
     @Override
     public Map<String, String> getParallelismOverrides(Context jobContext) {
         return 
Optional.ofNullable(parallelismOverridesStore.get(jobContext.getJobKey()))
@@ -143,12 +150,26 @@ public class InMemoryAutoScalerStateStore<KEY, Context 
extends JobAutoScalerCont
         parallelismOverridesStore.remove(jobContext.getJobKey());
     }
 
+    @Override
+    public void storeDelayedScaleDown(Context jobContext, DelayedScaleDown 
delayedScaleDown) {
+        delayedScaleDownStore.put(jobContext.getJobKey(), delayedScaleDown);
+    }
+
+    @Nonnull
+    @Override
+    public DelayedScaleDown getDelayedScaleDown(Context jobContext) {
+        return 
Optional.ofNullable(delayedScaleDownStore.get(jobContext.getJobKey()))
+                .orElse(new DelayedScaleDown());
+    }
+
     @Override
     public void clearAll(Context jobContext) {
         scalingHistoryStore.remove(jobContext.getJobKey());
         parallelismOverridesStore.remove(jobContext.getJobKey());
         collectedMetricsStore.remove(jobContext.getJobKey());
         tmConfigOverrides.remove(jobContext.getJobKey());
+        scalingTrackingStore.remove(jobContext.getJobKey());
+        delayedScaleDownStore.remove(jobContext.getJobKey());
     }
 
     @Override
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java
index 3fde1640..0aae09c9 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java
@@ -97,7 +97,7 @@ public class BacklogBasedScalingTest {
         defaultConf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, (double) 
Integer.MAX_VALUE);
         defaultConf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.8);
         defaultConf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.1);
-        defaultConf.set(AutoScalerOptions.SCALE_UP_GRACE_PERIOD, 
Duration.ZERO);
+        defaultConf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO);
         defaultConf.set(AutoScalerOptions.BACKLOG_PROCESSING_LAG_THRESHOLD, 
Duration.ofSeconds(1));
 
         autoscaler =
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
index 7856a3e7..17ade6c8 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.autoscaler;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.autoscaler.JobVertexScaler.ParallelismChange;
 import org.apache.flink.autoscaler.config.AutoScalerOptions;
 import org.apache.flink.autoscaler.event.TestingEventCollector;
 import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
@@ -36,6 +37,7 @@ import org.junit.jupiter.params.provider.MethodSource;
 import java.time.Clock;
 import java.time.Duration;
 import java.time.Instant;
+import java.time.ZoneId;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -48,7 +50,6 @@ import static 
org.apache.flink.autoscaler.JobVertexScaler.INEFFECTIVE_SCALING;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** Test for vertex parallelism scaler logic. */
@@ -57,6 +58,8 @@ public class JobVertexScalerTest {
     private static final Collection<ShipStrategy> NOT_ADJUST_INPUTS =
             List.of(ShipStrategy.REBALANCE, ShipStrategy.RESCALE);
 
+    private final JobVertexID vertex = new JobVertexID();
+
     private TestingEventCollector<JobID, JobAutoScalerContext<JobID>> 
eventCollector;
     private JobVertexScaler<JobID, JobAutoScalerContext<JobID>> vertexScaler;
     private JobAutoScalerContext<JobID> context;
@@ -95,127 +98,139 @@ public class JobVertexScalerTest {
     public void testParallelismScaling(Collection<ShipStrategy> 
inputShipStrategies) {
         var op = new JobVertexID();
         conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
+        var delayedScaleDown = new DelayedScaleDown();
 
         assertEquals(
-                5,
+                ParallelismChange.optional(5),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         op,
                         inputShipStrategies,
                         evaluated(10, 50, 100),
                         Collections.emptySortedMap(),
-                        restartTime));
+                        restartTime,
+                        delayedScaleDown));
 
         conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8);
         assertEquals(
-                8,
+                ParallelismChange.optional(8),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         op,
                         inputShipStrategies,
                         evaluated(10, 50, 100),
                         Collections.emptySortedMap(),
-                        restartTime));
+                        restartTime,
+                        delayedScaleDown));
 
         conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8);
         assertEquals(
-                10,
+                ParallelismChange.noChange(),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         op,
                         inputShipStrategies,
                         evaluated(10, 80, 100),
                         Collections.emptySortedMap(),
-                        restartTime));
+                        restartTime,
+                        delayedScaleDown));
 
         conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8);
         assertEquals(
-                8,
+                ParallelismChange.optional(8),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         op,
                         inputShipStrategies,
                         evaluated(10, 60, 100),
                         Collections.emptySortedMap(),
-                        restartTime));
+                        restartTime,
+                        delayedScaleDown));
 
         assertEquals(
-                8,
+                ParallelismChange.optional(8),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         op,
                         inputShipStrategies,
                         evaluated(10, 59, 100),
                         Collections.emptySortedMap(),
-                        restartTime));
+                        restartTime,
+                        delayedScaleDown));
 
         conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.5);
         assertEquals(
-                10,
+                ParallelismChange.required(10),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         op,
                         inputShipStrategies,
                         evaluated(2, 100, 40),
                         Collections.emptySortedMap(),
-                        restartTime));
+                        restartTime,
+                        delayedScaleDown));
 
         conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.6);
         assertEquals(
-                4,
+                ParallelismChange.required(4),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         op,
                         inputShipStrategies,
                         evaluated(2, 100, 100),
                         Collections.emptySortedMap(),
-                        restartTime));
+                        restartTime,
+                        delayedScaleDown));
 
         conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
         conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.5);
         assertEquals(
-                5,
+                ParallelismChange.optional(5),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         op,
                         inputShipStrategies,
                         evaluated(10, 10, 100),
                         Collections.emptySortedMap(),
-                        restartTime));
+                        restartTime,
+                        delayedScaleDown));
 
         conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.6);
         assertEquals(
-                4,
+                ParallelismChange.optional(4),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         op,
                         inputShipStrategies,
                         evaluated(10, 10, 100),
                         Collections.emptySortedMap(),
-                        restartTime));
+                        restartTime,
+                        delayedScaleDown));
 
         conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
         conf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, 0.5);
         assertEquals(
-                15,
+                ParallelismChange.required(15),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         op,
                         inputShipStrategies,
                         evaluated(10, 200, 10),
                         Collections.emptySortedMap(),
-                        restartTime));
+                        restartTime,
+                        delayedScaleDown));
 
         conf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, 0.6);
         assertEquals(
-                16,
+                ParallelismChange.required(16),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         op,
                         inputShipStrategies,
                         evaluated(10, 200, 10),
                         Collections.emptySortedMap(),
-                        restartTime));
+                        restartTime,
+                        delayedScaleDown));
     }
 
     @Test
@@ -319,95 +334,179 @@ public class JobVertexScalerTest {
     @Test
     public void testMinParallelismLimitIsUsed() {
         conf.setInteger(AutoScalerOptions.VERTEX_MIN_PARALLELISM, 5);
+        var delayedScaleDown = new DelayedScaleDown();
+
         assertEquals(
-                5,
+                ParallelismChange.optional(5),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         new JobVertexID(),
                         NOT_ADJUST_INPUTS,
                         evaluated(10, 100, 500),
                         Collections.emptySortedMap(),
-                        restartTime));
+                        restartTime,
+                        delayedScaleDown));
 
         // Make sure we respect current parallelism in case it's lower
         assertEquals(
-                4,
+                ParallelismChange.noChange(),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         new JobVertexID(),
                         NOT_ADJUST_INPUTS,
                         evaluated(4, 100, 500),
                         Collections.emptySortedMap(),
-                        restartTime));
+                        restartTime,
+                        delayedScaleDown));
     }
 
     @Test
     public void testMaxParallelismLimitIsUsed() {
         conf.setInteger(AutoScalerOptions.VERTEX_MAX_PARALLELISM, 10);
         conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
+        var delayedScaleDown = new DelayedScaleDown();
+
         assertEquals(
-                10,
+                ParallelismChange.noChange(),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         new JobVertexID(),
                         NOT_ADJUST_INPUTS,
                         evaluated(10, 500, 100),
                         Collections.emptySortedMap(),
-                        restartTime));
+                        restartTime,
+                        delayedScaleDown));
 
         // Make sure we respect current parallelism in case it's higher
         assertEquals(
-                12,
+                ParallelismChange.noChange(),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         new JobVertexID(),
                         NOT_ADJUST_INPUTS,
                         evaluated(12, 500, 100),
                         Collections.emptySortedMap(),
-                        restartTime));
+                        restartTime,
+                        delayedScaleDown));
     }
 
     @Test
-    public void testScaleDownAfterScaleUpDetection() {
-        var op = new JobVertexID();
+    public void testDisableScaleDownInterval() {
         conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
-        conf.set(AutoScalerOptions.SCALE_UP_GRACE_PERIOD, 
Duration.ofMinutes(1));
-        var clock = Clock.systemDefaultZone();
-        vertexScaler.setClock(clock);
+        conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ofMinutes(0));
 
-        var evaluated = evaluated(5, 100, 50);
-        var history = new TreeMap<Instant, ScalingSummary>();
-        assertEquals(
-                10,
-                vertexScaler.computeScaleTargetParallelism(
-                        context, op, NOT_ADJUST_INPUTS, evaluated, history, 
restartTime));
+        var delayedScaleDown = new DelayedScaleDown();
 
-        history.put(clock.instant(), new ScalingSummary(5, 10, evaluated));
+        assertParallelismChange(10, 50, 100, ParallelismChange.required(5), 
delayedScaleDown);
+    }
 
-        // Should not allow scale back down immediately
-        evaluated = evaluated(10, 50, 100);
-        assertEquals(
-                10,
-                vertexScaler.computeScaleTargetParallelism(
-                        context, op, NOT_ADJUST_INPUTS, evaluated, history, 
restartTime));
+    @Test
+    public void testRequiredScaleDownAfterInterval() {
+        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
+        conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ofMinutes(1));
+        var instant = Instant.now();
 
-        // Pass some time...
-        clock = Clock.offset(Clock.systemDefaultZone(), 
Duration.ofSeconds(61));
-        vertexScaler.setClock(clock);
+        var delayedScaleDown = new DelayedScaleDown();
 
-        assertEquals(
-                5,
-                vertexScaler.computeScaleTargetParallelism(
-                        context, op, NOT_ADJUST_INPUTS, evaluated, history, 
restartTime));
-        history.put(clock.instant(), new ScalingSummary(10, 5, evaluated));
+        // The scale down shouldn't be required.
+        vertexScaler.setClock(Clock.fixed(instant, ZoneId.systemDefault()));
+        assertParallelismChange(100, 800, 1000, 
ParallelismChange.optional(80), delayedScaleDown);
+
+        // Within scale down interval.
+        vertexScaler.setClock(
+                Clock.fixed(instant.plus(Duration.ofSeconds(10)), 
ZoneId.systemDefault()));
+        assertParallelismChange(100, 900, 1000, 
ParallelismChange.optional(90), delayedScaleDown);
+
+        vertexScaler.setClock(
+                Clock.fixed(instant.plus(Duration.ofSeconds(20)), 
ZoneId.systemDefault()));
+        assertParallelismChange(100, 820, 1000, 
ParallelismChange.optional(82), delayedScaleDown);
+
+        vertexScaler.setClock(
+                Clock.fixed(instant.plus(Duration.ofSeconds(40)), 
ZoneId.systemDefault()));
+        assertParallelismChange(100, 720, 1000, 
ParallelismChange.optional(72), delayedScaleDown);
+
+        vertexScaler.setClock(
+                Clock.fixed(instant.plus(Duration.ofSeconds(50)), 
ZoneId.systemDefault()));
+        assertParallelismChange(100, 600, 1000, 
ParallelismChange.optional(60), delayedScaleDown);
+
+        vertexScaler.setClock(
+                Clock.fixed(instant.plus(Duration.ofSeconds(59)), 
ZoneId.systemDefault()));
+        assertParallelismChange(100, 640, 1000, 
ParallelismChange.optional(64), delayedScaleDown);
+
+        // The scale down is required after the scale down interval ends.
+        vertexScaler.setClock(
+                Clock.fixed(instant.plus(Duration.ofSeconds(60)), 
ZoneId.systemDefault()));
+        assertParallelismChange(100, 700, 1000, 
ParallelismChange.required(70), delayedScaleDown);
+    }
+
+    @Test
+    public void testImmediateScaleUpWithinScaleDownInterval() {
+        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
+        conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ofMinutes(1));
+        var instant = Instant.now();
+
+        var delayedScaleDown = new DelayedScaleDown();
+
+        // The scale down shouldn't be required.
+        vertexScaler.setClock(Clock.fixed(instant, ZoneId.systemDefault()));
+        assertParallelismChange(100, 800, 1000, 
ParallelismChange.optional(80), delayedScaleDown);
+        assertThat(delayedScaleDown.getFirstTriggerTime()).isNotEmpty();
+
+        // Within scale down interval.
+        vertexScaler.setClock(
+                Clock.fixed(instant.plus(Duration.ofSeconds(10)), 
ZoneId.systemDefault()));
+        assertParallelismChange(100, 900, 1000, 
ParallelismChange.optional(90), delayedScaleDown);
+        assertThat(delayedScaleDown.getFirstTriggerTime()).isNotEmpty();
 
         // Allow immediate scale up
-        evaluated = evaluated(5, 100, 50);
+        vertexScaler.setClock(
+                Clock.fixed(instant.plus(Duration.ofSeconds(12)), 
ZoneId.systemDefault()));
+        assertParallelismChange(100, 1700, 1000, 
ParallelismChange.required(170), delayedScaleDown);
+        assertThat(delayedScaleDown.getFirstTriggerTime()).isEmpty();
+    }
+
+    @Test
+    public void testCancelDelayedScaleDownAfterNewParallelismIsSame() {
+        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
+        conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ofMinutes(1));
+        var instant = Instant.now();
+
+        var delayedScaleDown = new DelayedScaleDown();
+
+        // The scale down shouldn't be required.
+        vertexScaler.setClock(Clock.fixed(instant, ZoneId.systemDefault()));
+        assertParallelismChange(100, 800, 1000, 
ParallelismChange.optional(80), delayedScaleDown);
+        assertThat(delayedScaleDown.getFirstTriggerTime()).isNotEmpty();
+
+        // Within scale down interval.
+        vertexScaler.setClock(
+                Clock.fixed(instant.plus(Duration.ofSeconds(10)), 
ZoneId.systemDefault()));
+        assertParallelismChange(100, 900, 1000, 
ParallelismChange.optional(90), delayedScaleDown);
+        assertThat(delayedScaleDown.getFirstTriggerTime()).isNotEmpty();
+
+        // The delayed scale down is canceled when new parallelism is same 
with current parallelism.
+        vertexScaler.setClock(
+                Clock.fixed(instant.plus(Duration.ofSeconds(12)), 
ZoneId.systemDefault()));
+        assertParallelismChange(100, 1000, 1000, ParallelismChange.noChange(), 
delayedScaleDown);
+        assertThat(delayedScaleDown.getFirstTriggerTime()).isEmpty();
+    }
+
+    private void assertParallelismChange(
+            int parallelism,
+            int targetDataRate,
+            int trueProcessingRate,
+            ParallelismChange expectedParallelismChange,
+            DelayedScaleDown delayedScaleDown) {
         assertEquals(
-                10,
+                expectedParallelismChange,
                 vertexScaler.computeScaleTargetParallelism(
-                        context, op, NOT_ADJUST_INPUTS, evaluated, history, 
restartTime));
-        history.put(clock.instant(), new ScalingSummary(5, 10, evaluated));
+                        context,
+                        vertex,
+                        NOT_ADJUST_INPUTS,
+                        evaluated(parallelism, targetDataRate, 
trueProcessingRate),
+                        new TreeMap<>(),
+                        restartTime,
+                        delayedScaleDown));
     }
 
     @Test
@@ -415,23 +514,37 @@ public class JobVertexScalerTest {
         var op = new JobVertexID();
         conf.set(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED, 
true);
         conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
-        conf.set(AutoScalerOptions.SCALE_UP_GRACE_PERIOD, Duration.ZERO);
+        conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO);
 
         var evaluated = evaluated(5, 100, 50);
         var history = new TreeMap<Instant, ScalingSummary>();
+        var delayedScaleDown = new DelayedScaleDown();
+
         assertEquals(
-                10,
+                ParallelismChange.required(10),
                 vertexScaler.computeScaleTargetParallelism(
-                        context, op, NOT_ADJUST_INPUTS, evaluated, history, 
restartTime));
+                        context,
+                        op,
+                        NOT_ADJUST_INPUTS,
+                        evaluated,
+                        history,
+                        restartTime,
+                        delayedScaleDown));
         assertEquals(100, 
evaluated.get(ScalingMetric.EXPECTED_PROCESSING_RATE).getCurrent());
         history.put(Instant.now(), new ScalingSummary(5, 10, evaluated));
 
         // Allow to scale higher if scaling was effective (80%)
         evaluated = evaluated(10, 180, 90);
         assertEquals(
-                20,
+                ParallelismChange.required(20),
                 vertexScaler.computeScaleTargetParallelism(
-                        context, op, NOT_ADJUST_INPUTS, evaluated, history, 
restartTime));
+                        context,
+                        op,
+                        NOT_ADJUST_INPUTS,
+                        evaluated,
+                        history,
+                        restartTime,
+                        delayedScaleDown));
         assertEquals(180, 
evaluated.get(ScalingMetric.EXPECTED_PROCESSING_RATE).getCurrent());
         history.put(Instant.now(), new ScalingSummary(10, 20, evaluated));
 
@@ -439,50 +552,84 @@ public class JobVertexScalerTest {
         // 90 -> 94. Do not try to scale above 20
         evaluated = evaluated(20, 180, 94);
         assertEquals(
-                20,
+                ParallelismChange.noChange(),
                 vertexScaler.computeScaleTargetParallelism(
-                        context, op, NOT_ADJUST_INPUTS, evaluated, history, 
restartTime));
-        
assertFalse(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
+                        context,
+                        op,
+                        NOT_ADJUST_INPUTS,
+                        evaluated,
+                        history,
+                        restartTime,
+                        delayedScaleDown));
 
         // Still considered ineffective (less than <10%)
         evaluated = evaluated(20, 180, 98);
         assertEquals(
-                20,
+                ParallelismChange.noChange(),
                 vertexScaler.computeScaleTargetParallelism(
-                        context, op, NOT_ADJUST_INPUTS, evaluated, history, 
restartTime));
-        
assertFalse(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
+                        context,
+                        op,
+                        NOT_ADJUST_INPUTS,
+                        evaluated,
+                        history,
+                        restartTime,
+                        delayedScaleDown));
 
         // Allow scale up if current parallelism doesnt match last (user 
rescaled manually)
         evaluated = evaluated(10, 180, 90);
         assertEquals(
-                20,
+                ParallelismChange.required(20),
                 vertexScaler.computeScaleTargetParallelism(
-                        context, op, NOT_ADJUST_INPUTS, evaluated, history, 
restartTime));
+                        context,
+                        op,
+                        NOT_ADJUST_INPUTS,
+                        evaluated,
+                        history,
+                        restartTime,
+                        delayedScaleDown));
 
         // Over 10%, effective
         evaluated = evaluated(20, 180, 100);
         assertEquals(
-                36,
+                ParallelismChange.required(36),
                 vertexScaler.computeScaleTargetParallelism(
-                        context, op, NOT_ADJUST_INPUTS, evaluated, history, 
restartTime));
+                        context,
+                        op,
+                        NOT_ADJUST_INPUTS,
+                        evaluated,
+                        history,
+                        restartTime,
+                        delayedScaleDown));
         
assertTrue(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
 
         // Ineffective but detection is turned off
         conf.set(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED, 
false);
         evaluated = evaluated(20, 180, 90);
         assertEquals(
-                40,
+                ParallelismChange.required(40),
                 vertexScaler.computeScaleTargetParallelism(
-                        context, op, NOT_ADJUST_INPUTS, evaluated, history, 
restartTime));
+                        context,
+                        op,
+                        NOT_ADJUST_INPUTS,
+                        evaluated,
+                        history,
+                        restartTime,
+                        delayedScaleDown));
         
assertTrue(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
         conf.set(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED, 
true);
 
         // Allow scale down even if ineffective
         evaluated = evaluated(20, 45, 90);
         assertEquals(
-                10,
+                ParallelismChange.required(10),
                 vertexScaler.computeScaleTargetParallelism(
-                        context, op, NOT_ADJUST_INPUTS, evaluated, history, 
restartTime));
+                        context,
+                        op,
+                        NOT_ADJUST_INPUTS,
+                        evaluated,
+                        history,
+                        restartTime,
+                        delayedScaleDown));
         
assertTrue(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
     }
 
@@ -492,33 +639,37 @@ public class JobVertexScalerTest {
         var jobVertexID = new JobVertexID();
         conf.set(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED, 
true);
         conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.0);
-        conf.set(AutoScalerOptions.SCALE_UP_GRACE_PERIOD, Duration.ZERO);
+        conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO);
 
         var evaluated = evaluated(5, 100, 50);
         var history = new TreeMap<Instant, ScalingSummary>();
+        var delayedScaleDown = new DelayedScaleDown();
+
         assertEquals(
-                10,
+                ParallelismChange.required(10),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         jobVertexID,
                         inputShipStrategies,
                         evaluated,
                         history,
-                        restartTime));
+                        restartTime,
+                        delayedScaleDown));
         assertEquals(100, 
evaluated.get(ScalingMetric.EXPECTED_PROCESSING_RATE).getCurrent());
         history.put(Instant.now(), new ScalingSummary(5, 10, evaluated));
 
         // Effective scale, no events triggered
         evaluated = evaluated(10, 180, 90);
         assertEquals(
-                20,
+                ParallelismChange.required(20),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         jobVertexID,
                         inputShipStrategies,
                         evaluated,
                         history,
-                        restartTime));
+                        restartTime,
+                        delayedScaleDown));
         assertEquals(180, 
evaluated.get(ScalingMetric.EXPECTED_PROCESSING_RATE).getCurrent());
         history.put(Instant.now(), new ScalingSummary(10, 20, evaluated));
         assertEquals(0, eventCollector.events.size());
@@ -526,15 +677,15 @@ public class JobVertexScalerTest {
         // Ineffective scale, an event is triggered
         evaluated = evaluated(20, 180, 95);
         assertEquals(
-                20,
+                ParallelismChange.noChange(),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         jobVertexID,
                         inputShipStrategies,
                         evaluated,
                         history,
-                        restartTime));
-        
assertFalse(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
+                        restartTime,
+                        delayedScaleDown));
         assertEquals(1, eventCollector.events.size());
         var event = eventCollector.events.poll();
         assertThat(event).isNotNull();
@@ -552,15 +703,15 @@ public class JobVertexScalerTest {
                 ScalingMetric.TRUE_PROCESSING_RATE,
                 EvaluatedScalingMetric.avg(tpr.getAverage() + 0.01));
         assertEquals(
-                20,
+                ParallelismChange.noChange(),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         jobVertexID,
                         inputShipStrategies,
                         evaluated,
                         history,
-                        restartTime));
-        
assertFalse(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
+                        restartTime,
+                        delayedScaleDown));
         assertEquals(0, eventCollector.events.size());
 
         // reset tpr
@@ -569,29 +720,29 @@ public class JobVertexScalerTest {
         // Repeat ineffective scale with postive interval, no event is 
triggered
         conf.set(AutoScalerOptions.SCALING_EVENT_INTERVAL, 
Duration.ofSeconds(1800));
         assertEquals(
-                20,
+                ParallelismChange.noChange(),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         jobVertexID,
                         inputShipStrategies,
                         evaluated,
                         history,
-                        restartTime));
-        
assertFalse(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
+                        restartTime,
+                        delayedScaleDown));
         assertEquals(0, eventCollector.events.size());
 
         // Ineffective scale with interval set to 0, an event is triggered
         conf.set(AutoScalerOptions.SCALING_EVENT_INTERVAL, Duration.ZERO);
         assertEquals(
-                20,
+                ParallelismChange.noChange(),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         jobVertexID,
                         inputShipStrategies,
                         evaluated,
                         history,
-                        restartTime));
-        
assertFalse(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
+                        restartTime,
+                        delayedScaleDown));
         assertEquals(1, eventCollector.events.size());
         event = eventCollector.events.poll();
         assertThat(event).isNotNull();
@@ -605,14 +756,15 @@ public class JobVertexScalerTest {
         // Test ineffective scaling switched off
         conf.set(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED, 
false);
         assertEquals(
-                40,
+                ParallelismChange.required(40),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         jobVertexID,
                         inputShipStrategies,
                         evaluated,
                         history,
-                        restartTime));
+                        restartTime,
+                        delayedScaleDown));
         assertEquals(1, eventCollector.events.size());
         event = eventCollector.events.poll();
         assertThat(event).isNotNull();
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java
index ade85b6e..1d3d9bff 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java
@@ -187,7 +187,8 @@ public class MetricsCollectionAndEvaluationTest {
                 new HashMap<>(),
                 new ScalingTracking(),
                 clock.instant(),
-                topology);
+                topology,
+                new DelayedScaleDown());
 
         var scaledParallelism = 
ScalingExecutorTest.getScaledParallelism(stateStore, context);
         assertEquals(4, scaledParallelism.size());
@@ -364,7 +365,7 @@ public class MetricsCollectionAndEvaluationTest {
     public void testTolerateAbsenceOfPendingRecordsMetric() throws Exception {
         var topology = new JobTopology(new VertexInfo(source1, Map.of(), 5, 
720));
 
-        metricsCollector = new TestingMetricsCollector(topology);
+        metricsCollector = new TestingMetricsCollector<>(topology);
         metricsCollector.setJobUpdateTs(startTime);
 
         metricsCollector.updateMetrics(
@@ -378,6 +379,7 @@ public class MetricsCollectionAndEvaluationTest {
         var conf = context.getConfiguration();
         conf.set(AutoScalerOptions.STABILIZATION_INTERVAL, Duration.ZERO);
         conf.set(AutoScalerOptions.METRICS_WINDOW, Duration.ofSeconds(2));
+        conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ofSeconds(0));
 
         metricsCollector.setClock(clock);
 
@@ -424,7 +426,8 @@ public class MetricsCollectionAndEvaluationTest {
                 new HashMap<>(),
                 new ScalingTracking(),
                 clock.instant(),
-                topology);
+                topology,
+                new DelayedScaleDown());
         var scaledParallelism = 
ScalingExecutorTest.getScaledParallelism(stateStore, context);
         assertEquals(1, scaledParallelism.get(source1));
     }
@@ -634,6 +637,9 @@ public class MetricsCollectionAndEvaluationTest {
         metricsCollector = new TestingMetricsCollector<>(topology);
         metricsCollector.setJobUpdateTs(startTime);
 
+        var conf = context.getConfiguration();
+        conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ofSeconds(0));
+
         metricsCollector.updateMetrics(
                 source1,
                 TestMetrics.builder()
@@ -681,7 +687,8 @@ public class MetricsCollectionAndEvaluationTest {
                 new HashMap<>(),
                 new ScalingTracking(),
                 clock.instant(),
-                topology);
+                topology,
+                new DelayedScaleDown());
         var scaledParallelism = 
ScalingExecutorTest.getScaledParallelism(stateStore, context);
         assertEquals(1, scaledParallelism.get(source1));
 
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java
index 197cb94c..4b35d72a 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java
@@ -88,7 +88,7 @@ public class RecommendedParallelismTest {
         defaultConf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, (double) 
Integer.MAX_VALUE);
         defaultConf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.8);
         defaultConf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.1);
-        defaultConf.set(AutoScalerOptions.SCALE_UP_GRACE_PERIOD, 
Duration.ZERO);
+        defaultConf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO);
 
         autoscaler =
                 new JobAutoScalerImpl<>(
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
index 97972269..50e5870a 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
@@ -50,6 +50,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -82,7 +83,7 @@ public class ScalingExecutorTest {
 
     private Configuration capturedConfForMaxResources;
 
-    private ScalingTracking scalingTracking = new ScalingTracking();
+    private final ScalingTracking scalingTracking = new ScalingTracking();
 
     private static final Map<ScalingMetric, EvaluatedScalingMetric> 
dummyGlobalMetrics =
             Map.of(
@@ -119,7 +120,7 @@ public class ScalingExecutorTest {
     }
 
     @Test
-    public void testUtilizationBoundaries() throws Exception {
+    public void testUtilizationBoundariesForAllRequiredVertices() throws 
Exception {
         // Restart time should not affect utilization boundary
         var conf = context.getConfiguration();
         conf.set(AutoScalerOptions.RESTART_TIME, Duration.ZERO);
@@ -131,13 +132,15 @@ public class ScalingExecutorTest {
         conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.);
 
         var evaluated = Map.of(op1, evaluated(1, 70, 100));
-        var scalingSummary = Map.of(op1, new ScalingSummary(2, 1, 
evaluated.get(op1)));
-        
assertFalse(ScalingExecutor.allVerticesWithinUtilizationTarget(evaluated, 
scalingSummary));
+        assertFalse(
+                ScalingExecutor.allRequiredVerticesWithinUtilizationTarget(
+                        evaluated, evaluated.keySet()));
 
         conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.2);
         evaluated = Map.of(op1, evaluated(1, 70, 100));
-        scalingSummary = Map.of(op1, new ScalingSummary(2, 1, 
evaluated.get(op1)));
-        
assertTrue(ScalingExecutor.allVerticesWithinUtilizationTarget(evaluated, 
scalingSummary));
+        assertTrue(
+                ScalingExecutor.allRequiredVerticesWithinUtilizationTarget(
+                        evaluated, evaluated.keySet()));
         assertTrue(getScaledParallelism(stateStore, context).isEmpty());
 
         var op2 = new JobVertexID();
@@ -145,31 +148,59 @@ public class ScalingExecutorTest {
                 Map.of(
                         op1, evaluated(1, 70, 100),
                         op2, evaluated(1, 85, 100));
-        scalingSummary =
-                Map.of(
-                        op1,
-                        new ScalingSummary(1, 2, evaluated.get(op1)),
-                        op2,
-                        new ScalingSummary(1, 2, evaluated.get(op2)));
 
-        
assertFalse(ScalingExecutor.allVerticesWithinUtilizationTarget(evaluated, 
scalingSummary));
+        assertFalse(
+                ScalingExecutor.allRequiredVerticesWithinUtilizationTarget(
+                        evaluated, evaluated.keySet()));
 
         evaluated =
                 Map.of(
                         op1, evaluated(1, 70, 100),
                         op2, evaluated(1, 70, 100));
-        scalingSummary =
-                Map.of(
-                        op1,
-                        new ScalingSummary(1, 2, evaluated.get(op1)),
-                        op2,
-                        new ScalingSummary(1, 2, evaluated.get(op2)));
-        
assertTrue(ScalingExecutor.allVerticesWithinUtilizationTarget(evaluated, 
scalingSummary));
+        assertTrue(
+                ScalingExecutor.allRequiredVerticesWithinUtilizationTarget(
+                        evaluated, evaluated.keySet()));
 
         // Test with backlog based scaling
         evaluated = Map.of(op1, evaluated(1, 70, 100, 15));
-        scalingSummary = Map.of(op1, new ScalingSummary(1, 2, 
evaluated.get(op1)));
-        
assertFalse(ScalingExecutor.allVerticesWithinUtilizationTarget(evaluated, 
scalingSummary));
+        assertFalse(
+                ScalingExecutor.allRequiredVerticesWithinUtilizationTarget(
+                        evaluated, evaluated.keySet()));
+    }
+
+    @Test
+    public void testUtilizationBoundariesWithOptionalVertex() {
+        // Restart time should not affect utilization boundary
+        var conf = context.getConfiguration();
+        conf.set(AutoScalerOptions.RESTART_TIME, Duration.ZERO);
+        conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ZERO);
+        var op1 = new JobVertexID();
+        var op2 = new JobVertexID();
+
+        // All vertices are optional
+        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.6);
+        conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.);
+
+        var evaluated =
+                Map.of(
+                        op1, evaluated(1, 70, 100),
+                        op2, evaluated(1, 85, 100));
+
+        
assertTrue(ScalingExecutor.allRequiredVerticesWithinUtilizationTarget(evaluated,
 Set.of()));
+
+        // One vertex is required, and it's out of range.
+        assertFalse(
+                
ScalingExecutor.allRequiredVerticesWithinUtilizationTarget(evaluated, 
Set.of(op1)));
+
+        // One vertex is required, and it's within the range.
+        // The op2 is optional, so it shouldn't affect the scaling even if it 
is out of range,
+        conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.1);
+        evaluated =
+                Map.of(
+                        op1, evaluated(1, 65, 100),
+                        op2, evaluated(1, 85, 100));
+        assertTrue(
+                
ScalingExecutor.allRequiredVerticesWithinUtilizationTarget(evaluated, 
Set.of(op1)));
     }
 
     @Test
@@ -193,17 +224,9 @@ public class ScalingExecutorTest {
                         Map.of(vertex, evaluated(parallelism, targetRate, 
trueProcessingRate)),
                         dummyGlobalMetrics);
 
-        // Verify precondition
-        var scalingSummary =
-                Map.of(
-                        vertex,
-                        new ScalingSummary(
-                                parallelism,
-                                expectedParallelism,
-                                evaluated.getVertexMetrics().get(vertex)));
         assertTrue(
-                ScalingExecutor.allVerticesWithinUtilizationTarget(
-                        evaluated.getVertexMetrics(), scalingSummary));
+                ScalingExecutor.allRequiredVerticesWithinUtilizationTarget(
+                        evaluated.getVertexMetrics(), 
evaluated.getVertexMetrics().keySet()));
 
         // Execute the full scaling path
         var now = Instant.now();
@@ -217,7 +240,13 @@ public class ScalingExecutorTest {
                                 new IOMetrics(10000, 10000, 100)));
         assertFalse(
                 scalingExecutor.scaleResource(
-                        context, evaluated, new HashMap<>(), scalingTracking, 
now, jobTopology));
+                        context,
+                        evaluated,
+                        new HashMap<>(),
+                        scalingTracking,
+                        now,
+                        jobTopology,
+                        new DelayedScaleDown()));
     }
 
     @Test
@@ -236,6 +265,7 @@ public class ScalingExecutorTest {
                         new VertexInfo(sink, Map.of(filterOperator, HASH), 10, 
1000, false, null));
 
         var conf = context.getConfiguration();
+        conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ofSeconds(0));
         conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8);
         var metrics =
                 new EvaluatedMetrics(
@@ -250,6 +280,7 @@ public class ScalingExecutorTest {
         // filter operator should not scale
         conf.set(AutoScalerOptions.VERTEX_EXCLUDE_IDS, 
List.of(filterOperatorHexString));
         var now = Instant.now();
+        var delayedScaleDown = new DelayedScaleDown();
         assertFalse(
                 scalingExecutor.scaleResource(
                         context,
@@ -257,7 +288,8 @@ public class ScalingExecutorTest {
                         new HashMap<>(),
                         new ScalingTracking(),
                         now,
-                        jobTopology));
+                        jobTopology,
+                        delayedScaleDown));
         // filter operator should scale
         conf.set(AutoScalerOptions.VERTEX_EXCLUDE_IDS, List.of());
         assertTrue(
@@ -267,7 +299,8 @@ public class ScalingExecutorTest {
                         new HashMap<>(),
                         new ScalingTracking(),
                         now,
-                        jobTopology));
+                        jobTopology,
+                        delayedScaleDown));
     }
 
     @Test
@@ -297,6 +330,7 @@ public class ScalingExecutorTest {
                 new EvaluatedMetrics(
                         Map.of(source, evaluated(10, 110, 100), sink, 
evaluated(10, 110, 100)),
                         dummyGlobalMetrics);
+        var delayedScaleDown = new DelayedScaleDown();
         assertFalse(
                 scalingExecutor.scaleResource(
                         context,
@@ -304,7 +338,8 @@ public class ScalingExecutorTest {
                         new HashMap<>(),
                         new ScalingTracking(),
                         now,
-                        jobTopology));
+                        jobTopology,
+                        delayedScaleDown));
         // scaling execution outside excluded periods
         excludedPeriod =
                 new 
StringBuilder(localTime.plusSeconds(100).toString().split("\\.")[0])
@@ -319,7 +354,8 @@ public class ScalingExecutorTest {
                         new HashMap<>(),
                         new ScalingTracking(),
                         now,
-                        jobTopology));
+                        jobTopology,
+                        delayedScaleDown));
     }
 
     @Test
@@ -347,6 +383,7 @@ public class ScalingExecutorTest {
                                 EvaluatedScalingMetric.of(Double.NaN)));
 
         // Would normally scale without resource usage check
+        var delayedScaleDown = new DelayedScaleDown();
         assertTrue(
                 scalingExecutor.scaleResource(
                         context,
@@ -354,7 +391,8 @@ public class ScalingExecutorTest {
                         new HashMap<>(),
                         new ScalingTracking(),
                         now,
-                        jobTopology));
+                        jobTopology,
+                        delayedScaleDown));
 
         scalingExecutor =
                 new ScalingExecutor<>(
@@ -379,7 +417,8 @@ public class ScalingExecutorTest {
                         new HashMap<>(),
                         new ScalingTracking(),
                         now,
-                        jobTopology));
+                        jobTopology,
+                        delayedScaleDown));
     }
 
     @ParameterizedTest
@@ -426,7 +465,8 @@ public class ScalingExecutorTest {
                         new HashMap<>(),
                         new ScalingTracking(),
                         now,
-                        jobTopology));
+                        jobTopology,
+                        new DelayedScaleDown()));
         Map<String, String> expected;
         if (memoryTuningEnabled) {
             assertNotEquals(context.getConfiguration(), 
capturedConfForMaxResources);
@@ -474,6 +514,7 @@ public class ScalingExecutorTest {
     }
 
     private void testScalingEvents(boolean scalingEnabled, Duration interval) 
throws Exception {
+        var delayedScaleDown = new DelayedScaleDown();
         var jobVertexID = new JobVertexID();
 
         JobTopology jobTopology =
@@ -497,7 +538,8 @@ public class ScalingExecutorTest {
                         new HashMap<>(),
                         new ScalingTracking(),
                         now,
-                        jobTopology));
+                        jobTopology,
+                        delayedScaleDown));
         assertEquals(
                 scalingEnabled,
                 scalingExecutor.scaleResource(
@@ -506,7 +548,8 @@ public class ScalingExecutorTest {
                         new HashMap<>(),
                         new ScalingTracking(),
                         now,
-                        jobTopology));
+                        jobTopology,
+                        delayedScaleDown));
         int expectedSize = (interval == null || interval.toMillis() > 0) && 
!scalingEnabled ? 1 : 2;
         assertEquals(expectedSize, eventCollector.events.size());
 
@@ -545,7 +588,8 @@ public class ScalingExecutorTest {
                         new HashMap<>(),
                         new ScalingTracking(),
                         now,
-                        jobTopology));
+                        jobTopology,
+                        delayedScaleDown));
         var event2 = eventCollector.events.poll();
         assertThat(event2).isNotNull();
         assertThat(event2.getContext()).isSameAs(event.getContext());
@@ -555,6 +599,7 @@ public class ScalingExecutorTest {
 
     @Test
     public void testScalingUnderGcPressure() throws Exception {
+        var delayedScaleDown = new DelayedScaleDown();
         var jobVertexID = new JobVertexID();
         conf.set(AutoScalerOptions.SCALING_ENABLED, true);
         conf.set(AutoScalerOptions.GC_PRESSURE_THRESHOLD, 0.5);
@@ -580,7 +625,8 @@ public class ScalingExecutorTest {
                         new HashMap<>(),
                         new ScalingTracking(),
                         Instant.now(),
-                        jobTopology));
+                        jobTopology,
+                        delayedScaleDown));
 
         // Just below the thresholds
         metrics =
@@ -598,7 +644,8 @@ public class ScalingExecutorTest {
                         new HashMap<>(),
                         new ScalingTracking(),
                         Instant.now(),
-                        jobTopology));
+                        jobTopology,
+                        delayedScaleDown));
 
         eventCollector.events.clear();
 
@@ -618,7 +665,8 @@ public class ScalingExecutorTest {
                         new HashMap<>(),
                         new ScalingTracking(),
                         Instant.now(),
-                        jobTopology));
+                        jobTopology,
+                        delayedScaleDown));
         assertEquals("MemoryPressure", 
eventCollector.events.poll().getReason());
         assertTrue(eventCollector.events.isEmpty());
 
@@ -638,7 +686,8 @@ public class ScalingExecutorTest {
                         new HashMap<>(),
                         new ScalingTracking(),
                         Instant.now(),
-                        jobTopology));
+                        jobTopology,
+                        delayedScaleDown));
         assertEquals("MemoryPressure", 
eventCollector.events.poll().getReason());
         assertTrue(eventCollector.events.isEmpty());
     }
@@ -692,7 +741,8 @@ public class ScalingExecutorTest {
                                 new HashMap<>(),
                                 new ScalingTracking(),
                                 now,
-                                jobTopology))
+                                jobTopology,
+                                new DelayedScaleDown()))
                 .isTrue();
 
         Map<String, String> parallelismOverrides = 
stateStore.getParallelismOverrides(context);
@@ -816,7 +866,8 @@ public class ScalingExecutorTest {
                         new HashMap<>(),
                         new ScalingTracking(),
                         Instant.now(),
-                        jobTopology));
+                        jobTopology,
+                        new DelayedScaleDown()));
         if (quotaReached) {
             assertEquals("ScalingReport", 
eventCollector.events.poll().getReason());
             assertEquals("ResourceQuotaReached", 
eventCollector.events.poll().getReason());
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/AbstractAutoScalerStateStoreTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/AbstractAutoScalerStateStoreTest.java
index da0f0c2c..3dff8406 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/AbstractAutoScalerStateStoreTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/AbstractAutoScalerStateStoreTest.java
@@ -17,8 +17,11 @@
 
 package org.apache.flink.autoscaler.state;
 
+import org.apache.flink.autoscaler.DelayedScaleDown;
 import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.ScalingRecord;
 import org.apache.flink.autoscaler.ScalingSummary;
+import org.apache.flink.autoscaler.ScalingTracking;
 import org.apache.flink.autoscaler.config.AutoScalerOptions;
 import org.apache.flink.autoscaler.metrics.CollectedMetrics;
 import org.apache.flink.autoscaler.tuning.ConfigChanges;
@@ -176,6 +179,13 @@ public abstract class AbstractAutoScalerStateStoreTest<
 
     @Test
     protected void testDiscardAllState() throws Exception {
+        assertThat(stateStore.getCollectedMetrics(ctx)).isEmpty();
+        assertThat(stateStore.getScalingHistory(ctx)).isEmpty();
+        assertThat(stateStore.getParallelismOverrides(ctx)).isEmpty();
+        assertThat(stateStore.getConfigChanges(ctx).getOverrides()).isEmpty();
+        
assertThat(stateStore.getScalingTracking(ctx).getScalingRecords()).isEmpty();
+        
assertThat(stateStore.getDelayedScaleDown(ctx).getFirstTriggerTime()).isEmpty();
+
         stateStore.storeCollectedMetrics(
                 ctx, new TreeMap<>(Map.of(Instant.now(), new 
CollectedMetrics())));
         stateStore.storeScalingHistory(
@@ -187,10 +197,25 @@ public abstract class AbstractAutoScalerStateStoreTest<
         stateStore.storeConfigChanges(
                 ctx, new ConfigChanges().addOverride("config.value", "value"));
 
+        var scalingTracking = new ScalingTracking();
+        scalingTracking.addScalingRecord(
+                Instant.now().minus(Duration.ofHours(3)), new ScalingRecord());
+        scalingTracking.addScalingRecord(
+                Instant.now().minus(Duration.ofHours(2)), new ScalingRecord());
+        scalingTracking.addScalingRecord(
+                Instant.now().minus(Duration.ofHours(1)), new ScalingRecord());
+        stateStore.storeScalingTracking(ctx, scalingTracking);
+
+        var firstTriggerTime = Map.of(new JobVertexID(), Instant.now());
+        stateStore.storeDelayedScaleDown(ctx, new 
DelayedScaleDown(firstTriggerTime));
+
         assertThat(stateStore.getCollectedMetrics(ctx)).isNotEmpty();
         assertThat(stateStore.getScalingHistory(ctx)).isNotEmpty();
         assertThat(stateStore.getParallelismOverrides(ctx)).isNotEmpty();
         
assertThat(stateStore.getConfigChanges(ctx).getOverrides()).isNotEmpty();
+        
assertThat(stateStore.getScalingTracking(ctx)).isEqualTo(scalingTracking);
+        assertThat(stateStore.getDelayedScaleDown(ctx).getFirstTriggerTime())
+                .isEqualTo(firstTriggerTime);
 
         stateStore.flush(ctx);
 
@@ -198,6 +223,9 @@ public abstract class AbstractAutoScalerStateStoreTest<
         assertThat(stateStore.getScalingHistory(ctx)).isNotEmpty();
         assertThat(stateStore.getParallelismOverrides(ctx)).isNotEmpty();
         
assertThat(stateStore.getConfigChanges(ctx).getOverrides()).isNotEmpty();
+        
assertThat(stateStore.getScalingTracking(ctx)).isEqualTo(scalingTracking);
+        assertThat(stateStore.getDelayedScaleDown(ctx).getFirstTriggerTime())
+                .isEqualTo(firstTriggerTime);
 
         stateStore.clearAll(ctx);
 
@@ -205,5 +233,7 @@ public abstract class AbstractAutoScalerStateStoreTest<
         assertThat(stateStore.getScalingHistory(ctx)).isEmpty();
         assertThat(stateStore.getParallelismOverrides(ctx)).isEmpty();
         assertThat(stateStore.getConfigChanges(ctx).getOverrides()).isEmpty();
+        
assertThat(stateStore.getScalingTracking(ctx).getScalingRecords()).isEmpty();
+        
assertThat(stateStore.getDelayedScaleDown(ctx).getFirstTriggerTime()).isEmpty();
     }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStore.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStore.java
index fe812bef..ffc0d85b 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStore.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStore.java
@@ -18,6 +18,7 @@
 package org.apache.flink.kubernetes.operator.autoscaler.state;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.autoscaler.DelayedScaleDown;
 import org.apache.flink.autoscaler.ScalingSummary;
 import org.apache.flink.autoscaler.ScalingTracking;
 import org.apache.flink.autoscaler.metrics.CollectedMetrics;
@@ -43,6 +44,7 @@ import org.jetbrains.annotations.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.io.ByteArrayInputStream;
@@ -75,6 +77,8 @@ public class KubernetesAutoScalerStateStore
 
     protected static final String CONFIG_OVERRIDES_KEY = "configOverrides";
 
+    @VisibleForTesting protected static final String DELAYED_SCALE_DOWN = 
"delayedScaleDown";
+
     @VisibleForTesting protected static final int MAX_CM_BYTES = 1000000;
 
     protected static final ObjectMapper YAML_MAPPER =
@@ -104,6 +108,7 @@ public class KubernetesAutoScalerStateStore
                 jobContext, SCALING_TRACKING_KEY, 
serializeScalingTracking(scalingTrack));
     }
 
+    @Nonnull
     @Override
     public Map<JobVertexID, SortedMap<Instant, ScalingSummary>> 
getScalingHistory(
             KubernetesJobAutoScalerContext jobContext) {
@@ -154,6 +159,7 @@ public class KubernetesAutoScalerStateStore
                 jobContext, COLLECTED_METRICS_KEY, 
serializeEvaluatedMetrics(metrics));
     }
 
+    @Nonnull
     @Override
     public SortedMap<Instant, CollectedMetrics> getCollectedMetrics(
             KubernetesJobAutoScalerContext jobContext) {
@@ -187,6 +193,7 @@ public class KubernetesAutoScalerStateStore
                 serializeParallelismOverrides(parallelismOverrides));
     }
 
+    @Nonnull
     @Override
     public Map<String, String> 
getParallelismOverrides(KubernetesJobAutoScalerContext jobContext) {
         return configMapStore
@@ -221,6 +228,34 @@ public class KubernetesAutoScalerStateStore
         configMapStore.removeSerializedState(jobContext, 
PARALLELISM_OVERRIDES_KEY);
     }
 
+    @Override
+    public void storeDelayedScaleDown(
+            KubernetesJobAutoScalerContext jobContext, DelayedScaleDown 
delayedScaleDown)
+            throws Exception {
+        configMapStore.putSerializedState(
+                jobContext, DELAYED_SCALE_DOWN, 
serializeDelayedScaleDown(delayedScaleDown));
+    }
+
+    @Nonnull
+    @Override
+    public DelayedScaleDown getDelayedScaleDown(KubernetesJobAutoScalerContext 
jobContext) {
+        Optional<String> delayedScaleDown =
+                configMapStore.getSerializedState(jobContext, 
DELAYED_SCALE_DOWN);
+        if (delayedScaleDown.isEmpty()) {
+            return new DelayedScaleDown();
+        }
+
+        try {
+            return deserializeDelayedScaleDown(delayedScaleDown.get());
+        } catch (JacksonException e) {
+            LOG.error(
+                    "Could not deserialize delayed scale down, possibly the 
format changed. Discarding...",
+                    e);
+            configMapStore.removeSerializedState(jobContext, 
DELAYED_SCALE_DOWN);
+            return new DelayedScaleDown();
+        }
+    }
+
     @Override
     public void clearAll(KubernetesJobAutoScalerContext jobContext) {
         configMapStore.clearAll(jobContext);
@@ -298,6 +333,18 @@ public class KubernetesAutoScalerStateStore
         }
     }
 
+    private static String serializeDelayedScaleDown(DelayedScaleDown 
delayedScaleDown)
+            throws JacksonException {
+        return 
YAML_MAPPER.writeValueAsString(delayedScaleDown.getFirstTriggerTime());
+    }
+
+    private static DelayedScaleDown deserializeDelayedScaleDown(String 
delayedScaleDown)
+            throws JacksonException {
+        Map<JobVertexID, Instant> firstTriggerTime =
+                YAML_MAPPER.readValue(delayedScaleDown, new TypeReference<>() 
{});
+        return new DelayedScaleDown(firstTriggerTime);
+    }
+
     @VisibleForTesting
     protected void trimHistoryToMaxCmSize(KubernetesJobAutoScalerContext 
context) {
         int scalingHistorySize =

Reply via email to