This is an automated email from the ASF dual-hosted git repository.

morhidi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new f9ec6361 [FLINK-32271] Report RECOMMENDED_PARALLELISM as an autoscaler 
metric
f9ec6361 is described below

commit f9ec6361f09df4f5d34cd10185538a89d9417b4a
Author: Matyas Orhidi <[email protected]>
AuthorDate: Thu Jun 8 17:03:56 2023 -0700

    [FLINK-32271] Report RECOMMENDED_PARALLELISM as an autoscaler metric
---
 .../autoscaler/AutoscalerFlinkMetrics.java         |  12 +-
 .../operator/autoscaler/JobAutoScalerImpl.java     |  42 ++-
 .../operator/autoscaler/ScalingExecutor.java       |  15 ++
 .../autoscaler/ScalingMetricCollector.java         |   2 +-
 .../autoscaler/ScalingMetricEvaluator.java         |   2 +-
 .../operator/autoscaler/metrics/ScalingMetric.java |   5 +
 .../autoscaler/RecommendedParallelismTest.java     | 290 +++++++++++++++++++++
 7 files changed, 354 insertions(+), 14 deletions(-)

diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFlinkMetrics.java
 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFlinkMetrics.java
index f1084e6e..fab3cb32 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFlinkMetrics.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFlinkMetrics.java
@@ -78,10 +78,10 @@ public class AutoscalerFlinkMetrics {
                                                         Optional.ofNullable(
                                                                         
currentVertexMetrics.get())
                                                                 .map(m -> 
m.get(jobVertexID))
+                                                                .map(metrics 
-> metrics.get(sm))
                                                                 .map(
-                                                                        
metrics ->
-                                                                               
 metrics.get(sm)
-                                                                               
         .getCurrent())
+                                                                        
EvaluatedScalingMetric
+                                                                               
 ::getCurrent)
                                                                 .orElse(null));
 
                                         if (sm.isCalculateAverage()) {
@@ -92,10 +92,10 @@ public class AutoscalerFlinkMetrics {
                                                                             
currentVertexMetrics
                                                                                
     .get())
                                                                     .map(m -> 
m.get(jobVertexID))
+                                                                    
.map(metrics -> metrics.get(sm))
                                                                     .map(
-                                                                            
metrics ->
-                                                                               
     metrics.get(sm)
-                                                                               
             .getAverage())
+                                                                            
EvaluatedScalingMetric
+                                                                               
     ::getAverage)
                                                                     
.orElse(null));
                                         }
                                     });
diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
index b05038a5..44c361d7 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.kubernetes.operator.autoscaler;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
 import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
