This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit cd55bdc71e79b021c5ab4db2b827b9ae54db730e
Author: Rui Fan <fan...@apache.org>
AuthorDate: Mon Dec 16 16:52:22 2024 +0800

    [FLINK-36863][autoscaler] Add the end-to-end test for delayed scale down
---
 .../autoscaler/DelayedScaleDownEndToEndTest.java   | 724 +++++++++++++++++++++
 1 file changed, 724 insertions(+)

diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/DelayedScaleDownEndToEndTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/DelayedScaleDownEndToEndTest.java
new file mode 100644
index 00000000..420d9df3
--- /dev/null
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/DelayedScaleDownEndToEndTest.java
@@ -0,0 +1,724 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.event.TestingEventCollector;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.autoscaler.metrics.TestMetrics;
+import org.apache.flink.autoscaler.realizer.TestingScalingRealizer;
+import org.apache.flink.autoscaler.state.AutoScalerStateStore;
+import org.apache.flink.autoscaler.state.InMemoryAutoScalerStateStore;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.autoscaler.topology.VertexInfo;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.autoscaler.TestingAutoscalerUtils.createDefaultJobAutoScalerContext;
+import static 
org.apache.flink.autoscaler.TestingAutoscalerUtils.getRestClusterClientSupplier;
+import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.RECOMMENDED_PARALLELISM;
+import static org.apache.flink.autoscaler.topology.ShipStrategy.REBALANCE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** End-to-end test for {@link DelayedScaleDown}. */
+public class DelayedScaleDownEndToEndTest {
+
+    private static final int INITIAL_SOURCE_PARALLELISM = 200;
+    private static final int INITIAL_SINK_PARALLELISM = 1000;
+    private static final double UTILIZATION_TARGET = 0.8;
+
+    private JobAutoScalerContext<JobID> context;
+    private AutoScalerStateStore<JobID, JobAutoScalerContext<JobID>> 
stateStore;
+
+    TestingScalingRealizer<JobID, JobAutoScalerContext<JobID>> scalingRealizer;
+
+    private TestingMetricsCollector<JobID, JobAutoScalerContext<JobID>> 
metricsCollector;
+
+    private JobVertexID source, sink;
+
+    private JobAutoScalerImpl<JobID, JobAutoScalerContext<JobID>> autoscaler;
+    private Instant now;
+    private int expectedMetricSize;
+
+    @BeforeEach
+    public void setup() throws Exception {
+        context = createDefaultJobAutoScalerContext();
+
+        TestingEventCollector<JobID, JobAutoScalerContext<JobID>> 
eventCollector =
+                new TestingEventCollector<>();
+        stateStore = new InMemoryAutoScalerStateStore<>();
+
+        source = new JobVertexID();
+        sink = new JobVertexID();
+
+        metricsCollector =
+                new TestingMetricsCollector<>(
+                        new JobTopology(
+                                new VertexInfo(source, Map.of(), 
INITIAL_SOURCE_PARALLELISM, 4000),
+                                new VertexInfo(
+                                        sink,
+                                        Map.of(source, REBALANCE),
+                                        INITIAL_SINK_PARALLELISM,
+                                        4000)));
+
+        var scaleDownInterval = 
Duration.ofMinutes(60).minus(Duration.ofSeconds(1));
+        // The metric window size is 9:59 to avoid other metrics are mixed.
+        var metricWindow = Duration.ofMinutes(10).minus(Duration.ofSeconds(1));
+
+        var defaultConf = context.getConfiguration();
+        defaultConf.set(AutoScalerOptions.AUTOSCALER_ENABLED, true);
+        defaultConf.set(AutoScalerOptions.SCALING_ENABLED, true);
+        defaultConf.set(AutoScalerOptions.STABILIZATION_INTERVAL, 
Duration.ZERO);
+        defaultConf.set(AutoScalerOptions.RESTART_TIME, Duration.ofSeconds(0));
+        defaultConf.set(AutoScalerOptions.CATCH_UP_DURATION, 
Duration.ofSeconds(0));
+        defaultConf.set(AutoScalerOptions.VERTEX_MAX_PARALLELISM, 10000);
+        defaultConf.set(AutoScalerOptions.SCALING_ENABLED, true);
+        defaultConf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 1.);
+        defaultConf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, (double) 
Integer.MAX_VALUE);
+        defaultConf.set(AutoScalerOptions.UTILIZATION_TARGET, 
UTILIZATION_TARGET);
+        defaultConf.set(AutoScalerOptions.UTILIZATION_MAX, UTILIZATION_TARGET 
+ 0.1);
+        defaultConf.set(AutoScalerOptions.UTILIZATION_MIN, UTILIZATION_TARGET 
- 0.1);
+        defaultConf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, 
scaleDownInterval);
+        defaultConf.set(AutoScalerOptions.METRICS_WINDOW, metricWindow);
+
+        scalingRealizer = new TestingScalingRealizer<>();
+        autoscaler =
+                new JobAutoScalerImpl<>(
+                        metricsCollector,
+                        new ScalingMetricEvaluator(),
+                        new ScalingExecutor<>(eventCollector, stateStore),
+                        eventCollector,
+                        scalingRealizer,
+                        stateStore);
+
+        // initially the last evaluated metrics are empty
+        
assertThat(autoscaler.lastEvaluatedMetrics.get(context.getJobKey())).isNull();
+
+        now = Instant.ofEpochMilli(0);
+        setClocksTo(now);
+        running(now);
+
+        metricsCollector.updateMetrics(source, buildMetric(0, 800));
+        metricsCollector.updateMetrics(sink, buildMetric(0, 800));
+
+        // the recommended parallelism values are empty initially
+        autoscaler.scale(context);
+        expectedMetricSize = 1;
+        assertCollectedMetricsSize(expectedMetricSize);
+    }
+
+    /**
+     * The scale down won't be executed before scale down interval window is 
full, and it will use
+     * the max recommended parallelism in the past scale down interval window 
size when scale down
+     * is executed.
+     */
+    @Test
+    void testDelayedScaleDownHappenInLastMetricWindow() throws Exception {
+        var sourceBusyList = List.of(800, 800, 800, 800, 800, 800, 800);
+        var sinkBusyList = List.of(350, 300, 150, 200, 400, 250, 100);
+
+        var metricWindowSize = sourceBusyList.size();
+
+        assertThat(metricWindowSize).isGreaterThan(6);
+
+        var totalRecords = 0L;
+        int recordsPerMinutes = 4800000;
+
+        for (int windowIndex = 0; windowIndex < metricWindowSize; 
windowIndex++) {
+            for (int i = 1; i <= 10; i++) {
+                now = now.plus(Duration.ofMinutes(1));
+                setClocksTo(now);
+
+                metricsCollector.updateMetrics(
+                        source, buildMetric(totalRecords, 
sourceBusyList.get(windowIndex)));
+                metricsCollector.updateMetrics(
+                        sink, buildMetric(totalRecords, 
sinkBusyList.get(windowIndex)));
+
+                autoscaler.scale(context);
+                // Metric window is 10 minutes, so 10 is the maximal metric 
size.
+                expectedMetricSize = Math.min(expectedMetricSize + 1, 10);
+                assertCollectedMetricsSize(expectedMetricSize);
+
+                // Assert the recommended parallelism.
+                if (windowIndex == metricWindowSize - 1 && i == 10) {
+                    // Last metric, we expect scale down is executed, and max 
recommended
+                    // parallelism in the past window should be used.
+                    // The max busy time needs more parallelism than others, 
so we could compute
+                    // parallelism based on the max busy time.
+                    var expectedSourceParallelism =
+                            getExpectedParallelism(sourceBusyList, 
INITIAL_SOURCE_PARALLELISM);
+                    var expectedSinkParallelism =
+                            getExpectedParallelism(sinkBusyList, 
INITIAL_SINK_PARALLELISM);
+                    pollAndAssertScalingRealizer(
+                            expectedSourceParallelism, 
expectedSinkParallelism);
+                } else {
+                    // Otherwise, scale down cannot be executed.
+                    if (windowIndex == 0 && i <= 9) {
+                        // Metric window is not full, so don't have 
recommended parallelism.
+                        assertThat(getCurrentMetricValue(source, 
RECOMMENDED_PARALLELISM)).isNull();
+                        assertThat(getCurrentMetricValue(sink, 
RECOMMENDED_PARALLELISM)).isNull();
+                    } else {
+                        // Scale down won't be executed before scale down 
interval window is full.
+                        assertThat(getCurrentMetricValue(source, 
RECOMMENDED_PARALLELISM))
+                                .isEqualTo(INITIAL_SOURCE_PARALLELISM);
+                        assertThat(getCurrentMetricValue(sink, 
RECOMMENDED_PARALLELISM))
+                                .isEqualTo(INITIAL_SINK_PARALLELISM);
+                    }
+                    assertThat(scalingRealizer.events).isEmpty();
+                }
+
+                totalRecords += recordsPerMinutes;
+            }
+        }
+    }
+
+    private static List<List<Integer>>
+            
sinkParallelismMaxRecommendedParallelismWithinUtilizationBoundaryProvider() {
+        return List.of(
+                List.of(
+                        700, 690, 690, 690, 690, 690, 700, 690, 690, 690, 690, 
690, 700, 690, 690,
+                        690, 690, 690, 700, 690, 690, 690, 690, 690, 700, 690, 
690, 690, 690, 690,
+                        700),
+                List.of(700, 690, 700, 690, 700, 690, 700, 690, 700, 690, 700, 
690, 700, 690, 700),
+                List.of(
+                        790, 200, 200, 200, 200, 200, 790, 200, 200, 200, 200, 
200, 790, 200, 200,
+                        200, 200, 200, 790, 200, 200, 200, 200, 200, 790, 200, 
200, 200, 200, 200,
+                        790),
+                List.of(790, 200, 790, 200, 790, 200, 790, 200, 790, 200, 790, 
200, 790, 200, 790));
+    }
+
+    /**
+     * Job never scale down when the max recommended parallelism in the past 
scale down interval
+     * window is inside the utilization boundary.
+     */
+    @ParameterizedTest
+    
@MethodSource("sinkParallelismMaxRecommendedParallelismWithinUtilizationBoundaryProvider")
+    void 
testScaleDownNeverHappenWhenMaxRecommendedParallelismWithinUtilizationBoundary(
+            List<Integer> sinkBusyList) throws Exception {
+        var metricWindowSize = sinkBusyList.size();
+        var sourceBusyList = Collections.nCopies(metricWindowSize, 800);
+
+        assertThat(sourceBusyList).hasSameSizeAs(sinkBusyList);
+
+        var totalRecords = 0L;
+        int recordsPerMinutes = 4800000;
+
+        for (int windowIndex = 0; windowIndex < metricWindowSize; 
windowIndex++) {
+            for (int i = 1; i <= 10; i++) {
+                now = now.plus(Duration.ofMinutes(1));
+                setClocksTo(now);
+
+                metricsCollector.updateMetrics(
+                        source, buildMetric(totalRecords, 
sourceBusyList.get(windowIndex)));
+                metricsCollector.updateMetrics(
+                        sink, buildMetric(totalRecords, 
sinkBusyList.get(windowIndex)));
+
+                autoscaler.scale(context);
+                // Metric window is 10 minutes, so 10 is the maximal metric 
size.
+                expectedMetricSize = Math.min(expectedMetricSize + 1, 10);
+                assertCollectedMetricsSize(expectedMetricSize);
+
+                // Assert the recommended parallelism.
+                if (windowIndex == 0 && i <= 9) {
+                    // Metric window is not full, so don't have recommended 
parallelism.
+                    assertThat(getCurrentMetricValue(source, 
RECOMMENDED_PARALLELISM)).isNull();
+                    assertThat(getCurrentMetricValue(sink, 
RECOMMENDED_PARALLELISM)).isNull();
+                } else {
+                    // Scale down won't be executed before scale down interval 
window is full.
+                    assertThat(getCurrentMetricValue(source, 
RECOMMENDED_PARALLELISM))
+                            .isEqualTo(INITIAL_SOURCE_PARALLELISM);
+                    assertThat(getCurrentMetricValue(sink, 
RECOMMENDED_PARALLELISM))
+                            .isEqualTo(INITIAL_SINK_PARALLELISM);
+                }
+                assertThat(scalingRealizer.events).isEmpty();
+
+                totalRecords += recordsPerMinutes;
+            }
+        }
+    }
+
+    private static Stream<Arguments> 
scaleDownUtilizationBoundaryFromWithinToOutsideProvider() {
+        return Stream.of(
+                Arguments.of(
+                        List.of(
+                                780, 780, 780, 780, 780, 780, 780, 780, 780, 
780, 780, 780, 780,
+                                780, 780, 780, 780),
+                        List.of(
+                                800, 790, 780, 770, 760, 750, 740, 730, 720, 
710, 700, 690, 680,
+                                670, 660, 660, 660)),
+                Arguments.of(
+                        List.of(
+                                780, 780, 780, 780, 780, 780, 780, 780, 780, 
780, 780, 780, 780,
+                                780, 780, 780, 780),
+                        List.of(
+                                700, 700, 700, 700, 700, 700, 700, 700, 700, 
700, 700, 690, 680,
+                                670, 660, 660, 660)),
+                Arguments.of(
+                        List.of(
+                                780, 780, 780, 780, 780, 780, 780, 780, 780, 
780, 780, 780, 780,
+                                780, 780, 780, 780, 780),
+                        List.of(
+                                800, 790, 790, 790, 790, 790, 790, 790, 790, 
790, 790, 690, 680,
+                                670, 660, 660, 660, 660)),
+                Arguments.of(
+                        List.of(
+                                780, 780, 780, 780, 780, 780, 780, 780, 780, 
780, 780, 780, 780,
+                                780, 780, 780, 780),
+                        List.of(
+                                800, 790, 780, 770, 760, 750, 740, 730, 720, 
710, 700, 100, 100,
+                                100, 100, 100, 100)),
+                Arguments.of(
+                        List.of(
+                                780, 780, 780, 780, 780, 780, 780, 780, 780, 
780, 780, 780, 780,
+                                780, 780, 780, 780, 780, 780, 780, 780, 780, 
780),
+                        // only 50 minutes are outside of bound in the first 
time
+                        List.of(
+                                800, 790, 780, 770, 760, 750, 740, 730, 720, 
710, 700, 200, 200,
+                                200, 200, 200, 700, 100, 100, 100, 100, 100, 
100)));
+    }
+
+    /**
+     * Initially, all tasks are scaled down within the utilization bound, and 
scaling down is only
+     * executed when the max recommended parallelism in the past scale down 
interval for any task is
+     * outside the utilization bound.
+     */
+    @ParameterizedTest
+    @MethodSource("scaleDownUtilizationBoundaryFromWithinToOutsideProvider")
+    void testScaleDownUtilizationBoundaryFromWithinToOutside(
+            List<Integer> sourceBusyList, List<Integer> sinkBusyList) throws 
Exception {
+
+        assertThat(sourceBusyList).hasSameSizeAs(sinkBusyList);
+        var metricWindowSize = sourceBusyList.size();
+
+        assertThat(metricWindowSize).isGreaterThan(6);
+
+        var totalRecords = 0L;
+        int recordsPerMinutes = 4800000;
+        var sinkBusyWindow = new LinkedList<Integer>();
+        var sinkRecommendationOutsideBound = new LinkedList<Integer>();
+        var sourceBusyWindow = new LinkedList<Integer>();
+        var sourceRecommendation = new LinkedList<Integer>();
+
+        for (int windowIndex = 0; windowIndex < metricWindowSize; 
windowIndex++) {
+            for (int i = 1; i <= 10; i++) {
+                now = now.plus(Duration.ofMinutes(1));
+                setClocksTo(now);
+
+                var sourceBusyTimePerSec = sourceBusyList.get(windowIndex);
+                var sinkBusyTimePerSec = sinkBusyList.get(windowIndex);
+                sourceBusyWindow.add(sourceBusyTimePerSec);
+                sinkBusyWindow.add(sinkBusyTimePerSec);
+                // Poll the oldest metric.
+                if (sinkBusyWindow.size() > 10) {
+                    sinkBusyWindow.pollFirst();
+                    sourceBusyWindow.pollFirst();
+                }
+
+                metricsCollector.updateMetrics(
+                        source, buildMetric(totalRecords, 
sourceBusyList.get(windowIndex)));
+                metricsCollector.updateMetrics(
+                        sink, buildMetric(totalRecords, 
sinkBusyList.get(windowIndex)));
+
+                autoscaler.scale(context);
+                // Metric window is 10 minutes, so 10 is the maximal metric 
size.
+                expectedMetricSize = Math.min(expectedMetricSize + 1, 10);
+                assertCollectedMetricsSize(expectedMetricSize);
+
+                if (windowIndex == 0 && i <= 9) {
+                    // Metric window is not full, so don't have recommended 
parallelism.
+                    assertThat(getCurrentMetricValue(source, 
RECOMMENDED_PARALLELISM)).isNull();
+                    assertThat(getCurrentMetricValue(sink, 
RECOMMENDED_PARALLELISM)).isNull();
+                    assertThat(scalingRealizer.events).isEmpty();
+                } else {
+                    double sourceBusyAvg =
+                            sourceBusyWindow.stream().mapToInt(e -> 
e).average().getAsDouble();
+                    double sinkBusyAvg =
+                            sinkBusyWindow.stream().mapToInt(e -> 
e).average().getAsDouble();
+                    if (sinkBusyAvg < 700) {
+                        var expectedSourceParallelism =
+                                getExpectedParallelism(sourceBusyAvg, 
INITIAL_SOURCE_PARALLELISM);
+                        var expectedSinkParallelism =
+                                getExpectedParallelism(sinkBusyAvg, 
INITIAL_SINK_PARALLELISM);
+                        sourceRecommendation.add(expectedSourceParallelism);
+                        
sinkRecommendationOutsideBound.add(expectedSinkParallelism);
+                    } else {
+                        sourceRecommendation.clear();
+                        sinkRecommendationOutsideBound.clear();
+                    }
+
+                    // Assert the recommended parallelism.
+                    // why it's 60 instead of 59
+                    if (sinkRecommendationOutsideBound.size() >= 60) {
+                        // Last metric, we expect scale down is executed, and 
max recommended
+                        // parallelism in the past window should be used.
+                        // The max busy time needs more parallelism than 
others, so we could compute
+                        // parallelism based on the max busy time.
+                        var expectedSourceParallelism =
+                                sourceRecommendation.stream().mapToInt(e -> 
e).max().getAsInt();
+                        var expectedSinkParallelism =
+                                sinkRecommendationOutsideBound.stream()
+                                        .mapToInt(e -> e)
+                                        .max()
+                                        .getAsInt();
+                        pollAndAssertScalingRealizer(
+                                expectedSourceParallelism, 
expectedSinkParallelism);
+                        break;
+                    } else {
+                        // Otherwise, scale down cannot be executed.
+                        // Scale down won't be executed before scale down 
interval window is full.
+                        assertThat(getCurrentMetricValue(source, 
RECOMMENDED_PARALLELISM))
+                                .isEqualTo(INITIAL_SOURCE_PARALLELISM);
+                        assertThat(getCurrentMetricValue(sink, 
RECOMMENDED_PARALLELISM))
+                                .isEqualTo(INITIAL_SINK_PARALLELISM);
+                        assertThat(scalingRealizer.events).isEmpty();
+                    }
+                }
+
+                totalRecords += recordsPerMinutes;
+            }
+        }
+    }
+
+    /** The scale down trigger time will be reset, when other tasks scale up. 
*/
+    @Test
+    void testDelayedScaleDownIsResetWhenAnotherTaskScaleUp() throws Exception {
+        // It expects delayed scale down of sink is reset when scale up is 
executed for source.
+        var sourceBusyList = List.of(800, 800, 800, 800, 950);
+        var sinkBusyList = List.of(200, 200, 200, 200, 200);
+
+        var sourceBusyWindow = new LinkedList<Integer>();
+
+        var metricWindowSize = sourceBusyList.size();
+
+        var totalRecords = 0L;
+        int recordsPerMinutes = 4800000;
+
+        for (int windowIndex = 0; windowIndex < metricWindowSize; 
windowIndex++) {
+            for (int i = 1; i <= 10; i++) {
+                now = now.plus(Duration.ofMinutes(1));
+                setClocksTo(now);
+
+                var busyTimePerSec = sourceBusyList.get(windowIndex);
+                sourceBusyWindow.add(busyTimePerSec);
+                // Poll the oldest metric.
+                if (sourceBusyWindow.size() > 10) {
+                    sourceBusyWindow.pollFirst();
+                }
+
+                metricsCollector.updateMetrics(source, 
buildMetric(totalRecords, busyTimePerSec));
+                metricsCollector.updateMetrics(
+                        sink, buildMetric(totalRecords, 
sinkBusyList.get(windowIndex)));
+
+                autoscaler.scale(context);
+                // Metric window is 10 minutes, so 10 is the maximal metric 
size.
+                expectedMetricSize = Math.min(expectedMetricSize + 1, 10);
+                assertCollectedMetricsSize(expectedMetricSize);
+
+                // Assert the recommended parallelism.
+                if (windowIndex == 0 && i <= 9) {
+                    // Metric window is not full, so don't have recommended 
parallelism.
+                    assertThat(getCurrentMetricValue(source, 
RECOMMENDED_PARALLELISM)).isNull();
+                    assertThat(getCurrentMetricValue(sink, 
RECOMMENDED_PARALLELISM)).isNull();
+                    assertThat(scalingRealizer.events).isEmpty();
+                } else {
+                    double busyAvg =
+                            sourceBusyWindow.stream().mapToInt(e -> 
e).average().getAsDouble();
+                    if (busyAvg > 900) {
+                        // Scaling up happens for source, and the scale down 
of sink cannot be
+                        // executed since the scale down interval window is 
not full.
+                        var sourceMaxBusyRatio = busyAvg / 1000;
+                        var expectedSourceParallelism =
+                                (int)
+                                        Math.ceil(
+                                                INITIAL_SOURCE_PARALLELISM
+                                                        * sourceMaxBusyRatio
+                                                        / UTILIZATION_TARGET);
+                        pollAndAssertScalingRealizer(
+                                expectedSourceParallelism, 
INITIAL_SINK_PARALLELISM);
+                        // The delayed scale down should be cleaned up after 
source scales up.
+                        
assertThat(stateStore.getDelayedScaleDown(context).getDelayedVertices())
+                                .isEmpty();
+                        break;
+                    } else {
+                        // Scale down won't be executed before source 
utilization within the
+                        // utilization bound.
+                        assertThat(getCurrentMetricValue(source, 
RECOMMENDED_PARALLELISM))
+                                .isEqualTo(INITIAL_SOURCE_PARALLELISM);
+                        assertThat(getCurrentMetricValue(sink, 
RECOMMENDED_PARALLELISM))
+                                .isEqualTo(INITIAL_SINK_PARALLELISM);
+                        assertThat(scalingRealizer.events).isEmpty();
+                        // Delayed scale down is triggered
+                        
assertThat(stateStore.getDelayedScaleDown(context).getDelayedVertices())
+                                .isNotEmpty();
+                    }
+                }
+                totalRecords += recordsPerMinutes;
+            }
+        }
+    }
+
+    /** The scale down trigger time will be reset, when other tasks scale 
down. */
+    @Test
+    void testDelayedScaleDownIsResetWhenAnotherTaskScaleDown() throws 
Exception {
+        // It expects delayed scale down of sink is reset when scale down is 
executed for source.
+        var sourceBusyList = List.of(200, 200, 200, 200, 200, 200, 200);
+        var sinkBusyList = List.of(800, 800, 200, 200, 200, 200, 200);
+
+        var metricWindowSize = sourceBusyList.size();
+
+        var totalRecords = 0L;
+        int recordsPerMinutes = 4800000;
+
+        for (int windowIndex = 0; windowIndex < metricWindowSize; 
windowIndex++) {
+            for (int i = 1; i <= 10; i++) {
+                now = now.plus(Duration.ofMinutes(1));
+                setClocksTo(now);
+
+                metricsCollector.updateMetrics(
+                        source, buildMetric(totalRecords, 
sourceBusyList.get(windowIndex)));
+                metricsCollector.updateMetrics(
+                        sink, buildMetric(totalRecords, 
sinkBusyList.get(windowIndex)));
+
+                autoscaler.scale(context);
+                // Metric window is 10 minutes, so 10 is the maximal metric 
size.
+                expectedMetricSize = Math.min(expectedMetricSize + 1, 10);
+                assertCollectedMetricsSize(expectedMetricSize);
+
+                // Assert the recommended parallelism.
+                if (windowIndex == 0 && i <= 9) {
+                    // Metric window is not full, so don't have recommended 
parallelism.
+                    assertThat(getCurrentMetricValue(source, 
RECOMMENDED_PARALLELISM)).isNull();
+                    assertThat(getCurrentMetricValue(sink, 
RECOMMENDED_PARALLELISM)).isNull();
+                    assertThat(scalingRealizer.events).isEmpty();
+                } else {
+                    if (windowIndex == metricWindowSize - 1 && i == 10) {
+                        // Scaling up happens for source, and the scale down 
of sink cannot be
+                        // executed since the scale down interval window is 
not full.
+                        var expectedSourceParallelism =
+                                getExpectedParallelism(sourceBusyList, 
INITIAL_SOURCE_PARALLELISM);
+                        pollAndAssertScalingRealizer(
+                                expectedSourceParallelism, 
INITIAL_SINK_PARALLELISM);
+                        // The delayed scale down should be cleaned up after 
source scales down.
+                        
assertThat(stateStore.getDelayedScaleDown(context).getDelayedVertices())
+                                .isEmpty();
+                    } else {
+                        // Scale down won't be executed when scale down 
interval window of source is
+                        // not full.
+                        assertThat(getCurrentMetricValue(source, 
RECOMMENDED_PARALLELISM))
+                                .isEqualTo(INITIAL_SOURCE_PARALLELISM);
+                        assertThat(getCurrentMetricValue(sink, 
RECOMMENDED_PARALLELISM))
+                                .isEqualTo(INITIAL_SINK_PARALLELISM);
+                        assertThat(scalingRealizer.events).isEmpty();
+                        // Delayed scale down is triggered
+                        
assertThat(stateStore.getDelayedScaleDown(context).getDelayedVertices())
+                                .isNotEmpty();
+                    }
+                }
+                totalRecords += recordsPerMinutes;
+            }
+        }
+    }
+
+    private static List<List<Integer>> 
sinkParallelismIsGreaterOrEqualProvider() {
+        return List.of(
+                // test for the recommended parallelism is equal to the 
current parallelism.
+                List.of(200, 200, 200, 200, 800),
+                List.of(700, 700, 700, 700, 800),
+                List.of(780, 780, 780, 780, 800),
+                List.of(790, 800),
+                List.of(760, 800),
+                List.of(750, 800),
+                List.of(350, 800),
+                // test for the recommended parallelism is greater than 
current parallelism.
+                List.of(200, 200, 200, 200, 850),
+                List.of(700, 700, 700, 700, 850),
+                List.of(780, 780, 780, 780, 850),
+                List.of(790, 850),
+                List.of(760, 850),
+                List.of(750, 850),
+                List.of(350, 850),
+                List.of(200, 200, 200, 200, 900),
+                List.of(700, 700, 700, 700, 900),
+                List.of(780, 780, 780, 780, 900),
+                List.of(790, 900),
+                List.of(760, 900),
+                List.of(750, 900),
+                List.of(350, 900));
+    }
+
+    /**
+     * The triggered scale down of sink will be canceled when the recommended 
parallelism is greater
+     * than or equal to the current parallelism.
+     */
+    @ParameterizedTest
+    @MethodSource("sinkParallelismIsGreaterOrEqualProvider")
+    void 
testDelayedScaleDownIsCanceledWhenRecommendedParallelismIsGreaterOrEqual(
+            List<Integer> sinkBusyList) throws Exception {
+        var metricWindowSize = sinkBusyList.size();
+        var sourceBusyList = Collections.nCopies(metricWindowSize, 800);
+
+        var sinkBusyWindow = new LinkedList<Integer>();
+
+        var totalRecords = 0L;
+        int recordsPerMinutes = 480000000;
+
+        for (int windowIndex = 0; windowIndex < metricWindowSize; 
windowIndex++) {
+            for (int i = 1; i <= 10; i++) {
+                now = now.plus(Duration.ofMinutes(1));
+                setClocksTo(now);
+
+                var busyTimePerSec = sinkBusyList.get(windowIndex);
+                sinkBusyWindow.add(busyTimePerSec);
+                // Poll the oldest metric.
+                if (sinkBusyWindow.size() > 10) {
+                    sinkBusyWindow.pollFirst();
+                }
+
+                metricsCollector.updateMetrics(
+                        source, buildMetric(totalRecords, 
sourceBusyList.get(windowIndex)));
+                metricsCollector.updateMetrics(sink, buildMetric(totalRecords, 
busyTimePerSec));
+
+                autoscaler.scale(context);
+                // Metric window is 10 minutes, so 10 is the maximal metric 
size.
+                expectedMetricSize = Math.min(expectedMetricSize + 1, 10);
+                assertCollectedMetricsSize(expectedMetricSize);
+
+                assertThat(scalingRealizer.events).isEmpty();
+
+                // Assert the recommended parallelism.
+                if (windowIndex == 0 && i <= 9) {
+                    // Metric window is not full, so don't have recommended 
parallelism.
+                    assertThat(getCurrentMetricValue(source, 
RECOMMENDED_PARALLELISM)).isNull();
+                    assertThat(getCurrentMetricValue(sink, 
RECOMMENDED_PARALLELISM)).isNull();
+                } else {
+                    // Scale down won't be executed before scale down interval 
window is full.
+                    assertThat(getCurrentMetricValue(source, 
RECOMMENDED_PARALLELISM))
+                            .isEqualTo(INITIAL_SOURCE_PARALLELISM);
+                    assertThat(getCurrentMetricValue(sink, 
RECOMMENDED_PARALLELISM))
+                            .isEqualTo(INITIAL_SINK_PARALLELISM);
+
+                    double sinkBusyAvg =
+                            sinkBusyWindow.stream().mapToInt(e -> 
e).average().getAsDouble();
+                    if (sinkBusyAvg >= 800) {
+                        // The delayed scale down should be cleaned up after 
the expected
+                        // recommended parallelism is greater than or equal to 
the current
+                        // parallelism.
+                        
assertThat(stateStore.getDelayedScaleDown(context).getDelayedVertices())
+                                .isEmpty();
+                        break;
+                    } else {
+                        // Delayed scale down is triggered
+                        
assertThat(stateStore.getDelayedScaleDown(context).getDelayedVertices())
+                                .isNotEmpty();
+                    }
+                }
+                totalRecords += recordsPerMinutes;
+            }
+        }
+    }
+
+    private static int getExpectedParallelism(List<Integer> taskBusyList, int 
currentParallelism) {
+        var maxBusyTime =
+                taskBusyList.stream()
+                        .skip(taskBusyList.size() - 6)
+                        .max(Comparator.naturalOrder())
+                        .get();
+        return getExpectedParallelism(maxBusyTime, currentParallelism);
+    }
+
+    private static int getExpectedParallelism(double busyTime, int 
currentParallelism) {
+        var maxBusyRatio = busyTime / 1000;
+        return (int) Math.ceil(currentParallelism * maxBusyRatio / 
UTILIZATION_TARGET);
+    }
+
+    private void pollAndAssertScalingRealizer(
+            int expectedSourceParallelism, int expectedSinkParallelism) {
+        // Assert metric
+        assertThat(getCurrentMetricValue(source, RECOMMENDED_PARALLELISM))
+                .isEqualTo(expectedSourceParallelism);
+        assertThat(getCurrentMetricValue(sink, RECOMMENDED_PARALLELISM))
+                .isEqualTo(expectedSinkParallelism);
+
+        // Check scaling realizer.
+        assertThat(scalingRealizer.events).hasSize(1);
+        var parallelismOverrides = 
scalingRealizer.events.poll().getParallelismOverrides();
+        assertThat(parallelismOverrides)
+                .containsEntry(source.toHexString(), 
Integer.toString(expectedSourceParallelism));
+        assertThat(parallelismOverrides)
+                .containsEntry(sink.toHexString(), 
Integer.toString(expectedSinkParallelism));
+    }
+
+    private void assertCollectedMetricsSize(int expectedSize) throws Exception 
{
+        
assertThat(stateStore.getCollectedMetrics(context)).hasSize(expectedSize);
+    }
+
+    private Double getCurrentMetricValue(JobVertexID jobVertexID, 
ScalingMetric scalingMetric) {
+        var metric =
+                autoscaler
+                        .lastEvaluatedMetrics
+                        .get(context.getJobKey())
+                        .getVertexMetrics()
+                        .get(jobVertexID)
+                        .get(scalingMetric);
+        return metric == null ? null : metric.getCurrent();
+    }
+
+    private void running(Instant now) {
+        metricsCollector.setJobUpdateTs(now);
+        context =
+                new JobAutoScalerContext<>(
+                        context.getJobKey(),
+                        context.getJobID(),
+                        JobStatus.RUNNING,
+                        context.getConfiguration(),
+                        context.getMetricGroup(),
+                        getRestClusterClientSupplier());
+    }
+
+    private void setClocksTo(Instant time) {
+        var clock = Clock.fixed(time, ZoneId.systemDefault());
+        metricsCollector.setClock(clock);
+        autoscaler.setClock(clock);
+    }
+
+    private TestMetrics buildMetric(long totalRecords, int busyTimePerSec) {
+        return TestMetrics.builder()
+                .numRecordsIn(totalRecords)
+                .numRecordsOut(totalRecords)
+                .maxBusyTimePerSec(busyTimePerSec)
+                .build();
+    }
+}


Reply via email to