This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit 746a996732e56e573f7882764c82e2faa2cba71d Author: Rui Fan <[email protected]> AuthorDate: Mon Jan 29 11:40:18 2024 +0800 [FLINK-34178][autoscaler][refactor] Get the jobRunningTs from JobStatus.RUNNING instead of max timestamp Reason: The JobStatus may be changed during scaling --- .../flink/autoscaler/ScalingMetricCollector.java | 19 ++++++++++++------- .../flink/autoscaler/ScalingMetricCollectorTest.java | 2 +- .../flink/autoscaler/TestingMetricsCollector.java | 2 +- 3 files changed, 14 insertions(+), 9 deletions(-) diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java index 8f31ecf0..fce91ea4 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java @@ -19,6 +19,7 @@ package org.apache.flink.autoscaler; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; import org.apache.flink.autoscaler.config.AutoScalerOptions; import org.apache.flink.autoscaler.exceptions.NotReadyException; import org.apache.flink.autoscaler.metrics.CollectedMetricHistory; @@ -69,6 +70,7 @@ import java.util.stream.Stream; import static org.apache.flink.autoscaler.metrics.ScalingHistoryUtils.updateVertexList; import static org.apache.flink.autoscaler.utils.AutoScalerUtils.excludeVerticesFromScaling; import static org.apache.flink.autoscaler.utils.DateTimeUtils.readable; +import static org.apache.flink.util.Preconditions.checkState; /** Metric collector using flink rest api. */ public abstract class ScalingMetricCollector<KEY, Context extends JobAutoScalerContext<KEY>> { @@ -104,17 +106,17 @@ public abstract class ScalingMetricCollector<KEY, Context extends JobAutoScalerC var jobDetailsInfo = getJobDetailsInfo(ctx, conf.get(AutoScalerOptions.FLINK_CLIENT_TIMEOUT)); - var jobUpdateTs = getJobUpdateTs(jobDetailsInfo); + var jobRunningTs = getJobRunningTs(jobDetailsInfo); // We detect job change compared to our collected metrics by checking against the earliest // metric timestamp - if (!metricHistory.isEmpty() && jobUpdateTs.isAfter(metricHistory.firstKey())) { - LOG.info("Job updated at {}. Clearing metrics.", readable(jobUpdateTs)); + if (!metricHistory.isEmpty() && jobRunningTs.isAfter(metricHistory.firstKey())) { + LOG.info("Job updated at {}. Clearing metrics.", readable(jobRunningTs)); stateStore.removeCollectedMetrics(ctx); cleanup(ctx.getJobKey()); metricHistory.clear(); } var topology = getJobTopology(ctx, stateStore, jobDetailsInfo); - var stableTime = jobUpdateTs.plus(conf.get(AutoScalerOptions.STABILIZATION_INTERVAL)); + var stableTime = jobRunningTs.plus(conf.get(AutoScalerOptions.STABILIZATION_INTERVAL)); final boolean isStabilizing = now.isBefore(stableTime); // Calculate timestamp when the metric windows is full @@ -181,9 +183,12 @@ public abstract class ScalingMetricCollector<KEY, Context extends JobAutoScalerC } @VisibleForTesting - protected Instant getJobUpdateTs(JobDetailsInfo jobDetailsInfo) { - return Instant.ofEpochMilli( - jobDetailsInfo.getTimestamps().values().stream().max(Long::compare).get()); + protected Instant getJobRunningTs(JobDetailsInfo jobDetailsInfo) { + final Map<JobStatus, Long> timestamps = jobDetailsInfo.getTimestamps(); + + final Long runningTs = timestamps.get(JobStatus.RUNNING); + checkState(runningTs != null, "Unable to find when the job was switched to RUNNING."); + return Instant.ofEpochMilli(runningTs); } protected JobTopology getJobTopology( diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricCollectorTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricCollectorTest.java index ed8f852d..730fcf63 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricCollectorTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricCollectorTest.java @@ -95,7 +95,7 @@ public class ScalingMetricCollectorTest { Map.of(), new JobPlanInfo.RawJson("")); var metricsCollector = new RestApiMetricsCollector<>(); - assertEquals(Instant.ofEpochMilli(3L), metricsCollector.getJobUpdateTs(details)); + assertEquals(Instant.ofEpochMilli(3L), metricsCollector.getJobRunningTs(details)); } @Test diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/TestingMetricsCollector.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/TestingMetricsCollector.java index 1731bc64..d5a1b2f2 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/TestingMetricsCollector.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/TestingMetricsCollector.java @@ -97,7 +97,7 @@ public class TestingMetricsCollector<KEY, Context extends JobAutoScalerContext<K } @Override - protected Instant getJobUpdateTs(JobDetailsInfo jobDetailsInfo) { + protected Instant getJobRunningTs(JobDetailsInfo jobDetailsInfo) { return jobUpdateTs; } }
