mxm commented on code in PR #613:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/613#discussion_r1222756888
##########
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFlinkMetrics.java:
##########
@@ -45,7 +43,13 @@ public class AutoscalerFlinkMetrics {
private final MetricGroup metricGroup;
- private final Set<JobVertexID> vertexMetrics = new HashSet<>();
+ private final Map<JobVertexID, MetricGroup> vertexMetricGroups = new
ConcurrentHashMap<>();
+ private final Map<Tuple2<JobVertexID, ScalingMetric>, MetricGroup>
scalingMetricGroups =
+ new ConcurrentHashMap<>();
+ private final Map<Tuple2<JobVertexID, ScalingMetric>, Gauge<Double>>
currentScalingMetrics =
+ new ConcurrentHashMap<>();
+ private final Map<Tuple2<JobVertexID, ScalingMetric>, Gauge<Double>>
averageScalingMetrics =
+ new ConcurrentHashMap<>();
Review Comment:
Please revert back to using non-current hash map. This is not accessed
concurrently.
##########
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFlinkMetrics.java:
##########
@@ -54,51 +58,122 @@ public AutoscalerFlinkMetrics(MetricGroup metricGroup) {
this.metricGroup = metricGroup;
}
- public void registerScalingMetrics(
+ public void registerEvaluatedScalingMetrics(
Supplier<Map<JobVertexID, Map<ScalingMetric,
EvaluatedScalingMetric>>>
currentVertexMetrics) {
currentVertexMetrics
.get()
.forEach(
- (jobVertexID, evaluated) -> {
- if (!vertexMetrics.add(jobVertexID)) {
- return;
- }
- LOG.info("Registering scaling metrics for job
vertex {}", jobVertexID);
- var jobVertexMg =
- metricGroup.addGroup("jobVertexID",
jobVertexID.toHexString());
-
- evaluated.forEach(
- (sm, esm) -> {
- var smGroup =
jobVertexMg.addGroup(sm.name());
-
- smGroup.gauge(
- "Current",
- () ->
- Optional.ofNullable(
-
currentVertexMetrics.get())
- .map(m ->
m.get(jobVertexID))
- .map(
-
metrics ->
-
metrics.get(sm)
-
.getCurrent())
- .orElse(null));
-
- if (sm.isCalculateAverage()) {
- smGroup.gauge(
- "Average",
- () ->
-
Optional.ofNullable(
-
currentVertexMetrics
-
.get())
- .map(m ->
m.get(jobVertexID))
- .map(
-
metrics ->
-
metrics.get(sm)
-
.getAverage())
-
.orElse(null));
- }
+ (jobVertexID, evaluatedScalingMetrics) -> {
+ var jobVertexMg =
getJobVertexMetricGroup(jobVertexID);
+ evaluatedScalingMetrics.forEach(
+ (scalingMetric, esm) -> {
+ var smg =
+ getScalingMetricGroup(
+ jobVertexID,
jobVertexMg, scalingMetric);
+ registerScalingMetric(
+ currentVertexMetrics,
+ jobVertexID,
+ scalingMetric,
+ smg);
});
});
}
+
+ public void registerRecommendedParallelismMetrics(
+ Supplier<Map<JobVertexID, Integer>> recommendedParallelisms) {
+
+ if (recommendedParallelisms == null || recommendedParallelisms.get()
== null) {
+ return;
+ }
+ recommendedParallelisms
+ .get()
+ .forEach(
+ (jobVertexID, parallelism) -> {
+ var jobVertexMg =
getJobVertexMetricGroup(jobVertexID);
+ var smg =
+ getScalingMetricGroup(
+ jobVertexID, jobVertexMg,
RECOMMENDED_PARALLELISM);
+
+ currentScalingMetrics.computeIfAbsent(
+ Tuple2.of(jobVertexID,
RECOMMENDED_PARALLELISM),
+ key ->
+ smg.gauge(
+ "Current",
+ () ->
+ getCurrentValue(
+
recommendedParallelisms,
+
jobVertexID)));
+ });
+ }
Review Comment:
Why does this require a separate method? The recommended parallelism should
just be exposed like any other evaluated scaling metric.
##########
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFlinkMetrics.java:
##########
@@ -54,51 +58,122 @@ public AutoscalerFlinkMetrics(MetricGroup metricGroup) {
this.metricGroup = metricGroup;
}
- public void registerScalingMetrics(
+ public void registerEvaluatedScalingMetrics(
Supplier<Map<JobVertexID, Map<ScalingMetric,
EvaluatedScalingMetric>>>
currentVertexMetrics) {
currentVertexMetrics
.get()
.forEach(
- (jobVertexID, evaluated) -> {
- if (!vertexMetrics.add(jobVertexID)) {
- return;
- }
- LOG.info("Registering scaling metrics for job
vertex {}", jobVertexID);
- var jobVertexMg =
- metricGroup.addGroup("jobVertexID",
jobVertexID.toHexString());
-
- evaluated.forEach(
- (sm, esm) -> {
- var smGroup =
jobVertexMg.addGroup(sm.name());
-
- smGroup.gauge(
- "Current",
- () ->
- Optional.ofNullable(
-
currentVertexMetrics.get())
- .map(m ->
m.get(jobVertexID))
- .map(
-
metrics ->
-
metrics.get(sm)
-
.getCurrent())
- .orElse(null));
-
- if (sm.isCalculateAverage()) {
- smGroup.gauge(
- "Average",
- () ->
-
Optional.ofNullable(
-
currentVertexMetrics
-
.get())
- .map(m ->
m.get(jobVertexID))
- .map(
-
metrics ->
-
metrics.get(sm)
-
.getAverage())
-
.orElse(null));
- }
Review Comment:
As far as I understand, this should have been a separate commit or PR. The
refactoring wasn't necessary to make this work.
##########
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java:
##########
@@ -143,4 +163,18 @@ private AutoscalerFlinkMetrics
getOrInitAutoscalerFlinkMetrics(
new AutoscalerFlinkMetrics(
ctx.getResourceMetricGroup().addGroup("AutoScaler")));
}
+
+ private void resetRecommendedParallelisms(
Review Comment:
(2) Special casing for the PR feature in new component
##########
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFlinkMetrics.java:
##########
@@ -54,51 +58,122 @@ public AutoscalerFlinkMetrics(MetricGroup metricGroup) {
this.metricGroup = metricGroup;
}
- public void registerScalingMetrics(
+ public void registerEvaluatedScalingMetrics(
Supplier<Map<JobVertexID, Map<ScalingMetric,
EvaluatedScalingMetric>>>
currentVertexMetrics) {
currentVertexMetrics
.get()
.forEach(
- (jobVertexID, evaluated) -> {
- if (!vertexMetrics.add(jobVertexID)) {
- return;
- }
- LOG.info("Registering scaling metrics for job
vertex {}", jobVertexID);
- var jobVertexMg =
- metricGroup.addGroup("jobVertexID",
jobVertexID.toHexString());
-
- evaluated.forEach(
- (sm, esm) -> {
- var smGroup =
jobVertexMg.addGroup(sm.name());
-
- smGroup.gauge(
- "Current",
- () ->
- Optional.ofNullable(
-
currentVertexMetrics.get())
- .map(m ->
m.get(jobVertexID))
- .map(
-
metrics ->
-
metrics.get(sm)
-
.getCurrent())
- .orElse(null));
-
- if (sm.isCalculateAverage()) {
- smGroup.gauge(
- "Average",
- () ->
-
Optional.ofNullable(
-
currentVertexMetrics
-
.get())
- .map(m ->
m.get(jobVertexID))
- .map(
-
metrics ->
-
metrics.get(sm)
-
.getAverage())
-
.orElse(null));
- }
+ (jobVertexID, evaluatedScalingMetrics) -> {
+ var jobVertexMg =
getJobVertexMetricGroup(jobVertexID);
+ evaluatedScalingMetrics.forEach(
+ (scalingMetric, esm) -> {
+ var smg =
+ getScalingMetricGroup(
+ jobVertexID,
jobVertexMg, scalingMetric);
+ registerScalingMetric(
+ currentVertexMetrics,
+ jobVertexID,
+ scalingMetric,
+ smg);
});
});
}
+
+ public void registerRecommendedParallelismMetrics(
Review Comment:
(1) Special casing for the PR feature in new component
##########
flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java:
##########
@@ -1,437 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.kubernetes.operator.autoscaler;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.kubernetes.operator.OperatorTestBase;
-import org.apache.flink.kubernetes.operator.TestUtils;
-import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
-import
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
-import org.apache.flink.kubernetes.operator.autoscaler.topology.VertexInfo;
-import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
-import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
-import org.apache.flink.kubernetes.operator.utils.EventCollector;
-import org.apache.flink.kubernetes.operator.utils.EventRecorder;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
-
-import io.fabric8.kubernetes.api.model.HasMetadata;
-import io.fabric8.kubernetes.client.KubernetesClient;
-import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
-import io.javaoperatorsdk.operator.processing.event.ResourceID;
-import lombok.Getter;
-import org.jetbrains.annotations.NotNull;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import java.time.Clock;
-import java.time.Duration;
-import java.time.Instant;
-import java.time.ZoneId;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-/** Test for scaling metrics collection logic. */
-@EnableKubernetesMockClient(crud = true)
-public class BacklogBasedScalingTest extends OperatorTestBase {
-
- @Getter private KubernetesClient kubernetesClient;
-
- private ScalingMetricEvaluator evaluator;
- private TestingMetricsCollector metricsCollector;
- private ScalingExecutor scalingExecutor;
-
- private FlinkDeployment app;
- private JobVertexID source1, sink;
-
- private JobAutoScalerImpl autoscaler;
-
- private EventCollector eventCollector = new EventCollector();
-
- @BeforeEach
- public void setup() {
- evaluator = new ScalingMetricEvaluator();
- scalingExecutor =
- new ScalingExecutor(
- kubernetesClient,
- new EventRecorder(kubernetesClient, new
EventCollector()));
-
- app = TestUtils.buildApplicationCluster();
- app.getMetadata().setGeneration(1L);
- app.getStatus().getJobStatus().setJobId(new JobID().toHexString());
- kubernetesClient.resource(app).createOrReplace();
-
- source1 = new JobVertexID();
- sink = new JobVertexID();
-
- metricsCollector =
- new TestingMetricsCollector(
- new JobTopology(
- new VertexInfo(source1, Set.of(), 1, 720),
- new VertexInfo(sink, Set.of(source1), 1,
720)));
-
- var defaultConf = new Configuration();
- defaultConf.set(AutoScalerOptions.AUTOSCALER_ENABLED, true);
- defaultConf.set(AutoScalerOptions.STABILIZATION_INTERVAL,
Duration.ZERO);
- defaultConf.set(AutoScalerOptions.RESTART_TIME, Duration.ofSeconds(1));
- defaultConf.set(AutoScalerOptions.CATCH_UP_DURATION,
Duration.ofSeconds(2));
- defaultConf.set(AutoScalerOptions.SCALING_ENABLED, true);
- defaultConf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 1.);
- defaultConf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, (double)
Integer.MAX_VALUE);
- defaultConf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.8);
- defaultConf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.1);
- defaultConf.set(AutoScalerOptions.SCALE_UP_GRACE_PERIOD,
Duration.ZERO);
-
- configManager = new FlinkConfigManager(defaultConf);
- ReconciliationUtils.updateStatusForDeployedSpec(
- app, configManager.getDeployConfig(app.getMetadata(),
app.getSpec()));
- app.getStatus().getJobStatus().setState(JobStatus.RUNNING.name());
-
- autoscaler =
- new JobAutoScalerImpl(
- kubernetesClient,
- metricsCollector,
- evaluator,
- scalingExecutor,
- new EventRecorder(kubernetesClient, eventCollector));
-
- // Reset custom window size to default
- metricsCollector.setTestMetricWindowSize(null);
- }
-
- @Test
- public void test() throws Exception {
- var ctx = createAutoscalerTestContext();
-
- /* Test scaling up. */
- var now = Instant.ofEpochMilli(0);
- setClocksTo(now);
- redeployJob(now);
- // Adjust metric window size, so we can fill the metric window with
two metrics
- metricsCollector.setTestMetricWindowSize(Duration.ofSeconds(1));
- metricsCollector.setCurrentMetrics(
- Map.of(
- source1,
- Map.of(
- FlinkMetric.BUSY_TIME_PER_SEC,
- new AggregatedMetric("", Double.NaN, 850.,
Double.NaN, Double.NaN),
- FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
- new AggregatedMetric("", Double.NaN,
Double.NaN, Double.NaN, 500.),
- FlinkMetric.NUM_RECORDS_IN_PER_SEC,
- new AggregatedMetric("", Double.NaN,
Double.NaN, Double.NaN, 500.),
- FlinkMetric.PENDING_RECORDS,
- new AggregatedMetric(
- "", Double.NaN, Double.NaN,
Double.NaN, 2000.)),
- sink,
- Map.of(
- FlinkMetric.BUSY_TIME_PER_SEC,
- new AggregatedMetric("", Double.NaN, 850.,
Double.NaN, Double.NaN),
- FlinkMetric.NUM_RECORDS_IN_PER_SEC,
- new AggregatedMetric(
- "", Double.NaN, Double.NaN,
Double.NaN, 500.))));
-
- autoscaler.scale(getResourceContext(app, ctx));
- assertEquals(
- 1, AutoScalerInfo.forResource(app,
kubernetesClient).getMetricHistory().size());
- assertFlinkMetricsCount(0, 0, ctx);
-
- now = now.plus(Duration.ofSeconds(1));
- setClocksTo(now);
- autoscaler.scale(getResourceContext(app, ctx));
- assertEquals(
- 2, AutoScalerInfo.forResource(app,
kubernetesClient).getMetricHistory().size());
-
- var scaledParallelism = ScalingExecutorTest.getScaledParallelism(app);
- assertEquals(4, scaledParallelism.get(source1));
- assertEquals(4, scaledParallelism.get(sink));
- assertFlinkMetricsCount(1, 0, ctx);
-
- /* Test stability while processing pending records. */
-
- // Update topology to reflect updated parallelisms
- metricsCollector.setJobTopology(
- new JobTopology(
- new VertexInfo(source1, Set.of(), 4, 24),
- new VertexInfo(sink, Set.of(source1), 4, 720)));
- metricsCollector.setCurrentMetrics(
- Map.of(
- source1,
- Map.of(
- FlinkMetric.BUSY_TIME_PER_SEC,
- new AggregatedMetric("", Double.NaN, 1000.,
Double.NaN, Double.NaN),
- FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
- new AggregatedMetric("", Double.NaN,
Double.NaN, Double.NaN, 1800.),
- FlinkMetric.NUM_RECORDS_IN_PER_SEC,
- new AggregatedMetric("", Double.NaN,
Double.NaN, Double.NaN, 1800.),
- FlinkMetric.PENDING_RECORDS,
- new AggregatedMetric(
- "", Double.NaN, Double.NaN,
Double.NaN, 2500.)),
- sink,
- Map.of(
- FlinkMetric.BUSY_TIME_PER_SEC,
- new AggregatedMetric("", Double.NaN, 1000.,
Double.NaN, Double.NaN),
- FlinkMetric.NUM_RECORDS_IN_PER_SEC,
- new AggregatedMetric(
- "", Double.NaN, Double.NaN,
Double.NaN, 1800.))));
- now = now.plusSeconds(1);
- setClocksTo(now);
- // Redeploying which erases metric history
- redeployJob(now);
- // Adjust metric window size, so we can fill the metric window with
three metrics
- metricsCollector.setTestMetricWindowSize(Duration.ofSeconds(2));
-
- autoscaler.scale(getResourceContext(app, ctx));
- assertFlinkMetricsCount(1, 0, ctx);
- assertEquals(
- 1, AutoScalerInfo.forResource(app,
kubernetesClient).getMetricHistory().size());
- scaledParallelism = ScalingExecutorTest.getScaledParallelism(app);
- assertEquals(4, scaledParallelism.get(source1));
- assertEquals(4, scaledParallelism.get(sink));
-
- metricsCollector.setCurrentMetrics(
- Map.of(
- source1,
- Map.of(
- FlinkMetric.BUSY_TIME_PER_SEC,
- new AggregatedMetric("", Double.NaN, 1000.,
Double.NaN, Double.NaN),
- FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
- new AggregatedMetric("", Double.NaN,
Double.NaN, Double.NaN, 1800.),
- FlinkMetric.NUM_RECORDS_IN_PER_SEC,
- new AggregatedMetric("", Double.NaN,
Double.NaN, Double.NaN, 1800.),
- FlinkMetric.PENDING_RECORDS,
- new AggregatedMetric(
- "", Double.NaN, Double.NaN,
Double.NaN, 1200.)),
- sink,
- Map.of(
- FlinkMetric.BUSY_TIME_PER_SEC,
- new AggregatedMetric("", Double.NaN, 1000.,
Double.NaN, Double.NaN),
- FlinkMetric.NUM_RECORDS_IN_PER_SEC,
- new AggregatedMetric(
- "", Double.NaN, Double.NaN,
Double.NaN, 1800.))));
-
- now = now.plus(Duration.ofSeconds(1));
- setClocksTo(now);
- autoscaler.scale(getResourceContext(app, ctx));
- assertFlinkMetricsCount(1, 0, ctx);
- assertEquals(
- 2, AutoScalerInfo.forResource(app,
kubernetesClient).getMetricHistory().size());
- scaledParallelism = ScalingExecutorTest.getScaledParallelism(app);
- assertEquals(4, scaledParallelism.get(source1));
- assertEquals(4, scaledParallelism.get(sink));
-
- /* Test scaling down. */
-
- // We have finally caught up to our original lag, time to scale down
- metricsCollector.setCurrentMetrics(
- Map.of(
- source1,
- Map.of(
- FlinkMetric.BUSY_TIME_PER_SEC,
- new AggregatedMetric("", Double.NaN, 600.,
Double.NaN, Double.NaN),
- FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
- new AggregatedMetric("", Double.NaN,
Double.NaN, Double.NaN, 800.),
- FlinkMetric.NUM_RECORDS_IN_PER_SEC,
- new AggregatedMetric("", Double.NaN,
Double.NaN, Double.NaN, 800.),
- FlinkMetric.PENDING_RECORDS,
- new AggregatedMetric("", Double.NaN,
Double.NaN, Double.NaN, 0.)),
- sink,
- Map.of(
- FlinkMetric.BUSY_TIME_PER_SEC,
- new AggregatedMetric("", Double.NaN, 600.,
Double.NaN, Double.NaN),
- FlinkMetric.NUM_RECORDS_IN_PER_SEC,
- new AggregatedMetric(
- "", Double.NaN, Double.NaN,
Double.NaN, 800.))));
- now = now.plus(Duration.ofSeconds(1));
- setClocksTo(now);
- autoscaler.scale(getResourceContext(app, ctx));
- assertFlinkMetricsCount(2, 0, ctx);
- assertEquals(
- 3, AutoScalerInfo.forResource(app,
kubernetesClient).getMetricHistory().size());
-
- scaledParallelism = ScalingExecutorTest.getScaledParallelism(app);
- assertEquals(2, scaledParallelism.get(source1));
- assertEquals(2, scaledParallelism.get(sink));
-
- metricsCollector.setJobTopology(
- new JobTopology(
- new VertexInfo(source1, Set.of(), 2, 24),
- new VertexInfo(sink, Set.of(source1), 2, 720)));
-
- /* Test stability while processing backlog. */
-
- metricsCollector.setCurrentMetrics(
- Map.of(
- source1,
- Map.of(
- FlinkMetric.BUSY_TIME_PER_SEC,
- new AggregatedMetric("", Double.NaN, 1000.,
Double.NaN, Double.NaN),
- FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
- new AggregatedMetric("", Double.NaN,
Double.NaN, Double.NaN, 900.),
- FlinkMetric.NUM_RECORDS_IN_PER_SEC,
- new AggregatedMetric("", Double.NaN,
Double.NaN, Double.NaN, 900.),
- FlinkMetric.PENDING_RECORDS,
- new AggregatedMetric("", Double.NaN,
Double.NaN, Double.NaN, 500.)),
- sink,
- Map.of(
- FlinkMetric.BUSY_TIME_PER_SEC,
- new AggregatedMetric("", Double.NaN, 1000.,
Double.NaN, Double.NaN),
- FlinkMetric.NUM_RECORDS_IN_PER_SEC,
- new AggregatedMetric(
- "", Double.NaN, Double.NaN,
Double.NaN, 900.))));
- now = now.plus(Duration.ofSeconds(1));
- setClocksTo(now);
-
app.getStatus().getJobStatus().setStartTime(String.valueOf(now.toEpochMilli()));
- autoscaler.scale(getResourceContext(app, ctx));
- assertFlinkMetricsCount(2, 1, ctx);
- scaledParallelism = ScalingExecutorTest.getScaledParallelism(app);
- assertEquals(2, scaledParallelism.get(source1));
- assertEquals(2, scaledParallelism.get(sink));
-
- metricsCollector.setCurrentMetrics(
- Map.of(
- source1,
- Map.of(
- FlinkMetric.BUSY_TIME_PER_SEC,
- new AggregatedMetric("", Double.NaN, 1000.,
Double.NaN, Double.NaN),
- FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
- new AggregatedMetric("", Double.NaN,
Double.NaN, Double.NaN, 900.),
- FlinkMetric.NUM_RECORDS_IN_PER_SEC,
- new AggregatedMetric("", Double.NaN,
Double.NaN, Double.NaN, 900.),
- FlinkMetric.PENDING_RECORDS,
- new AggregatedMetric("", Double.NaN,
Double.NaN, Double.NaN, 100.)),
- sink,
- Map.of(
- FlinkMetric.BUSY_TIME_PER_SEC,
- new AggregatedMetric("", Double.NaN, 1000.,
Double.NaN, Double.NaN),
- FlinkMetric.NUM_RECORDS_IN_PER_SEC,
- new AggregatedMetric(
- "", Double.NaN, Double.NaN,
Double.NaN, 900.))));
- now = now.plus(Duration.ofSeconds(1));
- setClocksTo(now);
- autoscaler.scale(getResourceContext(app, ctx));
- assertFlinkMetricsCount(2, 2, ctx);
-
- scaledParallelism = ScalingExecutorTest.getScaledParallelism(app);
- assertEquals(2, scaledParallelism.get(source1));
- assertEquals(2, scaledParallelism.get(sink));
-
- metricsCollector.setCurrentMetrics(
- Map.of(
- source1,
- Map.of(
- FlinkMetric.BUSY_TIME_PER_SEC,
- new AggregatedMetric("", Double.NaN, 500.,
Double.NaN, Double.NaN),
- FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
- new AggregatedMetric("", Double.NaN,
Double.NaN, Double.NaN, 500.),
- FlinkMetric.NUM_RECORDS_IN_PER_SEC,
- new AggregatedMetric("", Double.NaN,
Double.NaN, Double.NaN, 500.),
- FlinkMetric.PENDING_RECORDS,
- new AggregatedMetric("", Double.NaN,
Double.NaN, Double.NaN, 0.)),
- sink,
- Map.of(
- FlinkMetric.BUSY_TIME_PER_SEC,
- new AggregatedMetric("", Double.NaN, 500.,
Double.NaN, Double.NaN),
- FlinkMetric.NUM_RECORDS_IN_PER_SEC,
- new AggregatedMetric(
- "", Double.NaN, Double.NaN,
Double.NaN, 500.))));
- now = now.plus(Duration.ofSeconds(1));
- setClocksTo(now);
- autoscaler.scale(getResourceContext(app, ctx));
- assertFlinkMetricsCount(2, 3, ctx);
- scaledParallelism = ScalingExecutorTest.getScaledParallelism(app);
- assertEquals(2, scaledParallelism.get(source1));
- assertEquals(2, scaledParallelism.get(sink));
- }
-
- @Test
- public void testMetricsPersistedAfterRedeploy() {
- var ctx = createAutoscalerTestContext();
- var now = Instant.ofEpochMilli(0);
- setClocksTo(now);
-
app.getStatus().getJobStatus().setUpdateTime(String.valueOf(now.toEpochMilli()));
- metricsCollector.setCurrentMetrics(
- Map.of(
- source1,
- Map.of(
- FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
- new AggregatedMetric("", Double.NaN,
Double.NaN, Double.NaN, 500.),
- FlinkMetric.NUM_RECORDS_IN_PER_SEC,
- new AggregatedMetric("", Double.NaN,
Double.NaN, Double.NaN, 500.)),
- sink,
- Map.of(
- FlinkMetric.NUM_RECORDS_IN_PER_SEC,
- new AggregatedMetric(
- "", Double.NaN, Double.NaN,
Double.NaN, 500.))));
-
- autoscaler.scale(getResourceContext(app, ctx));
- assertFalse(AutoScalerInfo.forResource(app,
kubernetesClient).getMetricHistory().isEmpty());
- }
-
- @Test
- public void testEventOnError() {
- // Invalid config
- app.getSpec()
- .getFlinkConfiguration()
- .put("kubernetes.operator.job.autoscaler.enabled", "3");
- autoscaler.scale(getResourceContext(app,
createAutoscalerTestContext()));
-
- var event = eventCollector.events.poll();
- assertTrue(eventCollector.events.isEmpty());
- assertEquals(EventRecorder.Reason.AutoscalerError.toString(),
event.getReason());
- assertTrue(event.getMessage().startsWith("Could not parse"));
- }
-
- private void redeployJob(Instant now) {
-
app.getStatus().getJobStatus().setUpdateTime(String.valueOf(now.toEpochMilli()));
- }
-
- private void setClocksTo(Instant time) {
- var clock = Clock.fixed(time, ZoneId.systemDefault());
- metricsCollector.setClock(clock);
- scalingExecutor.setClock(clock);
- }
-
- @NotNull
- private TestUtils.TestingContext<HasMetadata>
createAutoscalerTestContext() {
- return new TestUtils.TestingContext<>() {
- public <T1> Set<T1> getSecondaryResources(Class<T1> aClass) {
- return (Set)
-
kubernetesClient.configMaps().inAnyNamespace().list().getItems().stream()
- .collect(Collectors.toSet());
- }
- };
- }
-
- private void assertFlinkMetricsCount(
- int scalingCount, int balancedCount,
TestUtils.TestingContext<HasMetadata> ctx) {
- AutoscalerFlinkMetrics autoscalerFlinkMetrics =
- autoscaler.flinkMetrics.get(
- ResourceID.fromResource(getResourceContext(app,
ctx).getResource()));
- assertEquals(scalingCount,
autoscalerFlinkMetrics.numScalings.getCount());
- assertEquals(balancedCount,
autoscalerFlinkMetrics.numBalanced.getCount());
- }
-}
Review Comment:
Can we keep this test class and keep the refactoring to a minimum in this
feature PR?
##########
flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImplTest.java:
##########
@@ -18,59 +18,537 @@
package org.apache.flink.kubernetes.operator.autoscaler;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.OperatorTestBase;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
+import org.apache.flink.kubernetes.operator.autoscaler.topology.VertexInfo;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
-import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.utils.EventCollector;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
+import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import lombok.Getter;
+import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import static
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED;
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
-/** Tests for JobAutoScalerImpl. */
+/** Test for {@link JobAutoScalerImpl }. */
@EnableKubernetesMockClient(crud = true)
public class JobAutoScalerImplTest extends OperatorTestBase {
@Getter private KubernetesClient kubernetesClient;
+ private ScalingMetricEvaluator evaluator;
+ private TestingMetricsCollector metricsCollector;
+ private ScalingExecutor scalingExecutor;
+
private FlinkDeployment app;
+ private JobVertexID source1, sink;
+
+ private JobAutoScalerImpl autoscaler;
+
+ private EventCollector eventCollector = new EventCollector();
@BeforeEach
public void setup() {
+ evaluator = new ScalingMetricEvaluator();
+ scalingExecutor =
+ new ScalingExecutor(
+ kubernetesClient,
+ new EventRecorder(kubernetesClient, new
EventCollector()));
+
app = TestUtils.buildApplicationCluster();
app.getMetadata().setGeneration(1L);
app.getStatus().getJobStatus().setJobId(new JobID().toHexString());
kubernetesClient.resource(app).createOrReplace();
+ source1 = new JobVertexID();
+ sink = new JobVertexID();
+
+ metricsCollector =
+ new TestingMetricsCollector(
+ new JobTopology(
+ new VertexInfo(source1, Set.of(), 1, 720),
+ new VertexInfo(sink, Set.of(source1), 1,
720)));
+
var defaultConf = new Configuration();
- defaultConf.set(AUTOSCALER_ENABLED, true);
+ defaultConf.set(AutoScalerOptions.AUTOSCALER_ENABLED, true);
+ defaultConf.set(AutoScalerOptions.STABILIZATION_INTERVAL,
Duration.ZERO);
+ defaultConf.set(AutoScalerOptions.RESTART_TIME, Duration.ofSeconds(1));
+ defaultConf.set(AutoScalerOptions.CATCH_UP_DURATION,
Duration.ofSeconds(2));
+ defaultConf.set(AutoScalerOptions.SCALING_ENABLED, true);
+ defaultConf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 1.);
+ defaultConf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, (double)
Integer.MAX_VALUE);
+ defaultConf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.8);
+ defaultConf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.1);
+ defaultConf.set(AutoScalerOptions.SCALE_UP_GRACE_PERIOD,
Duration.ZERO);
+
configManager = new FlinkConfigManager(defaultConf);
ReconciliationUtils.updateStatusForDeployedSpec(
app, configManager.getDeployConfig(app.getMetadata(),
app.getSpec()));
+ app.getStatus().getJobStatus().setState(JobStatus.RUNNING.name());
+
+ autoscaler =
+ new JobAutoScalerImpl(
+ kubernetesClient,
+ metricsCollector,
+ evaluator,
+ scalingExecutor,
+ new EventRecorder(kubernetesClient, eventCollector));
+
+ // Reset custom window size to default
+ metricsCollector.setTestMetricWindowSize(null);
+ }
+
+ @Test
+ public void testBacklogBasedScaling() throws Exception {
Review Comment:
Is this a new test or an existing one? It's hard to review when there are
many unrelated changes.
##########
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java:
##########
@@ -269,6 +273,25 @@ private void setVertexParallelismOverrides(
resource.getSpec().setFlinkConfiguration(flinkConf.toMap());
}
+ private void updateRecommendedParallelisms(
Review Comment:
(3) Special casing for the PR feature in new component
##########
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java:
##########
@@ -441,6 +443,17 @@ public void cleanup(AbstractFlinkResource<?, ?> cr) {
topologies.remove(resourceId);
}
+ private void cleanupRecommendedParallelisms(
Review Comment:
(4) Special casing for the PR feature in new component
##########
flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java:
##########
@@ -68,6 +69,12 @@ public class MetricsCollectionAndEvaluationTest {
private final AutoScalerInfo scalingInfo = new AutoScalerInfo(new
HashMap<>());
+ private final Map<ResourceID, Map<JobVertexID, Integer>>
lastScalingSummaries =
+ new ConcurrentHashMap<>();
+
+ private final Map<ResourceID, Map<JobVertexID, Integer>>
recommendedParallelisms =
+ new ConcurrentHashMap<>();
Review Comment:
Test is not concurrent, hence normal HashMap would suffice and would be
advisable to use here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]