This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit 6e16bac00caf631ac6f54cdd8f06c67bfdd82255 Author: Rui Fan <fan...@apache.org> AuthorDate: Fri Dec 6 16:41:54 2024 +0800 [FLINK-36863][autoscaler] Use the maximum parallelism in the past scale-down.interval window when scaling down --- .../apache/flink/autoscaler/DelayedScaleDown.java | 104 ++++++- .../apache/flink/autoscaler/JobVertexScaler.java | 77 ++++- .../apache/flink/autoscaler/ScalingExecutor.java | 51 +--- .../flink/autoscaler/DelayedScaleDownTest.java | 173 ++++++++++- .../flink/autoscaler/JobVertexScalerTest.java | 52 ++-- .../flink/autoscaler/ScalingExecutorTest.java | 319 +++++++++++++-------- .../state/AbstractAutoScalerStateStoreTest.java | 5 +- 7 files changed, 544 insertions(+), 237 deletions(-) diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/DelayedScaleDown.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/DelayedScaleDown.java index b5a0dba7..489bbc9f 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/DelayedScaleDown.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/DelayedScaleDown.java @@ -30,23 +30,96 @@ import javax.annotation.Nonnull; import java.time.Instant; import java.util.HashMap; +import java.util.LinkedList; import java.util.Map; +import static org.apache.flink.util.Preconditions.checkState; + /** All delayed scale down requests. */ public class DelayedScaleDown { + /** Details of the recommended parallelism. */ + @Data + public static class RecommendedParallelism { + @Nonnull private final Instant triggerTime; + private final int parallelism; + private final boolean outsideUtilizationBound; + + @JsonCreator + public RecommendedParallelism( + @Nonnull @JsonProperty("triggerTime") Instant triggerTime, + @JsonProperty("parallelism") int parallelism, + @JsonProperty("outsideUtilizationBound") boolean outsideUtilizationBound) { + this.triggerTime = triggerTime; + this.parallelism = parallelism; + this.outsideUtilizationBound = outsideUtilizationBound; + } + } + /** The delayed scale down info for vertex. */ @Data public static class VertexDelayedScaleDownInfo { private final Instant firstTriggerTime; - private int maxRecommendedParallelism; + + /** + * In theory, it maintains all recommended parallelisms at each time within the past + * `scale-down.interval` window period, so all recommended parallelisms before the window + * start time will be evicted. + * + * <p>Also, if latest parallelism is greater than the past parallelism, all smaller + * parallelism in the past never be the max recommended parallelism, so we could evict all + * smaller parallelism in the past. It's a general optimization for calculating max value + * for sliding window. So we only need to maintain a list with monotonically decreasing + * parallelism within the past window, and the first parallelism will be the max recommended + * parallelism within the past `scale-down.interval` window period. + */ + private final LinkedList<RecommendedParallelism> recommendedParallelisms; + + public VertexDelayedScaleDownInfo(Instant firstTriggerTime) { + this.firstTriggerTime = firstTriggerTime; + this.recommendedParallelisms = new LinkedList<>(); + } @JsonCreator public VertexDelayedScaleDownInfo( @JsonProperty("firstTriggerTime") Instant firstTriggerTime, - @JsonProperty("maxRecommendedParallelism") int maxRecommendedParallelism) { + @JsonProperty("recommendedParallelisms") + LinkedList<RecommendedParallelism> recommendedParallelisms) { this.firstTriggerTime = firstTriggerTime; - this.maxRecommendedParallelism = maxRecommendedParallelism; + this.recommendedParallelisms = recommendedParallelisms; + } + + /** Record current recommended parallelism. */ + public void recordRecommendedParallelism( + Instant triggerTime, int parallelism, boolean outsideUtilizationBound) { + // Evict all recommended parallelisms that are lower than or equal to the latest + // parallelism. When the past parallelism is equal to the latest parallelism, + // triggerTime needs to be updated, so it also needs to be evicted. + while (!recommendedParallelisms.isEmpty() + && recommendedParallelisms.peekLast().getParallelism() <= parallelism) { + recommendedParallelisms.pollLast(); + } + + recommendedParallelisms.addLast( + new RecommendedParallelism(triggerTime, parallelism, outsideUtilizationBound)); + } + + @JsonIgnore + public RecommendedParallelism getMaxRecommendedParallelism(Instant windowStartTime) { + // Evict all recommended parallelisms before the window start time. + while (!recommendedParallelisms.isEmpty() + && recommendedParallelisms + .peekFirst() + .getTriggerTime() + .isBefore(windowStartTime)) { + recommendedParallelisms.pollFirst(); + } + + var maxRecommendedParallelism = recommendedParallelisms.peekFirst(); + checkState( + maxRecommendedParallelism != null, + "The getMaxRecommendedParallelism should be called after triggering a scale down, it may be a bug."); + return maxRecommendedParallelism; } } @@ -63,18 +136,19 @@ public class DelayedScaleDown { /** Trigger a scale down, and return the corresponding {@link VertexDelayedScaleDownInfo}. */ @Nonnull public VertexDelayedScaleDownInfo triggerScaleDown( - JobVertexID vertex, Instant triggerTime, int parallelism) { - var vertexDelayedScaleDownInfo = delayedVertices.get(vertex); - if (vertexDelayedScaleDownInfo == null) { - // It's the first trigger - vertexDelayedScaleDownInfo = new VertexDelayedScaleDownInfo(triggerTime, parallelism); - delayedVertices.put(vertex, vertexDelayedScaleDownInfo); - updated = true; - } else if (parallelism > vertexDelayedScaleDownInfo.getMaxRecommendedParallelism()) { - // Not the first trigger, but the maxRecommendedParallelism needs to be updated. - vertexDelayedScaleDownInfo.setMaxRecommendedParallelism(parallelism); - updated = true; - } + JobVertexID vertex, + Instant triggerTime, + int parallelism, + boolean outsideUtilizationBound) { + // The vertexDelayedScaleDownInfo is updated once scale down is triggered due to we need + // update the triggerTime each time. + updated = true; + + var vertexDelayedScaleDownInfo = + delayedVertices.computeIfAbsent( + vertex, k -> new VertexDelayedScaleDownInfo(triggerTime)); + vertexDelayedScaleDownInfo.recordRecommendedParallelism( + triggerTime, parallelism, outsideUtilizationBound); return vertexDelayedScaleDownInfo; } diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java index 41075b7f..4c185f89 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java @@ -56,6 +56,8 @@ import static org.apache.flink.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESS import static org.apache.flink.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM; import static org.apache.flink.autoscaler.metrics.ScalingMetric.NUM_SOURCE_PARTITIONS; import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD; import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE; import static org.apache.flink.autoscaler.topology.ShipStrategy.HASH; import static org.apache.flink.configuration.description.TextElement.text; @@ -92,12 +94,15 @@ public class JobVertexScaler<KEY, Context extends JobAutoScalerContext<KEY>> { @Getter public static class ParallelismChange { - private static final ParallelismChange NO_CHANGE = new ParallelismChange(-1); + private static final ParallelismChange NO_CHANGE = new ParallelismChange(-1, false); private final int newParallelism; - private ParallelismChange(int newParallelism) { + private final boolean outsideUtilizationBound; + + private ParallelismChange(int newParallelism, boolean outsideUtilizationBound) { this.newParallelism = newParallelism; + this.outsideUtilizationBound = outsideUtilizationBound; } public boolean isNoChange() { @@ -113,24 +118,29 @@ public class JobVertexScaler<KEY, Context extends JobAutoScalerContext<KEY>> { return false; } ParallelismChange that = (ParallelismChange) o; - return newParallelism == that.newParallelism; + return newParallelism == that.newParallelism + && outsideUtilizationBound == that.outsideUtilizationBound; } @Override public int hashCode() { - return Objects.hash(newParallelism); + return Objects.hash(newParallelism, outsideUtilizationBound); } @Override public String toString() { return isNoChange() ? "NoParallelismChange" - : "ParallelismChange{newParallelism=" + newParallelism + '}'; + : "ParallelismChange{newParallelism=" + + newParallelism + + ", outsideUtilizationBound=" + + outsideUtilizationBound + + "}"; } - public static ParallelismChange build(int newParallelism) { + public static ParallelismChange build(int newParallelism, boolean outsideUtilizationBound) { checkArgument(newParallelism > 0, "The parallelism should be greater than 0."); - return new ParallelismChange(newParallelism); + return new ParallelismChange(newParallelism, outsideUtilizationBound); } public static ParallelismChange noChange() { @@ -239,6 +249,8 @@ public class JobVertexScaler<KEY, Context extends JobAutoScalerContext<KEY>> { currentParallelism != newParallelism, "The newParallelism is equal to currentParallelism, no scaling is needed. This is probably a bug."); + var outsideUtilizationBound = outsideUtilizationBound(vertex, evaluatedMetrics); + var scaledUp = currentParallelism < newParallelism; if (scaledUp) { @@ -248,7 +260,7 @@ public class JobVertexScaler<KEY, Context extends JobAutoScalerContext<KEY>> { // If we don't have past scaling actions for this vertex, don't block scale up. if (history.isEmpty()) { - return ParallelismChange.build(newParallelism); + return ParallelismChange.build(newParallelism, outsideUtilizationBound); } var lastSummary = history.get(history.lastKey()); @@ -260,28 +272,59 @@ public class JobVertexScaler<KEY, Context extends JobAutoScalerContext<KEY>> { return ParallelismChange.noChange(); } - return ParallelismChange.build(newParallelism); + return ParallelismChange.build(newParallelism, outsideUtilizationBound); + } else { + return applyScaleDownInterval( + delayedScaleDown, vertex, conf, newParallelism, outsideUtilizationBound); + } + } + + private static boolean outsideUtilizationBound( + JobVertexID vertex, Map<ScalingMetric, EvaluatedScalingMetric> metrics) { + double trueProcessingRate = metrics.get(TRUE_PROCESSING_RATE).getAverage(); + double scaleUpRateThreshold = metrics.get(SCALE_UP_RATE_THRESHOLD).getCurrent(); + double scaleDownRateThreshold = metrics.get(SCALE_DOWN_RATE_THRESHOLD).getCurrent(); + + if (trueProcessingRate < scaleUpRateThreshold + || trueProcessingRate > scaleDownRateThreshold) { + LOG.debug( + "Vertex {} processing rate {} is outside ({}, {})", + vertex, + trueProcessingRate, + scaleUpRateThreshold, + scaleDownRateThreshold); + return true; } else { - return applyScaleDownInterval(delayedScaleDown, vertex, conf, newParallelism); + LOG.debug( + "Vertex {} processing rate {} is within target ({}, {})", + vertex, + trueProcessingRate, + scaleUpRateThreshold, + scaleDownRateThreshold); } + return false; } private ParallelismChange applyScaleDownInterval( DelayedScaleDown delayedScaleDown, JobVertexID vertex, Configuration conf, - int newParallelism) { + int newParallelism, + boolean outsideUtilizationBound) { var scaleDownInterval = conf.get(SCALE_DOWN_INTERVAL); if (scaleDownInterval.toMillis() <= 0) { // The scale down interval is disable, so don't block scaling. - return ParallelismChange.build(newParallelism); + return ParallelismChange.build(newParallelism, outsideUtilizationBound); } var now = clock.instant(); - var delayedScaleDownInfo = delayedScaleDown.triggerScaleDown(vertex, now, newParallelism); + var windowStartTime = now.minus(scaleDownInterval); + var delayedScaleDownInfo = + delayedScaleDown.triggerScaleDown( + vertex, now, newParallelism, outsideUtilizationBound); // Never scale down within scale down interval - if (now.isBefore(delayedScaleDownInfo.getFirstTriggerTime().plus(scaleDownInterval))) { + if (windowStartTime.isBefore(delayedScaleDownInfo.getFirstTriggerTime())) { if (now.equals(delayedScaleDownInfo.getFirstTriggerTime())) { LOG.info("The scale down of {} is delayed by {}.", vertex, scaleDownInterval); } else { @@ -293,7 +336,11 @@ public class JobVertexScaler<KEY, Context extends JobAutoScalerContext<KEY>> { } else { // Using the maximum parallelism within the scale down interval window instead of the // latest parallelism when scaling down - return ParallelismChange.build(delayedScaleDownInfo.getMaxRecommendedParallelism()); + var maxRecommendedParallelism = + delayedScaleDownInfo.getMaxRecommendedParallelism(windowStartTime); + return ParallelismChange.build( + maxRecommendedParallelism.getParallelism(), + maxRecommendedParallelism.isOutsideUtilizationBound()); } } diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java index af8384c4..a9b04276 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java @@ -47,8 +47,8 @@ import java.time.Instant; import java.util.Collections; import java.util.HashMap; import java.util.Map; -import java.util.Set; import java.util.SortedMap; +import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.flink.autoscaler.config.AutoScalerOptions.EXCLUDED_PERIODS; import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_ENABLED; @@ -57,9 +57,6 @@ import static org.apache.flink.autoscaler.event.AutoScalerEventHandler.SCALING_E import static org.apache.flink.autoscaler.event.AutoScalerEventHandler.SCALING_SUMMARY_HEADER_SCALING_EXECUTION_DISABLED; import static org.apache.flink.autoscaler.event.AutoScalerEventHandler.SCALING_SUMMARY_HEADER_SCALING_EXECUTION_ENABLED; import static org.apache.flink.autoscaler.metrics.ScalingHistoryUtils.addToScalingHistoryAndStore; -import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD; -import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD; -import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE; /** Class responsible for executing scaling decisions. */ public class ScalingExecutor<KEY, Context extends JobAutoScalerContext<KEY>> { @@ -178,44 +175,6 @@ public class ScalingExecutor<KEY, Context extends JobAutoScalerContext<KEY>> { scalingSummary.getNewParallelism()))); } - @VisibleForTesting - static boolean allChangedVerticesWithinUtilizationTarget( - Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> evaluatedMetrics, - Set<JobVertexID> changedVertices) { - // No vertices with changed parallelism. - if (changedVertices.isEmpty()) { - return true; - } - - for (JobVertexID vertex : changedVertices) { - var metrics = evaluatedMetrics.get(vertex); - - double trueProcessingRate = metrics.get(TRUE_PROCESSING_RATE).getAverage(); - double scaleUpRateThreshold = metrics.get(SCALE_UP_RATE_THRESHOLD).getCurrent(); - double scaleDownRateThreshold = metrics.get(SCALE_DOWN_RATE_THRESHOLD).getCurrent(); - - if (trueProcessingRate < scaleUpRateThreshold - || trueProcessingRate > scaleDownRateThreshold) { - LOG.debug( - "Vertex {} processing rate {} is outside ({}, {})", - vertex, - trueProcessingRate, - scaleUpRateThreshold, - scaleDownRateThreshold); - return false; - } else { - LOG.debug( - "Vertex {} processing rate {} is within target ({}, {})", - vertex, - trueProcessingRate, - scaleUpRateThreshold, - scaleDownRateThreshold); - } - } - LOG.info("All vertex processing rates are within target."); - return true; - } - @VisibleForTesting Map<JobVertexID, ScalingSummary> computeScalingSummary( Context context, @@ -235,6 +194,7 @@ public class ScalingExecutor<KEY, Context extends JobAutoScalerContext<KEY>> { var excludeVertexIdList = context.getConfiguration().get(AutoScalerOptions.VERTEX_EXCLUDE_IDS); + AtomicBoolean anyVertexOutsideBound = new AtomicBoolean(false); evaluatedMetrics .getVertexMetrics() .forEach( @@ -260,6 +220,9 @@ public class ScalingExecutor<KEY, Context extends JobAutoScalerContext<KEY>> { if (parallelismChange.isNoChange()) { return; } + if (parallelismChange.isOutsideUtilizationBound()) { + anyVertexOutsideBound.set(true); + } out.put( v, new ScalingSummary( @@ -270,8 +233,8 @@ public class ScalingExecutor<KEY, Context extends JobAutoScalerContext<KEY>> { }); // If the Utilization of all tasks is within range, we can skip scaling. - if (allChangedVerticesWithinUtilizationTarget( - evaluatedMetrics.getVertexMetrics(), out.keySet())) { + if (!anyVertexOutsideBound.get()) { + LOG.info("All vertex processing rates are within target."); return Map.of(); } diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/DelayedScaleDownTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/DelayedScaleDownTest.java index f27ad5b0..8adc62bb 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/DelayedScaleDownTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/DelayedScaleDownTest.java @@ -24,12 +24,153 @@ import org.junit.jupiter.api.Test; import java.time.Instant; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Test for {@link DelayedScaleDown}. */ public class DelayedScaleDownTest { private final JobVertexID vertex = new JobVertexID(); + @Test + void testWrongWindowStartTime() { + var instant = Instant.now(); + var delayedScaleDown = new DelayedScaleDown(); + + // First trigger time as the trigger time, and it won't be updated. + var vertexDelayedScaleDownInfo = + delayedScaleDown.triggerScaleDown(vertex, instant, 5, true); + assertVertexDelayedScaleDownInfo( + vertexDelayedScaleDownInfo, + instant, + new DelayedScaleDown.RecommendedParallelism(instant, 5, true), + instant); + + // Get the max recommended parallelism from a wrong window, and no any recommended + // parallelism since the start window. + assertThatThrownBy( + () -> + vertexDelayedScaleDownInfo.getMaxRecommendedParallelism( + instant.plusSeconds(1))) + .isInstanceOf(IllegalStateException.class); + } + + @Test + void testMaxRecommendedParallelismForInitialWindow() { + var instant = Instant.now(); + var delayedScaleDown = new DelayedScaleDown(); + assertThat(delayedScaleDown.isUpdated()).isFalse(); + + delayedScaleDown.triggerScaleDown(vertex, instant, 5, true); + delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(1), 9, false); + delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(2), 4, true); + delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(3), 5, true); + var vertexDelayedScaleDownInfo = + delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(4), 6, true); + + // [5, 9, 4, 5, 6] -> 9 + assertVertexDelayedScaleDownInfo( + vertexDelayedScaleDownInfo, + instant, + new DelayedScaleDown.RecommendedParallelism(instant.plusSeconds(1), 9, false), + instant); + // 5, [9, 4, 5, 6] -> 9 + assertVertexDelayedScaleDownInfo( + vertexDelayedScaleDownInfo, + instant, + new DelayedScaleDown.RecommendedParallelism(instant.plusSeconds(1), 9, false), + instant.plusSeconds(1)); + // 5, 9, [4, 5, 6] -> 6 + assertVertexDelayedScaleDownInfo( + vertexDelayedScaleDownInfo, + instant, + new DelayedScaleDown.RecommendedParallelism(instant.plusSeconds(4), 6, true), + instant.plusSeconds(2)); + // 5, 9, 4, [5, 6] -> 6 + assertVertexDelayedScaleDownInfo( + vertexDelayedScaleDownInfo, + instant, + new DelayedScaleDown.RecommendedParallelism(instant.plusSeconds(4), 6, true), + instant.plusSeconds(3)); + // 5, 9, 4, 5, [6] -> 6 + assertVertexDelayedScaleDownInfo( + vertexDelayedScaleDownInfo, + instant, + new DelayedScaleDown.RecommendedParallelism(instant.plusSeconds(4), 6, true), + instant.plusSeconds(4)); + } + + @Test + void testMaxRecommendedParallelismForSlidingWindow() { + var instant = Instant.now(); + var delayedScaleDown = new DelayedScaleDown(); + assertThat(delayedScaleDown.isUpdated()).isFalse(); + + // [5] -> 5 + assertVertexDelayedScaleDownInfo( + delayedScaleDown.triggerScaleDown(vertex, instant, 5, true), + instant, + new DelayedScaleDown.RecommendedParallelism(instant, 5, true), + instant); + // [5, 8] -> 8 + assertVertexDelayedScaleDownInfo( + delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(1), 8, false), + instant, + new DelayedScaleDown.RecommendedParallelism(instant.plusSeconds(1), 8, false), + instant); + // [5, 8, 6] -> 8 + assertVertexDelayedScaleDownInfo( + delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(2), 6, true), + instant, + new DelayedScaleDown.RecommendedParallelism(instant.plusSeconds(1), 8, false), + instant); + // [5, 8, 6, 4] -> 8 + assertVertexDelayedScaleDownInfo( + delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(3), 4, true), + instant, + new DelayedScaleDown.RecommendedParallelism(instant.plusSeconds(1), 8, false), + instant); + // 5, [8, 6, 4, 3] -> 8 + assertVertexDelayedScaleDownInfo( + delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(4), 3, true), + instant, + new DelayedScaleDown.RecommendedParallelism(instant.plusSeconds(1), 8, false), + instant.plusSeconds(1)); + // 5, 8, [6, 4, 3, 3] -> 6 + assertVertexDelayedScaleDownInfo( + delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(5), 3, true), + instant, + new DelayedScaleDown.RecommendedParallelism(instant.plusSeconds(2), 6, true), + instant.plusSeconds(2)); + // 5, 8, 6, [4, 3, 3, 3] -> 4 + assertVertexDelayedScaleDownInfo( + delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(6), 3, true), + instant, + new DelayedScaleDown.RecommendedParallelism(instant.plusSeconds(3), 4, true), + instant.plusSeconds(3)); + + // Check the timestamp of latest parallelism is maintained correctly when recommended + // parallelism is same. + // 5, 8, 6, 4, [3, 3, 3, 3] -> 3 + // 5, 8, 6, 4, 3, [3, 3, 3] -> 3 + // 5, 8, 6, 4, 3, 3, [3, 3] -> 3 + // 5, 8, 6, 4, 3, 3, 3, [3] -> 3 + var vertexDelayedScaleDownInfo = + delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(7), 3, true); + for (int offset = 4; offset <= 7; offset++) { + assertVertexDelayedScaleDownInfo( + vertexDelayedScaleDownInfo, + instant, + new DelayedScaleDown.RecommendedParallelism(instant.plusSeconds(7), 3, true), + instant.plusSeconds(offset)); + } + // 5, 8, 6, 4, 3, 3, 3, [3, 9] -> 9 + assertVertexDelayedScaleDownInfo( + delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(9), 9, false), + instant, + new DelayedScaleDown.RecommendedParallelism(instant.plusSeconds(9), 9, false), + instant.plusSeconds(7)); + } + @Test void testTriggerUpdateAndClean() { var instant = Instant.now(); @@ -38,40 +179,52 @@ public class DelayedScaleDownTest { // First trigger time as the trigger time, and it won't be updated. assertVertexDelayedScaleDownInfo( - delayedScaleDown.triggerScaleDown(vertex, instant, 5), instant, 5); + delayedScaleDown.triggerScaleDown(vertex, instant, 5, true), + instant, + new DelayedScaleDown.RecommendedParallelism(instant, 5, true), + instant); assertThat(delayedScaleDown.isUpdated()).isTrue(); // The lower parallelism doesn't update the result assertVertexDelayedScaleDownInfo( - delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(5), 3), instant, 5); + delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(5), 3, true), + instant, + new DelayedScaleDown.RecommendedParallelism(instant, 5, true), + instant); // The higher parallelism will update the result assertVertexDelayedScaleDownInfo( - delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(10), 8), instant, 8); + delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(10), 8, true), + instant, + new DelayedScaleDown.RecommendedParallelism(instant.plusSeconds(10), 8, true), + instant); // The scale down could be re-triggered again after clean delayedScaleDown.clearVertex(vertex); assertThat(delayedScaleDown.getDelayedVertices()).isEmpty(); assertVertexDelayedScaleDownInfo( - delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(15), 4), + delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(15), 4, true), instant.plusSeconds(15), - 4); + new DelayedScaleDown.RecommendedParallelism(instant.plusSeconds(15), 4, true), + instant); // The scale down could be re-triggered again after cleanAll delayedScaleDown.clearAll(); assertThat(delayedScaleDown.getDelayedVertices()).isEmpty(); assertVertexDelayedScaleDownInfo( - delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(15), 2), - instant.plusSeconds(15), - 2); + delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(16), 2, true), + instant.plusSeconds(16), + new DelayedScaleDown.RecommendedParallelism(instant.plusSeconds(16), 2, true), + instant); } void assertVertexDelayedScaleDownInfo( DelayedScaleDown.VertexDelayedScaleDownInfo vertexDelayedScaleDownInfo, Instant expectedTriggerTime, - int expectedMaxRecommendedParallelism) { + DelayedScaleDown.RecommendedParallelism expectedMaxRecommendedParallelism, + Instant windowStartTime) { assertThat(vertexDelayedScaleDownInfo.getFirstTriggerTime()).isEqualTo(expectedTriggerTime); - assertThat(vertexDelayedScaleDownInfo.getMaxRecommendedParallelism()) + assertThat(vertexDelayedScaleDownInfo.getMaxRecommendedParallelism(windowStartTime)) .isEqualTo(expectedMaxRecommendedParallelism); } } diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java index af01ce34..9cdc7159 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java @@ -104,7 +104,7 @@ public class JobVertexScalerTest { var delayedScaleDown = new DelayedScaleDown(); assertEquals( - ParallelismChange.build(5), + ParallelismChange.build(5, true), vertexScaler.computeScaleTargetParallelism( context, op, @@ -116,7 +116,7 @@ public class JobVertexScalerTest { conf.set(UTILIZATION_TARGET, .8); assertEquals( - ParallelismChange.build(8), + ParallelismChange.build(8, false), vertexScaler.computeScaleTargetParallelism( context, op, @@ -140,7 +140,7 @@ public class JobVertexScalerTest { conf.set(UTILIZATION_TARGET, .8); assertEquals( - ParallelismChange.build(8), + ParallelismChange.build(8, false), vertexScaler.computeScaleTargetParallelism( context, op, @@ -151,7 +151,7 @@ public class JobVertexScalerTest { delayedScaleDown)); assertEquals( - ParallelismChange.build(8), + ParallelismChange.build(8, false), vertexScaler.computeScaleTargetParallelism( context, op, @@ -163,7 +163,7 @@ public class JobVertexScalerTest { conf.set(UTILIZATION_TARGET, 0.5); assertEquals( - ParallelismChange.build(10), + ParallelismChange.build(10, true), vertexScaler.computeScaleTargetParallelism( context, op, @@ -175,7 +175,7 @@ public class JobVertexScalerTest { conf.set(UTILIZATION_TARGET, 0.6); assertEquals( - ParallelismChange.build(4), + ParallelismChange.build(4, true), vertexScaler.computeScaleTargetParallelism( context, op, @@ -188,7 +188,7 @@ public class JobVertexScalerTest { conf.set(UTILIZATION_TARGET, 1.); conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.5); assertEquals( - ParallelismChange.build(5), + ParallelismChange.build(5, true), vertexScaler.computeScaleTargetParallelism( context, op, @@ -200,7 +200,7 @@ public class JobVertexScalerTest { conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.6); assertEquals( - ParallelismChange.build(4), + ParallelismChange.build(4, true), vertexScaler.computeScaleTargetParallelism( context, op, @@ -213,7 +213,7 @@ public class JobVertexScalerTest { conf.set(UTILIZATION_TARGET, 1.); conf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, 0.5); assertEquals( - ParallelismChange.build(15), + ParallelismChange.build(15, true), vertexScaler.computeScaleTargetParallelism( context, op, @@ -225,7 +225,7 @@ public class JobVertexScalerTest { conf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, 0.6); assertEquals( - ParallelismChange.build(16), + ParallelismChange.build(16, true), vertexScaler.computeScaleTargetParallelism( context, op, @@ -533,7 +533,7 @@ public class JobVertexScalerTest { var delayedScaleDown = new DelayedScaleDown(); assertEquals( - ParallelismChange.build(5), + ParallelismChange.build(5, true), vertexScaler.computeScaleTargetParallelism( context, new JobVertexID(), @@ -593,7 +593,7 @@ public class JobVertexScalerTest { var delayedScaleDown = new DelayedScaleDown(); - assertParallelismChange(10, 50, 100, ParallelismChange.build(5), delayedScaleDown); + assertParallelismChange(10, 50, 100, ParallelismChange.build(5, true), delayedScaleDown); } @Test @@ -625,7 +625,8 @@ public class JobVertexScalerTest { // interval. vertexScaler.setClock( Clock.fixed(instant.plus(Duration.ofSeconds(60)), ZoneId.systemDefault())); - assertParallelismChange(100, 700, 1000, ParallelismChange.build(90), delayedScaleDown); + assertParallelismChange( + 100, 700, 1000, ParallelismChange.build(90, false), delayedScaleDown); } @Test @@ -650,7 +651,8 @@ public class JobVertexScalerTest { // Allow immediate scale up vertexScaler.setClock( Clock.fixed(instant.plus(Duration.ofSeconds(12)), ZoneId.systemDefault())); - assertParallelismChange(100, 1700, 1000, ParallelismChange.build(170), delayedScaleDown); + assertParallelismChange( + 100, 1700, 1000, ParallelismChange.build(170, true), delayedScaleDown); assertThat(delayedScaleDown.getDelayedVertices()).isEmpty(); } @@ -710,7 +712,7 @@ public class JobVertexScalerTest { var delayedScaleDown = new DelayedScaleDown(); assertEquals( - ParallelismChange.build(10), + ParallelismChange.build(10, true), vertexScaler.computeScaleTargetParallelism( context, op, @@ -725,7 +727,7 @@ public class JobVertexScalerTest { // Allow to scale higher if scaling was effective (80%) evaluated = evaluated(10, 180, 90); assertEquals( - ParallelismChange.build(20), + ParallelismChange.build(20, true), vertexScaler.computeScaleTargetParallelism( context, op, @@ -767,7 +769,7 @@ public class JobVertexScalerTest { // Allow scale up if current parallelism doesnt match last (user rescaled manually) evaluated = evaluated(10, 180, 90); assertEquals( - ParallelismChange.build(20), + ParallelismChange.build(20, true), vertexScaler.computeScaleTargetParallelism( context, op, @@ -780,7 +782,7 @@ public class JobVertexScalerTest { // Over 10%, effective evaluated = evaluated(20, 180, 100); assertEquals( - ParallelismChange.build(36), + ParallelismChange.build(36, true), vertexScaler.computeScaleTargetParallelism( context, op, @@ -795,7 +797,7 @@ public class JobVertexScalerTest { conf.set(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED, false); evaluated = evaluated(20, 180, 90); assertEquals( - ParallelismChange.build(40), + ParallelismChange.build(40, true), vertexScaler.computeScaleTargetParallelism( context, op, @@ -810,7 +812,7 @@ public class JobVertexScalerTest { // Allow scale down even if ineffective evaluated = evaluated(20, 45, 90); assertEquals( - ParallelismChange.build(10), + ParallelismChange.build(10, true), vertexScaler.computeScaleTargetParallelism( context, op, @@ -835,7 +837,7 @@ public class JobVertexScalerTest { var delayedScaleDown = new DelayedScaleDown(); assertEquals( - ParallelismChange.build(10), + ParallelismChange.build(10, true), vertexScaler.computeScaleTargetParallelism( context, jobVertexID, @@ -850,7 +852,7 @@ public class JobVertexScalerTest { // Effective scale, no events triggered evaluated = evaluated(10, 180, 90); assertEquals( - ParallelismChange.build(20), + ParallelismChange.build(20, true), vertexScaler.computeScaleTargetParallelism( context, jobVertexID, @@ -945,7 +947,7 @@ public class JobVertexScalerTest { // Test ineffective scaling switched off conf.set(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED, false); assertEquals( - ParallelismChange.build(40), + ParallelismChange.build(40, true), vertexScaler.computeScaleTargetParallelism( context, jobVertexID, @@ -1092,7 +1094,7 @@ public class JobVertexScalerTest { var delayedScaleDown = new DelayedScaleDown(); // partition limited assertEquals( - ParallelismChange.build(15), + ParallelismChange.build(15, true), vertexScaler.computeScaleTargetParallelism( context, jobVertexID, @@ -1115,7 +1117,7 @@ public class JobVertexScalerTest { smallChangesForScaleFactor.put( ScalingMetric.NUM_SOURCE_PARTITIONS, EvaluatedScalingMetric.of(15)); assertEquals( - ParallelismChange.build(15), + ParallelismChange.build(15, true), vertexScaler.computeScaleTargetParallelism( context, jobVertexID, diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java index a3bc6746..1b1a10c1 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java @@ -50,7 +50,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -120,91 +119,104 @@ public class ScalingExecutorTest { } @Test - public void testUtilizationBoundariesForAllRequiredVertices() throws Exception { + public void testUtilizationBoundaries() { // Restart time should not affect utilization boundary var conf = context.getConfiguration(); conf.set(AutoScalerOptions.RESTART_TIME, Duration.ZERO); conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ZERO); + conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO); - var op1 = new JobVertexID(); + var sourceHexString = "0bfd135746ac8efb3cce668b12e16d3a"; + var source = JobVertexID.fromHexString(sourceHexString); + var sinkHexString = "a6b7102b8d3e3a9564998c1ffeb5e2b7"; + var sink = JobVertexID.fromHexString(sinkHexString); + var jobTopology = + new JobTopology( + new VertexInfo(source, Map.of(), 10, 1000, false, null), + new VertexInfo(sink, Map.of(source, HASH), 10, 1000, false, null)); + // 0.7 is outside of the utilization bound [0.6 - 0.0, 0.6 + 0.0], so scaling happens conf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.6); conf.set(AutoScalerOptions.UTILIZATION_MAX, 0.6); conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.6); + var metrics = + new EvaluatedMetrics( + Map.of(source, evaluated(10, 70, 100), sink, evaluated(10, 60, 100)), + dummyGlobalMetrics); + assertThat( + scalingExecutor.computeScalingSummary( + context, + metrics, + Map.of(), + Duration.ZERO, + jobTopology, + new DelayedScaleDown())) + .hasSize(1); - var evaluated = Map.of(op1, evaluated(1, 70, 100)); - assertFalse( - ScalingExecutor.allChangedVerticesWithinUtilizationTarget( - evaluated, evaluated.keySet())); - + // Both of 0.7 and 0.6 are within the utilization bound [0.6 - 0.2, 0.6 + 0.2], so scaling + // could be ignored. + conf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.6); conf.set(AutoScalerOptions.UTILIZATION_MAX, 0.8); conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.4); - evaluated = Map.of(op1, evaluated(1, 70, 100)); - assertTrue( - ScalingExecutor.allChangedVerticesWithinUtilizationTarget( - evaluated, evaluated.keySet())); - assertTrue(getScaledParallelism(stateStore, context).isEmpty()); - - var op2 = new JobVertexID(); - evaluated = - Map.of( - op1, evaluated(1, 70, 100), - op2, evaluated(1, 85, 100)); + metrics = + new EvaluatedMetrics( + Map.of(source, evaluated(10, 70, 100), sink, evaluated(10, 60, 100)), + dummyGlobalMetrics); + assertThat( + scalingExecutor.computeScalingSummary( + context, + metrics, + Map.of(), + Duration.ZERO, + jobTopology, + new DelayedScaleDown())) + .isEmpty(); - assertFalse( - ScalingExecutor.allChangedVerticesWithinUtilizationTarget( - evaluated, evaluated.keySet())); + // 0.85 is outside of the utilization bound [0.6 - 0.2, 0.6 + 0.2], so scaling happens + metrics = + new EvaluatedMetrics( + Map.of(source, evaluated(10, 70, 100), sink, evaluated(10, 85, 100)), + dummyGlobalMetrics); + assertThat( + scalingExecutor.computeScalingSummary( + context, + metrics, + Map.of(), + Duration.ZERO, + jobTopology, + new DelayedScaleDown())) + .hasSize(2); - evaluated = - Map.of( - op1, evaluated(1, 70, 100), - op2, evaluated(1, 70, 100)); - assertTrue( - ScalingExecutor.allChangedVerticesWithinUtilizationTarget( - evaluated, evaluated.keySet())); + // Both of 0.7 are within the utilization bound [0.6 - 0.2, 0.6 + 0.2], so scaling could be + // ignored. + metrics = + new EvaluatedMetrics( + Map.of(source, evaluated(10, 70, 100), sink, evaluated(10, 70, 100)), + dummyGlobalMetrics); + assertThat( + scalingExecutor.computeScalingSummary( + context, + metrics, + Map.of(), + Duration.ZERO, + jobTopology, + new DelayedScaleDown())) + .isEmpty(); // Test with backlog based scaling - evaluated = Map.of(op1, evaluated(1, 70, 100, 15)); - assertFalse( - ScalingExecutor.allChangedVerticesWithinUtilizationTarget( - evaluated, evaluated.keySet())); - } - - @Test - public void testUtilizationBoundariesWithOptionalVertex() { - // Restart time should not affect utilization boundary - var conf = context.getConfiguration(); - conf.set(AutoScalerOptions.RESTART_TIME, Duration.ZERO); - conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ZERO); - var op1 = new JobVertexID(); - var op2 = new JobVertexID(); - - // All vertices are optional - conf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.6); - conf.set(AutoScalerOptions.UTILIZATION_MAX, 0.6); - conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.6); - - var evaluated = - Map.of( - op1, evaluated(1, 70, 100), - op2, evaluated(1, 85, 100)); - - assertTrue(ScalingExecutor.allChangedVerticesWithinUtilizationTarget(evaluated, Set.of())); - - // One vertex is required, and it's out of range. - assertFalse( - ScalingExecutor.allChangedVerticesWithinUtilizationTarget(evaluated, Set.of(op1))); - - // One vertex is required, and it's within the range. - // The op2 is optional, so it shouldn't affect the scaling even if it is out of range, - conf.set(AutoScalerOptions.UTILIZATION_MAX, 0.8); - conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.6); - evaluated = - Map.of( - op1, evaluated(1, 65, 100), - op2, evaluated(1, 85, 100)); - assertTrue( - ScalingExecutor.allChangedVerticesWithinUtilizationTarget(evaluated, Set.of(op1))); + metrics = + new EvaluatedMetrics( + Map.of(source, evaluated(10, 70, 100, 15), sink, evaluated(10, 70, 100)), + dummyGlobalMetrics); + assertThat( + scalingExecutor.computeScalingSummary( + context, + metrics, + Map.of(), + Duration.ZERO, + jobTopology, + new DelayedScaleDown())) + .hasSize(2); } @Test @@ -230,10 +242,6 @@ public class ScalingExecutorTest { Map.of(vertex, evaluated(parallelism, targetRate, trueProcessingRate)), dummyGlobalMetrics); - assertTrue( - ScalingExecutor.allChangedVerticesWithinUtilizationTarget( - evaluated.getVertexMetrics(), evaluated.getVertexMetrics().keySet())); - // Execute the full scaling path var now = Instant.now(); var jobTopology = @@ -260,83 +268,142 @@ public class ScalingExecutorTest { var conf = context.getConfiguration(); conf.set(AutoScalerOptions.RESTART_TIME, Duration.ZERO); conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ZERO); - var op1 = new JobVertexID(); - var op2 = new JobVertexID(); + conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO); + + var source = new JobVertexID(); + var sink = new JobVertexID(); + var jobTopology = + new JobTopology( + new VertexInfo(source, Map.of(), 10, 1000, false, null), + new VertexInfo(sink, Map.of(source, HASH), 10, 1000, false, null)); - // All vertices are optional + // target 0.6, target boundary 0.1 -> max 0.7, min 0.5 conf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.6); conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.1); - var evaluated = - Map.of( - op1, evaluated(1, 70, 100), - op2, evaluated(1, 85, 100)); - // target boundary 0.1, target 0.6, max 0.7, min 0.5 - boolean boundaryOp1 = - ScalingExecutor.allChangedVerticesWithinUtilizationTarget(evaluated, Set.of(op1)); - boolean boundaryOp2 = - ScalingExecutor.allChangedVerticesWithinUtilizationTarget(evaluated, Set.of(op2)); + var metrics = + new EvaluatedMetrics( + Map.of(source, evaluated(10, 70, 100), sink, evaluated(10, 85, 100)), + dummyGlobalMetrics); + assertThat( + scalingExecutor.computeScalingSummary( + context, + metrics, + Map.of(), + Duration.ZERO, + jobTopology, + new DelayedScaleDown())) + .containsOnlyKeys(source, sink); - // Remove target boundary and use min max, should get the same result + // target 0.6, max 0.7, min 0.5 + conf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.6); conf.removeConfig(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY); conf.set(AutoScalerOptions.UTILIZATION_MAX, 0.7); conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.5); - boolean minMaxOp1 = - ScalingExecutor.allChangedVerticesWithinUtilizationTarget(evaluated, Set.of(op1)); - boolean minMaxOp2 = - ScalingExecutor.allChangedVerticesWithinUtilizationTarget(evaluated, Set.of(op2)); - assertEquals(boundaryOp1, minMaxOp1); - assertEquals(boundaryOp2, minMaxOp2); + assertThat( + scalingExecutor.computeScalingSummary( + context, + metrics, + Map.of(), + Duration.ZERO, + jobTopology, + new DelayedScaleDown())) + .containsOnlyKeys(source, sink); // When the target boundary parameter is used, // but the min max parameter is also set, // the min max parameter shall prevail. + // target 0.6, max 0.7, min 0.3 + conf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.6); conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 1.); conf.set(AutoScalerOptions.UTILIZATION_MAX, 0.7); conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.3); - evaluated = - Map.of( - op1, evaluated(2, 150, 100), - op2, evaluated(1, 85, 100)); - assertFalse( - ScalingExecutor.allChangedVerticesWithinUtilizationTarget( - evaluated, evaluated.keySet())); + metrics = + new EvaluatedMetrics( + Map.of(source, evaluated(10, 60, 100), sink, evaluated(10, 85, 100)), + dummyGlobalMetrics); + assertThat( + scalingExecutor.computeScalingSummary( + context, + metrics, + Map.of(), + Duration.ZERO, + jobTopology, + new DelayedScaleDown())) + .containsOnlyKeys(sink); - // When the target boundary parameter is used, - // but the max parameter is also set, - conf.removeConfig(AutoScalerOptions.UTILIZATION_MIN); - conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 1.); + // When the target boundary parameter and max parameter are set, + // we expect the min is derived from boundary. + // target 0.5, max 0.6, min 0 conf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.5); + conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 1.); conf.set(AutoScalerOptions.UTILIZATION_MAX, 0.6); + conf.removeConfig(AutoScalerOptions.UTILIZATION_MIN); - evaluated = - Map.of( - op1, evaluated(2, 100, 99999), - op2, evaluated(1, 80, 99999)); - assertTrue( - ScalingExecutor.allChangedVerticesWithinUtilizationTarget( - evaluated, evaluated.keySet())); + metrics = + new EvaluatedMetrics( + Map.of(source, evaluated(10, 100, 99999), sink, evaluated(10, 80, 99999)), + dummyGlobalMetrics); + assertThat( + scalingExecutor.computeScalingSummary( + context, + metrics, + Map.of(), + Duration.ZERO, + jobTopology, + new DelayedScaleDown())) + .isEmpty(); - evaluated = Map.of(op2, evaluated(1, 85, 100)); - assertFalse( - ScalingExecutor.allChangedVerticesWithinUtilizationTarget( - evaluated, evaluated.keySet())); + metrics = + new EvaluatedMetrics( + Map.of(source, evaluated(10, 50, 100), sink, evaluated(10, 85, 100)), + dummyGlobalMetrics); + assertThat( + scalingExecutor.computeScalingSummary( + context, + metrics, + Map.of(), + Duration.ZERO, + jobTopology, + new DelayedScaleDown())) + .containsOnlyKeys(sink); + // When the target boundary parameter and min parameter are set, + // we expect the max is derived from boundary. + // target 0.5, max 1.0, min 0.3 + conf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.5); + conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 1.); conf.removeConfig(AutoScalerOptions.UTILIZATION_MAX); conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.3); - evaluated = - Map.of( - op1, evaluated(2, 80, 81), - op2, evaluated(1, 100, 101)); - assertTrue( - ScalingExecutor.allChangedVerticesWithinUtilizationTarget( - evaluated, evaluated.keySet())); + metrics = + new EvaluatedMetrics( + Map.of(source, evaluated(10, 80, 81), sink, evaluated(10, 100, 101)), + dummyGlobalMetrics); - evaluated = Map.of(op1, evaluated(1, 80, 79)); - assertFalse( - ScalingExecutor.allChangedVerticesWithinUtilizationTarget( - evaluated, evaluated.keySet())); + assertThat( + scalingExecutor.computeScalingSummary( + context, + metrics, + Map.of(), + Duration.ZERO, + jobTopology, + new DelayedScaleDown())) + .isEmpty(); + + metrics = + new EvaluatedMetrics( + Map.of(source, evaluated(10, 80, 79), sink, evaluated(10, 100, 101)), + dummyGlobalMetrics); + assertThat( + scalingExecutor.computeScalingSummary( + context, + metrics, + Map.of(), + Duration.ZERO, + jobTopology, + new DelayedScaleDown())) + .containsOnlyKeys(source, sink); } @Test diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/AbstractAutoScalerStateStoreTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/AbstractAutoScalerStateStoreTest.java index d4e0db9a..da2fa097 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/AbstractAutoScalerStateStoreTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/AbstractAutoScalerStateStoreTest.java @@ -207,8 +207,9 @@ public abstract class AbstractAutoScalerStateStoreTest< stateStore.storeScalingTracking(ctx, scalingTracking); var delayedScaleDown = new DelayedScaleDown(); - delayedScaleDown.triggerScaleDown(new JobVertexID(), Instant.now(), 10); - delayedScaleDown.triggerScaleDown(new JobVertexID(), Instant.now().plusSeconds(10), 12); + delayedScaleDown.triggerScaleDown(new JobVertexID(), Instant.now(), 10, true); + delayedScaleDown.triggerScaleDown( + new JobVertexID(), Instant.now().plusSeconds(10), 12, true); stateStore.storeDelayedScaleDown(ctx, delayedScaleDown);