This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new f6496f58 [FLINK-30593][autoscaler] Improve restart time tracking (#735)
f6496f58 is described below

commit f6496f582ff4ea6ab177bcff7988f3ab2e50df60
Author: Alexander Fedulov <[email protected]>
AuthorDate: Wed Jan 10 11:34:16 2024 +0100

    [FLINK-30593][autoscaler] Improve restart time tracking (#735)
    
    - Adds more debug logs
    - Stores restart Duration directly instead of the endTime Instant
    - Fixes a bug that makes restart duration tracking dependent on whether 
metrics are considered fully collected
---
 .../apache/flink/autoscaler/JobAutoScalerImpl.java | 19 +++++-----
 .../apache/flink/autoscaler/ScalingExecutor.java   |  1 +
 .../flink/autoscaler/ScalingMetricEvaluator.java   |  2 +-
 .../org/apache/flink/autoscaler/ScalingRecord.java |  4 +--
 .../apache/flink/autoscaler/ScalingTracking.java   | 42 ++++++++++++++--------
 .../flink/autoscaler/BacklogBasedScalingTest.java  | 10 ++++--
 .../autoscaler/ScalingMetricEvaluatorTest.java     |  6 ++--
 .../flink/autoscaler/ScalingTrackingTest.java      | 39 ++++++++++++--------
 8 files changed, 74 insertions(+), 49 deletions(-)

diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
index 2d477145..04ab36e7 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
@@ -179,22 +179,21 @@ public class JobAutoScalerImpl<KEY, Context extends 
JobAutoScalerContext<KEY>>
                 jobTopology.getVerticesInTopologicalOrder(),
                 () -> lastEvaluatedMetrics.get(ctx.getJobKey()));
 
+        var scalingHistory = getTrimmedScalingHistory(stateStore, ctx, now);
+        // A scaling tracking without an end time gets created whenever a 
scaling decision is
+        // applied. Here, we record the end time for it (runScalingLogic is 
only called when the job
+        // transitions back into the RUNNING state).
+        if 
(scalingTracking.recordRestartDurationIfTrackedAndParallelismMatches(
+                now, jobTopology, scalingHistory)) {
+            stateStore.storeScalingTracking(ctx, scalingTracking);
+        }
+
         if (!collectedMetrics.isFullyCollected()) {
             // We have done an upfront evaluation, but we are not ready for 
scaling.
             resetRecommendedParallelism(evaluatedMetrics.getVertexMetrics());
             return;
         }
 
-        var scalingHistory = getTrimmedScalingHistory(stateStore, ctx, now);
-        // A scaling tracking without an end time gets created whenever a 
scaling decision is
-        // applied. Here, when the job transitions to RUNNING, we record the 
time for it.
-        if (ctx.getJobStatus() == JobStatus.RUNNING) {
-            if (scalingTracking.setEndTimeIfTrackedAndParallelismMatches(
-                    now, jobTopology, scalingHistory)) {
-                stateStore.storeScalingTracking(ctx, scalingTracking);
-            }
-        }
-
         var parallelismChanged =
                 scalingExecutor.scaleResource(
                         ctx, evaluatedMetrics, scalingHistory, 
scalingTracking, now);
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
index 5d5b0186..680a4135 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
@@ -177,6 +177,7 @@ public class ScalingExecutor<KEY, Context extends 
JobAutoScalerContext<KEY>> {
             EvaluatedMetrics evaluatedMetrics,
             Map<JobVertexID, SortedMap<Instant, ScalingSummary>> 
scalingHistory,
             Duration restartTime) {
+        LOG.debug("Restart time used in scaling summary computation: {}", 
restartTime);
 
         if (isJobUnderMemoryPressure(context, 
evaluatedMetrics.getGlobalMetrics())) {
             LOG.info("Skipping vertex scaling due to memory pressure");
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
index ef2bfa17..e4c5f0d4 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
@@ -69,7 +69,7 @@ public class ScalingMetricEvaluator {
 
     public EvaluatedMetrics evaluate(
             Configuration conf, CollectedMetricHistory collectedMetrics, 
Duration restartTime) {
-
+        LOG.debug("Restart time used in metrics evaluation: {}", restartTime);
         var scalingOutput = new HashMap<JobVertexID, Map<ScalingMetric, 
EvaluatedScalingMetric>>();
         var metricsHistory = collectedMetrics.getMetricHistory();
         var topology = collectedMetrics.getJobTopology();
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingRecord.java 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingRecord.java
index b4eba37a..b36e2e3b 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingRecord.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingRecord.java
@@ -21,7 +21,7 @@ import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 
-import java.time.Instant;
+import java.time.Duration;
 
 /**
  * Class for tracking scaling details, including time it took for the job to 
transition to the
@@ -31,5 +31,5 @@ import java.time.Instant;
 @NoArgsConstructor
 @AllArgsConstructor
 public class ScalingRecord {
-    private Instant endTime;
+    private Duration restartDuration;
 }
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java
index bb86d3b1..064bc68d 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java
@@ -65,17 +65,17 @@ public class ScalingTracking {
     }
 
     /**
-     * Sets the end time for the latest scaling record if its parallelism 
matches the current job
-     * parallelism.
+     * Sets restart duration for the latest scaling record if its parallelism 
matches the current
+     * job parallelism.
      *
-     * @param now The current instant to be set as the end time of the scaling 
record.
+     * @param now The instant to be used as the end time when calculating the 
restart duration.
      * @param jobTopology The current job topology containing details of the 
job's parallelism.
      * @param scalingHistory The scaling history.
-     * @return true if the end time is successfully set, false if the end time 
is already set, the
-     *     latest scaling record cannot be found, or the target parallelism 
does not match the
-     *     actual parallelism.
+     * @return true if the restart duration is successfully recorded, false if 
the restart duration
+     *     is already set, the latest scaling record cannot be found, or the 
target parallelism does
+     *     not match the actual parallelism.
      */
-    public boolean setEndTimeIfTrackedAndParallelismMatches(
+    public boolean recordRestartDurationIfTrackedAndParallelismMatches(
             Instant now,
             JobTopology jobTopology,
             Map<JobVertexID, SortedMap<Instant, ScalingSummary>> 
scalingHistory) {
@@ -84,7 +84,7 @@ public class ScalingTracking {
                         entry -> {
                             var value = entry.getValue();
                             var scalingTimestamp = entry.getKey();
-                            if (value.getEndTime() == null) {
+                            if (value.getRestartDuration() == null) {
                                 var targetParallelism =
                                         getTargetParallelismOfScaledVertices(
                                                 scalingTimestamp, 
scalingHistory);
@@ -92,7 +92,8 @@ public class ScalingTracking {
 
                                 if (targetParallelismMatchesActual(
                                         targetParallelism, actualParallelism)) 
{
-                                    value.setEndTime(now);
+                                    value.setRestartDuration(
+                                            Duration.between(scalingTimestamp, 
now));
                                     LOG.debug(
                                             "Recorded restart duration of {} 
seconds (from {} till {})",
                                             Duration.between(scalingTimestamp, 
now).getSeconds(),
@@ -100,6 +101,10 @@ public class ScalingTracking {
                                             now);
                                     return true;
                                 }
+                            } else {
+                                LOG.debug(
+                                        "Cannot record restart duration 
because already set in the latest record: {}",
+                                        value.getRestartDuration());
                             }
                             return false;
                         })
@@ -129,7 +134,15 @@ public class ScalingTracking {
                             var vertexID = entry.getKey();
                             var targetParallelism = entry.getValue();
                             var actualParallelism = 
actualParallelisms.getOrDefault(vertexID, -1);
-                            return actualParallelism.equals(targetParallelism);
+                            boolean isEqual = 
actualParallelism.equals(targetParallelism);
+                            if (!isEqual) {
+                                LOG.debug(
+                                        "Vertex {} actual parallelism {} does 
not match target parallelism {}",
+                                        vertexID,
+                                        actualParallelism,
+                                        targetParallelism);
+                            }
+                            return isEqual;
                         });
     }
 
@@ -143,13 +156,12 @@ public class ScalingTracking {
         long maxRestartTime = -1;
         if (conf.get(AutoScalerOptions.PREFER_TRACKED_RESTART_TIME)) {
             for (Map.Entry<Instant, ScalingRecord> entry : 
scalingRecords.entrySet()) {
-                var startTime = entry.getKey();
-                var endTime = entry.getValue().getEndTime();
-                if (endTime != null) {
-                    var restartTime = Duration.between(startTime, 
endTime).toSeconds();
-                    maxRestartTime = Math.max(restartTime, maxRestartTime);
+                var restartDuration = entry.getValue().getRestartDuration();
+                if (restartDuration != null) {
+                    maxRestartTime = Math.max(restartDuration.toSeconds(), 
maxRestartTime);
                 }
             }
+            LOG.debug("Maximum tracked restart time: {}", maxRestartTime);
         }
         var restartTimeFromConfig = conf.get(AutoScalerOptions.RESTART_TIME);
         long maxRestartTimeFromConfig =
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java
index 0ec2ae8c..d48f059b 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java
@@ -346,7 +346,7 @@ public class BacklogBasedScalingTest {
     }
 
     @Test
-    public void shouldTrackEndOfScalingTimeCorrectly() throws Exception {
+    public void shouldTrackRestartDurationCorrectly() throws Exception {
         var now = Instant.ofEpochMilli(0);
         setClocksTo(now);
         metricsCollector.setJobUpdateTs(now);
@@ -406,7 +406,13 @@ public class BacklogBasedScalingTest {
     private void assertLastTrackingEndTimeIs(Instant expectedEndTime) throws 
Exception {
         var scalingTracking = stateStore.getScalingTracking(context);
         var latestScalingRecordEntry = 
scalingTracking.getLatestScalingRecordEntry().get();
-        
assertThat(latestScalingRecordEntry.getValue().getEndTime()).isEqualTo(expectedEndTime);
+        var startTime = latestScalingRecordEntry.getKey();
+        var restartDuration = 
latestScalingRecordEntry.getValue().getRestartDuration();
+        if (expectedEndTime == null) {
+            assertThat(restartDuration).isNull();
+        } else {
+            assertThat(restartDuration).isEqualTo(Duration.between(startTime, 
expectedEndTime));
+        }
     }
 
     @Test
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
index 07662bae..8202cf37 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
@@ -288,11 +288,9 @@ public class ScalingMetricEvaluatorTest {
 
         var scalingTracking = new ScalingTracking();
         scalingTracking.addScalingRecord(
-                Instant.parse("2023-11-15T16:00:00.00Z"),
-                new ScalingRecord(Instant.parse("2023-11-15T16:03:00.00Z")));
+                Instant.parse("2023-11-15T16:00:00.00Z"), new 
ScalingRecord(Duration.ofMinutes(3)));
         scalingTracking.addScalingRecord(
-                Instant.parse("2023-11-15T16:20:00.00Z"),
-                new ScalingRecord(Instant.parse("2023-11-15T16:25:00.00Z")));
+                Instant.parse("2023-11-15T16:20:00.00Z"), new 
ScalingRecord(Duration.ofMinutes(5)));
 
         var restartTimeSec = scalingTracking.getMaxRestartTimeOrDefault(conf);
         // Restart time does not factor in
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingTrackingTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingTrackingTest.java
index 6a5fcba6..85cb65d8 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingTrackingTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingTrackingTest.java
@@ -106,29 +106,35 @@ class ScalingTrackingTest {
     }
 
     @Test
-    void shouldSetEndTime_WhenParallelismMatches() {
+    void shouldSetRestartDuration_WhenParallelismMatches() {
         var now = Instant.now();
-        var lastScaling = now.minusSeconds(60);
-        addScalingRecordWithoutEndTime(lastScaling);
+        var restartDuration = Duration.ofSeconds(60);
+        var lastScaling = now.minusSeconds(restartDuration.getSeconds());
+        addScalingRecordWithoutRestartDuration(lastScaling);
         var actualParallelisms = initActualParallelisms();
         var jobTopology = new 
JobTopology(createVertexInfoSet(actualParallelisms));
         var scalingHistory =
                 initScalingHistoryWithTargetParallelism(lastScaling, 
actualParallelisms);
 
         boolean result =
-                scalingTracking.setEndTimeIfTrackedAndParallelismMatches(
+                
scalingTracking.recordRestartDurationIfTrackedAndParallelismMatches(
                         now, jobTopology, scalingHistory);
 
         assertThat(result).isTrue();
-        
assertThat(scalingTracking.getLatestScalingRecordEntry().get().getValue().getEndTime())
-                .isEqualTo(now);
+        assertThat(
+                        scalingTracking
+                                .getLatestScalingRecordEntry()
+                                .get()
+                                .getValue()
+                                .getRestartDuration())
+                .isEqualTo(restartDuration);
     }
 
     @Test
-    void shouldNotSetEndTime_WhenParallelismDoesNotMatch() {
+    void shouldNotSetRestartDuration_WhenParallelismDoesNotMatch() {
         var now = Instant.now();
         var lastScaling = now.minusSeconds(60);
-        addScalingRecordWithoutEndTime(lastScaling);
+        addScalingRecordWithoutRestartDuration(lastScaling);
         var actualParallelisms = initActualParallelisms();
         var jobTopology = new 
JobTopology(createVertexInfoSet(actualParallelisms));
         var mismatchedParallelisms = new HashMap<>(actualParallelisms);
@@ -137,11 +143,16 @@ class ScalingTrackingTest {
                 initScalingHistoryWithTargetParallelism(lastScaling, 
mismatchedParallelisms);
 
         boolean result =
-                scalingTracking.setEndTimeIfTrackedAndParallelismMatches(
+                
scalingTracking.recordRestartDurationIfTrackedAndParallelismMatches(
                         now, jobTopology, scalingHistory);
 
         assertThat(result).isFalse();
-        
assertThat(scalingTracking.getLatestScalingRecordEntry().get().getValue().getEndTime())
+        assertThat(
+                        scalingTracking
+                                .getLatestScalingRecordEntry()
+                                .get()
+                                .getValue()
+                                .getRestartDuration())
                 .isNull();
     }
 
@@ -183,15 +194,13 @@ class ScalingTrackingTest {
 
     private void setUpScalingRecords(Duration secondRescaleDuration) {
         scalingTracking.addScalingRecord(
-                Instant.parse("2023-11-15T16:00:00.00Z"),
-                new ScalingRecord(Instant.parse("2023-11-15T16:03:00.00Z")));
+                Instant.parse("2023-11-15T16:00:00.00Z"), new 
ScalingRecord(Duration.ofMinutes(3)));
         var secondRecordStart = Instant.parse("2023-11-15T16:20:00.00Z");
         scalingTracking.addScalingRecord(
-                secondRecordStart,
-                new 
ScalingRecord(secondRecordStart.plus(secondRescaleDuration)));
+                secondRecordStart, new ScalingRecord(secondRescaleDuration));
     }
 
-    private void addScalingRecordWithoutEndTime(Instant startTime) {
+    private void addScalingRecordWithoutRestartDuration(Instant startTime) {
         ScalingRecord record = new ScalingRecord();
         scalingTracking.addScalingRecord(startTime, record);
     }

Reply via email to