This is an automated email from the ASF dual-hosted git repository.
fanrui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 968a5785 [FLINK-36018][autoscaler] Support lazy scale down to avoid
frequent rescaling (#875)
968a5785 is described below
commit 968a578515d6269bbd5637594a7a342d74c1cd5c
Author: Rui Fan <[email protected]>
AuthorDate: Fri Sep 6 16:55:18 2024 +0800
[FLINK-36018][autoscaler] Support lazy scale down to avoid frequent
rescaling (#875)
---
.../generated/auto_scaler_configuration.html | 12 +-
e2e-tests/data/autoscaler.yaml | 1 +
.../jdbc/state/JdbcAutoScalerStateStore.java | 43 +++
.../flink/autoscaler/jdbc/state/StateType.java | 3 +-
.../apache/flink/autoscaler/DelayedScaleDown.java | 69 ++++
.../apache/flink/autoscaler/JobAutoScalerImpl.java | 13 +-
.../apache/flink/autoscaler/JobVertexScaler.java | 184 ++++++++---
.../apache/flink/autoscaler/ScalingExecutor.java | 67 ++--
.../flink/autoscaler/config/AutoScalerOptions.java | 15 +-
.../autoscaler/state/AutoScalerStateStore.java | 7 +
.../state/InMemoryAutoScalerStateStore.java | 23 +-
.../flink/autoscaler/BacklogBasedScalingTest.java | 2 +-
.../flink/autoscaler/JobVertexScalerTest.java | 350 +++++++++++++++------
.../MetricsCollectionAndEvaluationTest.java | 15 +-
.../autoscaler/RecommendedParallelismTest.java | 2 +-
.../flink/autoscaler/ScalingExecutorTest.java | 149 ++++++---
.../state/AbstractAutoScalerStateStoreTest.java | 30 ++
.../state/KubernetesAutoScalerStateStore.java | 47 +++
18 files changed, 802 insertions(+), 230 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
index d9bed101..49ff71ef 100644
--- a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
+++ b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
@@ -146,18 +146,18 @@
<td>Duration</td>
<td>Maximum cap for the observed restart time when
'job.autoscaler.restart.time-tracking.enabled' is set to true.</td>
</tr>
+ <tr>
+ <td><h5>job.autoscaler.scale-down.interval</h5></td>
+ <td style="word-wrap: break-word;">1 h</td>
+ <td>Duration</td>
+ <td>The delay time for scale down to be executed. If it is greater
than 0, the scale down will be delayed. Delayed rescale can merge multiple
scale downs within `scale-down.interval` into a scale down, thereby reducing
the number of rescales. Reducing the frequency of job restarts can improve job
availability. Scale down can be executed directly if it's less than or equal
0.</td>
+ </tr>
<tr>
<td><h5>job.autoscaler.scale-down.max-factor</h5></td>
<td style="word-wrap: break-word;">0.6</td>
<td>Double</td>
<td>Max scale down factor. 1 means no limit on scale down, 0.6
means job can only be scaled down with 60% of the original parallelism.</td>
</tr>
- <tr>
- <td><h5>job.autoscaler.scale-up.grace-period</h5></td>
- <td style="word-wrap: break-word;">1 h</td>
- <td>Duration</td>
- <td>Duration in which no scale down of a vertex is allowed after
it has been scaled up.</td>
- </tr>
<tr>
<td><h5>job.autoscaler.scale-up.max-factor</h5></td>
<td style="word-wrap: break-word;">100000.0</td>
diff --git a/e2e-tests/data/autoscaler.yaml b/e2e-tests/data/autoscaler.yaml
index 9deebe92..5e0b3043 100644
--- a/e2e-tests/data/autoscaler.yaml
+++ b/e2e-tests/data/autoscaler.yaml
@@ -40,6 +40,7 @@ spec:
job.autoscaler.scaling.enabled: "true"
job.autoscaler.stabilization.interval: "5s"
job.autoscaler.metrics.window: "1m"
+ job.autoscaler.scale-down.interval: "0m"
# Invalid Validations for testing autoscaler configurations
# kubernetes.operator.job.autoscaler.scale-down.max-factor: "-0.6"
diff --git
a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java
b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java
index 1224e0d4..5ac870fe 100644
---
a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java
+++
b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java
@@ -19,6 +19,7 @@ package org.apache.flink.autoscaler.jdbc.state;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.autoscaler.DelayedScaleDown;
import org.apache.flink.autoscaler.JobAutoScalerContext;
import org.apache.flink.autoscaler.ScalingSummary;
import org.apache.flink.autoscaler.ScalingTracking;
@@ -49,6 +50,7 @@ import java.util.TreeMap;
import static
org.apache.flink.autoscaler.jdbc.state.StateType.COLLECTED_METRICS;
import static
org.apache.flink.autoscaler.jdbc.state.StateType.CONFIG_OVERRIDES;
+import static
org.apache.flink.autoscaler.jdbc.state.StateType.DELAYED_SCALE_DOWN;
import static
org.apache.flink.autoscaler.jdbc.state.StateType.PARALLELISM_OVERRIDES;
import static org.apache.flink.autoscaler.jdbc.state.StateType.SCALING_HISTORY;
import static
org.apache.flink.autoscaler.jdbc.state.StateType.SCALING_TRACKING;
@@ -214,6 +216,35 @@ public class JdbcAutoScalerStateStore<KEY, Context extends
JobAutoScalerContext<
jdbcStateStore.removeSerializedState(getSerializeKey(jobContext),
CONFIG_OVERRIDES);
}
+ @Override
+ public void storeDelayedScaleDown(Context jobContext, DelayedScaleDown
delayedScaleDown)
+ throws Exception {
+ jdbcStateStore.putSerializedState(
+ getSerializeKey(jobContext),
+ DELAYED_SCALE_DOWN,
+ serializeDelayedScaleDown(delayedScaleDown));
+ }
+
+ @Nonnull
+ @Override
+ public DelayedScaleDown getDelayedScaleDown(Context jobContext) {
+ Optional<String> delayedScaleDown =
+ jdbcStateStore.getSerializedState(getSerializeKey(jobContext),
DELAYED_SCALE_DOWN);
+ if (delayedScaleDown.isEmpty()) {
+ return new DelayedScaleDown();
+ }
+
+ try {
+ return deserializeDelayedScaleDown(delayedScaleDown.get());
+ } catch (JacksonException e) {
+ LOG.error(
+ "Could not deserialize delayed scale down, possibly the
format changed. Discarding...",
+ e);
+ jdbcStateStore.removeSerializedState(getSerializeKey(jobContext),
DELAYED_SCALE_DOWN);
+ return new DelayedScaleDown();
+ }
+ }
+
@Override
public void clearAll(Context jobContext) {
jdbcStateStore.clearAll(getSerializeKey(jobContext));
@@ -296,4 +327,16 @@ public class JdbcAutoScalerStateStore<KEY, Context extends
JobAutoScalerContext<
return null;
}
}
+
+ private static String serializeDelayedScaleDown(DelayedScaleDown
delayedScaleDown)
+ throws JacksonException {
+ return
YAML_MAPPER.writeValueAsString(delayedScaleDown.getFirstTriggerTime());
+ }
+
+ private static DelayedScaleDown deserializeDelayedScaleDown(String
delayedScaleDown)
+ throws JacksonException {
+ Map<JobVertexID, Instant> firstTriggerTime =
+ YAML_MAPPER.readValue(delayedScaleDown, new TypeReference<>()
{});
+ return new DelayedScaleDown(firstTriggerTime);
+ }
}
diff --git
a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/StateType.java
b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/StateType.java
index ca2864f4..fa3da3f4 100644
---
a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/StateType.java
+++
b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/StateType.java
@@ -27,7 +27,8 @@ public enum StateType {
SCALING_TRACKING("scalingTracking"),
COLLECTED_METRICS("collectedMetrics"),
PARALLELISM_OVERRIDES("parallelismOverrides"),
- CONFIG_OVERRIDES("configOverrides");
+ CONFIG_OVERRIDES("configOverrides"),
+ DELAYED_SCALE_DOWN("delayedScaleDown");
/**
* The identifier of each state type, it will be used to store. Please
ensure the identifier is
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/DelayedScaleDown.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/DelayedScaleDown.java
new file mode 100644
index 00000000..c2402264
--- /dev/null
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/DelayedScaleDown.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import lombok.Getter;
+
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+/** All delayed scale down requests. */
+public class DelayedScaleDown {
+
+ @Getter private final Map<JobVertexID, Instant> firstTriggerTime;
+
+ // Have any scale down request been updated? It doesn't need to be stored,
it is only used to
+ // determine whether DelayedScaleDown needs to be stored.
+ @Getter private boolean isUpdated = false;
+
+ public DelayedScaleDown() {
+ this.firstTriggerTime = new HashMap<>();
+ }
+
+ public DelayedScaleDown(Map<JobVertexID, Instant> firstTriggerTime) {
+ this.firstTriggerTime = firstTriggerTime;
+ }
+
+ Optional<Instant> getFirstTriggerTimeForVertex(JobVertexID vertex) {
+ return Optional.ofNullable(firstTriggerTime.get(vertex));
+ }
+
+ void updateTriggerTime(JobVertexID vertex, Instant instant) {
+ firstTriggerTime.put(vertex, instant);
+ isUpdated = true;
+ }
+
+ void clearVertex(JobVertexID vertex) {
+ Instant removed = firstTriggerTime.remove(vertex);
+ if (removed != null) {
+ isUpdated = true;
+ }
+ }
+
+ void clearAll() {
+ if (firstTriggerTime.isEmpty()) {
+ return;
+ }
+ firstTriggerTime.clear();
+ isUpdated = true;
+ }
+}
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
index 19a23d5a..f3af3baa 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
@@ -214,9 +214,20 @@ public class JobAutoScalerImpl<KEY, Context extends
JobAutoScalerContext<KEY>>
return;
}
+ var delayedScaleDown = stateStore.getDelayedScaleDown(ctx);
var parallelismChanged =
scalingExecutor.scaleResource(
- ctx, evaluatedMetrics, scalingHistory,
scalingTracking, now, jobTopology);
+ ctx,
+ evaluatedMetrics,
+ scalingHistory,
+ scalingTracking,
+ now,
+ jobTopology,
+ delayedScaleDown);
+
+ if (delayedScaleDown.isUpdated()) {
+ stateStore.storeDelayedScaleDown(ctx, delayedScaleDown);
+ }
if (parallelismChanged) {
autoscalerMetrics.incrementScaling();
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
index 7442532a..5a43ad36 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
@@ -28,6 +28,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.util.Preconditions;
+import lombok.Getter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,11 +38,12 @@ import java.time.Instant;
import java.time.ZoneId;
import java.util.Collection;
import java.util.Map;
+import java.util.Objects;
import java.util.SortedMap;
import static
org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
import static
org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_UP_FACTOR;
-import static
org.apache.flink.autoscaler.config.AutoScalerOptions.SCALE_UP_GRACE_PERIOD;
+import static
org.apache.flink.autoscaler.config.AutoScalerOptions.SCALE_DOWN_INTERVAL;
import static
org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_EVENT_INTERVAL;
import static
org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
import static
org.apache.flink.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM;
@@ -51,6 +53,7 @@ import static
org.apache.flink.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM;
import static
org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
import static org.apache.flink.autoscaler.topology.ShipStrategy.HASH;
+import static org.apache.flink.util.Preconditions.checkArgument;
/** Component responsible for computing vertex parallelism based on the
scaling metrics. */
public class JobVertexScaler<KEY, Context extends JobAutoScalerContext<KEY>> {
@@ -71,13 +74,79 @@ public class JobVertexScaler<KEY, Context extends
JobAutoScalerContext<KEY>> {
this.autoScalerEventHandler = autoScalerEventHandler;
}
- public int computeScaleTargetParallelism(
+ /** The parallelism change type of {@link ParallelismChange}. */
+ public enum ParallelismChangeType {
+ NO_CHANGE,
+ REQUIRED_CHANGE,
+ OPTIONAL_CHANGE;
+ }
+
+ /**
+ * The rescaling will be triggered if any vertex's ParallelismChange is
required. This means
+ * that if all vertices' ParallelismChange is optional, rescaling will be
ignored.
+ */
+ @Getter
+ public static class ParallelismChange {
+
+ private static final ParallelismChange NO_CHANGE =
+ new ParallelismChange(ParallelismChangeType.NO_CHANGE, -1);
+
+ private final ParallelismChangeType changeType;
+ private final int newParallelism;
+
+ private ParallelismChange(ParallelismChangeType changeType, int
newParallelism) {
+ this.changeType = changeType;
+ this.newParallelism = newParallelism;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ParallelismChange that = (ParallelismChange) o;
+ return changeType == that.changeType && newParallelism ==
that.newParallelism;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(changeType, newParallelism);
+ }
+
+ @Override
+ public String toString() {
+ return "ParallelismChange{"
+ + "changeType="
+ + changeType
+ + ", newParallelism="
+ + newParallelism
+ + '}';
+ }
+
+ public static ParallelismChange required(int newParallelism) {
+ return new
ParallelismChange(ParallelismChangeType.REQUIRED_CHANGE, newParallelism);
+ }
+
+ public static ParallelismChange optional(int newParallelism) {
+ return new
ParallelismChange(ParallelismChangeType.OPTIONAL_CHANGE, newParallelism);
+ }
+
+ public static ParallelismChange noChange() {
+ return NO_CHANGE;
+ }
+ }
+
+ public ParallelismChange computeScaleTargetParallelism(
Context context,
JobVertexID vertex,
Collection<ShipStrategy> inputShipStrategies,
Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
SortedMap<Instant, ScalingSummary> history,
- Duration restartTime) {
+ Duration restartTime,
+ DelayedScaleDown delayedScaleDown) {
var conf = context.getConfiguration();
var currentParallelism = (int)
evaluatedMetrics.get(PARALLELISM).getCurrent();
double averageTrueProcessingRate =
evaluatedMetrics.get(TRUE_PROCESSING_RATE).getAverage();
@@ -85,7 +154,7 @@ public class JobVertexScaler<KEY, Context extends
JobAutoScalerContext<KEY>> {
LOG.warn(
"True processing rate is not available for {}, cannot
compute new parallelism",
vertex);
- return currentParallelism;
+ return ParallelismChange.noChange();
}
double targetCapacity =
@@ -95,7 +164,7 @@ public class JobVertexScaler<KEY, Context extends
JobAutoScalerContext<KEY>> {
LOG.warn(
"Target data rate is not available for {}, cannot compute
new parallelism",
vertex);
- return currentParallelism;
+ return ParallelismChange.noChange();
}
LOG.debug("Target processing capacity for {} is {}", vertex,
targetCapacity);
@@ -131,65 +200,92 @@ public class JobVertexScaler<KEY, Context extends
JobAutoScalerContext<KEY>> {
Math.min(currentParallelism,
conf.getInteger(VERTEX_MIN_PARALLELISM)),
Math.max(currentParallelism,
conf.getInteger(VERTEX_MAX_PARALLELISM)));
- if (newParallelism == currentParallelism
- || blockScalingBasedOnPastActions(
- context,
- vertex,
- conf,
- evaluatedMetrics,
- history,
- currentParallelism,
- newParallelism)) {
- return currentParallelism;
+ if (newParallelism == currentParallelism) {
+ // Clear delayed scale down request if the new parallelism is
equal to
+ // currentParallelism.
+ delayedScaleDown.clearVertex(vertex);
+ return ParallelismChange.noChange();
}
// We record our expectations for this scaling operation
evaluatedMetrics.put(
ScalingMetric.EXPECTED_PROCESSING_RATE,
EvaluatedScalingMetric.of(cappedTargetCapacity));
- return newParallelism;
+
+ return detectBlockScaling(
+ context,
+ vertex,
+ conf,
+ evaluatedMetrics,
+ history,
+ currentParallelism,
+ newParallelism,
+ delayedScaleDown);
}
- private boolean blockScalingBasedOnPastActions(
+ private ParallelismChange detectBlockScaling(
Context context,
JobVertexID vertex,
Configuration conf,
Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
SortedMap<Instant, ScalingSummary> history,
int currentParallelism,
- int newParallelism) {
-
- // If we don't have past scaling actions for this vertex, there is
nothing to do
- if (history.isEmpty()) {
- return false;
- }
-
- boolean scaledUp = currentParallelism < newParallelism;
- var lastScalingTs = history.lastKey();
- var lastSummary = history.get(lastScalingTs);
+ int newParallelism,
+ DelayedScaleDown delayedScaleDown) {
+ checkArgument(
+ currentParallelism != newParallelism,
+ "The newParallelism is equal to currentParallelism, no scaling
is needed. This is probably a bug.");
+
+ var scaledUp = currentParallelism < newParallelism;
+
+ if (scaledUp) {
+ // Clear delayed scale down request if the new parallelism is
greater than
+ // currentParallelism.
+ delayedScaleDown.clearVertex(vertex);
+
+ // If we don't have past scaling actions for this vertex, don't
block scale up.
+ if (history.isEmpty()) {
+ return ParallelismChange.required(newParallelism);
+ }
- if (currentParallelism == lastSummary.getNewParallelism() &&
lastSummary.isScaledUp()) {
- if (scaledUp) {
- return detectIneffectiveScaleUp(
- context, vertex, conf, evaluatedMetrics, lastSummary);
- } else {
- return detectImmediateScaleDownAfterScaleUp(vertex, conf,
lastScalingTs);
+ var lastSummary = history.get(history.lastKey());
+ if (currentParallelism == lastSummary.getNewParallelism()
+ && lastSummary.isScaledUp()
+ && detectIneffectiveScaleUp(
+ context, vertex, conf, evaluatedMetrics,
lastSummary)) {
+ // Block scale up when last rescale is ineffective.
+ return ParallelismChange.noChange();
}
+
+ return ParallelismChange.required(newParallelism);
+ } else {
+ return applyScaleDownInterval(delayedScaleDown, vertex, conf,
newParallelism);
}
- return false;
}
- private boolean detectImmediateScaleDownAfterScaleUp(
- JobVertexID vertex, Configuration conf, Instant lastScalingTs) {
+ private ParallelismChange applyScaleDownInterval(
+ DelayedScaleDown delayedScaleDown,
+ JobVertexID vertex,
+ Configuration conf,
+ int newParallelism) {
+ var scaleDownInterval = conf.get(SCALE_DOWN_INTERVAL);
+ if (scaleDownInterval.toMillis() <= 0) {
+ // The scale down interval is disable, so don't block scaling.
+ return ParallelismChange.required(newParallelism);
+ }
- var gracePeriod = conf.get(SCALE_UP_GRACE_PERIOD);
- if (lastScalingTs.plus(gracePeriod).isAfter(clock.instant())) {
- LOG.info(
- "Skipping immediate scale down after scale up within grace
period for {}",
- vertex);
- return true;
+ var firstTriggerTime =
delayedScaleDown.getFirstTriggerTimeForVertex(vertex);
+ if (firstTriggerTime.isEmpty()) {
+ LOG.info("The scale down of {} is delayed by {}.", vertex,
scaleDownInterval);
+ delayedScaleDown.updateTriggerTime(vertex, clock.instant());
+ return ParallelismChange.optional(newParallelism);
+ }
+
+ if
(clock.instant().isBefore(firstTriggerTime.get().plus(scaleDownInterval))) {
+ LOG.debug("Try to skip immediate scale down within scale-down
interval for {}", vertex);
+ return ParallelismChange.optional(newParallelism);
} else {
- return false;
+ return ParallelismChange.required(newParallelism);
}
}
@@ -258,7 +354,7 @@ public class JobVertexScaler<KEY, Context extends
JobAutoScalerContext<KEY>> {
double scaleFactor,
int parallelismLowerLimit,
int parallelismUpperLimit) {
- Preconditions.checkArgument(
+ checkArgument(
parallelismLowerLimit <= parallelismUpperLimit,
"The parallelism lower limitation must not be greater than the
parallelism upper limitation.");
if (parallelismLowerLimit > maxParallelism) {
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
index af325371..02e5ad4f 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
@@ -45,9 +45,13 @@ import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import java.util.SortedMap;
+import static
org.apache.flink.autoscaler.JobVertexScaler.ParallelismChangeType.NO_CHANGE;
+import static
org.apache.flink.autoscaler.JobVertexScaler.ParallelismChangeType.REQUIRED_CHANGE;
import static
org.apache.flink.autoscaler.config.AutoScalerOptions.EXCLUDED_PERIODS;
import static
org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_ENABLED;
import static
org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_EVENT_INTERVAL;
@@ -100,25 +104,26 @@ public class ScalingExecutor<KEY, Context extends
JobAutoScalerContext<KEY>> {
Map<JobVertexID, SortedMap<Instant, ScalingSummary>>
scalingHistory,
ScalingTracking scalingTracking,
Instant now,
- JobTopology jobTopology)
+ JobTopology jobTopology,
+ DelayedScaleDown delayedScaleDown)
throws Exception {
var conf = context.getConfiguration();
var restartTime = scalingTracking.getMaxRestartTimeOrDefault(conf);
var scalingSummaries =
computeScalingSummary(
- context, evaluatedMetrics, scalingHistory,
restartTime, jobTopology);
+ context,
+ evaluatedMetrics,
+ scalingHistory,
+ restartTime,
+ jobTopology,
+ delayedScaleDown);
if (scalingSummaries.isEmpty()) {
LOG.info("All job vertices are currently running at their target
parallelism.");
return false;
}
- if (allVerticesWithinUtilizationTarget(
- evaluatedMetrics.getVertexMetrics(), scalingSummaries)) {
- return false;
- }
-
updateRecommendedParallelism(evaluatedMetrics.getVertexMetrics(),
scalingSummaries);
if (checkIfBlockedAndTriggerScalingEvent(context, scalingSummaries,
conf, now)) {
@@ -156,6 +161,9 @@ public class ScalingExecutor<KEY, Context extends
JobAutoScalerContext<KEY>> {
autoScalerStateStore.storeConfigChanges(context, configOverrides);
+ // Try to clear all delayed scale down requests after scaling.
+ delayedScaleDown.clearAll();
+
return true;
}
@@ -172,12 +180,16 @@ public class ScalingExecutor<KEY, Context extends
JobAutoScalerContext<KEY>> {
scalingSummary.getNewParallelism())));
}
- protected static boolean allVerticesWithinUtilizationTarget(
+ @VisibleForTesting
+ static boolean allRequiredVerticesWithinUtilizationTarget(
Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>>
evaluatedMetrics,
- Map<JobVertexID, ScalingSummary> scalingSummaries) {
+ Set<JobVertexID> requiredVertices) {
+ // All vertices' ParallelismChange is optional, rescaling will be
ignored.
+ if (requiredVertices.isEmpty()) {
+ return true;
+ }
- for (Map.Entry<JobVertexID, ScalingSummary> entry :
scalingSummaries.entrySet()) {
- var vertex = entry.getKey();
+ for (JobVertexID vertex : requiredVertices) {
var metrics = evaluatedMetrics.get(vertex);
double trueProcessingRate =
metrics.get(TRUE_PROCESSING_RATE).getAverage();
@@ -212,7 +224,8 @@ public class ScalingExecutor<KEY, Context extends
JobAutoScalerContext<KEY>> {
EvaluatedMetrics evaluatedMetrics,
Map<JobVertexID, SortedMap<Instant, ScalingSummary>>
scalingHistory,
Duration restartTime,
- JobTopology jobTopology) {
+ JobTopology jobTopology,
+ DelayedScaleDown delayedScaleDown) {
LOG.debug("Restart time used in scaling summary computation: {}",
restartTime);
if (isJobUnderMemoryPressure(context,
evaluatedMetrics.getGlobalMetrics())) {
@@ -221,6 +234,8 @@ public class ScalingExecutor<KEY, Context extends
JobAutoScalerContext<KEY>> {
}
var out = new HashMap<JobVertexID, ScalingSummary>();
+ var requiredVertices = new HashSet<JobVertexID>();
+
var excludeVertexIdList =
context.getConfiguration().get(AutoScalerOptions.VERTEX_EXCLUDE_IDS);
evaluatedMetrics
@@ -235,7 +250,7 @@ public class ScalingExecutor<KEY, Context extends
JobAutoScalerContext<KEY>> {
var currentParallelism =
(int)
metrics.get(ScalingMetric.PARALLELISM).getCurrent();
- var newParallelism =
+ var parallelismChange =
jobVertexScaler.computeScaleTargetParallelism(
context,
v,
@@ -243,15 +258,29 @@ public class ScalingExecutor<KEY, Context extends
JobAutoScalerContext<KEY>> {
metrics,
scalingHistory.getOrDefault(
v,
Collections.emptySortedMap()),
- restartTime);
- if (currentParallelism != newParallelism) {
- out.put(
- v,
- new ScalingSummary(
- currentParallelism,
newParallelism, metrics));
+ restartTime,
+ delayedScaleDown);
+ if (NO_CHANGE ==
parallelismChange.getChangeType()) {
+ return;
+ } else if (REQUIRED_CHANGE ==
parallelismChange.getChangeType()) {
+ requiredVertices.add(v);
}
+ out.put(
+ v,
+ new ScalingSummary(
+ currentParallelism,
+
parallelismChange.getNewParallelism(),
+ metrics));
}
});
+
+ // If the Utilization of all required tasks is within range, we can
skip scaling.
+ // It means that if only optional tasks are out of scope, we still
need to ignore scale.
+ if (allRequiredVerticesWithinUtilizationTarget(
+ evaluatedMetrics.getVertexMetrics(), requiredVertices)) {
+ return Map.of();
+ }
+
return out;
}
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
index 6922448b..e1ea6a86 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
@@ -102,13 +102,20 @@ public class AutoScalerOptions {
.withFallbackKeys(oldOperatorConfigKey("target.utilization.boundary"))
.withDescription(
"Target vertex utilization boundary. Scaling won't
be performed if the processing capacity is within [target_rate /
(target_utilization - boundary), (target_rate / (target_utilization +
boundary)]");
- public static final ConfigOption<Duration> SCALE_UP_GRACE_PERIOD =
- autoScalerConfig("scale-up.grace-period")
+
+ public static final ConfigOption<Duration> SCALE_DOWN_INTERVAL =
+ autoScalerConfig("scale-down.interval")
.durationType()
.defaultValue(Duration.ofHours(1))
-
.withFallbackKeys(oldOperatorConfigKey("scale-up.grace-period"))
+
.withDeprecatedKeys(autoScalerConfigKey("scale-up.grace-period"))
+ .withFallbackKeys(
+ oldOperatorConfigKey("scale-up.grace-period"),
+ oldOperatorConfigKey("scale-down.interval"))
.withDescription(
- "Duration in which no scale down of a vertex is
allowed after it has been scaled up.");
+ "The delay time for scale down to be executed. If
it is greater than 0, the scale down will be delayed. "
+ + "Delayed rescale can merge multiple
scale downs within `scale-down.interval` into a scale down, thereby reducing
the number of rescales. "
+ + "Reducing the frequency of job restarts
can improve job availability. "
+ + "Scale down can be executed directly if
it's less than or equal 0.");
public static final ConfigOption<Integer> VERTEX_MIN_PARALLELISM =
autoScalerConfig("vertex.min-parallelism")
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java
index 7fb369cf..15581b75 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java
@@ -18,6 +18,7 @@
package org.apache.flink.autoscaler.state;
import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.DelayedScaleDown;
import org.apache.flink.autoscaler.JobAutoScalerContext;
import org.apache.flink.autoscaler.ScalingSummary;
import org.apache.flink.autoscaler.ScalingTracking;
@@ -77,6 +78,12 @@ public interface AutoScalerStateStore<KEY, Context extends
JobAutoScalerContext<
void removeConfigChanges(Context jobContext) throws Exception;
+ void storeDelayedScaleDown(Context jobContext, DelayedScaleDown
delayedScaleDown)
+ throws Exception;
+
+ @Nonnull
+ DelayedScaleDown getDelayedScaleDown(Context jobContext) throws Exception;
+
/** Removes all data from this context. Flush stil needs to be called. */
void clearAll(Context jobContext) throws Exception;
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java
index 35084db3..cf6aa74f 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java
@@ -17,6 +17,7 @@
package org.apache.flink.autoscaler.state;
+import org.apache.flink.autoscaler.DelayedScaleDown;
import org.apache.flink.autoscaler.JobAutoScalerContext;
import org.apache.flink.autoscaler.ScalingSummary;
import org.apache.flink.autoscaler.ScalingTracking;
@@ -54,12 +55,15 @@ public class InMemoryAutoScalerStateStore<KEY, Context
extends JobAutoScalerCont
private final Map<KEY, ScalingTracking> scalingTrackingStore;
+ private final Map<KEY, DelayedScaleDown> delayedScaleDownStore;
+
public InMemoryAutoScalerStateStore() {
scalingHistoryStore = new ConcurrentHashMap<>();
collectedMetricsStore = new ConcurrentHashMap<>();
parallelismOverridesStore = new ConcurrentHashMap<>();
scalingTrackingStore = new ConcurrentHashMap<>();
- tmConfigOverrides = new ConcurrentHashMap<KEY, ConfigChanges>();
+ tmConfigOverrides = new ConcurrentHashMap<>();
+ delayedScaleDownStore = new ConcurrentHashMap<>();
}
@Override
@@ -69,6 +73,7 @@ public class InMemoryAutoScalerStateStore<KEY, Context
extends JobAutoScalerCont
scalingHistoryStore.put(jobContext.getJobKey(), scalingHistory);
}
+ @Nonnull
@Override
public Map<JobVertexID, SortedMap<Instant, ScalingSummary>>
getScalingHistory(
Context jobContext) {
@@ -98,6 +103,7 @@ public class InMemoryAutoScalerStateStore<KEY, Context
extends JobAutoScalerCont
collectedMetricsStore.put(jobContext.getJobKey(), metrics);
}
+ @Nonnull
@Override
public SortedMap<Instant, CollectedMetrics> getCollectedMetrics(Context
jobContext) {
return
Optional.ofNullable(collectedMetricsStore.get(jobContext.getJobKey()))
@@ -115,6 +121,7 @@ public class InMemoryAutoScalerStateStore<KEY, Context
extends JobAutoScalerCont
parallelismOverridesStore.put(jobContext.getJobKey(),
parallelismOverrides);
}
+ @Nonnull
@Override
public Map<String, String> getParallelismOverrides(Context jobContext) {
return
Optional.ofNullable(parallelismOverridesStore.get(jobContext.getJobKey()))
@@ -143,12 +150,26 @@ public class InMemoryAutoScalerStateStore<KEY, Context
extends JobAutoScalerCont
parallelismOverridesStore.remove(jobContext.getJobKey());
}
+ @Override
+ public void storeDelayedScaleDown(Context jobContext, DelayedScaleDown
delayedScaleDown) {
+ delayedScaleDownStore.put(jobContext.getJobKey(), delayedScaleDown);
+ }
+
+ @Nonnull
+ @Override
+ public DelayedScaleDown getDelayedScaleDown(Context jobContext) {
+ return
Optional.ofNullable(delayedScaleDownStore.get(jobContext.getJobKey()))
+ .orElse(new DelayedScaleDown());
+ }
+
@Override
public void clearAll(Context jobContext) {
scalingHistoryStore.remove(jobContext.getJobKey());
parallelismOverridesStore.remove(jobContext.getJobKey());
collectedMetricsStore.remove(jobContext.getJobKey());
tmConfigOverrides.remove(jobContext.getJobKey());
+ scalingTrackingStore.remove(jobContext.getJobKey());
+ delayedScaleDownStore.remove(jobContext.getJobKey());
}
@Override
diff --git
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java
index 3fde1640..0aae09c9 100644
---
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java
+++
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java
@@ -97,7 +97,7 @@ public class BacklogBasedScalingTest {
defaultConf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, (double)
Integer.MAX_VALUE);
defaultConf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.8);
defaultConf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.1);
- defaultConf.set(AutoScalerOptions.SCALE_UP_GRACE_PERIOD,
Duration.ZERO);
+ defaultConf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO);
defaultConf.set(AutoScalerOptions.BACKLOG_PROCESSING_LAG_THRESHOLD,
Duration.ofSeconds(1));
autoscaler =
diff --git
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
index 7856a3e7..17ade6c8 100644
---
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
+++
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.autoscaler;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.autoscaler.JobVertexScaler.ParallelismChange;
import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.autoscaler.event.TestingEventCollector;
import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
@@ -36,6 +37,7 @@ import org.junit.jupiter.params.provider.MethodSource;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
+import java.time.ZoneId;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -48,7 +50,6 @@ import static
org.apache.flink.autoscaler.JobVertexScaler.INEFFECTIVE_SCALING;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
/** Test for vertex parallelism scaler logic. */
@@ -57,6 +58,8 @@ public class JobVertexScalerTest {
private static final Collection<ShipStrategy> NOT_ADJUST_INPUTS =
List.of(ShipStrategy.REBALANCE, ShipStrategy.RESCALE);
+ private final JobVertexID vertex = new JobVertexID();
+
private TestingEventCollector<JobID, JobAutoScalerContext<JobID>>
eventCollector;
private JobVertexScaler<JobID, JobAutoScalerContext<JobID>> vertexScaler;
private JobAutoScalerContext<JobID> context;
@@ -95,127 +98,139 @@ public class JobVertexScalerTest {
public void testParallelismScaling(Collection<ShipStrategy>
inputShipStrategies) {
var op = new JobVertexID();
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
+ var delayedScaleDown = new DelayedScaleDown();
assertEquals(
- 5,
+ ParallelismChange.optional(5),
vertexScaler.computeScaleTargetParallelism(
context,
op,
inputShipStrategies,
evaluated(10, 50, 100),
Collections.emptySortedMap(),
- restartTime));
+ restartTime,
+ delayedScaleDown));
conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8);
assertEquals(
- 8,
+ ParallelismChange.optional(8),
vertexScaler.computeScaleTargetParallelism(
context,
op,
inputShipStrategies,
evaluated(10, 50, 100),
Collections.emptySortedMap(),
- restartTime));
+ restartTime,
+ delayedScaleDown));
conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8);
assertEquals(
- 10,
+ ParallelismChange.noChange(),
vertexScaler.computeScaleTargetParallelism(
context,
op,
inputShipStrategies,
evaluated(10, 80, 100),
Collections.emptySortedMap(),
- restartTime));
+ restartTime,
+ delayedScaleDown));
conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8);
assertEquals(
- 8,
+ ParallelismChange.optional(8),
vertexScaler.computeScaleTargetParallelism(
context,
op,
inputShipStrategies,
evaluated(10, 60, 100),
Collections.emptySortedMap(),
- restartTime));
+ restartTime,
+ delayedScaleDown));
assertEquals(
- 8,
+ ParallelismChange.optional(8),
vertexScaler.computeScaleTargetParallelism(
context,
op,
inputShipStrategies,
evaluated(10, 59, 100),
Collections.emptySortedMap(),
- restartTime));
+ restartTime,
+ delayedScaleDown));
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.5);
assertEquals(
- 10,
+ ParallelismChange.required(10),
vertexScaler.computeScaleTargetParallelism(
context,
op,
inputShipStrategies,
evaluated(2, 100, 40),
Collections.emptySortedMap(),
- restartTime));
+ restartTime,
+ delayedScaleDown));
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.6);
assertEquals(
- 4,
+ ParallelismChange.required(4),
vertexScaler.computeScaleTargetParallelism(
context,
op,
inputShipStrategies,
evaluated(2, 100, 100),
Collections.emptySortedMap(),
- restartTime));
+ restartTime,
+ delayedScaleDown));
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.5);
assertEquals(
- 5,
+ ParallelismChange.optional(5),
vertexScaler.computeScaleTargetParallelism(
context,
op,
inputShipStrategies,
evaluated(10, 10, 100),
Collections.emptySortedMap(),
- restartTime));
+ restartTime,
+ delayedScaleDown));
conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.6);
assertEquals(
- 4,
+ ParallelismChange.optional(4),
vertexScaler.computeScaleTargetParallelism(
context,
op,
inputShipStrategies,
evaluated(10, 10, 100),
Collections.emptySortedMap(),
- restartTime));
+ restartTime,
+ delayedScaleDown));
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
conf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, 0.5);
assertEquals(
- 15,
+ ParallelismChange.required(15),
vertexScaler.computeScaleTargetParallelism(
context,
op,
inputShipStrategies,
evaluated(10, 200, 10),
Collections.emptySortedMap(),
- restartTime));
+ restartTime,
+ delayedScaleDown));
conf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, 0.6);
assertEquals(
- 16,
+ ParallelismChange.required(16),
vertexScaler.computeScaleTargetParallelism(
context,
op,
inputShipStrategies,
evaluated(10, 200, 10),
Collections.emptySortedMap(),
- restartTime));
+ restartTime,
+ delayedScaleDown));
}
@Test
@@ -319,95 +334,179 @@ public class JobVertexScalerTest {
@Test
public void testMinParallelismLimitIsUsed() {
conf.setInteger(AutoScalerOptions.VERTEX_MIN_PARALLELISM, 5);
+ var delayedScaleDown = new DelayedScaleDown();
+
assertEquals(
- 5,
+ ParallelismChange.optional(5),
vertexScaler.computeScaleTargetParallelism(
context,
new JobVertexID(),
NOT_ADJUST_INPUTS,
evaluated(10, 100, 500),
Collections.emptySortedMap(),
- restartTime));
+ restartTime,
+ delayedScaleDown));
// Make sure we respect current parallelism in case it's lower
assertEquals(
- 4,
+ ParallelismChange.noChange(),
vertexScaler.computeScaleTargetParallelism(
context,
new JobVertexID(),
NOT_ADJUST_INPUTS,
evaluated(4, 100, 500),
Collections.emptySortedMap(),
- restartTime));
+ restartTime,
+ delayedScaleDown));
}
@Test
public void testMaxParallelismLimitIsUsed() {
conf.setInteger(AutoScalerOptions.VERTEX_MAX_PARALLELISM, 10);
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
+ var delayedScaleDown = new DelayedScaleDown();
+
assertEquals(
- 10,
+ ParallelismChange.noChange(),
vertexScaler.computeScaleTargetParallelism(
context,
new JobVertexID(),
NOT_ADJUST_INPUTS,
evaluated(10, 500, 100),
Collections.emptySortedMap(),
- restartTime));
+ restartTime,
+ delayedScaleDown));
// Make sure we respect current parallelism in case it's higher
assertEquals(
- 12,
+ ParallelismChange.noChange(),
vertexScaler.computeScaleTargetParallelism(
context,
new JobVertexID(),
NOT_ADJUST_INPUTS,
evaluated(12, 500, 100),
Collections.emptySortedMap(),
- restartTime));
+ restartTime,
+ delayedScaleDown));
}
@Test
- public void testScaleDownAfterScaleUpDetection() {
- var op = new JobVertexID();
+ public void testDisableScaleDownInterval() {
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
- conf.set(AutoScalerOptions.SCALE_UP_GRACE_PERIOD,
Duration.ofMinutes(1));
- var clock = Clock.systemDefaultZone();
- vertexScaler.setClock(clock);
+ conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ofMinutes(0));
- var evaluated = evaluated(5, 100, 50);
- var history = new TreeMap<Instant, ScalingSummary>();
- assertEquals(
- 10,
- vertexScaler.computeScaleTargetParallelism(
- context, op, NOT_ADJUST_INPUTS, evaluated, history,
restartTime));
+ var delayedScaleDown = new DelayedScaleDown();
- history.put(clock.instant(), new ScalingSummary(5, 10, evaluated));
+ assertParallelismChange(10, 50, 100, ParallelismChange.required(5),
delayedScaleDown);
+ }
- // Should not allow scale back down immediately
- evaluated = evaluated(10, 50, 100);
- assertEquals(
- 10,
- vertexScaler.computeScaleTargetParallelism(
- context, op, NOT_ADJUST_INPUTS, evaluated, history,
restartTime));
+ @Test
+ public void testRequiredScaleDownAfterInterval() {
+ conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
+ conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ofMinutes(1));
+ var instant = Instant.now();
- // Pass some time...
- clock = Clock.offset(Clock.systemDefaultZone(),
Duration.ofSeconds(61));
- vertexScaler.setClock(clock);
+ var delayedScaleDown = new DelayedScaleDown();
- assertEquals(
- 5,
- vertexScaler.computeScaleTargetParallelism(
- context, op, NOT_ADJUST_INPUTS, evaluated, history,
restartTime));
- history.put(clock.instant(), new ScalingSummary(10, 5, evaluated));
+ // The scale down shouldn't be required.
+ vertexScaler.setClock(Clock.fixed(instant, ZoneId.systemDefault()));
+ assertParallelismChange(100, 800, 1000,
ParallelismChange.optional(80), delayedScaleDown);
+
+ // Within scale down interval.
+ vertexScaler.setClock(
+ Clock.fixed(instant.plus(Duration.ofSeconds(10)),
ZoneId.systemDefault()));
+ assertParallelismChange(100, 900, 1000,
ParallelismChange.optional(90), delayedScaleDown);
+
+ vertexScaler.setClock(
+ Clock.fixed(instant.plus(Duration.ofSeconds(20)),
ZoneId.systemDefault()));
+ assertParallelismChange(100, 820, 1000,
ParallelismChange.optional(82), delayedScaleDown);
+
+ vertexScaler.setClock(
+ Clock.fixed(instant.plus(Duration.ofSeconds(40)),
ZoneId.systemDefault()));
+ assertParallelismChange(100, 720, 1000,
ParallelismChange.optional(72), delayedScaleDown);
+
+ vertexScaler.setClock(
+ Clock.fixed(instant.plus(Duration.ofSeconds(50)),
ZoneId.systemDefault()));
+ assertParallelismChange(100, 600, 1000,
ParallelismChange.optional(60), delayedScaleDown);
+
+ vertexScaler.setClock(
+ Clock.fixed(instant.plus(Duration.ofSeconds(59)),
ZoneId.systemDefault()));
+ assertParallelismChange(100, 640, 1000,
ParallelismChange.optional(64), delayedScaleDown);
+
+ // The scale down is required after the scale down interval ends.
+ vertexScaler.setClock(
+ Clock.fixed(instant.plus(Duration.ofSeconds(60)),
ZoneId.systemDefault()));
+ assertParallelismChange(100, 700, 1000,
ParallelismChange.required(70), delayedScaleDown);
+ }
+
+ @Test
+ public void testImmediateScaleUpWithinScaleDownInterval() {
+ conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
+ conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ofMinutes(1));
+ var instant = Instant.now();
+
+ var delayedScaleDown = new DelayedScaleDown();
+
+ // The scale down shouldn't be required.
+ vertexScaler.setClock(Clock.fixed(instant, ZoneId.systemDefault()));
+ assertParallelismChange(100, 800, 1000,
ParallelismChange.optional(80), delayedScaleDown);
+ assertThat(delayedScaleDown.getFirstTriggerTime()).isNotEmpty();
+
+ // Within scale down interval.
+ vertexScaler.setClock(
+ Clock.fixed(instant.plus(Duration.ofSeconds(10)),
ZoneId.systemDefault()));
+ assertParallelismChange(100, 900, 1000,
ParallelismChange.optional(90), delayedScaleDown);
+ assertThat(delayedScaleDown.getFirstTriggerTime()).isNotEmpty();
// Allow immediate scale up
- evaluated = evaluated(5, 100, 50);
+ vertexScaler.setClock(
+ Clock.fixed(instant.plus(Duration.ofSeconds(12)),
ZoneId.systemDefault()));
+ assertParallelismChange(100, 1700, 1000,
ParallelismChange.required(170), delayedScaleDown);
+ assertThat(delayedScaleDown.getFirstTriggerTime()).isEmpty();
+ }
+
+ @Test
+ public void testCancelDelayedScaleDownAfterNewParallelismIsSame() {
+ conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
+ conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ofMinutes(1));
+ var instant = Instant.now();
+
+ var delayedScaleDown = new DelayedScaleDown();
+
+ // The scale down shouldn't be required.
+ vertexScaler.setClock(Clock.fixed(instant, ZoneId.systemDefault()));
+ assertParallelismChange(100, 800, 1000,
ParallelismChange.optional(80), delayedScaleDown);
+ assertThat(delayedScaleDown.getFirstTriggerTime()).isNotEmpty();
+
+ // Within scale down interval.
+ vertexScaler.setClock(
+ Clock.fixed(instant.plus(Duration.ofSeconds(10)),
ZoneId.systemDefault()));
+ assertParallelismChange(100, 900, 1000,
ParallelismChange.optional(90), delayedScaleDown);
+ assertThat(delayedScaleDown.getFirstTriggerTime()).isNotEmpty();
+
+ // The delayed scale down is canceled when new parallelism is same
with current parallelism.
+ vertexScaler.setClock(
+ Clock.fixed(instant.plus(Duration.ofSeconds(12)),
ZoneId.systemDefault()));
+ assertParallelismChange(100, 1000, 1000, ParallelismChange.noChange(),
delayedScaleDown);
+ assertThat(delayedScaleDown.getFirstTriggerTime()).isEmpty();
+ }
+
+ private void assertParallelismChange(
+ int parallelism,
+ int targetDataRate,
+ int trueProcessingRate,
+ ParallelismChange expectedParallelismChange,
+ DelayedScaleDown delayedScaleDown) {
assertEquals(
- 10,
+ expectedParallelismChange,
vertexScaler.computeScaleTargetParallelism(
- context, op, NOT_ADJUST_INPUTS, evaluated, history,
restartTime));
- history.put(clock.instant(), new ScalingSummary(5, 10, evaluated));
+ context,
+ vertex,
+ NOT_ADJUST_INPUTS,
+ evaluated(parallelism, targetDataRate,
trueProcessingRate),
+ new TreeMap<>(),
+ restartTime,
+ delayedScaleDown));
}
@Test
@@ -415,23 +514,37 @@ public class JobVertexScalerTest {
var op = new JobVertexID();
conf.set(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED,
true);
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
- conf.set(AutoScalerOptions.SCALE_UP_GRACE_PERIOD, Duration.ZERO);
+ conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO);
var evaluated = evaluated(5, 100, 50);
var history = new TreeMap<Instant, ScalingSummary>();
+ var delayedScaleDown = new DelayedScaleDown();
+
assertEquals(
- 10,
+ ParallelismChange.required(10),
vertexScaler.computeScaleTargetParallelism(
- context, op, NOT_ADJUST_INPUTS, evaluated, history,
restartTime));
+ context,
+ op,
+ NOT_ADJUST_INPUTS,
+ evaluated,
+ history,
+ restartTime,
+ delayedScaleDown));
assertEquals(100,
evaluated.get(ScalingMetric.EXPECTED_PROCESSING_RATE).getCurrent());
history.put(Instant.now(), new ScalingSummary(5, 10, evaluated));
// Allow to scale higher if scaling was effective (80%)
evaluated = evaluated(10, 180, 90);
assertEquals(
- 20,
+ ParallelismChange.required(20),
vertexScaler.computeScaleTargetParallelism(
- context, op, NOT_ADJUST_INPUTS, evaluated, history,
restartTime));
+ context,
+ op,
+ NOT_ADJUST_INPUTS,
+ evaluated,
+ history,
+ restartTime,
+ delayedScaleDown));
assertEquals(180,
evaluated.get(ScalingMetric.EXPECTED_PROCESSING_RATE).getCurrent());
history.put(Instant.now(), new ScalingSummary(10, 20, evaluated));
@@ -439,50 +552,84 @@ public class JobVertexScalerTest {
// 90 -> 94. Do not try to scale above 20
evaluated = evaluated(20, 180, 94);
assertEquals(
- 20,
+ ParallelismChange.noChange(),
vertexScaler.computeScaleTargetParallelism(
- context, op, NOT_ADJUST_INPUTS, evaluated, history,
restartTime));
-
assertFalse(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
+ context,
+ op,
+ NOT_ADJUST_INPUTS,
+ evaluated,
+ history,
+ restartTime,
+ delayedScaleDown));
// Still considered ineffective (less than <10%)
evaluated = evaluated(20, 180, 98);
assertEquals(
- 20,
+ ParallelismChange.noChange(),
vertexScaler.computeScaleTargetParallelism(
- context, op, NOT_ADJUST_INPUTS, evaluated, history,
restartTime));
-
assertFalse(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
+ context,
+ op,
+ NOT_ADJUST_INPUTS,
+ evaluated,
+ history,
+ restartTime,
+ delayedScaleDown));
// Allow scale up if current parallelism doesnt match last (user
rescaled manually)
evaluated = evaluated(10, 180, 90);
assertEquals(
- 20,
+ ParallelismChange.required(20),
vertexScaler.computeScaleTargetParallelism(
- context, op, NOT_ADJUST_INPUTS, evaluated, history,
restartTime));
+ context,
+ op,
+ NOT_ADJUST_INPUTS,
+ evaluated,
+ history,
+ restartTime,
+ delayedScaleDown));
// Over 10%, effective
evaluated = evaluated(20, 180, 100);
assertEquals(
- 36,
+ ParallelismChange.required(36),
vertexScaler.computeScaleTargetParallelism(
- context, op, NOT_ADJUST_INPUTS, evaluated, history,
restartTime));
+ context,
+ op,
+ NOT_ADJUST_INPUTS,
+ evaluated,
+ history,
+ restartTime,
+ delayedScaleDown));
assertTrue(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
// Ineffective but detection is turned off
conf.set(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED,
false);
evaluated = evaluated(20, 180, 90);
assertEquals(
- 40,
+ ParallelismChange.required(40),
vertexScaler.computeScaleTargetParallelism(
- context, op, NOT_ADJUST_INPUTS, evaluated, history,
restartTime));
+ context,
+ op,
+ NOT_ADJUST_INPUTS,
+ evaluated,
+ history,
+ restartTime,
+ delayedScaleDown));
assertTrue(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
conf.set(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED,
true);
// Allow scale down even if ineffective
evaluated = evaluated(20, 45, 90);
assertEquals(
- 10,
+ ParallelismChange.required(10),
vertexScaler.computeScaleTargetParallelism(
- context, op, NOT_ADJUST_INPUTS, evaluated, history,
restartTime));
+ context,
+ op,
+ NOT_ADJUST_INPUTS,
+ evaluated,
+ history,
+ restartTime,
+ delayedScaleDown));
assertTrue(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
}
@@ -492,33 +639,37 @@ public class JobVertexScalerTest {
var jobVertexID = new JobVertexID();
conf.set(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED,
true);
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.0);
- conf.set(AutoScalerOptions.SCALE_UP_GRACE_PERIOD, Duration.ZERO);
+ conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO);
var evaluated = evaluated(5, 100, 50);
var history = new TreeMap<Instant, ScalingSummary>();
+ var delayedScaleDown = new DelayedScaleDown();
+
assertEquals(
- 10,
+ ParallelismChange.required(10),
vertexScaler.computeScaleTargetParallelism(
context,
jobVertexID,
inputShipStrategies,
evaluated,
history,
- restartTime));
+ restartTime,
+ delayedScaleDown));
assertEquals(100,
evaluated.get(ScalingMetric.EXPECTED_PROCESSING_RATE).getCurrent());
history.put(Instant.now(), new ScalingSummary(5, 10, evaluated));
// Effective scale, no events triggered
evaluated = evaluated(10, 180, 90);
assertEquals(
- 20,
+ ParallelismChange.required(20),
vertexScaler.computeScaleTargetParallelism(
context,
jobVertexID,
inputShipStrategies,
evaluated,
history,
- restartTime));
+ restartTime,
+ delayedScaleDown));
assertEquals(180,
evaluated.get(ScalingMetric.EXPECTED_PROCESSING_RATE).getCurrent());
history.put(Instant.now(), new ScalingSummary(10, 20, evaluated));
assertEquals(0, eventCollector.events.size());
@@ -526,15 +677,15 @@ public class JobVertexScalerTest {
// Ineffective scale, an event is triggered
evaluated = evaluated(20, 180, 95);
assertEquals(
- 20,
+ ParallelismChange.noChange(),
vertexScaler.computeScaleTargetParallelism(
context,
jobVertexID,
inputShipStrategies,
evaluated,
history,
- restartTime));
-
assertFalse(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
+ restartTime,
+ delayedScaleDown));
assertEquals(1, eventCollector.events.size());
var event = eventCollector.events.poll();
assertThat(event).isNotNull();
@@ -552,15 +703,15 @@ public class JobVertexScalerTest {
ScalingMetric.TRUE_PROCESSING_RATE,
EvaluatedScalingMetric.avg(tpr.getAverage() + 0.01));
assertEquals(
- 20,
+ ParallelismChange.noChange(),
vertexScaler.computeScaleTargetParallelism(
context,
jobVertexID,
inputShipStrategies,
evaluated,
history,
- restartTime));
-
assertFalse(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
+ restartTime,
+ delayedScaleDown));
assertEquals(0, eventCollector.events.size());
// reset tpr
@@ -569,29 +720,29 @@ public class JobVertexScalerTest {
// Repeat ineffective scale with postive interval, no event is
triggered
conf.set(AutoScalerOptions.SCALING_EVENT_INTERVAL,
Duration.ofSeconds(1800));
assertEquals(
- 20,
+ ParallelismChange.noChange(),
vertexScaler.computeScaleTargetParallelism(
context,
jobVertexID,
inputShipStrategies,
evaluated,
history,
- restartTime));
-
assertFalse(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
+ restartTime,
+ delayedScaleDown));
assertEquals(0, eventCollector.events.size());
// Ineffective scale with interval set to 0, an event is triggered
conf.set(AutoScalerOptions.SCALING_EVENT_INTERVAL, Duration.ZERO);
assertEquals(
- 20,
+ ParallelismChange.noChange(),
vertexScaler.computeScaleTargetParallelism(
context,
jobVertexID,
inputShipStrategies,
evaluated,
history,
- restartTime));
-
assertFalse(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
+ restartTime,
+ delayedScaleDown));
assertEquals(1, eventCollector.events.size());
event = eventCollector.events.poll();
assertThat(event).isNotNull();
@@ -605,14 +756,15 @@ public class JobVertexScalerTest {
// Test ineffective scaling switched off
conf.set(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED,
false);
assertEquals(
- 40,
+ ParallelismChange.required(40),
vertexScaler.computeScaleTargetParallelism(
context,
jobVertexID,
inputShipStrategies,
evaluated,
history,
- restartTime));
+ restartTime,
+ delayedScaleDown));
assertEquals(1, eventCollector.events.size());
event = eventCollector.events.poll();
assertThat(event).isNotNull();
diff --git
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java
index ade85b6e..1d3d9bff 100644
---
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java
+++
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java
@@ -187,7 +187,8 @@ public class MetricsCollectionAndEvaluationTest {
new HashMap<>(),
new ScalingTracking(),
clock.instant(),
- topology);
+ topology,
+ new DelayedScaleDown());
var scaledParallelism =
ScalingExecutorTest.getScaledParallelism(stateStore, context);
assertEquals(4, scaledParallelism.size());
@@ -364,7 +365,7 @@ public class MetricsCollectionAndEvaluationTest {
public void testTolerateAbsenceOfPendingRecordsMetric() throws Exception {
var topology = new JobTopology(new VertexInfo(source1, Map.of(), 5,
720));
- metricsCollector = new TestingMetricsCollector(topology);
+ metricsCollector = new TestingMetricsCollector<>(topology);
metricsCollector.setJobUpdateTs(startTime);
metricsCollector.updateMetrics(
@@ -378,6 +379,7 @@ public class MetricsCollectionAndEvaluationTest {
var conf = context.getConfiguration();
conf.set(AutoScalerOptions.STABILIZATION_INTERVAL, Duration.ZERO);
conf.set(AutoScalerOptions.METRICS_WINDOW, Duration.ofSeconds(2));
+ conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ofSeconds(0));
metricsCollector.setClock(clock);
@@ -424,7 +426,8 @@ public class MetricsCollectionAndEvaluationTest {
new HashMap<>(),
new ScalingTracking(),
clock.instant(),
- topology);
+ topology,
+ new DelayedScaleDown());
var scaledParallelism =
ScalingExecutorTest.getScaledParallelism(stateStore, context);
assertEquals(1, scaledParallelism.get(source1));
}
@@ -634,6 +637,9 @@ public class MetricsCollectionAndEvaluationTest {
metricsCollector = new TestingMetricsCollector<>(topology);
metricsCollector.setJobUpdateTs(startTime);
+ var conf = context.getConfiguration();
+ conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ofSeconds(0));
+
metricsCollector.updateMetrics(
source1,
TestMetrics.builder()
@@ -681,7 +687,8 @@ public class MetricsCollectionAndEvaluationTest {
new HashMap<>(),
new ScalingTracking(),
clock.instant(),
- topology);
+ topology,
+ new DelayedScaleDown());
var scaledParallelism =
ScalingExecutorTest.getScaledParallelism(stateStore, context);
assertEquals(1, scaledParallelism.get(source1));
diff --git
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java
index 197cb94c..4b35d72a 100644
---
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java
+++
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java
@@ -88,7 +88,7 @@ public class RecommendedParallelismTest {
defaultConf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, (double)
Integer.MAX_VALUE);
defaultConf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.8);
defaultConf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.1);
- defaultConf.set(AutoScalerOptions.SCALE_UP_GRACE_PERIOD,
Duration.ZERO);
+ defaultConf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO);
autoscaler =
new JobAutoScalerImpl<>(
diff --git
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
index 97972269..50e5870a 100644
---
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
+++
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
@@ -50,6 +50,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -82,7 +83,7 @@ public class ScalingExecutorTest {
private Configuration capturedConfForMaxResources;
- private ScalingTracking scalingTracking = new ScalingTracking();
+ private final ScalingTracking scalingTracking = new ScalingTracking();
private static final Map<ScalingMetric, EvaluatedScalingMetric>
dummyGlobalMetrics =
Map.of(
@@ -119,7 +120,7 @@ public class ScalingExecutorTest {
}
@Test
- public void testUtilizationBoundaries() throws Exception {
+ public void testUtilizationBoundariesForAllRequiredVertices() throws
Exception {
// Restart time should not affect utilization boundary
var conf = context.getConfiguration();
conf.set(AutoScalerOptions.RESTART_TIME, Duration.ZERO);
@@ -131,13 +132,15 @@ public class ScalingExecutorTest {
conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.);
var evaluated = Map.of(op1, evaluated(1, 70, 100));
- var scalingSummary = Map.of(op1, new ScalingSummary(2, 1,
evaluated.get(op1)));
-
assertFalse(ScalingExecutor.allVerticesWithinUtilizationTarget(evaluated,
scalingSummary));
+ assertFalse(
+ ScalingExecutor.allRequiredVerticesWithinUtilizationTarget(
+ evaluated, evaluated.keySet()));
conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.2);
evaluated = Map.of(op1, evaluated(1, 70, 100));
- scalingSummary = Map.of(op1, new ScalingSummary(2, 1,
evaluated.get(op1)));
-
assertTrue(ScalingExecutor.allVerticesWithinUtilizationTarget(evaluated,
scalingSummary));
+ assertTrue(
+ ScalingExecutor.allRequiredVerticesWithinUtilizationTarget(
+ evaluated, evaluated.keySet()));
assertTrue(getScaledParallelism(stateStore, context).isEmpty());
var op2 = new JobVertexID();
@@ -145,31 +148,59 @@ public class ScalingExecutorTest {
Map.of(
op1, evaluated(1, 70, 100),
op2, evaluated(1, 85, 100));
- scalingSummary =
- Map.of(
- op1,
- new ScalingSummary(1, 2, evaluated.get(op1)),
- op2,
- new ScalingSummary(1, 2, evaluated.get(op2)));
-
assertFalse(ScalingExecutor.allVerticesWithinUtilizationTarget(evaluated,
scalingSummary));
+ assertFalse(
+ ScalingExecutor.allRequiredVerticesWithinUtilizationTarget(
+ evaluated, evaluated.keySet()));
evaluated =
Map.of(
op1, evaluated(1, 70, 100),
op2, evaluated(1, 70, 100));
- scalingSummary =
- Map.of(
- op1,
- new ScalingSummary(1, 2, evaluated.get(op1)),
- op2,
- new ScalingSummary(1, 2, evaluated.get(op2)));
-
assertTrue(ScalingExecutor.allVerticesWithinUtilizationTarget(evaluated,
scalingSummary));
+ assertTrue(
+ ScalingExecutor.allRequiredVerticesWithinUtilizationTarget(
+ evaluated, evaluated.keySet()));
// Test with backlog based scaling
evaluated = Map.of(op1, evaluated(1, 70, 100, 15));
- scalingSummary = Map.of(op1, new ScalingSummary(1, 2,
evaluated.get(op1)));
-
assertFalse(ScalingExecutor.allVerticesWithinUtilizationTarget(evaluated,
scalingSummary));
+ assertFalse(
+ ScalingExecutor.allRequiredVerticesWithinUtilizationTarget(
+ evaluated, evaluated.keySet()));
+ }
+
+ @Test
+ public void testUtilizationBoundariesWithOptionalVertex() {
+ // Restart time should not affect utilization boundary
+ var conf = context.getConfiguration();
+ conf.set(AutoScalerOptions.RESTART_TIME, Duration.ZERO);
+ conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ZERO);
+ var op1 = new JobVertexID();
+ var op2 = new JobVertexID();
+
+ // All vertices are optional
+ conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.6);
+ conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.);
+
+ var evaluated =
+ Map.of(
+ op1, evaluated(1, 70, 100),
+ op2, evaluated(1, 85, 100));
+
+
assertTrue(ScalingExecutor.allRequiredVerticesWithinUtilizationTarget(evaluated,
Set.of()));
+
+ // One vertex is required, and it's out of range.
+ assertFalse(
+
ScalingExecutor.allRequiredVerticesWithinUtilizationTarget(evaluated,
Set.of(op1)));
+
+ // One vertex is required, and it's within the range.
+ // The op2 is optional, so it shouldn't affect the scaling even if it
is out of range,
+ conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.1);
+ evaluated =
+ Map.of(
+ op1, evaluated(1, 65, 100),
+ op2, evaluated(1, 85, 100));
+ assertTrue(
+
ScalingExecutor.allRequiredVerticesWithinUtilizationTarget(evaluated,
Set.of(op1)));
}
@Test
@@ -193,17 +224,9 @@ public class ScalingExecutorTest {
Map.of(vertex, evaluated(parallelism, targetRate,
trueProcessingRate)),
dummyGlobalMetrics);
- // Verify precondition
- var scalingSummary =
- Map.of(
- vertex,
- new ScalingSummary(
- parallelism,
- expectedParallelism,
- evaluated.getVertexMetrics().get(vertex)));
assertTrue(
- ScalingExecutor.allVerticesWithinUtilizationTarget(
- evaluated.getVertexMetrics(), scalingSummary));
+ ScalingExecutor.allRequiredVerticesWithinUtilizationTarget(
+ evaluated.getVertexMetrics(),
evaluated.getVertexMetrics().keySet()));
// Execute the full scaling path
var now = Instant.now();
@@ -217,7 +240,13 @@ public class ScalingExecutorTest {
new IOMetrics(10000, 10000, 100)));
assertFalse(
scalingExecutor.scaleResource(
- context, evaluated, new HashMap<>(), scalingTracking,
now, jobTopology));
+ context,
+ evaluated,
+ new HashMap<>(),
+ scalingTracking,
+ now,
+ jobTopology,
+ new DelayedScaleDown()));
}
@Test
@@ -236,6 +265,7 @@ public class ScalingExecutorTest {
new VertexInfo(sink, Map.of(filterOperator, HASH), 10,
1000, false, null));
var conf = context.getConfiguration();
+ conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ofSeconds(0));
conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8);
var metrics =
new EvaluatedMetrics(
@@ -250,6 +280,7 @@ public class ScalingExecutorTest {
// filter operator should not scale
conf.set(AutoScalerOptions.VERTEX_EXCLUDE_IDS,
List.of(filterOperatorHexString));
var now = Instant.now();
+ var delayedScaleDown = new DelayedScaleDown();
assertFalse(
scalingExecutor.scaleResource(
context,
@@ -257,7 +288,8 @@ public class ScalingExecutorTest {
new HashMap<>(),
new ScalingTracking(),
now,
- jobTopology));
+ jobTopology,
+ delayedScaleDown));
// filter operator should scale
conf.set(AutoScalerOptions.VERTEX_EXCLUDE_IDS, List.of());
assertTrue(
@@ -267,7 +299,8 @@ public class ScalingExecutorTest {
new HashMap<>(),
new ScalingTracking(),
now,
- jobTopology));
+ jobTopology,
+ delayedScaleDown));
}
@Test
@@ -297,6 +330,7 @@ public class ScalingExecutorTest {
new EvaluatedMetrics(
Map.of(source, evaluated(10, 110, 100), sink,
evaluated(10, 110, 100)),
dummyGlobalMetrics);
+ var delayedScaleDown = new DelayedScaleDown();
assertFalse(
scalingExecutor.scaleResource(
context,
@@ -304,7 +338,8 @@ public class ScalingExecutorTest {
new HashMap<>(),
new ScalingTracking(),
now,
- jobTopology));
+ jobTopology,
+ delayedScaleDown));
// scaling execution outside excluded periods
excludedPeriod =
new
StringBuilder(localTime.plusSeconds(100).toString().split("\\.")[0])
@@ -319,7 +354,8 @@ public class ScalingExecutorTest {
new HashMap<>(),
new ScalingTracking(),
now,
- jobTopology));
+ jobTopology,
+ delayedScaleDown));
}
@Test
@@ -347,6 +383,7 @@ public class ScalingExecutorTest {
EvaluatedScalingMetric.of(Double.NaN)));
// Would normally scale without resource usage check
+ var delayedScaleDown = new DelayedScaleDown();
assertTrue(
scalingExecutor.scaleResource(
context,
@@ -354,7 +391,8 @@ public class ScalingExecutorTest {
new HashMap<>(),
new ScalingTracking(),
now,
- jobTopology));
+ jobTopology,
+ delayedScaleDown));
scalingExecutor =
new ScalingExecutor<>(
@@ -379,7 +417,8 @@ public class ScalingExecutorTest {
new HashMap<>(),
new ScalingTracking(),
now,
- jobTopology));
+ jobTopology,
+ delayedScaleDown));
}
@ParameterizedTest
@@ -426,7 +465,8 @@ public class ScalingExecutorTest {
new HashMap<>(),
new ScalingTracking(),
now,
- jobTopology));
+ jobTopology,
+ new DelayedScaleDown()));
Map<String, String> expected;
if (memoryTuningEnabled) {
assertNotEquals(context.getConfiguration(),
capturedConfForMaxResources);
@@ -474,6 +514,7 @@ public class ScalingExecutorTest {
}
private void testScalingEvents(boolean scalingEnabled, Duration interval)
throws Exception {
+ var delayedScaleDown = new DelayedScaleDown();
var jobVertexID = new JobVertexID();
JobTopology jobTopology =
@@ -497,7 +538,8 @@ public class ScalingExecutorTest {
new HashMap<>(),
new ScalingTracking(),
now,
- jobTopology));
+ jobTopology,
+ delayedScaleDown));
assertEquals(
scalingEnabled,
scalingExecutor.scaleResource(
@@ -506,7 +548,8 @@ public class ScalingExecutorTest {
new HashMap<>(),
new ScalingTracking(),
now,
- jobTopology));
+ jobTopology,
+ delayedScaleDown));
int expectedSize = (interval == null || interval.toMillis() > 0) &&
!scalingEnabled ? 1 : 2;
assertEquals(expectedSize, eventCollector.events.size());
@@ -545,7 +588,8 @@ public class ScalingExecutorTest {
new HashMap<>(),
new ScalingTracking(),
now,
- jobTopology));
+ jobTopology,
+ delayedScaleDown));
var event2 = eventCollector.events.poll();
assertThat(event2).isNotNull();
assertThat(event2.getContext()).isSameAs(event.getContext());
@@ -555,6 +599,7 @@ public class ScalingExecutorTest {
@Test
public void testScalingUnderGcPressure() throws Exception {
+ var delayedScaleDown = new DelayedScaleDown();
var jobVertexID = new JobVertexID();
conf.set(AutoScalerOptions.SCALING_ENABLED, true);
conf.set(AutoScalerOptions.GC_PRESSURE_THRESHOLD, 0.5);
@@ -580,7 +625,8 @@ public class ScalingExecutorTest {
new HashMap<>(),
new ScalingTracking(),
Instant.now(),
- jobTopology));
+ jobTopology,
+ delayedScaleDown));
// Just below the thresholds
metrics =
@@ -598,7 +644,8 @@ public class ScalingExecutorTest {
new HashMap<>(),
new ScalingTracking(),
Instant.now(),
- jobTopology));
+ jobTopology,
+ delayedScaleDown));
eventCollector.events.clear();
@@ -618,7 +665,8 @@ public class ScalingExecutorTest {
new HashMap<>(),
new ScalingTracking(),
Instant.now(),
- jobTopology));
+ jobTopology,
+ delayedScaleDown));
assertEquals("MemoryPressure",
eventCollector.events.poll().getReason());
assertTrue(eventCollector.events.isEmpty());
@@ -638,7 +686,8 @@ public class ScalingExecutorTest {
new HashMap<>(),
new ScalingTracking(),
Instant.now(),
- jobTopology));
+ jobTopology,
+ delayedScaleDown));
assertEquals("MemoryPressure",
eventCollector.events.poll().getReason());
assertTrue(eventCollector.events.isEmpty());
}
@@ -692,7 +741,8 @@ public class ScalingExecutorTest {
new HashMap<>(),
new ScalingTracking(),
now,
- jobTopology))
+ jobTopology,
+ new DelayedScaleDown()))
.isTrue();
Map<String, String> parallelismOverrides =
stateStore.getParallelismOverrides(context);
@@ -816,7 +866,8 @@ public class ScalingExecutorTest {
new HashMap<>(),
new ScalingTracking(),
Instant.now(),
- jobTopology));
+ jobTopology,
+ new DelayedScaleDown()));
if (quotaReached) {
assertEquals("ScalingReport",
eventCollector.events.poll().getReason());
assertEquals("ResourceQuotaReached",
eventCollector.events.poll().getReason());
diff --git
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/AbstractAutoScalerStateStoreTest.java
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/AbstractAutoScalerStateStoreTest.java
index da0f0c2c..3dff8406 100644
---
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/AbstractAutoScalerStateStoreTest.java
+++
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/AbstractAutoScalerStateStoreTest.java
@@ -17,8 +17,11 @@
package org.apache.flink.autoscaler.state;
+import org.apache.flink.autoscaler.DelayedScaleDown;
import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.ScalingRecord;
import org.apache.flink.autoscaler.ScalingSummary;
+import org.apache.flink.autoscaler.ScalingTracking;
import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.autoscaler.metrics.CollectedMetrics;
import org.apache.flink.autoscaler.tuning.ConfigChanges;
@@ -176,6 +179,13 @@ public abstract class AbstractAutoScalerStateStoreTest<
@Test
protected void testDiscardAllState() throws Exception {
+ assertThat(stateStore.getCollectedMetrics(ctx)).isEmpty();
+ assertThat(stateStore.getScalingHistory(ctx)).isEmpty();
+ assertThat(stateStore.getParallelismOverrides(ctx)).isEmpty();
+ assertThat(stateStore.getConfigChanges(ctx).getOverrides()).isEmpty();
+
assertThat(stateStore.getScalingTracking(ctx).getScalingRecords()).isEmpty();
+
assertThat(stateStore.getDelayedScaleDown(ctx).getFirstTriggerTime()).isEmpty();
+
stateStore.storeCollectedMetrics(
ctx, new TreeMap<>(Map.of(Instant.now(), new
CollectedMetrics())));
stateStore.storeScalingHistory(
@@ -187,10 +197,25 @@ public abstract class AbstractAutoScalerStateStoreTest<
stateStore.storeConfigChanges(
ctx, new ConfigChanges().addOverride("config.value", "value"));
+ var scalingTracking = new ScalingTracking();
+ scalingTracking.addScalingRecord(
+ Instant.now().minus(Duration.ofHours(3)), new ScalingRecord());
+ scalingTracking.addScalingRecord(
+ Instant.now().minus(Duration.ofHours(2)), new ScalingRecord());
+ scalingTracking.addScalingRecord(
+ Instant.now().minus(Duration.ofHours(1)), new ScalingRecord());
+ stateStore.storeScalingTracking(ctx, scalingTracking);
+
+ var firstTriggerTime = Map.of(new JobVertexID(), Instant.now());
+ stateStore.storeDelayedScaleDown(ctx, new
DelayedScaleDown(firstTriggerTime));
+
assertThat(stateStore.getCollectedMetrics(ctx)).isNotEmpty();
assertThat(stateStore.getScalingHistory(ctx)).isNotEmpty();
assertThat(stateStore.getParallelismOverrides(ctx)).isNotEmpty();
assertThat(stateStore.getConfigChanges(ctx).getOverrides()).isNotEmpty();
+
assertThat(stateStore.getScalingTracking(ctx)).isEqualTo(scalingTracking);
+ assertThat(stateStore.getDelayedScaleDown(ctx).getFirstTriggerTime())
+ .isEqualTo(firstTriggerTime);
stateStore.flush(ctx);
@@ -198,6 +223,9 @@ public abstract class AbstractAutoScalerStateStoreTest<
assertThat(stateStore.getScalingHistory(ctx)).isNotEmpty();
assertThat(stateStore.getParallelismOverrides(ctx)).isNotEmpty();
assertThat(stateStore.getConfigChanges(ctx).getOverrides()).isNotEmpty();
+
assertThat(stateStore.getScalingTracking(ctx)).isEqualTo(scalingTracking);
+ assertThat(stateStore.getDelayedScaleDown(ctx).getFirstTriggerTime())
+ .isEqualTo(firstTriggerTime);
stateStore.clearAll(ctx);
@@ -205,5 +233,7 @@ public abstract class AbstractAutoScalerStateStoreTest<
assertThat(stateStore.getScalingHistory(ctx)).isEmpty();
assertThat(stateStore.getParallelismOverrides(ctx)).isEmpty();
assertThat(stateStore.getConfigChanges(ctx).getOverrides()).isEmpty();
+
assertThat(stateStore.getScalingTracking(ctx).getScalingRecords()).isEmpty();
+
assertThat(stateStore.getDelayedScaleDown(ctx).getFirstTriggerTime()).isEmpty();
}
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStore.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStore.java
index fe812bef..ffc0d85b 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStore.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStore.java
@@ -18,6 +18,7 @@
package org.apache.flink.kubernetes.operator.autoscaler.state;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.autoscaler.DelayedScaleDown;
import org.apache.flink.autoscaler.ScalingSummary;
import org.apache.flink.autoscaler.ScalingTracking;
import org.apache.flink.autoscaler.metrics.CollectedMetrics;
@@ -43,6 +44,7 @@ import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.ByteArrayInputStream;
@@ -75,6 +77,8 @@ public class KubernetesAutoScalerStateStore
protected static final String CONFIG_OVERRIDES_KEY = "configOverrides";
+ @VisibleForTesting protected static final String DELAYED_SCALE_DOWN =
"delayedScaleDown";
+
@VisibleForTesting protected static final int MAX_CM_BYTES = 1000000;
protected static final ObjectMapper YAML_MAPPER =
@@ -104,6 +108,7 @@ public class KubernetesAutoScalerStateStore
jobContext, SCALING_TRACKING_KEY,
serializeScalingTracking(scalingTrack));
}
+ @Nonnull
@Override
public Map<JobVertexID, SortedMap<Instant, ScalingSummary>>
getScalingHistory(
KubernetesJobAutoScalerContext jobContext) {
@@ -154,6 +159,7 @@ public class KubernetesAutoScalerStateStore
jobContext, COLLECTED_METRICS_KEY,
serializeEvaluatedMetrics(metrics));
}
+ @Nonnull
@Override
public SortedMap<Instant, CollectedMetrics> getCollectedMetrics(
KubernetesJobAutoScalerContext jobContext) {
@@ -187,6 +193,7 @@ public class KubernetesAutoScalerStateStore
serializeParallelismOverrides(parallelismOverrides));
}
+ @Nonnull
@Override
public Map<String, String>
getParallelismOverrides(KubernetesJobAutoScalerContext jobContext) {
return configMapStore
@@ -221,6 +228,34 @@ public class KubernetesAutoScalerStateStore
configMapStore.removeSerializedState(jobContext,
PARALLELISM_OVERRIDES_KEY);
}
+ @Override
+ public void storeDelayedScaleDown(
+ KubernetesJobAutoScalerContext jobContext, DelayedScaleDown
delayedScaleDown)
+ throws Exception {
+ configMapStore.putSerializedState(
+ jobContext, DELAYED_SCALE_DOWN,
serializeDelayedScaleDown(delayedScaleDown));
+ }
+
+ @Nonnull
+ @Override
+ public DelayedScaleDown getDelayedScaleDown(KubernetesJobAutoScalerContext
jobContext) {
+ Optional<String> delayedScaleDown =
+ configMapStore.getSerializedState(jobContext,
DELAYED_SCALE_DOWN);
+ if (delayedScaleDown.isEmpty()) {
+ return new DelayedScaleDown();
+ }
+
+ try {
+ return deserializeDelayedScaleDown(delayedScaleDown.get());
+ } catch (JacksonException e) {
+ LOG.error(
+ "Could not deserialize delayed scale down, possibly the
format changed. Discarding...",
+ e);
+ configMapStore.removeSerializedState(jobContext,
DELAYED_SCALE_DOWN);
+ return new DelayedScaleDown();
+ }
+ }
+
@Override
public void clearAll(KubernetesJobAutoScalerContext jobContext) {
configMapStore.clearAll(jobContext);
@@ -298,6 +333,18 @@ public class KubernetesAutoScalerStateStore
}
}
+ private static String serializeDelayedScaleDown(DelayedScaleDown
delayedScaleDown)
+ throws JacksonException {
+ return
YAML_MAPPER.writeValueAsString(delayedScaleDown.getFirstTriggerTime());
+ }
+
+ private static DelayedScaleDown deserializeDelayedScaleDown(String
delayedScaleDown)
+ throws JacksonException {
+ Map<JobVertexID, Instant> firstTriggerTime =
+ YAML_MAPPER.readValue(delayedScaleDown, new TypeReference<>()
{});
+ return new DelayedScaleDown(firstTriggerTime);
+ }
+
@VisibleForTesting
protected void trimHistoryToMaxCmSize(KubernetesJobAutoScalerContext
context) {
int scalingHistorySize =