This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit e2dd94d5083133e1f49f616b83a6423ba83969a1
Author: Maximilian Michels <[email protected]>
AuthorDate: Tue May 9 21:00:57 2023 +0200

    [FLINK-32005] Report executed scaling decisions
---
 .../autoscaler/AutoscalerFlinkMetrics.java         | 104 +++++++++++++++++++++
 .../operator/autoscaler/JobAutoScalerImpl.java     |  90 +++++-------------
 .../autoscaler/BacklogBasedScalingTest.java        |  19 ++++
 .../operator/autoscaler/JobAutoScalerImplTest.java |   7 +-
 4 files changed, 151 insertions(+), 69 deletions(-)

diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFlinkMetrics.java
 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFlinkMetrics.java
new file mode 100644
index 00000000..f1084e6e
--- /dev/null
+++ 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFlinkMetrics.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Supplier;
+
+/** Autoscaler metrics for observability. */
+public class AutoscalerFlinkMetrics {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AutoscalerFlinkMetrics.class);
+
+    final Counter numScalings;
+
+    final Counter numErrors;
+
+    final Counter numBalanced;
+
+    private final MetricGroup metricGroup;
+
+    private final Set<JobVertexID> vertexMetrics = new HashSet<>();
+
+    public AutoscalerFlinkMetrics(MetricGroup metricGroup) {
+        this.numScalings = metricGroup.counter("scalings");
+        this.numErrors = metricGroup.counter("errors");
+        this.numBalanced = metricGroup.counter("balanced");
+        this.metricGroup = metricGroup;
+    }
+
+    public void registerScalingMetrics(
+            Supplier<Map<JobVertexID, Map<ScalingMetric, 
EvaluatedScalingMetric>>>
+                    currentVertexMetrics) {
+        currentVertexMetrics
+                .get()
+                .forEach(
+                        (jobVertexID, evaluated) -> {
+                            if (!vertexMetrics.add(jobVertexID)) {
+                                return;
+                            }
+                            LOG.info("Registering scaling metrics for job 
vertex {}", jobVertexID);
+                            var jobVertexMg =
+                                    metricGroup.addGroup("jobVertexID", 
jobVertexID.toHexString());
+
+                            evaluated.forEach(
+                                    (sm, esm) -> {
+                                        var smGroup = 
jobVertexMg.addGroup(sm.name());
+
+                                        smGroup.gauge(
+                                                "Current",
+                                                () ->
+                                                        Optional.ofNullable(
+                                                                        
currentVertexMetrics.get())
+                                                                .map(m -> 
m.get(jobVertexID))
+                                                                .map(
+                                                                        
metrics ->
+                                                                               
 metrics.get(sm)
+                                                                               
         .getCurrent())
+                                                                .orElse(null));
+
+                                        if (sm.isCalculateAverage()) {
+                                            smGroup.gauge(
+                                                    "Average",
+                                                    () ->
+                                                            
Optional.ofNullable(
+                                                                            
currentVertexMetrics
+                                                                               
     .get())
+                                                                    .map(m -> 
m.get(jobVertexID))
+                                                                    .map(
+                                                                            
metrics ->
+                                                                               
     metrics.get(sm)
+                                                                               
             .getAverage())
+                                                                    
.orElse(null));
+                                        }
+                                    });
+                        });
+    }
+}
diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
index 041c03e9..186de397 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
@@ -24,8 +24,6 @@ import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
 import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
 import 
org.apache.flink.kubernetes.operator.reconciler.deployment.JobAutoScaler;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
-import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
@@ -33,10 +31,7 @@ import 
io.javaoperatorsdk.operator.processing.event.ResourceID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashSet;
 import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED;
