This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new b11d6e18 [FLINK-31684] Report autoscaling metrics before metric window 
is full (#606)
b11d6e18 is described below

commit b11d6e1861ae1e7c9f6f0fdb4446b4c76420612e
Author: Maximilian Michels <[email protected]>
AuthorDate: Wed May 24 20:05:26 2023 +0200

    [FLINK-31684] Report autoscaling metrics before metric window is full (#606)
    
    The metrics get reported only after the metric window is full. This is not 
helpful for observability after rescaling. We need to make sure that metrics 
are reported even when the metric window is not
    yet full.
---
 .../operator/autoscaler/JobAutoScalerImpl.java      | 11 ++++++-----
 .../operator/autoscaler/ScalingMetricCollector.java | 15 +++++++--------
 .../autoscaler/metrics/CollectedMetricHistory.java  | 10 ++++++----
 .../MetricsCollectionAndEvaluationTest.java         | 21 +++++++++++++++------
 4 files changed, 34 insertions(+), 23 deletions(-)

diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
index d340b249..20571643 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
@@ -101,17 +101,18 @@ public class JobAutoScalerImpl implements JobAutoScaler {
                     metricsCollector.updateMetrics(
                             resource, autoScalerInfo, ctx.getFlinkService(), 
conf);
 
-            if (collectedMetrics.getMetricHistory().isEmpty()) {
-                autoScalerInfo.replaceInKubernetes(kubernetesClient);
-                return false;
-            }
-
             LOG.debug("Evaluating scaling metrics for {}", collectedMetrics);
             var evaluatedMetrics = evaluator.evaluate(conf, collectedMetrics);
             LOG.debug("Scaling metrics evaluated: {}", evaluatedMetrics);
             lastEvaluatedMetrics.put(resourceId, evaluatedMetrics);
             flinkMetrics.registerScalingMetrics(() -> 
lastEvaluatedMetrics.get(resourceId));
 
+            if (!collectedMetrics.isFullyCollected()) {
+                // We have done an upfront evaluation, but we are not ready 
for scaling.
+                autoScalerInfo.replaceInKubernetes(kubernetesClient);
+                return false;
+            }
+
             var specAdjusted =
                     scalingExecutor.scaleResource(resource, autoScalerInfo, 
conf, evaluatedMetrics);
             if (specAdjusted) {
diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
index 18478a9b..0fb65242 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
@@ -128,17 +128,16 @@ public abstract class ScalingMetricCollector {
         metricHistory.put(now, scalingMetrics);
         autoscalerInfo.updateMetricHistory(metricHistory);
 
+        var collectedMetrics = new CollectedMetricHistory(topology, 
metricHistory);
+
         var windowFullTime = metricCollectionStartTs.plus(metricWindowSize);
-        if (now.isBefore(windowFullTime)) {
-            // As long as we haven't had time to collect a full window,
-            // collect metrics but do not return any metrics
-            LOG.info(
-                    "Waiting until {} so the initial metric window is full 
before starting scaling",
-                    windowFullTime);
-            return new CollectedMetricHistory(topology, 
Collections.emptySortedMap());
+        collectedMetrics.setFullyCollected(!now.isBefore(windowFullTime));
+
+        if (!collectedMetrics.isFullyCollected()) {
+            LOG.info("Metric window not full until {}", windowFullTime);
         }
 
-        return new CollectedMetricHistory(topology, metricHistory);
+        return collectedMetrics;
     }
 
     protected Duration getMetricWindowSize(Configuration conf) {
diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetricHistory.java
 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetricHistory.java
index 33243ec0..43bbf47b 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetricHistory.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetricHistory.java
@@ -19,14 +19,16 @@ package 
org.apache.flink.kubernetes.operator.autoscaler.metrics;
 
 import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
 
-import lombok.Value;
+import lombok.Data;
+import lombok.Setter;
 
 import java.time.Instant;
 import java.util.SortedMap;
 
 /** Topology and collected metric history. */
-@Value
+@Data
 public class CollectedMetricHistory {
-    JobTopology jobTopology;
-    SortedMap<Instant, CollectedMetrics> metricHistory;
+    final JobTopology jobTopology;
+    final SortedMap<Instant, CollectedMetrics> metricHistory;
+    @Setter private boolean fullyCollected;
 }
diff --git 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
index 7bef5609..c019d7aa 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
@@ -146,14 +146,16 @@ public class MetricsCollectionAndEvaluationTest {
         clock = Clock.offset(clock, 
conf.get(AutoScalerOptions.STABILIZATION_INTERVAL));
         metricsCollector.setClock(clock);
         collectedMetrics = metricsCollector.updateMetrics(app, scalingInfo, 
service, conf);
-        assertTrue(collectedMetrics.getMetricHistory().isEmpty());
+        assertEquals(1, collectedMetrics.getMetricHistory().size());
+        assertFalse(collectedMetrics.isFullyCollected());
 
         // We haven't collected a full window yet
         // => no metrics should be reported but metrics should still get 
collected.
         clock = Clock.offset(clock, Duration.ofSeconds(1));
         metricsCollector.setClock(clock);
         collectedMetrics = metricsCollector.updateMetrics(app, scalingInfo, 
service, conf);
-        assertTrue(collectedMetrics.getMetricHistory().isEmpty());
+        assertEquals(2, collectedMetrics.getMetricHistory().size());
+        assertFalse(collectedMetrics.isFullyCollected());
 
         // Advance time to stabilization period + full window => metrics 
should be present
         clock =
@@ -165,6 +167,7 @@ public class MetricsCollectionAndEvaluationTest {
         metricsCollector.setClock(clock);
         collectedMetrics = metricsCollector.updateMetrics(app, scalingInfo, 
service, conf);
         assertEquals(3, collectedMetrics.getMetricHistory().size());
+        assertTrue(collectedMetrics.isFullyCollected());
 
         // Test resetting the collector and make sure we can deserialize the 
scalingInfo correctly
         metricsCollector = new TestingMetricsCollector(topology);
@@ -172,6 +175,7 @@ public class MetricsCollectionAndEvaluationTest {
         setDefaultMetrics(metricsCollector);
         collectedMetrics = metricsCollector.updateMetrics(app, scalingInfo, 
service, conf);
         assertEquals(3, collectedMetrics.getMetricHistory().size());
+        assertTrue(collectedMetrics.isFullyCollected());
 
         var evaluation = evaluator.evaluate(conf, collectedMetrics);
         scalingExecutor.scaleResource(app, scalingInfo, conf, evaluation);
@@ -318,17 +322,20 @@ public class MetricsCollectionAndEvaluationTest {
         // This call will lead to metric collection but we haven't reached the 
window size yet
         // which will hold back metrics
         metricsHistory = metricsCollector.updateMetrics(app, scalingInfo, 
service, conf);
-        assertEquals(0, metricsHistory.getMetricHistory().size());
+        assertEquals(1, metricsHistory.getMetricHistory().size());
+        assertFalse(metricsHistory.isFullyCollected());
 
         // Collect more values in window
         metricsCollector.setClock(Clock.offset(clock, Duration.ofSeconds(1)));
         metricsHistory = metricsCollector.updateMetrics(app, scalingInfo, 
service, conf);
-        assertEquals(0, metricsHistory.getMetricHistory().size());
+        assertEquals(2, metricsHistory.getMetricHistory().size());
+        assertFalse(metricsHistory.isFullyCollected());
 
         // Window size reached
         metricsCollector.setClock(Clock.offset(clock, 
conf.get(AutoScalerOptions.METRICS_WINDOW)));
         metricsHistory = metricsCollector.updateMetrics(app, scalingInfo, 
service, conf);
         assertEquals(3, metricsHistory.getMetricHistory().size());
+        assertTrue(metricsHistory.isFullyCollected());
 
         // Window size + 1 will invalidate the first metric
         metricsCollector.setClock(
@@ -447,12 +454,14 @@ public class MetricsCollectionAndEvaluationTest {
         metricsCollector.setClock(clock);
 
         var collectedMetrics = metricsCollector.updateMetrics(app, 
scalingInfo, service, conf);
-        assertTrue(collectedMetrics.getMetricHistory().isEmpty());
+        assertEquals(1, collectedMetrics.getMetricHistory().size());
+        assertFalse(collectedMetrics.isFullyCollected());
 
         metricsCollector.setClock(Clock.offset(clock, Duration.ofSeconds(2)));
 
         collectedMetrics = metricsCollector.updateMetrics(app, scalingInfo, 
service, conf);
-        assertFalse(collectedMetrics.getMetricHistory().isEmpty());
+        assertEquals(2, collectedMetrics.getMetricHistory().size());
+        assertTrue(collectedMetrics.isFullyCollected());
 
         return collectedMetrics;
     }

Reply via email to