This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit 2bd6b1b38171ce821509d46d687291b876f72a2e
Author: Rui Fan <[email protected]>
AuthorDate: Mon Jan 22 15:24:40 2024 +0800

    [FLINK-34178][autoscaler] Fix the bug that observed scaling restart time is 
always great than `stabilization.interval`
---
 .../apache/flink/autoscaler/JobAutoScalerImpl.java | 22 +++++++++++-----------
 .../flink/autoscaler/ScalingMetricCollector.java   |  4 ++--
 .../apache/flink/autoscaler/ScalingTracking.java   | 12 +++++++-----
 .../autoscaler/metrics/CollectedMetricHistory.java |  1 +
 .../flink/autoscaler/BacklogBasedScalingTest.java  |  5 ++++-
 .../autoscaler/ScalingMetricEvaluatorTest.java     | 20 ++++++++++----------
 6 files changed, 35 insertions(+), 29 deletions(-)

diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
index 04ab36e7..b3839ff4 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
@@ -160,14 +160,23 @@ public class JobAutoScalerImpl<KEY, Context extends 
JobAutoScalerContext<KEY>>
         var collectedMetrics = metricsCollector.updateMetrics(ctx, stateStore);
         var jobTopology = collectedMetrics.getJobTopology();
 
+        var now = clock.instant();
+        var scalingTracking = getTrimmedScalingTracking(stateStore, ctx, now);
+        var scalingHistory = getTrimmedScalingHistory(stateStore, ctx, now);
+        // A scaling tracking without an end time gets created whenever a 
scaling decision is
+        // applied. Here, we record the end time for it (runScalingLogic is 
only called when the job
+        // transitions back into the RUNNING state).
+        if 
(scalingTracking.recordRestartDurationIfTrackedAndParallelismMatches(
+                collectedMetrics.getJobRunningTs(), jobTopology, 
scalingHistory)) {
+            stateStore.storeScalingTracking(ctx, scalingTracking);
+        }
+
         if (collectedMetrics.getMetricHistory().isEmpty()) {
             return;
         }
         LOG.debug("Collected metrics: {}", collectedMetrics);
 
-        var now = clock.instant();
         // Scaling tracking data contains previous restart times that are 
taken into account
-        var scalingTracking = getTrimmedScalingTracking(stateStore, ctx, now);
         var restartTime = 
scalingTracking.getMaxRestartTimeOrDefault(ctx.getConfiguration());
         var evaluatedMetrics =
                 evaluator.evaluate(ctx.getConfiguration(), collectedMetrics, 
restartTime);
@@ -179,15 +188,6 @@ public class JobAutoScalerImpl<KEY, Context extends 
JobAutoScalerContext<KEY>>
                 jobTopology.getVerticesInTopologicalOrder(),
                 () -> lastEvaluatedMetrics.get(ctx.getJobKey()));
 