@@ -35,6 +36,8 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.RECOMMENDED_PARALLELISM;
 
 /** Application and SessionJob autoscaler. */
 public class JobAutoScalerImpl implements JobAutoScaler {
@@ -47,8 +50,11 @@ public class JobAutoScalerImpl implements JobAutoScaler {
     private final ScalingExecutor scalingExecutor;
     private final EventRecorder eventRecorder;
 
-    private final Map<ResourceID, Map<JobVertexID, Map<ScalingMetric, 
EvaluatedScalingMetric>>>
+    @VisibleForTesting
+    final Map<ResourceID, Map<JobVertexID, Map<ScalingMetric, 
EvaluatedScalingMetric>>>
             lastEvaluatedMetrics = new ConcurrentHashMap<>();
+
+    @VisibleForTesting
     final Map<ResourceID, AutoscalerFlinkMetrics> flinkMetrics = new 
ConcurrentHashMap<>();
 
     public JobAutoScalerImpl(
@@ -79,6 +85,8 @@ public class JobAutoScalerImpl implements JobAutoScaler {
         var conf = ctx.getObserveConfig();
         var resource = ctx.getResource();
         var resourceId = ResourceID.fromResource(resource);
+        var flinkMetrics = getOrInitAutoscalerFlinkMetrics(ctx, resourceId);
+        Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> 
evaluatedMetrics = null;
 
         try {
 
@@ -88,10 +96,10 @@ public class JobAutoScalerImpl implements JobAutoScaler {
             }
 
             // Initialize metrics only if autoscaler is enabled
-            var flinkMetrics = getOrInitAutoscalerFlinkMetrics(ctx, 
resourceId);
 
             if 
(!resource.getStatus().getJobStatus().getState().equals(JobStatus.RUNNING.name()))
 {
                 LOG.info("Job autoscaler is waiting for RUNNING job state");
+                lastEvaluatedMetrics.remove(resourceId);
                 return false;
             }
 
@@ -107,13 +115,13 @@ public class JobAutoScalerImpl implements JobAutoScaler {
             }
 
             LOG.debug("Evaluating scaling metrics for {}", collectedMetrics);
-            var evaluatedMetrics = evaluator.evaluate(conf, collectedMetrics);
+            evaluatedMetrics = evaluator.evaluate(conf, collectedMetrics);
+            initRecommendedParallelism(evaluatedMetrics);
             LOG.debug("Scaling metrics evaluated: {}", evaluatedMetrics);
-            lastEvaluatedMetrics.put(resourceId, evaluatedMetrics);
-            flinkMetrics.registerScalingMetrics(() -> 
lastEvaluatedMetrics.get(resourceId));
 
             if (!collectedMetrics.isFullyCollected()) {
                 // We have done an upfront evaluation, but we are not ready 
for scaling.
+                resetRecommendedParallelism(evaluatedMetrics);
                 autoScalerInfo.replaceInKubernetes(kubernetesClient);
                 return false;
             }
@@ -129,7 +137,7 @@ public class JobAutoScalerImpl implements JobAutoScaler {
             return specAdjusted;
         } catch (Throwable e) {
             LOG.error("Error while scaling resource", e);
-            getOrInitAutoscalerFlinkMetrics(ctx, resourceId).numErrors.inc();
+            flinkMetrics.numErrors.inc();
             eventRecorder.triggerEvent(
                     resource,
                     EventRecorder.Type.Warning,
@@ -137,6 +145,12 @@ public class JobAutoScalerImpl implements JobAutoScaler {
                     EventRecorder.Component.Operator,
                     e.getMessage());
             return false;
+        } finally {
+            if (evaluatedMetrics != null) {
+                LOG.debug("Storing evaluated metrics {}", evaluatedMetrics);
+                lastEvaluatedMetrics.put(resourceId, evaluatedMetrics);
+                flinkMetrics.registerScalingMetrics(() -> 
lastEvaluatedMetrics.get(resourceId));
+            }
         }
     }
 
@@ -148,4 +162,20 @@ public class JobAutoScalerImpl implements JobAutoScaler {
                         new AutoscalerFlinkMetrics(
                                 
ctx.getResourceMetricGroup().addGroup("AutoScaler")));
     }
+
+    private void initRecommendedParallelism(
+            Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> 
evaluatedMetrics) {
+        evaluatedMetrics.forEach(
+                (jobVertexID, evaluatedScalingMetricMap) ->
+                        evaluatedScalingMetricMap.put(
+                                RECOMMENDED_PARALLELISM,
+                                evaluatedScalingMetricMap.get(PARALLELISM)));
+    }
+
+    private void resetRecommendedParallelism(
+            Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> 
evaluatedMetrics) {
+        evaluatedMetrics.forEach(
+                (jobVertexID, evaluatedScalingMetricMap) ->
+                        evaluatedScalingMetricMap.put(RECOMMENDED_PARALLELISM, 
null));
+    }
 }
diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
index a5da1cb7..03267797 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
@@ -101,6 +101,8 @@ public class ScalingExecutor {
             return false;
         }
 
+        updateRecommendedParallelism(evaluatedMetrics, scalingSummaries);
+
         var scalingEnabled = conf.get(SCALING_ENABLED);
 
         var scalingReport = scalingReport(scalingSummaries, scalingEnabled);
@@ -129,6 +131,19 @@ public class ScalingExecutor {
         return true;
     }
 
+    private void updateRecommendedParallelism(
+            Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> 
evaluatedMetrics,
+            Map<JobVertexID, ScalingSummary> scalingSummaries) {
+        scalingSummaries.forEach(
+                (jobVertexID, scalingSummary) -> {
+                    evaluatedMetrics
+                            .get(jobVertexID)
+                            .put(
+                                    ScalingMetric.RECOMMENDED_PARALLELISM,
+                                    
EvaluatedScalingMetric.of(scalingSummary.getNewParallelism()));
+                });
+    }
+
     private static String scalingReport(
             Map<JobVertexID, ScalingSummary> scalingSummaries, boolean 
scalingEnabled) {
         StringBuilder sb =
diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
index 365da10e..033029a5 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
@@ -83,7 +83,6 @@ public abstract class ScalingMetricCollector {
             Configuration conf)
             throws Exception {
 
-        var topology = getJobTopology(flinkService, cr, conf, autoscalerInfo);
         var resourceID = ResourceID.fromResource(cr);
         var now = clock.instant();
 
@@ -101,6 +100,7 @@ public abstract class ScalingMetricCollector {
             metricHistory.clear();
             metricCollectionStartTs = now;
         }
+        var topology = getJobTopology(flinkService, cr, conf, autoscalerInfo);
 
         // Trim metrics outside the metric window from metrics history
         var metricWindowSize = getMetricWindowSize(conf);
diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java
 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java
index 5b4d3b3a..ebf87e25 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java
@@ -139,10 +139,10 @@ public class ScalingMetricEvaluator {
 
         evaluatedMetrics.put(
                 PARALLELISM, 
EvaluatedScalingMetric.of(topology.getParallelisms().get(vertex)));
+
         evaluatedMetrics.put(
                 MAX_PARALLELISM,
                 
EvaluatedScalingMetric.of(topology.getMaxParallelisms().get(vertex)));
-
         computeProcessingRateThresholds(evaluatedMetrics, conf, 
processingBacklog);
         return evaluatedMetrics;
     }
diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java
 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java
index e41816ce..59f0d26a 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java
@@ -49,8 +49,13 @@ public enum ScalingMetric {
 
     /** Total number of pending records. */
     LAG(false),
+
     /** Job vertex parallelism. */
     PARALLELISM(false),
+
+    /** Recommended job vertex parallelism. */
+    RECOMMENDED_PARALLELISM(false),
+
     /** Job vertex max parallelism. */
     MAX_PARALLELISM(false),
     /** Upper boundary of the target data rate range. */
diff --git 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/RecommendedParallelismTest.java
 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/RecommendedParallelismTest.java
new file mode 100644
index 00000000..c0dd8fa9
--- /dev/null
+++ 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/RecommendedParallelismTest.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.OperatorTestBase;
+import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
+import org.apache.flink.kubernetes.operator.autoscaler.topology.VertexInfo;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.utils.EventCollector;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
+
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
+import lombok.Getter;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.RECOMMENDED_PARALLELISM;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+/** Test for recommended parallelism. */
+@EnableKubernetesMockClient(crud = true)
+public class RecommendedParallelismTest extends OperatorTestBase {
+
+    @Getter private KubernetesClient kubernetesClient;
+
+    private ScalingMetricEvaluator evaluator;
+    private TestingMetricsCollector metricsCollector;
+    private ScalingExecutor scalingExecutor;
+
+    private FlinkDeployment app;
+    private JobVertexID source, sink;
+
+    private JobAutoScalerImpl autoscaler;
+
+    private EventCollector eventCollector = new EventCollector();
+
+    @BeforeEach
+    public void setup() {
+        evaluator = new ScalingMetricEvaluator();
+        scalingExecutor =
+                new ScalingExecutor(
+                        kubernetesClient,
+                        new EventRecorder(kubernetesClient, new 
EventCollector()));
+
+        app = TestUtils.buildApplicationCluster();
+        app.getMetadata().setGeneration(1L);
+        app.getStatus().getJobStatus().setJobId(new JobID().toHexString());
+        kubernetesClient.resource(app).createOrReplace();
+
+        source = new JobVertexID();
+        sink = new JobVertexID();
+
+        metricsCollector =
+                new TestingMetricsCollector(
+                        new JobTopology(
+                                new VertexInfo(source, Set.of(), 1, 720),
+                                new VertexInfo(sink, Set.of(source), 1, 720)));
+
+        var defaultConf = new Configuration();
+        defaultConf.set(AutoScalerOptions.AUTOSCALER_ENABLED, true);
+        defaultConf.set(AutoScalerOptions.STABILIZATION_INTERVAL, 
Duration.ZERO);
+        defaultConf.set(AutoScalerOptions.RESTART_TIME, Duration.ofSeconds(1));
+        defaultConf.set(AutoScalerOptions.CATCH_UP_DURATION, 
Duration.ofSeconds(2));
+        defaultConf.set(AutoScalerOptions.SCALING_ENABLED, true);
+        defaultConf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 1.);
+        defaultConf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, (double) 
Integer.MAX_VALUE);
+        defaultConf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.8);
+        defaultConf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.1);
+        defaultConf.set(AutoScalerOptions.SCALE_UP_GRACE_PERIOD, 
Duration.ZERO);
+
+        configManager = new FlinkConfigManager(defaultConf);
+        ReconciliationUtils.updateStatusForDeployedSpec(
+                app, configManager.getDeployConfig(app.getMetadata(), 
app.getSpec()));
+        app.getStatus().getJobStatus().setState(JobStatus.RUNNING.name());
+
+        autoscaler =
+                new JobAutoScalerImpl(
+                        kubernetesClient,
+                        metricsCollector,
+                        evaluator,
+                        scalingExecutor,
+                        new EventRecorder(kubernetesClient, eventCollector));
+
+        // Reset custom window size to default
+        metricsCollector.setTestMetricWindowSize(null);
+    }
+
+    @Test
+    public void endToEnd() throws Exception {
+
+        // we start the autoscaler in advisor mode
+        
app.getSpec().getFlinkConfiguration().put(AutoScalerOptions.SCALING_ENABLED.key(),
 "false");
+        var ctx = createAutoscalerTestContext();
+
+        // initially the last evaluated metrics are empty
+        
assertNull(autoscaler.lastEvaluatedMetrics.get(ResourceID.fromResource(app)));
+
+        var now = Instant.ofEpochMilli(0);
+        setClocksTo(now);
+        running(now);
+
+        metricsCollector.setTestMetricWindowSize(Duration.ofSeconds(2));
+        metricsCollector.setCurrentMetrics(
+                Map.of(
+                        source,
+                        Map.of(
+                                FlinkMetric.BUSY_TIME_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, 850., 
Double.NaN, Double.NaN),
+                                FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, 
Double.NaN, Double.NaN, 500.),
+                                FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, 
Double.NaN, Double.NaN, 500.),
+                                FlinkMetric.PENDING_RECORDS,
+                                new AggregatedMetric(
+                                        "", Double.NaN, Double.NaN, 
Double.NaN, 2000.)),
+                        sink,
+                        Map.of(
+                                FlinkMetric.BUSY_TIME_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, 850., 
Double.NaN, Double.NaN),
+                                FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+                                new AggregatedMetric(
+                                        "", Double.NaN, Double.NaN, 
Double.NaN, 500.))));
+
+        // the recommended parallelism values are empty initially
+        autoscaler.scale(getResourceContext(app, ctx));
+        assertEquals(
+                1, AutoScalerInfo.forResource(app, 
kubernetesClient).getMetricHistory().size());
+        assertNull(getCurrentMetricValue(source, RECOMMENDED_PARALLELISM));
+        assertNull(getCurrentMetricValue(sink, RECOMMENDED_PARALLELISM));
+        assertEquals(1., getCurrentMetricValue(source, PARALLELISM));
+        assertEquals(1., getCurrentMetricValue(sink, PARALLELISM));
+        // the auto scaler is running in recommendation mode
+        assertNull(ScalingExecutorTest.getScaledParallelism(app));
+
+        now = now.plus(Duration.ofSeconds(1));
+        setClocksTo(now);
+
+        // it stays empty until the metric window is full
+        autoscaler.scale(getResourceContext(app, ctx));
+        assertEquals(
+                2, AutoScalerInfo.forResource(app, 
kubernetesClient).getMetricHistory().size());
+        assertNull(getCurrentMetricValue(source, RECOMMENDED_PARALLELISM));
+        assertNull(getCurrentMetricValue(sink, RECOMMENDED_PARALLELISM));
+        assertEquals(1., getCurrentMetricValue(source, PARALLELISM));
+        assertEquals(1., getCurrentMetricValue(sink, PARALLELISM));
+        assertNull(ScalingExecutorTest.getScaledParallelism(app));
+
+        now = now.plus(Duration.ofSeconds(1));
+        setClocksTo(now);
+
+        // then the recommended parallelism can change according to the 
evaluated metrics
+        autoscaler.scale(getResourceContext(app, ctx));
+        assertEquals(
+                3, AutoScalerInfo.forResource(app, 
kubernetesClient).getMetricHistory().size());
+        assertEquals(1., getCurrentMetricValue(source, PARALLELISM));
+        assertEquals(1., getCurrentMetricValue(sink, PARALLELISM));
+        assertEquals(4., getCurrentMetricValue(source, 
RECOMMENDED_PARALLELISM));
+        assertEquals(4., getCurrentMetricValue(sink, RECOMMENDED_PARALLELISM));
+        assertNull(ScalingExecutorTest.getScaledParallelism(app));
+
+        // once scaling is enabled
+        
app.getSpec().getFlinkConfiguration().put(AutoScalerOptions.SCALING_ENABLED.key(),
 "true");
