This is an automated email from the ASF dual-hosted git repository. mxm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit e2dd94d5083133e1f49f616b83a6423ba83969a1 Author: Maximilian Michels <[email protected]> AuthorDate: Tue May 9 21:00:57 2023 +0200 [FLINK-32005] Report executed scaling decisions --- .../autoscaler/AutoscalerFlinkMetrics.java | 104 +++++++++++++++++++++ .../operator/autoscaler/JobAutoScalerImpl.java | 90 +++++------------- .../autoscaler/BacklogBasedScalingTest.java | 19 ++++ .../operator/autoscaler/JobAutoScalerImplTest.java | 7 +- 4 files changed, 151 insertions(+), 69 deletions(-) diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFlinkMetrics.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFlinkMetrics.java new file mode 100644 index 00000000..f1084e6e --- /dev/null +++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFlinkMetrics.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.autoscaler; + +import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric; +import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.function.Supplier; + +/** Autoscaler metrics for observability. */ +public class AutoscalerFlinkMetrics { + + private static final Logger LOG = LoggerFactory.getLogger(AutoscalerFlinkMetrics.class); + + final Counter numScalings; + + final Counter numErrors; + + final Counter numBalanced; + + private final MetricGroup metricGroup; + + private final Set<JobVertexID> vertexMetrics = new HashSet<>(); + + public AutoscalerFlinkMetrics(MetricGroup metricGroup) { + this.numScalings = metricGroup.counter("scalings"); + this.numErrors = metricGroup.counter("errors"); + this.numBalanced = metricGroup.counter("balanced"); + this.metricGroup = metricGroup; + } + + public void registerScalingMetrics( + Supplier<Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>>> + currentVertexMetrics) { + currentVertexMetrics + .get() + .forEach( + (jobVertexID, evaluated) -> { + if (!vertexMetrics.add(jobVertexID)) { + return; + } + LOG.info("Registering scaling metrics for job vertex {}", jobVertexID); + var jobVertexMg = + metricGroup.addGroup("jobVertexID", jobVertexID.toHexString()); + + evaluated.forEach( + (sm, esm) -> { + var smGroup = jobVertexMg.addGroup(sm.name()); + + smGroup.gauge( + "Current", + () -> + Optional.ofNullable( + currentVertexMetrics.get()) + .map(m -> m.get(jobVertexID)) + .map( + metrics -> + metrics.get(sm) + .getCurrent()) + .orElse(null)); + + if (sm.isCalculateAverage()) { + smGroup.gauge( + "Average", + () -> + Optional.ofNullable( + currentVertexMetrics + .get()) + .map(m -> m.get(jobVertexID)) + .map( + metrics -> + metrics.get(sm) + .getAverage()) + .orElse(null)); + } + }); + }); + } +} diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java index 041c03e9..186de397 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java +++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java @@ -24,8 +24,6 @@ import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext; import org.apache.flink.kubernetes.operator.reconciler.deployment.JobAutoScaler; import org.apache.flink.kubernetes.operator.utils.EventRecorder; -import org.apache.flink.metrics.Counter; -import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.jobgraph.JobVertexID; import io.fabric8.kubernetes.client.KubernetesClient; @@ -33,10 +31,7 @@ import io.javaoperatorsdk.operator.processing.event.ResourceID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashSet; import java.util.Map; -import java.util.Optional; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED; @@ -54,9 +49,7 @@ public class JobAutoScalerImpl implements JobAutoScaler { private final Map<ResourceID, Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>>> lastEvaluatedMetrics = new ConcurrentHashMap<>(); - private final Map<ResourceID, Set<JobVertexID>> registeredMetrics = new ConcurrentHashMap<>(); - - final Map<ResourceID, Counter> errorCounters = new ConcurrentHashMap<>(); + final Map<ResourceID, AutoscalerFlinkMetrics> flinkMetrics = new ConcurrentHashMap<>(); public JobAutoScalerImpl( KubernetesClient kubernetesClient, @@ -77,7 +70,9 @@ public class JobAutoScalerImpl implements JobAutoScaler { metricsCollector.cleanup(cr); var resourceId = ResourceID.fromResource(cr); lastEvaluatedMetrics.remove(resourceId); - registeredMetrics.remove(resourceId); + // We are not removing the metrics registered with Flink (flinkMetrics) + // because Flink metrics can only be registered once! When the deployment + // comes back we would not be able to register and report metrics. } @Override @@ -85,8 +80,7 @@ public class JobAutoScalerImpl implements JobAutoScaler { var conf = ctx.getObserveConfig(); var resource = ctx.getResource(); - var resouceId = ResourceID.fromResource(resource); - var autoscalerMetricGroup = ctx.getResourceMetricGroup().addGroup("AutoScaler"); + var resourceId = ResourceID.fromResource(resource); try { @@ -95,6 +89,9 @@ public class JobAutoScalerImpl implements JobAutoScaler { return false; } + // Initialize metrics only if autoscaler is enabled + var flinkMetrics = getOrInitAutoscalerFlinkMetrics(ctx, resourceId); + if (!resource.getStatus().getJobStatus().getState().equals(JobStatus.RUNNING.name())) { LOG.info("Job autoscaler is waiting for RUNNING job state"); return false; @@ -114,18 +111,21 @@ public class JobAutoScalerImpl implements JobAutoScaler { LOG.debug("Evaluating scaling metrics for {}", collectedMetrics); var evaluatedMetrics = evaluator.evaluate(conf, collectedMetrics); LOG.debug("Scaling metrics evaluated: {}", evaluatedMetrics); - lastEvaluatedMetrics.put(resouceId, evaluatedMetrics); - registerResourceScalingMetrics(resource, autoscalerMetricGroup); + lastEvaluatedMetrics.put(resourceId, evaluatedMetrics); + flinkMetrics.registerScalingMetrics(() -> lastEvaluatedMetrics.get(resourceId)); var specAdjusted = scalingExecutor.scaleResource(resource, autoScalerInfo, conf, evaluatedMetrics); + if (specAdjusted) { + flinkMetrics.numScalings.inc(); + } else { + flinkMetrics.numBalanced.inc(); + } autoScalerInfo.replaceInKubernetes(kubernetesClient); return specAdjusted; } catch (Throwable e) { LOG.error("Error while scaling resource", e); - errorCounters - .computeIfAbsent(resouceId, _id -> autoscalerMetricGroup.counter("errors")) - .inc(); + getOrInitAutoscalerFlinkMetrics(ctx, resourceId).numErrors.inc(); eventRecorder.triggerEvent( resource, EventRecorder.Type.Warning, @@ -136,56 +136,12 @@ public class JobAutoScalerImpl implements JobAutoScaler { } } - private void registerResourceScalingMetrics( - AbstractFlinkResource<?, ?> resource, MetricGroup scalerGroup) { - var resourceId = ResourceID.fromResource(resource); - - lastEvaluatedMetrics - .get(resourceId) - .forEach( - (jobVertexID, evaluated) -> { - if (!registeredMetrics - .computeIfAbsent(resourceId, r -> new HashSet<>()) - .add(jobVertexID)) { - return; - } - LOG.info("Registering scaling metrics for job vertex {}", jobVertexID); - var jobVertexMg = - scalerGroup.addGroup("jobVertexID", jobVertexID.toHexString()); - - evaluated.forEach( - (sm, esm) -> { - var smGroup = jobVertexMg.addGroup(sm.name()); - - smGroup.gauge( - "Current", - () -> - Optional.ofNullable( - lastEvaluatedMetrics.get( - resourceId)) - .map(m -> m.get(jobVertexID)) - .map( - metrics -> - metrics.get(sm) - .getCurrent()) - .orElse(null)); - - if (sm.isCalculateAverage()) { - smGroup.gauge( - "Average", - () -> - Optional.ofNullable( - lastEvaluatedMetrics - .get( - resourceId)) - .map(m -> m.get(jobVertexID)) - .map( - metrics -> - metrics.get(sm) - .getAverage()) - .orElse(null)); - } - }); - }); + private AutoscalerFlinkMetrics getOrInitAutoscalerFlinkMetrics( + FlinkResourceContext<? extends AbstractFlinkResource<?, ?>> ctx, ResourceID resouceId) { + return this.flinkMetrics.computeIfAbsent( + resouceId, + id -> + new AutoscalerFlinkMetrics( + ctx.getResourceMetricGroup().addGroup("AutoScaler"))); } } diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java index 6809dd86..67cb8300 100644 --- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java +++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java @@ -37,6 +37,7 @@ import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric; import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient; +import io.javaoperatorsdk.operator.processing.event.ResourceID; import lombok.Getter; import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.BeforeEach; @@ -156,6 +157,7 @@ public class BacklogBasedScalingTest extends OperatorTestBase { autoscaler.scale(getResourceContext(app, ctx)); assertEquals( 1, AutoScalerInfo.forResource(app, kubernetesClient).getMetricHistory().size()); + assertFlinkMetricsCount(0, 0, ctx); now = now.plus(Duration.ofSeconds(1)); setClocksTo(now); @@ -166,6 +168,7 @@ public class BacklogBasedScalingTest extends OperatorTestBase { var scaledParallelism = ScalingExecutorTest.getScaledParallelism(app); assertEquals(4, scaledParallelism.get(source1)); assertEquals(4, scaledParallelism.get(sink)); + assertFlinkMetricsCount(1, 0, ctx); /* Test stability while processing pending records. */ @@ -202,6 +205,7 @@ public class BacklogBasedScalingTest extends OperatorTestBase { metricsCollector.setTestMetricWindowSize(Duration.ofSeconds(2)); autoscaler.scale(getResourceContext(app, ctx)); + assertFlinkMetricsCount(1, 0, ctx); assertEquals( 1, AutoScalerInfo.forResource(app, kubernetesClient).getMetricHistory().size()); scaledParallelism = ScalingExecutorTest.getScaledParallelism(app); @@ -232,6 +236,7 @@ public class BacklogBasedScalingTest extends OperatorTestBase { now = now.plus(Duration.ofSeconds(1)); setClocksTo(now); autoscaler.scale(getResourceContext(app, ctx)); + assertFlinkMetricsCount(1, 0, ctx); assertEquals( 2, AutoScalerInfo.forResource(app, kubernetesClient).getMetricHistory().size()); scaledParallelism = ScalingExecutorTest.getScaledParallelism(app); @@ -263,6 +268,7 @@ public class BacklogBasedScalingTest extends OperatorTestBase { now = now.plus(Duration.ofSeconds(1)); setClocksTo(now); autoscaler.scale(getResourceContext(app, ctx)); + assertFlinkMetricsCount(2, 0, ctx); assertEquals( 3, AutoScalerInfo.forResource(app, kubernetesClient).getMetricHistory().size()); @@ -300,6 +306,7 @@ public class BacklogBasedScalingTest extends OperatorTestBase { setClocksTo(now); app.getStatus().getJobStatus().setStartTime(String.valueOf(now.toEpochMilli())); autoscaler.scale(getResourceContext(app, ctx)); + assertFlinkMetricsCount(2, 1, ctx); scaledParallelism = ScalingExecutorTest.getScaledParallelism(app); assertEquals(2, scaledParallelism.get(source1)); assertEquals(2, scaledParallelism.get(sink)); @@ -326,6 +333,8 @@ public class BacklogBasedScalingTest extends OperatorTestBase { now = now.plus(Duration.ofSeconds(1)); setClocksTo(now); autoscaler.scale(getResourceContext(app, ctx)); + assertFlinkMetricsCount(2, 2, ctx); + scaledParallelism = ScalingExecutorTest.getScaledParallelism(app); assertEquals(2, scaledParallelism.get(source1)); assertEquals(2, scaledParallelism.get(sink)); @@ -352,6 +361,7 @@ public class BacklogBasedScalingTest extends OperatorTestBase { now = now.plus(Duration.ofSeconds(1)); setClocksTo(now); autoscaler.scale(getResourceContext(app, ctx)); + assertFlinkMetricsCount(2, 3, ctx); scaledParallelism = ScalingExecutorTest.getScaledParallelism(app); assertEquals(2, scaledParallelism.get(source1)); assertEquals(2, scaledParallelism.get(sink)); @@ -415,4 +425,13 @@ public class BacklogBasedScalingTest extends OperatorTestBase { } }; } + + private void assertFlinkMetricsCount( + int scalingCount, int balancedCount, TestUtils.TestingContext<HasMetadata> ctx) { + AutoscalerFlinkMetrics autoscalerFlinkMetrics = + autoscaler.flinkMetrics.get( + ResourceID.fromResource(getResourceContext(app, ctx).getResource())); + assertEquals(scalingCount, autoscalerFlinkMetrics.numScalings.getCount()); + assertEquals(balancedCount, autoscalerFlinkMetrics.numBalanced.getCount()); + } } diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImplTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImplTest.java index 5303e5c7..8f478bf0 100644 --- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImplTest.java +++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImplTest.java @@ -35,6 +35,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED; +import static org.junit.jupiter.api.Assertions.assertEquals; /** Tests for JobAutoScalerImpl. */ @EnableKubernetesMockClient(crud = true) @@ -65,9 +66,11 @@ public class JobAutoScalerImplTest extends OperatorTestBase { ResourceID resourceId = ResourceID.fromResource(app); autoscaler.scale(resourceContext); - Assertions.assertEquals(1, autoscaler.errorCounters.get(resourceId).getCount()); + Assertions.assertEquals(1, autoscaler.flinkMetrics.get(resourceId).numErrors.getCount()); autoscaler.scale(resourceContext); - Assertions.assertEquals(2, autoscaler.errorCounters.get(resourceId).getCount()); + Assertions.assertEquals(2, autoscaler.flinkMetrics.get(resourceId).numErrors.getCount()); + + assertEquals(0, autoscaler.flinkMetrics.get(resourceId).numScalings.getCount()); } }
