This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 9d54190e [hotfix] Fix evaluation error during stabilization period
9d54190e is described below
commit 9d54190e99df1b936ad0ea07356c903301d65b3f
Author: Gyula Fora <[email protected]>
AuthorDate: Thu Jun 8 12:02:04 2023 +0200
[hotfix] Fix evaluation error during stabilization period
---
.../kubernetes/operator/autoscaler/JobAutoScalerImpl.java | 5 +++++
.../operator/autoscaler/BacklogBasedScalingTest.java | 15 +++++++++++++++
2 files changed, 20 insertions(+)
diff --git
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
index 4bd53e4f..b05038a5 100644
---
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
+++
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
@@ -101,6 +101,11 @@ public class JobAutoScalerImpl implements JobAutoScaler {
metricsCollector.updateMetrics(
resource, autoScalerInfo, ctx.getFlinkService(),
conf);
+ if (collectedMetrics.getMetricHistory().isEmpty()) {
+ autoScalerInfo.replaceInKubernetes(kubernetesClient);
+ return false;
+ }
+
LOG.debug("Evaluating scaling metrics for {}", collectedMetrics);
var evaluatedMetrics = evaluator.evaluate(conf, collectedMetrics);
LOG.debug("Scaling metrics evaluated: {}", evaluatedMetrics);
diff --git
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
index 67cb8300..4ce069ca 100644
---
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
+++
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
@@ -405,6 +405,21 @@ public class BacklogBasedScalingTest extends
OperatorTestBase {
assertTrue(event.getMessage().startsWith("Could not parse"));
}
+ @Test
+ public void testNoEvaluationDuringStabilization() {
+ var conf = configManager.getDefaultConfig();
+ conf.set(AutoScalerOptions.STABILIZATION_INTERVAL,
Duration.ofMinutes(1));
+ configManager.updateDefaultConfig(conf);
+
+ var ctx = createAutoscalerTestContext();
+ var now = Instant.ofEpochMilli(0);
+ setClocksTo(now);
+
app.getStatus().getJobStatus().setUpdateTime(String.valueOf(now.toEpochMilli()));
+ assertFalse(autoscaler.scale(getResourceContext(app, ctx)));
+ assertTrue(AutoScalerInfo.forResource(app,
kubernetesClient).getMetricHistory().isEmpty());
+ assertTrue(eventCollector.events.isEmpty());
+ }
+
private void redeployJob(Instant now) {
app.getStatus().getJobStatus().setUpdateTime(String.valueOf(now.toEpochMilli()));
}