+
+        now = now.plus(Duration.ofSeconds(1));
+        setClocksTo(now);
+
+        // the scaled parallelism will pick up the recommended parallelism 
values
+        autoscaler.scale(getResourceContext(app, ctx));
+        assertEquals(
+                3, AutoScalerInfo.forResource(app, 
kubernetesClient).getMetricHistory().size());
+        assertEquals(4., getCurrentMetricValue(source, 
RECOMMENDED_PARALLELISM));
+        assertEquals(4., getCurrentMetricValue(sink, RECOMMENDED_PARALLELISM));
+        var scaledParallelism = ScalingExecutorTest.getScaledParallelism(app);
+        assertEquals(4, scaledParallelism.get(source));
+        assertEquals(4, scaledParallelism.get(sink));
+
+        // updating the topology to reflect the scale
+        metricsCollector.setJobTopology(
+                new JobTopology(
+                        new VertexInfo(source, Set.of(), 4, 24),
+                        new VertexInfo(sink, Set.of(source), 4, 720)));
+
+        now = now.plus(Duration.ofSeconds(10));
+        setClocksTo(now);
+        restart(now);
+
+        // after restart while the job is not running the evaluated metrics 
are gone
+        autoscaler.scale(getResourceContext(app, ctx));
+        assertEquals(
+                3, AutoScalerInfo.forResource(app, 
kubernetesClient).getMetricHistory().size());
+        
assertNull(autoscaler.lastEvaluatedMetrics.get(ResourceID.fromResource(app)));
+        scaledParallelism = ScalingExecutorTest.getScaledParallelism(app);
+        assertEquals(4, scaledParallelism.get(source));
+        assertEquals(4, scaledParallelism.get(sink));
+
+        now = now.plus(Duration.ofSeconds(1));
+        setClocksTo(now);
+        running(now);
+
+        // once the job is running we got back the evaluated metric except the 
recommended
+        // parallelisms (until the metric window is full again)
+        autoscaler.scale(getResourceContext(app, ctx));
+        assertEquals(
+                1, AutoScalerInfo.forResource(app, 
kubernetesClient).getMetricHistory().size());
+        assertEquals(4., getCurrentMetricValue(source, PARALLELISM));
+        assertEquals(4., getCurrentMetricValue(sink, PARALLELISM));
+        assertNull(getCurrentMetricValue(source, RECOMMENDED_PARALLELISM));
+        assertNull(getCurrentMetricValue(sink, RECOMMENDED_PARALLELISM));
+        scaledParallelism = ScalingExecutorTest.getScaledParallelism(app);
+        assertEquals(4, scaledParallelism.get(source));
+        assertEquals(4, scaledParallelism.get(sink));
+    }
+
+    private Double getCurrentMetricValue(JobVertexID jobVertexID, 
ScalingMetric scalingMetric) {
+        var metric =
+                autoscaler
+                        .lastEvaluatedMetrics
+                        .get(ResourceID.fromResource(app))
+                        .get(jobVertexID)
+                        .get(scalingMetric);
+        return metric == null ? null : metric.getCurrent();
+    }
+
+    private void restart(Instant now) {
+        
app.getStatus().getJobStatus().setUpdateTime(String.valueOf(now.toEpochMilli()));
+        app.getStatus().getJobStatus().setState(JobStatus.CREATED.name());
+    }
+
+    private void running(Instant now) {
+        
app.getStatus().getJobStatus().setUpdateTime(String.valueOf(now.toEpochMilli()));
+        app.getStatus().getJobStatus().setState(JobStatus.RUNNING.name());
+    }
+
+    private void setClocksTo(Instant time) {
+        var clock = Clock.fixed(time, ZoneId.systemDefault());
+        metricsCollector.setClock(clock);
+        scalingExecutor.setClock(clock);
+    }
+
+    @NotNull
+    private TestUtils.TestingContext<HasMetadata> 
createAutoscalerTestContext() {
+        return new TestUtils.TestingContext<>() {
+            public <T1> Set<T1> getSecondaryResources(Class<T1> aClass) {
+                return (Set)
+                        
kubernetesClient.configMaps().inAnyNamespace().list().getItems().stream()
+                                .collect(Collectors.toSet());
+            }
+        };
+    }
+}

Reply via email to