This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 9d54190e [hotfix] Fix evaluation error during stabilization period
9d54190e is described below

commit 9d54190e99df1b936ad0ea07356c903301d65b3f
Author: Gyula Fora <[email protected]>
AuthorDate: Thu Jun 8 12:02:04 2023 +0200

    [hotfix] Fix evaluation error during stabilization period
---
 .../kubernetes/operator/autoscaler/JobAutoScalerImpl.java |  5 +++++
 .../operator/autoscaler/BacklogBasedScalingTest.java      | 15 +++++++++++++++
 2 files changed, 20 insertions(+)

diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
index 4bd53e4f..b05038a5 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
@@ -101,6 +101,11 @@ public class JobAutoScalerImpl implements JobAutoScaler {
                     metricsCollector.updateMetrics(
                             resource, autoScalerInfo, ctx.getFlinkService(), 
conf);
 
+            if (collectedMetrics.getMetricHistory().isEmpty()) {
+                autoScalerInfo.replaceInKubernetes(kubernetesClient);
+                return false;
+            }
+
             LOG.debug("Evaluating scaling metrics for {}", collectedMetrics);
             var evaluatedMetrics = evaluator.evaluate(conf, collectedMetrics);
             LOG.debug("Scaling metrics evaluated: {}", evaluatedMetrics);
diff --git 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
index 67cb8300..4ce069ca 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
@@ -405,6 +405,21 @@ public class BacklogBasedScalingTest extends 
OperatorTestBase {
         assertTrue(event.getMessage().startsWith("Could not parse"));
     }
 
+    @Test
+    public void testNoEvaluationDuringStabilization() {
+        var conf = configManager.getDefaultConfig();
+        conf.set(AutoScalerOptions.STABILIZATION_INTERVAL, 
Duration.ofMinutes(1));
+        configManager.updateDefaultConfig(conf);
+
+        var ctx = createAutoscalerTestContext();
+        var now = Instant.ofEpochMilli(0);
+        setClocksTo(now);
+        
app.getStatus().getJobStatus().setUpdateTime(String.valueOf(now.toEpochMilli()));
+        assertFalse(autoscaler.scale(getResourceContext(app, ctx)));
+        assertTrue(AutoScalerInfo.forResource(app, 
kubernetesClient).getMetricHistory().isEmpty());
+        assertTrue(eventCollector.events.isEmpty());
+    }
+
     private void redeployJob(Instant now) {
         
app.getStatus().getJobStatus().setUpdateTime(String.valueOf(now.toEpochMilli()));
     }

Reply via email to