-        var scalingHistory = getTrimmedScalingHistory(stateStore, ctx, now);
-        // A scaling tracking without an end time gets created whenever a 
scaling decision is
-        // applied. Here, we record the end time for it (runScalingLogic is 
only called when the job
-        // transitions back into the RUNNING state).
-        if 
(scalingTracking.recordRestartDurationIfTrackedAndParallelismMatches(
-                now, jobTopology, scalingHistory)) {
-            stateStore.storeScalingTracking(ctx, scalingTracking);
-        }
-
         if (!collectedMetrics.isFullyCollected()) {
             // We have done an upfront evaluation, but we are not ready for 
scaling.
             resetRecommendedParallelism(evaluatedMetrics.getVertexMetrics());
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java
index fce91ea4..ed5a2e47 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java
@@ -149,10 +149,10 @@ public abstract class ScalingMetricCollector<KEY, Context 
extends JobAutoScalerC
         if (isStabilizing) {
             LOG.info("Stabilizing until {}", readable(stableTime));
             stateStore.storeCollectedMetrics(ctx, metricHistory);
-            return new CollectedMetricHistory(topology, 
Collections.emptySortedMap());
+            return new CollectedMetricHistory(topology, 
Collections.emptySortedMap(), jobRunningTs);
         }
 
-        var collectedMetrics = new CollectedMetricHistory(topology, 
metricHistory);
+        var collectedMetrics = new CollectedMetricHistory(topology, 
metricHistory, jobRunningTs);
         if (now.isBefore(windowFullTime)) {
             LOG.info("Metric window not full until {}", 
readable(windowFullTime));
         } else {
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java
index 0f11445b..af8a2f40 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java
@@ -69,7 +69,8 @@ public class ScalingTracking {
      * Sets restart duration for the latest scaling record if its parallelism 
matches the current
      * job parallelism.
      *
-     * @param now The instant to be used as the end time when calculating the 
restart duration.
+     * @param jobRunningTs The instant when the JobStatus is switched to 
RUNNING, it will be used as
+     *     the end time when calculating the restart duration.
      * @param jobTopology The current job topology containing details of the 
job's parallelism.
      * @param scalingHistory The scaling history.
      * @return true if the restart duration is successfully recorded, false if 
the restart duration
@@ -77,7 +78,7 @@ public class ScalingTracking {
      *     not match the actual parallelism.
      */
     public boolean recordRestartDurationIfTrackedAndParallelismMatches(
-            Instant now,
+            Instant jobRunningTs,
             JobTopology jobTopology,
             Map<JobVertexID, SortedMap<Instant, ScalingSummary>> 
scalingHistory) {
         return getLatestScalingRecordEntry()
@@ -94,12 +95,13 @@ public class ScalingTracking {
                                 if (targetParallelismMatchesActual(
                                         targetParallelism, actualParallelism)) 
{
                                     value.setRestartDuration(
-                                            Duration.between(scalingTimestamp, 
now));
+                                            Duration.between(scalingTimestamp, 
jobRunningTs));
                                     LOG.debug(
                                             "Recorded restart duration of {} 
seconds (from {} till {})",
-                                            Duration.between(scalingTimestamp, 
now).getSeconds(),
+                                            Duration.between(scalingTimestamp, 
jobRunningTs)
+                                                    .getSeconds(),
                                             scalingTimestamp,
-                                            now);
+                                            jobRunningTs);
                                     return true;
                                 }
                             } else {
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CollectedMetricHistory.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CollectedMetricHistory.java
index 31a19879..442c6666 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CollectedMetricHistory.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CollectedMetricHistory.java
@@ -30,5 +30,6 @@ import java.util.SortedMap;
 public class CollectedMetricHistory {
     final JobTopology jobTopology;
     final SortedMap<Instant, CollectedMetrics> metricHistory;
+    final Instant jobRunningTs;
     @Setter private boolean fullyCollected;
 }
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java
index d48f059b..bd71dba1 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java
@@ -398,9 +398,12 @@ public class BacklogBasedScalingTest {
                 new JobTopology(
                         new VertexInfo(source1, Set.of(), 4, 720),
                         new VertexInfo(sink, Set.of(source1), 4, 720)));
+
+        var expectedEndTime = Instant.ofEpochMilli(10);
+        metricsCollector.setJobUpdateTs(expectedEndTime);
         autoscaler.scale(context);
 
-        assertLastTrackingEndTimeIs(now);
+        assertLastTrackingEndTimeIs(expectedEndTime);
     }
 
     private void assertLastTrackingEndTimeIs(Instant expectedEndTime) throws 
Exception {
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
index 260687e0..0bfbd989 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
@@ -118,7 +118,7 @@ public class ScalingMetricEvaluatorTest {
                 evaluator
                         .evaluate(
                                 conf,
-                                new CollectedMetricHistory(topology, 
metricHistory),
+                                new CollectedMetricHistory(topology, 
metricHistory, Instant.now()),
                                 Duration.ZERO)
                         .getVertexMetrics();
 
@@ -152,7 +152,7 @@ public class ScalingMetricEvaluatorTest {
                 evaluator
                         .evaluate(
                                 conf,
-                                new CollectedMetricHistory(topology, 
metricHistory),
+                                new CollectedMetricHistory(topology, 
metricHistory, Instant.now()),
                                 Duration.ZERO)
                         .getVertexMetrics();
         assertEquals(
@@ -175,7 +175,7 @@ public class ScalingMetricEvaluatorTest {
                 evaluator
                         .evaluate(
                                 conf,
-                                new CollectedMetricHistory(topology, 
metricHistory),
+                                new CollectedMetricHistory(topology, 
metricHistory, Instant.now()),
                                 Duration.ZERO)
                         .getVertexMetrics();
         assertEquals(
@@ -197,7 +197,7 @@ public class ScalingMetricEvaluatorTest {
                 evaluator
                         .evaluate(
                                 conf,
-                                new CollectedMetricHistory(topology, 
metricHistory),
+                                new CollectedMetricHistory(topology, 
metricHistory, Instant.now()),
                                 Duration.ZERO)
                         .getVertexMetrics();
         assertEquals(
@@ -243,7 +243,7 @@ public class ScalingMetricEvaluatorTest {
                 evaluator
                         .evaluate(
                                 conf,
-                                new CollectedMetricHistory(topology, 
metricHistory),
+                                new CollectedMetricHistory(topology, 
metricHistory, Instant.now()),
                                 Duration.ZERO)
                         .getVertexMetrics();
         assertEquals(
@@ -461,7 +461,7 @@ public class ScalingMetricEvaluatorTest {
                 evaluator
                         .evaluate(
                                 conf,
-                                new CollectedMetricHistory(topology, 
metricHistory),
+                                new CollectedMetricHistory(topology, 
metricHistory, Instant.now()),
                                 restartTime)
                         .getVertexMetrics()
                         .get(source)
@@ -476,7 +476,7 @@ public class ScalingMetricEvaluatorTest {
                 evaluator
                         .evaluate(
                                 conf,
-                                new CollectedMetricHistory(topology, 
metricHistory),
+                                new CollectedMetricHistory(topology, 
metricHistory, Instant.now()),
                                 restartTime)
                         .getVertexMetrics()
                         .get(source)
@@ -489,7 +489,7 @@ public class ScalingMetricEvaluatorTest {
                 evaluator
                         .evaluate(
                                 conf,
-                                new CollectedMetricHistory(topology, 
metricHistory),
+                                new CollectedMetricHistory(topology, 
metricHistory, Instant.now()),
                                 restartTime)
                         .getVertexMetrics()
                         .get(source)
@@ -531,7 +531,7 @@ public class ScalingMetricEvaluatorTest {
                 evaluator
                         .evaluate(
                                 conf,
-                                new CollectedMetricHistory(topology, 
metricHistory),
+                                new CollectedMetricHistory(topology, 
metricHistory, Instant.now()),
                                 restartTime)
                         .getVertexMetrics()
                         .get(source)
@@ -562,7 +562,7 @@ public class ScalingMetricEvaluatorTest {
                 evaluator
                         .evaluate(
                                 conf,
-                                new CollectedMetricHistory(topology, 
metricHistory),
+                                new CollectedMetricHistory(topology, 
metricHistory, Instant.now()),
                                 restartTime)
                         .getVertexMetrics()
                         .get(source)

Reply via email to