This is an automated email from the ASF dual-hosted git repository.
morhidi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new f9ec6361 [FLINK-32271] Report RECOMMENDED_PARALLELISM as an autoscaler
metric
f9ec6361 is described below
commit f9ec6361f09df4f5d34cd10185538a89d9417b4a
Author: Matyas Orhidi <[email protected]>
AuthorDate: Thu Jun 8 17:03:56 2023 -0700
[FLINK-32271] Report RECOMMENDED_PARALLELISM as an autoscaler metric
---
.../autoscaler/AutoscalerFlinkMetrics.java | 12 +-
.../operator/autoscaler/JobAutoScalerImpl.java | 42 ++-
.../operator/autoscaler/ScalingExecutor.java | 15 ++
.../autoscaler/ScalingMetricCollector.java | 2 +-
.../autoscaler/ScalingMetricEvaluator.java | 2 +-
.../operator/autoscaler/metrics/ScalingMetric.java | 5 +
.../autoscaler/RecommendedParallelismTest.java | 290 +++++++++++++++++++++
7 files changed, 354 insertions(+), 14 deletions(-)
diff --git
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFlinkMetrics.java
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFlinkMetrics.java
index f1084e6e..fab3cb32 100644
---
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFlinkMetrics.java
+++
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFlinkMetrics.java
@@ -78,10 +78,10 @@ public class AutoscalerFlinkMetrics {
Optional.ofNullable(
currentVertexMetrics.get())
.map(m ->
m.get(jobVertexID))
+ .map(metrics
-> metrics.get(sm))
.map(
-
metrics ->
-
metrics.get(sm)
-
.getCurrent())
+
EvaluatedScalingMetric
+
::getCurrent)
.orElse(null));
if (sm.isCalculateAverage()) {
@@ -92,10 +92,10 @@ public class AutoscalerFlinkMetrics {
currentVertexMetrics
.get())
.map(m ->
m.get(jobVertexID))
+
.map(metrics -> metrics.get(sm))
.map(
-
metrics ->
-
metrics.get(sm)
-
.getAverage())
+
EvaluatedScalingMetric
+
::getAverage)
.orElse(null));
}
});
diff --git
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
index b05038a5..44c361d7 100644
---
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
+++
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
@@ -17,6 +17,7 @@
package org.apache.flink.kubernetes.operator.autoscaler;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import
org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
@@ -35,6 +36,8 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import static
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED;
+import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM;
+import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.RECOMMENDED_PARALLELISM;
/** Application and SessionJob autoscaler. */
public class JobAutoScalerImpl implements JobAutoScaler {
@@ -47,8 +50,11 @@ public class JobAutoScalerImpl implements JobAutoScaler {
private final ScalingExecutor scalingExecutor;
private final EventRecorder eventRecorder;
- private final Map<ResourceID, Map<JobVertexID, Map<ScalingMetric,
EvaluatedScalingMetric>>>
+ @VisibleForTesting
+ final Map<ResourceID, Map<JobVertexID, Map<ScalingMetric,
EvaluatedScalingMetric>>>
lastEvaluatedMetrics = new ConcurrentHashMap<>();
+
+ @VisibleForTesting
final Map<ResourceID, AutoscalerFlinkMetrics> flinkMetrics = new
ConcurrentHashMap<>();
public JobAutoScalerImpl(
@@ -79,6 +85,8 @@ public class JobAutoScalerImpl implements JobAutoScaler {
var conf = ctx.getObserveConfig();
var resource = ctx.getResource();
var resourceId = ResourceID.fromResource(resource);
+ var flinkMetrics = getOrInitAutoscalerFlinkMetrics(ctx, resourceId);
+ Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>>
evaluatedMetrics = null;
try {
@@ -88,10 +96,10 @@ public class JobAutoScalerImpl implements JobAutoScaler {
}
// Initialize metrics only if autoscaler is enabled
- var flinkMetrics = getOrInitAutoscalerFlinkMetrics(ctx,
resourceId);
if
(!resource.getStatus().getJobStatus().getState().equals(JobStatus.RUNNING.name()))
{
LOG.info("Job autoscaler is waiting for RUNNING job state");
+ lastEvaluatedMetrics.remove(resourceId);
return false;
}
@@ -107,13 +115,13 @@ public class JobAutoScalerImpl implements JobAutoScaler {
}
LOG.debug("Evaluating scaling metrics for {}", collectedMetrics);
- var evaluatedMetrics = evaluator.evaluate(conf, collectedMetrics);
+ evaluatedMetrics = evaluator.evaluate(conf, collectedMetrics);
+ initRecommendedParallelism(evaluatedMetrics);
LOG.debug("Scaling metrics evaluated: {}", evaluatedMetrics);
- lastEvaluatedMetrics.put(resourceId, evaluatedMetrics);
- flinkMetrics.registerScalingMetrics(() ->
lastEvaluatedMetrics.get(resourceId));
if (!collectedMetrics.isFullyCollected()) {
// We have done an upfront evaluation, but we are not ready
for scaling.
+ resetRecommendedParallelism(evaluatedMetrics);
autoScalerInfo.replaceInKubernetes(kubernetesClient);
return false;
}
@@ -129,7 +137,7 @@ public class JobAutoScalerImpl implements JobAutoScaler {
return specAdjusted;
} catch (Throwable e) {
LOG.error("Error while scaling resource", e);
- getOrInitAutoscalerFlinkMetrics(ctx, resourceId).numErrors.inc();
+ flinkMetrics.numErrors.inc();
eventRecorder.triggerEvent(
resource,
EventRecorder.Type.Warning,
@@ -137,6 +145,12 @@ public class JobAutoScalerImpl implements JobAutoScaler {
EventRecorder.Component.Operator,
e.getMessage());
return false;
+ } finally {
+ if (evaluatedMetrics != null) {
+ LOG.debug("Storing evaluated metrics {}", evaluatedMetrics);
+ lastEvaluatedMetrics.put(resourceId, evaluatedMetrics);
+ flinkMetrics.registerScalingMetrics(() ->
lastEvaluatedMetrics.get(resourceId));
+ }
}
}
@@ -148,4 +162,20 @@ public class JobAutoScalerImpl implements JobAutoScaler {
new AutoscalerFlinkMetrics(
ctx.getResourceMetricGroup().addGroup("AutoScaler")));
}
+
+ private void initRecommendedParallelism(
+ Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>>
evaluatedMetrics) {
+ evaluatedMetrics.forEach(
+ (jobVertexID, evaluatedScalingMetricMap) ->
+ evaluatedScalingMetricMap.put(
+ RECOMMENDED_PARALLELISM,
+ evaluatedScalingMetricMap.get(PARALLELISM)));
+ }
+
+ private void resetRecommendedParallelism(
+ Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>>
evaluatedMetrics) {
+ evaluatedMetrics.forEach(
+ (jobVertexID, evaluatedScalingMetricMap) ->
+ evaluatedScalingMetricMap.put(RECOMMENDED_PARALLELISM,
null));
+ }
}
diff --git
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
index a5da1cb7..03267797 100644
---
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
+++
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
@@ -101,6 +101,8 @@ public class ScalingExecutor {
return false;
}
+ updateRecommendedParallelism(evaluatedMetrics, scalingSummaries);
+
var scalingEnabled = conf.get(SCALING_ENABLED);
var scalingReport = scalingReport(scalingSummaries, scalingEnabled);
@@ -129,6 +131,19 @@ public class ScalingExecutor {
return true;
}
+ private void updateRecommendedParallelism(
+ Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>>
evaluatedMetrics,
+ Map<JobVertexID, ScalingSummary> scalingSummaries) {
+ scalingSummaries.forEach(
+ (jobVertexID, scalingSummary) -> {
+ evaluatedMetrics
+ .get(jobVertexID)
+ .put(
+ ScalingMetric.RECOMMENDED_PARALLELISM,
+
EvaluatedScalingMetric.of(scalingSummary.getNewParallelism()));
+ });
+ }
+
private static String scalingReport(
Map<JobVertexID, ScalingSummary> scalingSummaries, boolean
scalingEnabled) {
StringBuilder sb =
diff --git
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
index 365da10e..033029a5 100644
---
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
+++
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
@@ -83,7 +83,6 @@ public abstract class ScalingMetricCollector {
Configuration conf)
throws Exception {
- var topology = getJobTopology(flinkService, cr, conf, autoscalerInfo);
var resourceID = ResourceID.fromResource(cr);
var now = clock.instant();
@@ -101,6 +100,7 @@ public abstract class ScalingMetricCollector {
metricHistory.clear();
metricCollectionStartTs = now;
}
+ var topology = getJobTopology(flinkService, cr, conf, autoscalerInfo);
// Trim metrics outside the metric window from metrics history
var metricWindowSize = getMetricWindowSize(conf);
diff --git
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java
index 5b4d3b3a..ebf87e25 100644
---
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java
+++
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java
@@ -139,10 +139,10 @@ public class ScalingMetricEvaluator {
evaluatedMetrics.put(
PARALLELISM,
EvaluatedScalingMetric.of(topology.getParallelisms().get(vertex)));
+
evaluatedMetrics.put(
MAX_PARALLELISM,
EvaluatedScalingMetric.of(topology.getMaxParallelisms().get(vertex)));
-
computeProcessingRateThresholds(evaluatedMetrics, conf,
processingBacklog);
return evaluatedMetrics;
}
diff --git
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java
index e41816ce..59f0d26a 100644
---
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java
+++
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java
@@ -49,8 +49,13 @@ public enum ScalingMetric {
/** Total number of pending records. */
LAG(false),
+
/** Job vertex parallelism. */
PARALLELISM(false),
+
+ /** Recommended job vertex parallelism. */
+ RECOMMENDED_PARALLELISM(false),
+
/** Job vertex max parallelism. */
MAX_PARALLELISM(false),
/** Upper boundary of the target data rate range. */
diff --git
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/RecommendedParallelismTest.java
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/RecommendedParallelismTest.java
new file mode 100644
index 00000000..c0dd8fa9
--- /dev/null
+++
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/RecommendedParallelismTest.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.OperatorTestBase;
+import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
+import org.apache.flink.kubernetes.operator.autoscaler.topology.VertexInfo;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.utils.EventCollector;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
+
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
+import lombok.Getter;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM;
+import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.RECOMMENDED_PARALLELISM;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+/** Test for recommended parallelism. */
+@EnableKubernetesMockClient(crud = true)
+public class RecommendedParallelismTest extends OperatorTestBase {
+
+ @Getter private KubernetesClient kubernetesClient;
+
+ private ScalingMetricEvaluator evaluator;
+ private TestingMetricsCollector metricsCollector;
+ private ScalingExecutor scalingExecutor;
+
+ private FlinkDeployment app;
+ private JobVertexID source, sink;
+
+ private JobAutoScalerImpl autoscaler;
+
+ private EventCollector eventCollector = new EventCollector();
+
+ @BeforeEach
+ public void setup() {
+ evaluator = new ScalingMetricEvaluator();
+ scalingExecutor =
+ new ScalingExecutor(
+ kubernetesClient,
+ new EventRecorder(kubernetesClient, new
EventCollector()));
+
+ app = TestUtils.buildApplicationCluster();
+ app.getMetadata().setGeneration(1L);
+ app.getStatus().getJobStatus().setJobId(new JobID().toHexString());
+ kubernetesClient.resource(app).createOrReplace();
+
+ source = new JobVertexID();
+ sink = new JobVertexID();
+
+ metricsCollector =
+ new TestingMetricsCollector(
+ new JobTopology(
+ new VertexInfo(source, Set.of(), 1, 720),
+ new VertexInfo(sink, Set.of(source), 1, 720)));
+
+ var defaultConf = new Configuration();
+ defaultConf.set(AutoScalerOptions.AUTOSCALER_ENABLED, true);
+ defaultConf.set(AutoScalerOptions.STABILIZATION_INTERVAL,
Duration.ZERO);
+ defaultConf.set(AutoScalerOptions.RESTART_TIME, Duration.ofSeconds(1));
+ defaultConf.set(AutoScalerOptions.CATCH_UP_DURATION,
Duration.ofSeconds(2));
+ defaultConf.set(AutoScalerOptions.SCALING_ENABLED, true);
+ defaultConf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 1.);
+ defaultConf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, (double)
Integer.MAX_VALUE);
+ defaultConf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.8);
+ defaultConf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.1);
+ defaultConf.set(AutoScalerOptions.SCALE_UP_GRACE_PERIOD,
Duration.ZERO);
+
+ configManager = new FlinkConfigManager(defaultConf);
+ ReconciliationUtils.updateStatusForDeployedSpec(
+ app, configManager.getDeployConfig(app.getMetadata(),
app.getSpec()));
+ app.getStatus().getJobStatus().setState(JobStatus.RUNNING.name());
+
+ autoscaler =
+ new JobAutoScalerImpl(
+ kubernetesClient,
+ metricsCollector,
+ evaluator,
+ scalingExecutor,
+ new EventRecorder(kubernetesClient, eventCollector));
+
+ // Reset custom window size to default
+ metricsCollector.setTestMetricWindowSize(null);
+ }
+
+ @Test
+ public void endToEnd() throws Exception {
+
+ // we start the autoscaler in advisor mode
+
app.getSpec().getFlinkConfiguration().put(AutoScalerOptions.SCALING_ENABLED.key(),
"false");
+ var ctx = createAutoscalerTestContext();
+
+ // initially the last evaluated metrics are empty
+
assertNull(autoscaler.lastEvaluatedMetrics.get(ResourceID.fromResource(app)));
+
+ var now = Instant.ofEpochMilli(0);
+ setClocksTo(now);
+ running(now);
+
+ metricsCollector.setTestMetricWindowSize(Duration.ofSeconds(2));
+ metricsCollector.setCurrentMetrics(
+ Map.of(
+ source,
+ Map.of(
+ FlinkMetric.BUSY_TIME_PER_SEC,
+ new AggregatedMetric("", Double.NaN, 850.,
Double.NaN, Double.NaN),
+ FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
+ new AggregatedMetric("", Double.NaN,
Double.NaN, Double.NaN, 500.),
+ FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+ new AggregatedMetric("", Double.NaN,
Double.NaN, Double.NaN, 500.),
+ FlinkMetric.PENDING_RECORDS,
+ new AggregatedMetric(
+ "", Double.NaN, Double.NaN,
Double.NaN, 2000.)),
+ sink,
+ Map.of(
+ FlinkMetric.BUSY_TIME_PER_SEC,
+ new AggregatedMetric("", Double.NaN, 850.,
Double.NaN, Double.NaN),
+ FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+ new AggregatedMetric(
+ "", Double.NaN, Double.NaN,
Double.NaN, 500.))));
+
+ // the recommended parallelism values are empty initially
+ autoscaler.scale(getResourceContext(app, ctx));
+ assertEquals(
+ 1, AutoScalerInfo.forResource(app,
kubernetesClient).getMetricHistory().size());
+ assertNull(getCurrentMetricValue(source, RECOMMENDED_PARALLELISM));
+ assertNull(getCurrentMetricValue(sink, RECOMMENDED_PARALLELISM));
+ assertEquals(1., getCurrentMetricValue(source, PARALLELISM));
+ assertEquals(1., getCurrentMetricValue(sink, PARALLELISM));
+ // the auto scaler is running in recommendation mode
+ assertNull(ScalingExecutorTest.getScaledParallelism(app));
+
+ now = now.plus(Duration.ofSeconds(1));
+ setClocksTo(now);
+
+ // it stays empty until the metric window is full
+ autoscaler.scale(getResourceContext(app, ctx));
+ assertEquals(
+ 2, AutoScalerInfo.forResource(app,
kubernetesClient).getMetricHistory().size());
+ assertNull(getCurrentMetricValue(source, RECOMMENDED_PARALLELISM));
+ assertNull(getCurrentMetricValue(sink, RECOMMENDED_PARALLELISM));
+ assertEquals(1., getCurrentMetricValue(source, PARALLELISM));
+ assertEquals(1., getCurrentMetricValue(sink, PARALLELISM));
+ assertNull(ScalingExecutorTest.getScaledParallelism(app));
+
+ now = now.plus(Duration.ofSeconds(1));
+ setClocksTo(now);
+
+ // then the recommended parallelism can change according to the
evaluated metrics
+ autoscaler.scale(getResourceContext(app, ctx));
+ assertEquals(
+ 3, AutoScalerInfo.forResource(app,
kubernetesClient).getMetricHistory().size());
+ assertEquals(1., getCurrentMetricValue(source, PARALLELISM));
+ assertEquals(1., getCurrentMetricValue(sink, PARALLELISM));
+ assertEquals(4., getCurrentMetricValue(source,
RECOMMENDED_PARALLELISM));
+ assertEquals(4., getCurrentMetricValue(sink, RECOMMENDED_PARALLELISM));
+ assertNull(ScalingExecutorTest.getScaledParallelism(app));
+
+ // once scaling is enabled
+
app.getSpec().getFlinkConfiguration().put(AutoScalerOptions.SCALING_ENABLED.key(),
"true");
+
+ now = now.plus(Duration.ofSeconds(1));
+ setClocksTo(now);
+
+ // the scaled parallelism will pick up the recommended parallelism
values
+ autoscaler.scale(getResourceContext(app, ctx));
+ assertEquals(
+ 3, AutoScalerInfo.forResource(app,
kubernetesClient).getMetricHistory().size());
+ assertEquals(4., getCurrentMetricValue(source,
RECOMMENDED_PARALLELISM));
+ assertEquals(4., getCurrentMetricValue(sink, RECOMMENDED_PARALLELISM));
+ var scaledParallelism = ScalingExecutorTest.getScaledParallelism(app);
+ assertEquals(4, scaledParallelism.get(source));
+ assertEquals(4, scaledParallelism.get(sink));
+
+ // updating the topology to reflect the scale
+ metricsCollector.setJobTopology(
+ new JobTopology(
+ new VertexInfo(source, Set.of(), 4, 24),
+ new VertexInfo(sink, Set.of(source), 4, 720)));
+
+ now = now.plus(Duration.ofSeconds(10));
+ setClocksTo(now);
+ restart(now);
+
+ // after restart while the job is not running the evaluated metrics
are gone
+ autoscaler.scale(getResourceContext(app, ctx));
+ assertEquals(
+ 3, AutoScalerInfo.forResource(app,
kubernetesClient).getMetricHistory().size());
+
assertNull(autoscaler.lastEvaluatedMetrics.get(ResourceID.fromResource(app)));
+ scaledParallelism = ScalingExecutorTest.getScaledParallelism(app);
+ assertEquals(4, scaledParallelism.get(source));
+ assertEquals(4, scaledParallelism.get(sink));
+
+ now = now.plus(Duration.ofSeconds(1));
+ setClocksTo(now);
+ running(now);
+
+ // once the job is running we got back the evaluated metric except the
recommended
+ // parallelisms (until the metric window is full again)
+ autoscaler.scale(getResourceContext(app, ctx));
+ assertEquals(
+ 1, AutoScalerInfo.forResource(app,
kubernetesClient).getMetricHistory().size());
+ assertEquals(4., getCurrentMetricValue(source, PARALLELISM));
+ assertEquals(4., getCurrentMetricValue(sink, PARALLELISM));
+ assertNull(getCurrentMetricValue(source, RECOMMENDED_PARALLELISM));
+ assertNull(getCurrentMetricValue(sink, RECOMMENDED_PARALLELISM));
+ scaledParallelism = ScalingExecutorTest.getScaledParallelism(app);
+ assertEquals(4, scaledParallelism.get(source));
+ assertEquals(4, scaledParallelism.get(sink));
+ }
+
+ private Double getCurrentMetricValue(JobVertexID jobVertexID,
ScalingMetric scalingMetric) {
+ var metric =
+ autoscaler
+ .lastEvaluatedMetrics
+ .get(ResourceID.fromResource(app))
+ .get(jobVertexID)
+ .get(scalingMetric);
+ return metric == null ? null : metric.getCurrent();
+ }
+
+ private void restart(Instant now) {
+
app.getStatus().getJobStatus().setUpdateTime(String.valueOf(now.toEpochMilli()));
+ app.getStatus().getJobStatus().setState(JobStatus.CREATED.name());
+ }
+
+ private void running(Instant now) {
+
app.getStatus().getJobStatus().setUpdateTime(String.valueOf(now.toEpochMilli()));
+ app.getStatus().getJobStatus().setState(JobStatus.RUNNING.name());
+ }
+
+ private void setClocksTo(Instant time) {
+ var clock = Clock.fixed(time, ZoneId.systemDefault());
+ metricsCollector.setClock(clock);
+ scalingExecutor.setClock(clock);
+ }
+
+ @NotNull
+ private TestUtils.TestingContext<HasMetadata>
createAutoscalerTestContext() {
+ return new TestUtils.TestingContext<>() {
+ public <T1> Set<T1> getSecondaryResources(Class<T1> aClass) {
+ return (Set)
+
kubernetesClient.configMaps().inAnyNamespace().list().getItems().stream()
+ .collect(Collectors.toSet());
+ }
+ };
+ }
+}