@@ -54,9 +49,7 @@ public class JobAutoScalerImpl implements JobAutoScaler {
 
     private final Map<ResourceID, Map<JobVertexID, Map<ScalingMetric, 
EvaluatedScalingMetric>>>
             lastEvaluatedMetrics = new ConcurrentHashMap<>();
-    private final Map<ResourceID, Set<JobVertexID>> registeredMetrics = new 
ConcurrentHashMap<>();
-
-    final Map<ResourceID, Counter> errorCounters = new ConcurrentHashMap<>();
+    final Map<ResourceID, AutoscalerFlinkMetrics> flinkMetrics = new 
ConcurrentHashMap<>();
 
     public JobAutoScalerImpl(
             KubernetesClient kubernetesClient,
@@ -77,7 +70,9 @@ public class JobAutoScalerImpl implements JobAutoScaler {
         metricsCollector.cleanup(cr);
         var resourceId = ResourceID.fromResource(cr);
         lastEvaluatedMetrics.remove(resourceId);
-        registeredMetrics.remove(resourceId);
+        // We are not removing the metrics registered with Flink (flinkMetrics)
+        // because Flink metrics can only be registered once! When the 
deployment
+        // comes back we would not be able to register and report metrics.
     }
 
     @Override
@@ -85,8 +80,7 @@ public class JobAutoScalerImpl implements JobAutoScaler {
 
         var conf = ctx.getObserveConfig();
         var resource = ctx.getResource();
-        var resouceId = ResourceID.fromResource(resource);
-        var autoscalerMetricGroup = 
ctx.getResourceMetricGroup().addGroup("AutoScaler");
+        var resourceId = ResourceID.fromResource(resource);
 
         try {
 
@@ -95,6 +89,9 @@ public class JobAutoScalerImpl implements JobAutoScaler {
                 return false;
             }
 
+            // Initialize metrics only if autoscaler is enabled
+            var flinkMetrics = getOrInitAutoscalerFlinkMetrics(ctx, 
resourceId);
+
             if 
(!resource.getStatus().getJobStatus().getState().equals(JobStatus.RUNNING.name()))
 {
                 LOG.info("Job autoscaler is waiting for RUNNING job state");
                 return false;
@@ -114,18 +111,21 @@ public class JobAutoScalerImpl implements JobAutoScaler {
             LOG.debug("Evaluating scaling metrics for {}", collectedMetrics);
             var evaluatedMetrics = evaluator.evaluate(conf, collectedMetrics);
             LOG.debug("Scaling metrics evaluated: {}", evaluatedMetrics);
-            lastEvaluatedMetrics.put(resouceId, evaluatedMetrics);
-            registerResourceScalingMetrics(resource, autoscalerMetricGroup);
+            lastEvaluatedMetrics.put(resourceId, evaluatedMetrics);
+            flinkMetrics.registerScalingMetrics(() -> 
lastEvaluatedMetrics.get(resourceId));
 
             var specAdjusted =
                     scalingExecutor.scaleResource(resource, autoScalerInfo, 
conf, evaluatedMetrics);
+            if (specAdjusted) {
+                flinkMetrics.numScalings.inc();
+            } else {
+                flinkMetrics.numBalanced.inc();
+            }
             autoScalerInfo.replaceInKubernetes(kubernetesClient);
             return specAdjusted;
         } catch (Throwable e) {
             LOG.error("Error while scaling resource", e);
-            errorCounters
-                    .computeIfAbsent(resouceId, _id -> 
autoscalerMetricGroup.counter("errors"))
-                    .inc();
+            getOrInitAutoscalerFlinkMetrics(ctx, resourceId).numErrors.inc();
             eventRecorder.triggerEvent(
                     resource,
                     EventRecorder.Type.Warning,
@@ -136,56 +136,12 @@ public class JobAutoScalerImpl implements JobAutoScaler {
         }
     }
 
-    private void registerResourceScalingMetrics(
-            AbstractFlinkResource<?, ?> resource, MetricGroup scalerGroup) {
-        var resourceId = ResourceID.fromResource(resource);
-
-        lastEvaluatedMetrics
-                .get(resourceId)
-                .forEach(
-                        (jobVertexID, evaluated) -> {
-                            if (!registeredMetrics
-                                    .computeIfAbsent(resourceId, r -> new 
HashSet<>())
-                                    .add(jobVertexID)) {
-                                return;
-                            }
-                            LOG.info("Registering scaling metrics for job 
vertex {}", jobVertexID);
-                            var jobVertexMg =
-                                    scalerGroup.addGroup("jobVertexID", 
jobVertexID.toHexString());
-
-                            evaluated.forEach(
-                                    (sm, esm) -> {
-                                        var smGroup = 
jobVertexMg.addGroup(sm.name());
-
-                                        smGroup.gauge(
-                                                "Current",
-                                                () ->
-                                                        Optional.ofNullable(
-                                                                        
lastEvaluatedMetrics.get(
-                                                                               
 resourceId))
-                                                                .map(m -> 
m.get(jobVertexID))
-                                                                .map(
-                                                                        
metrics ->
-                                                                               
 metrics.get(sm)
-                                                                               
         .getCurrent())
-                                                                .orElse(null));
-
-                                        if (sm.isCalculateAverage()) {
-                                            smGroup.gauge(
-                                                    "Average",
-                                                    () ->
-                                                            
Optional.ofNullable(
-                                                                            
lastEvaluatedMetrics
-                                                                               
     .get(
-                                                                               
             resourceId))
-                                                                    .map(m -> 
m.get(jobVertexID))
-                                                                    .map(
-                                                                            
metrics ->
-                                                                               
     metrics.get(sm)
-                                                                               
             .getAverage())
-                                                                    
.orElse(null));
-                                        }
-                                    });
-                        });
+    private AutoscalerFlinkMetrics getOrInitAutoscalerFlinkMetrics(
+            FlinkResourceContext<? extends AbstractFlinkResource<?, ?>> ctx, 
ResourceID resouceId) {
+        return this.flinkMetrics.computeIfAbsent(
+                resouceId,
+                id ->
+                        new AutoscalerFlinkMetrics(
+                                
ctx.getResourceMetricGroup().addGroup("AutoScaler")));
     }
 }
diff --git 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
index 6809dd86..67cb8300 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
@@ -37,6 +37,7 @@ import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
 import io.fabric8.kubernetes.api.model.HasMetadata;
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
 import lombok.Getter;
 import org.jetbrains.annotations.NotNull;
 import org.junit.jupiter.api.BeforeEach;
@@ -156,6 +157,7 @@ public class BacklogBasedScalingTest extends 
OperatorTestBase {
         autoscaler.scale(getResourceContext(app, ctx));
         assertEquals(
                 1, AutoScalerInfo.forResource(app, 
kubernetesClient).getMetricHistory().size());
+        assertFlinkMetricsCount(0, 0, ctx);
 
         now = now.plus(Duration.ofSeconds(1));
         setClocksTo(now);
@@ -166,6 +168,7 @@ public class BacklogBasedScalingTest extends 
OperatorTestBase {
         var scaledParallelism = ScalingExecutorTest.getScaledParallelism(app);
         assertEquals(4, scaledParallelism.get(source1));
         assertEquals(4, scaledParallelism.get(sink));
+        assertFlinkMetricsCount(1, 0, ctx);
 
         /* Test stability while processing pending records. */
 
@@ -202,6 +205,7 @@ public class BacklogBasedScalingTest extends 
OperatorTestBase {
         metricsCollector.setTestMetricWindowSize(Duration.ofSeconds(2));
 
         autoscaler.scale(getResourceContext(app, ctx));
+        assertFlinkMetricsCount(1, 0, ctx);
         assertEquals(
                 1, AutoScalerInfo.forResource(app, 
kubernetesClient).getMetricHistory().size());
         scaledParallelism = ScalingExecutorTest.getScaledParallelism(app);
@@ -232,6 +236,7 @@ public class BacklogBasedScalingTest extends 
OperatorTestBase {
         now = now.plus(Duration.ofSeconds(1));
         setClocksTo(now);
         autoscaler.scale(getResourceContext(app, ctx));
+        assertFlinkMetricsCount(1, 0, ctx);
         assertEquals(
                 2, AutoScalerInfo.forResource(app, 
kubernetesClient).getMetricHistory().size());
         scaledParallelism = ScalingExecutorTest.getScaledParallelism(app);
@@ -263,6 +268,7 @@ public class BacklogBasedScalingTest extends 
OperatorTestBase {
         now = now.plus(Duration.ofSeconds(1));
         setClocksTo(now);
         autoscaler.scale(getResourceContext(app, ctx));
+        assertFlinkMetricsCount(2, 0, ctx);
         assertEquals(
                 3, AutoScalerInfo.forResource(app, 
kubernetesClient).getMetricHistory().size());
 
@@ -300,6 +306,7 @@ public class BacklogBasedScalingTest extends 
OperatorTestBase {
         setClocksTo(now);
         
app.getStatus().getJobStatus().setStartTime(String.valueOf(now.toEpochMilli()));
         autoscaler.scale(getResourceContext(app, ctx));
+        assertFlinkMetricsCount(2, 1, ctx);
         scaledParallelism = ScalingExecutorTest.getScaledParallelism(app);
         assertEquals(2, scaledParallelism.get(source1));
         assertEquals(2, scaledParallelism.get(sink));
@@ -326,6 +333,8 @@ public class BacklogBasedScalingTest extends 
OperatorTestBase {
         now = now.plus(Duration.ofSeconds(1));
         setClocksTo(now);
         autoscaler.scale(getResourceContext(app, ctx));
+        assertFlinkMetricsCount(2, 2, ctx);
+
         scaledParallelism = ScalingExecutorTest.getScaledParallelism(app);
         assertEquals(2, scaledParallelism.get(source1));
         assertEquals(2, scaledParallelism.get(sink));
@@ -352,6 +361,7 @@ public class BacklogBasedScalingTest extends 
OperatorTestBase {
         now = now.plus(Duration.ofSeconds(1));
         setClocksTo(now);
         autoscaler.scale(getResourceContext(app, ctx));
+        assertFlinkMetricsCount(2, 3, ctx);
         scaledParallelism = ScalingExecutorTest.getScaledParallelism(app);
         assertEquals(2, scaledParallelism.get(source1));
         assertEquals(2, scaledParallelism.get(sink));
@@ -415,4 +425,13 @@ public class BacklogBasedScalingTest extends 
OperatorTestBase {
             }
         };
     }
+
+    private void assertFlinkMetricsCount(
+            int scalingCount, int balancedCount, 
TestUtils.TestingContext<HasMetadata> ctx) {
+        AutoscalerFlinkMetrics autoscalerFlinkMetrics =
+                autoscaler.flinkMetrics.get(
+                        ResourceID.fromResource(getResourceContext(app, 
ctx).getResource()));
+        assertEquals(scalingCount, 
autoscalerFlinkMetrics.numScalings.getCount());
+        assertEquals(balancedCount, 
autoscalerFlinkMetrics.numBalanced.getCount());
+    }
 }
diff --git 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImplTest.java
 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImplTest.java
index 5303e5c7..8f478bf0 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImplTest.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImplTest.java
@@ -35,6 +35,7 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 
 /** Tests for JobAutoScalerImpl. */
 @EnableKubernetesMockClient(crud = true)
@@ -65,9 +66,11 @@ public class JobAutoScalerImplTest extends OperatorTestBase {
         ResourceID resourceId = ResourceID.fromResource(app);
 
         autoscaler.scale(resourceContext);
-        Assertions.assertEquals(1, 
autoscaler.errorCounters.get(resourceId).getCount());
+        Assertions.assertEquals(1, 
autoscaler.flinkMetrics.get(resourceId).numErrors.getCount());
 
         autoscaler.scale(resourceContext);
-        Assertions.assertEquals(2, 
autoscaler.errorCounters.get(resourceId).getCount());
+        Assertions.assertEquals(2, 
autoscaler.flinkMetrics.get(resourceId).numErrors.getCount());
+
+        assertEquals(0, 
autoscaler.flinkMetrics.get(resourceId).numScalings.getCount());
     }
 }

Reply via email to