This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 1c22bf6d [FLINK-30463] Start stabilization period only after job goes 
into RUNNING (#492)
1c22bf6d is described below

commit 1c22bf6d789cbe3aed223de5b54b87518218181e
Author: Maximilian Michels <[email protected]>
AuthorDate: Wed Dec 21 20:04:58 2022 +0100

    [FLINK-30463] Start stabilization period only after job goes into RUNNING 
(#492)
    
    We were tracking the stabilization period from the job start time which can
    greatly vary from the time the job goes into RUNNING state. This can result 
in
    no stabilization period at all leading to poor scaling decisions.
    
    Thus, we change the logic to use the update time in combination with a 
RUNNING
    job status.
---
 .../operator/autoscaler/ScalingExecutor.java       | 16 +++++++++++++---
 .../autoscaler/BacklogBasedScalingTest.java        |  4 +++-
 .../MetricsCollectionAndEvaluationTest.java        |  3 +++
 .../operator/autoscaler/ScalingExecutorTest.java   | 22 +++++++++++++---------
 4 files changed, 32 insertions(+), 13 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
index afd9d2fc..6a2353ab 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
@@ -18,6 +18,7 @@
 package org.apache.flink.kubernetes.operator.autoscaler;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
@@ -121,13 +122,22 @@ public class ScalingExecutor implements Cleanup {
 
     private boolean stabilizationPeriodPassed(
             AbstractFlinkResource<?, ?> resource, Configuration conf) {
-        var now = clock.instant();
+        var jobStatus = resource.getStatus().getJobStatus();
+
+        if (!JobStatus.RUNNING.name().equals(jobStatus.getState())) {
+            // Never consider a non-running job stable
+            return false;
+        }
+
         var startTs =
                 Instant.ofEpochMilli(
-                        
Long.parseLong(resource.getStatus().getJobStatus().getStartTime()));
+                        // Use the update time which will reflect the latest 
job state update
+                        // Do not use the start time because it doesn't tell 
when the job went to
+                        // RUNNING
+                        Long.parseLong(jobStatus.getUpdateTime()));
         var stableTime = startTs.plus(conf.get(STABILIZATION_INTERVAL));
 
-        if (stableTime.isAfter(now)) {
+        if (stableTime.isAfter(clock.instant())) {
             LOG.info("Waiting until {} to stabilize before new scale 
operation.", stableTime);
             return false;
         }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
index 1e1466c8..0d043a80 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
@@ -118,7 +118,9 @@ public class BacklogBasedScalingTest {
         var ctx = createAutoscalerTestContext();
         var now = Instant.now();
         setClocksTo(now);
-        
app.getStatus().getJobStatus().setStartTime(String.valueOf(now.toEpochMilli()));
+        String startTime = String.valueOf(now.toEpochMilli());
+        app.getStatus().getJobStatus().setStartTime(startTime);
+        app.getStatus().getJobStatus().setUpdateTime(startTime);
         metricsCollector.setCurrentMetrics(
                 Map.of(
                         source1,
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
index 11fc16a9..29b92abb 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
@@ -18,6 +18,7 @@
 package org.apache.flink.kubernetes.operator.autoscaler;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.TestingFlinkService;
@@ -103,6 +104,8 @@ public class MetricsCollectionAndEvaluationTest {
         conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 1.);
         ReconciliationUtils.updateStatusForDeployedSpec(app, conf);
         
app.getStatus().getJobStatus().setStartTime(String.valueOf(System.currentTimeMillis()));
+        
app.getStatus().getJobStatus().setUpdateTime(String.valueOf(System.currentTimeMillis()));
+        app.getStatus().getJobStatus().setState(JobStatus.RUNNING.name());
     }
 
     @Test
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
index e819fd89..9aa67145 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.kubernetes.operator.autoscaler;
 
+import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
@@ -66,9 +67,10 @@ public class ScalingExecutorTest {
 
         flinkDep = TestUtils.buildApplicationCluster();
         kubernetesClient.resource(flinkDep).createOrReplace();
-        flinkDep.getStatus()
-                .getJobStatus()
-                .setStartTime(String.valueOf(System.currentTimeMillis()));
+        var jobStatus = flinkDep.getStatus().getJobStatus();
+        jobStatus.setStartTime(String.valueOf(System.currentTimeMillis()));
+        jobStatus.setUpdateTime(String.valueOf(System.currentTimeMillis()));
+        jobStatus.setState(JobStatus.RUNNING.name());
     }
 
     @Test
@@ -79,9 +81,8 @@ public class ScalingExecutorTest {
 
         var scalingInfo = new AutoScalerInfo(new HashMap<>());
         var clock = Clock.fixed(Instant.now(), ZoneId.systemDefault());
-        flinkDep.getStatus()
-                .getJobStatus()
-                .setStartTime(String.valueOf(clock.instant().toEpochMilli()));
+        var jobStatus = flinkDep.getStatus().getJobStatus();
+        
jobStatus.setUpdateTime(String.valueOf(clock.instant().toEpochMilli()));
 
         scalingDecisionExecutor.setClock(clock);
         assertFalse(scalingDecisionExecutor.scaleResource(flinkDep, 
scalingInfo, conf, metrics));
@@ -98,9 +99,12 @@ public class ScalingExecutorTest {
         scalingDecisionExecutor.setClock(clock);
         assertTrue(scalingDecisionExecutor.scaleResource(flinkDep, 
scalingInfo, conf, metrics));
 
-        flinkDep.getStatus()
-                .getJobStatus()
-                .setStartTime(String.valueOf(clock.instant().toEpochMilli()));
+        // A job should not be considered stable in a non-RUNNING state
+        jobStatus.setState(JobStatus.FAILING.name());
+        assertFalse(scalingDecisionExecutor.scaleResource(flinkDep, 
scalingInfo, conf, metrics));
+
+        jobStatus.setState(JobStatus.RUNNING.name());
+        
jobStatus.setUpdateTime(String.valueOf(clock.instant().toEpochMilli()));
         assertFalse(scalingDecisionExecutor.scaleResource(flinkDep, 
scalingInfo, conf, metrics));
 
         clock = Clock.offset(clock, Duration.ofSeconds(59));

Reply via email to