This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit 746a996732e56e573f7882764c82e2faa2cba71d
Author: Rui Fan <[email protected]>
AuthorDate: Mon Jan 29 11:40:18 2024 +0800

    [FLINK-34178][autoscaler][refactor] Get the jobRunningTs from 
JobStatus.RUNNING instead of max timestamp
    
    Reason: The JobStatus may be changed during scaling
---
 .../flink/autoscaler/ScalingMetricCollector.java      | 19 ++++++++++++-------
 .../flink/autoscaler/ScalingMetricCollectorTest.java  |  2 +-
 .../flink/autoscaler/TestingMetricsCollector.java     |  2 +-
 3 files changed, 14 insertions(+), 9 deletions(-)

diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java
index 8f31ecf0..fce91ea4 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java
@@ -19,6 +19,7 @@ package org.apache.flink.autoscaler;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.autoscaler.config.AutoScalerOptions;
 import org.apache.flink.autoscaler.exceptions.NotReadyException;
 import org.apache.flink.autoscaler.metrics.CollectedMetricHistory;
@@ -69,6 +70,7 @@ import java.util.stream.Stream;
 import static 
org.apache.flink.autoscaler.metrics.ScalingHistoryUtils.updateVertexList;
 import static 
org.apache.flink.autoscaler.utils.AutoScalerUtils.excludeVerticesFromScaling;
 import static org.apache.flink.autoscaler.utils.DateTimeUtils.readable;
+import static org.apache.flink.util.Preconditions.checkState;
 
 /** Metric collector using flink rest api. */
 public abstract class ScalingMetricCollector<KEY, Context extends 
JobAutoScalerContext<KEY>> {
@@ -104,17 +106,17 @@ public abstract class ScalingMetricCollector<KEY, Context 
extends JobAutoScalerC
 
         var jobDetailsInfo =
                 getJobDetailsInfo(ctx, 
conf.get(AutoScalerOptions.FLINK_CLIENT_TIMEOUT));
-        var jobUpdateTs = getJobUpdateTs(jobDetailsInfo);
+        var jobRunningTs = getJobRunningTs(jobDetailsInfo);
         // We detect job change compared to our collected metrics by checking 
against the earliest
         // metric timestamp
-        if (!metricHistory.isEmpty() && 
jobUpdateTs.isAfter(metricHistory.firstKey())) {
-            LOG.info("Job updated at {}. Clearing metrics.", 
readable(jobUpdateTs));
+        if (!metricHistory.isEmpty() && 
jobRunningTs.isAfter(metricHistory.firstKey())) {
+            LOG.info("Job updated at {}. Clearing metrics.", 
readable(jobRunningTs));
             stateStore.removeCollectedMetrics(ctx);
             cleanup(ctx.getJobKey());
             metricHistory.clear();
         }
         var topology = getJobTopology(ctx, stateStore, jobDetailsInfo);
-        var stableTime = 
jobUpdateTs.plus(conf.get(AutoScalerOptions.STABILIZATION_INTERVAL));
+        var stableTime = 
jobRunningTs.plus(conf.get(AutoScalerOptions.STABILIZATION_INTERVAL));
         final boolean isStabilizing = now.isBefore(stableTime);
 
         // Calculate timestamp when the metric windows is full
@@ -181,9 +183,12 @@ public abstract class ScalingMetricCollector<KEY, Context 
extends JobAutoScalerC
     }
 
     @VisibleForTesting
-    protected Instant getJobUpdateTs(JobDetailsInfo jobDetailsInfo) {
-        return Instant.ofEpochMilli(
-                
jobDetailsInfo.getTimestamps().values().stream().max(Long::compare).get());
+    protected Instant getJobRunningTs(JobDetailsInfo jobDetailsInfo) {
+        final Map<JobStatus, Long> timestamps = jobDetailsInfo.getTimestamps();
+
+        final Long runningTs = timestamps.get(JobStatus.RUNNING);
+        checkState(runningTs != null, "Unable to find when the job was 
switched to RUNNING.");
+        return Instant.ofEpochMilli(runningTs);
     }
 
     protected JobTopology getJobTopology(
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricCollectorTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricCollectorTest.java
index ed8f852d..730fcf63 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricCollectorTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricCollectorTest.java
@@ -95,7 +95,7 @@ public class ScalingMetricCollectorTest {
                         Map.of(),
                         new JobPlanInfo.RawJson(""));
         var metricsCollector = new RestApiMetricsCollector<>();
-        assertEquals(Instant.ofEpochMilli(3L), 
metricsCollector.getJobUpdateTs(details));
+        assertEquals(Instant.ofEpochMilli(3L), 
metricsCollector.getJobRunningTs(details));
     }
 
     @Test
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/TestingMetricsCollector.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/TestingMetricsCollector.java
index 1731bc64..d5a1b2f2 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/TestingMetricsCollector.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/TestingMetricsCollector.java
@@ -97,7 +97,7 @@ public class TestingMetricsCollector<KEY, Context extends 
JobAutoScalerContext<K
     }
 
     @Override
-    protected Instant getJobUpdateTs(JobDetailsInfo jobDetailsInfo) {
+    protected Instant getJobRunningTs(JobDetailsInfo jobDetailsInfo) {
         return jobUpdateTs;
     }
 }

Reply via email to