This is an automated email from the ASF dual-hosted git repository.
mxm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new f6496f58 [FLINK-30593][autoscaler] Improve restart time tracking (#735)
f6496f58 is described below
commit f6496f582ff4ea6ab177bcff7988f3ab2e50df60
Author: Alexander Fedulov <[email protected]>
AuthorDate: Wed Jan 10 11:34:16 2024 +0100
[FLINK-30593][autoscaler] Improve restart time tracking (#735)
- Adds more debug logs
- Stores restart Duration directly instead of the endTime Instant
- Fixes a bug that makes restart duration tracking dependent on whether
metrics are considered fully collected
---
.../apache/flink/autoscaler/JobAutoScalerImpl.java | 19 +++++-----
.../apache/flink/autoscaler/ScalingExecutor.java | 1 +
.../flink/autoscaler/ScalingMetricEvaluator.java | 2 +-
.../org/apache/flink/autoscaler/ScalingRecord.java | 4 +--
.../apache/flink/autoscaler/ScalingTracking.java | 42 ++++++++++++++--------
.../flink/autoscaler/BacklogBasedScalingTest.java | 10 ++++--
.../autoscaler/ScalingMetricEvaluatorTest.java | 6 ++--
.../flink/autoscaler/ScalingTrackingTest.java | 39 ++++++++++++--------
8 files changed, 74 insertions(+), 49 deletions(-)
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
index 2d477145..04ab36e7 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
@@ -179,22 +179,21 @@ public class JobAutoScalerImpl<KEY, Context extends
JobAutoScalerContext<KEY>>
jobTopology.getVerticesInTopologicalOrder(),
() -> lastEvaluatedMetrics.get(ctx.getJobKey()));
+ var scalingHistory = getTrimmedScalingHistory(stateStore, ctx, now);
+ // A scaling tracking without an end time gets created whenever a
scaling decision is
+ // applied. Here, we record the end time for it (runScalingLogic is
only called when the job
+ // transitions back into the RUNNING state).
+ if
(scalingTracking.recordRestartDurationIfTrackedAndParallelismMatches(
+ now, jobTopology, scalingHistory)) {
+ stateStore.storeScalingTracking(ctx, scalingTracking);
+ }
+
if (!collectedMetrics.isFullyCollected()) {
// We have done an upfront evaluation, but we are not ready for
scaling.
resetRecommendedParallelism(evaluatedMetrics.getVertexMetrics());
return;
}
- var scalingHistory = getTrimmedScalingHistory(stateStore, ctx, now);
- // A scaling tracking without an end time gets created whenever a
scaling decision is
- // applied. Here, when the job transitions to RUNNING, we record the
time for it.
- if (ctx.getJobStatus() == JobStatus.RUNNING) {
- if (scalingTracking.setEndTimeIfTrackedAndParallelismMatches(
- now, jobTopology, scalingHistory)) {
- stateStore.storeScalingTracking(ctx, scalingTracking);
- }
- }
-
var parallelismChanged =
scalingExecutor.scaleResource(
ctx, evaluatedMetrics, scalingHistory,
scalingTracking, now);
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
index 5d5b0186..680a4135 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
@@ -177,6 +177,7 @@ public class ScalingExecutor<KEY, Context extends
JobAutoScalerContext<KEY>> {
EvaluatedMetrics evaluatedMetrics,
Map<JobVertexID, SortedMap<Instant, ScalingSummary>>
scalingHistory,
Duration restartTime) {
+ LOG.debug("Restart time used in scaling summary computation: {}",
restartTime);
if (isJobUnderMemoryPressure(context,
evaluatedMetrics.getGlobalMetrics())) {
LOG.info("Skipping vertex scaling due to memory pressure");
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
index ef2bfa17..e4c5f0d4 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
@@ -69,7 +69,7 @@ public class ScalingMetricEvaluator {
public EvaluatedMetrics evaluate(
Configuration conf, CollectedMetricHistory collectedMetrics,
Duration restartTime) {
-
+ LOG.debug("Restart time used in metrics evaluation: {}", restartTime);
var scalingOutput = new HashMap<JobVertexID, Map<ScalingMetric,
EvaluatedScalingMetric>>();
var metricsHistory = collectedMetrics.getMetricHistory();
var topology = collectedMetrics.getJobTopology();
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingRecord.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingRecord.java
index b4eba37a..b36e2e3b 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingRecord.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingRecord.java
@@ -21,7 +21,7 @@ import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
-import java.time.Instant;
+import java.time.Duration;
/**
* Class for tracking scaling details, including time it took for the job to
transition to the
@@ -31,5 +31,5 @@ import java.time.Instant;
@NoArgsConstructor
@AllArgsConstructor
public class ScalingRecord {
- private Instant endTime;
+ private Duration restartDuration;
}
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java
index bb86d3b1..064bc68d 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java
@@ -65,17 +65,17 @@ public class ScalingTracking {
}
/**
- * Sets the end time for the latest scaling record if its parallelism
matches the current job
- * parallelism.
+ * Sets restart duration for the latest scaling record if its parallelism
matches the current
+ * job parallelism.
*
- * @param now The current instant to be set as the end time of the scaling
record.
+ * @param now The instant to be used as the end time when calculating the
restart duration.
* @param jobTopology The current job topology containing details of the
job's parallelism.
* @param scalingHistory The scaling history.
- * @return true if the end time is successfully set, false if the end time
is already set, the
- * latest scaling record cannot be found, or the target parallelism
does not match the
- * actual parallelism.
+ * @return true if the restart duration is successfully recorded, false if
the restart duration
+ * is already set, the latest scaling record cannot be found, or the
target parallelism does
+ * not match the actual parallelism.
*/
- public boolean setEndTimeIfTrackedAndParallelismMatches(
+ public boolean recordRestartDurationIfTrackedAndParallelismMatches(
Instant now,
JobTopology jobTopology,
Map<JobVertexID, SortedMap<Instant, ScalingSummary>>
scalingHistory) {
@@ -84,7 +84,7 @@ public class ScalingTracking {
entry -> {
var value = entry.getValue();
var scalingTimestamp = entry.getKey();
- if (value.getEndTime() == null) {
+ if (value.getRestartDuration() == null) {
var targetParallelism =
getTargetParallelismOfScaledVertices(
scalingTimestamp,
scalingHistory);
@@ -92,7 +92,8 @@ public class ScalingTracking {
if (targetParallelismMatchesActual(
targetParallelism, actualParallelism))
{
- value.setEndTime(now);
+ value.setRestartDuration(
+ Duration.between(scalingTimestamp,
now));
LOG.debug(
"Recorded restart duration of {}
seconds (from {} till {})",
Duration.between(scalingTimestamp,
now).getSeconds(),
@@ -100,6 +101,10 @@ public class ScalingTracking {
now);
return true;
}
+ } else {
+ LOG.debug(
+ "Cannot record restart duration
because already set in the latest record: {}",
+ value.getRestartDuration());
}
return false;
})
@@ -129,7 +134,15 @@ public class ScalingTracking {
var vertexID = entry.getKey();
var targetParallelism = entry.getValue();
var actualParallelism =
actualParallelisms.getOrDefault(vertexID, -1);
- return actualParallelism.equals(targetParallelism);
+ boolean isEqual =
actualParallelism.equals(targetParallelism);
+ if (!isEqual) {
+ LOG.debug(
+ "Vertex {} actual parallelism {} does
not match target parallelism {}",
+ vertexID,
+ actualParallelism,
+ targetParallelism);
+ }
+ return isEqual;
});
}
@@ -143,13 +156,12 @@ public class ScalingTracking {
long maxRestartTime = -1;
if (conf.get(AutoScalerOptions.PREFER_TRACKED_RESTART_TIME)) {
for (Map.Entry<Instant, ScalingRecord> entry :
scalingRecords.entrySet()) {
- var startTime = entry.getKey();
- var endTime = entry.getValue().getEndTime();
- if (endTime != null) {
- var restartTime = Duration.between(startTime,
endTime).toSeconds();
- maxRestartTime = Math.max(restartTime, maxRestartTime);
+ var restartDuration = entry.getValue().getRestartDuration();
+ if (restartDuration != null) {
+ maxRestartTime = Math.max(restartDuration.toSeconds(),
maxRestartTime);
}
}
+ LOG.debug("Maximum tracked restart time: {}", maxRestartTime);
}
var restartTimeFromConfig = conf.get(AutoScalerOptions.RESTART_TIME);
long maxRestartTimeFromConfig =
diff --git
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java
index 0ec2ae8c..d48f059b 100644
---
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java
+++
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java
@@ -346,7 +346,7 @@ public class BacklogBasedScalingTest {
}
@Test
- public void shouldTrackEndOfScalingTimeCorrectly() throws Exception {
+ public void shouldTrackRestartDurationCorrectly() throws Exception {
var now = Instant.ofEpochMilli(0);
setClocksTo(now);
metricsCollector.setJobUpdateTs(now);
@@ -406,7 +406,13 @@ public class BacklogBasedScalingTest {
private void assertLastTrackingEndTimeIs(Instant expectedEndTime) throws
Exception {
var scalingTracking = stateStore.getScalingTracking(context);
var latestScalingRecordEntry =
scalingTracking.getLatestScalingRecordEntry().get();
-
assertThat(latestScalingRecordEntry.getValue().getEndTime()).isEqualTo(expectedEndTime);
+ var startTime = latestScalingRecordEntry.getKey();
+ var restartDuration =
latestScalingRecordEntry.getValue().getRestartDuration();
+ if (expectedEndTime == null) {
+ assertThat(restartDuration).isNull();
+ } else {
+ assertThat(restartDuration).isEqualTo(Duration.between(startTime,
expectedEndTime));
+ }
}
@Test
diff --git
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
index 07662bae..8202cf37 100644
---
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
+++
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
@@ -288,11 +288,9 @@ public class ScalingMetricEvaluatorTest {
var scalingTracking = new ScalingTracking();
scalingTracking.addScalingRecord(
- Instant.parse("2023-11-15T16:00:00.00Z"),
- new ScalingRecord(Instant.parse("2023-11-15T16:03:00.00Z")));
+ Instant.parse("2023-11-15T16:00:00.00Z"), new
ScalingRecord(Duration.ofMinutes(3)));
scalingTracking.addScalingRecord(
- Instant.parse("2023-11-15T16:20:00.00Z"),
- new ScalingRecord(Instant.parse("2023-11-15T16:25:00.00Z")));
+ Instant.parse("2023-11-15T16:20:00.00Z"), new
ScalingRecord(Duration.ofMinutes(5)));
var restartTimeSec = scalingTracking.getMaxRestartTimeOrDefault(conf);
// Restart time does not factor in
diff --git
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingTrackingTest.java
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingTrackingTest.java
index 6a5fcba6..85cb65d8 100644
---
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingTrackingTest.java
+++
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingTrackingTest.java
@@ -106,29 +106,35 @@ class ScalingTrackingTest {
}
@Test
- void shouldSetEndTime_WhenParallelismMatches() {
+ void shouldSetRestartDuration_WhenParallelismMatches() {
var now = Instant.now();
- var lastScaling = now.minusSeconds(60);
- addScalingRecordWithoutEndTime(lastScaling);
+ var restartDuration = Duration.ofSeconds(60);
+ var lastScaling = now.minusSeconds(restartDuration.getSeconds());
+ addScalingRecordWithoutRestartDuration(lastScaling);
var actualParallelisms = initActualParallelisms();
var jobTopology = new
JobTopology(createVertexInfoSet(actualParallelisms));
var scalingHistory =
initScalingHistoryWithTargetParallelism(lastScaling,
actualParallelisms);
boolean result =
- scalingTracking.setEndTimeIfTrackedAndParallelismMatches(
+
scalingTracking.recordRestartDurationIfTrackedAndParallelismMatches(
now, jobTopology, scalingHistory);
assertThat(result).isTrue();
-
assertThat(scalingTracking.getLatestScalingRecordEntry().get().getValue().getEndTime())
- .isEqualTo(now);
+ assertThat(
+ scalingTracking
+ .getLatestScalingRecordEntry()
+ .get()
+ .getValue()
+ .getRestartDuration())
+ .isEqualTo(restartDuration);
}
@Test
- void shouldNotSetEndTime_WhenParallelismDoesNotMatch() {
+ void shouldNotSetRestartDuration_WhenParallelismDoesNotMatch() {
var now = Instant.now();
var lastScaling = now.minusSeconds(60);
- addScalingRecordWithoutEndTime(lastScaling);
+ addScalingRecordWithoutRestartDuration(lastScaling);
var actualParallelisms = initActualParallelisms();
var jobTopology = new
JobTopology(createVertexInfoSet(actualParallelisms));
var mismatchedParallelisms = new HashMap<>(actualParallelisms);
@@ -137,11 +143,16 @@ class ScalingTrackingTest {
initScalingHistoryWithTargetParallelism(lastScaling,
mismatchedParallelisms);
boolean result =
- scalingTracking.setEndTimeIfTrackedAndParallelismMatches(
+
scalingTracking.recordRestartDurationIfTrackedAndParallelismMatches(
now, jobTopology, scalingHistory);
assertThat(result).isFalse();
-
assertThat(scalingTracking.getLatestScalingRecordEntry().get().getValue().getEndTime())
+ assertThat(
+ scalingTracking
+ .getLatestScalingRecordEntry()
+ .get()
+ .getValue()
+ .getRestartDuration())
.isNull();
}
@@ -183,15 +194,13 @@ class ScalingTrackingTest {
private void setUpScalingRecords(Duration secondRescaleDuration) {
scalingTracking.addScalingRecord(
- Instant.parse("2023-11-15T16:00:00.00Z"),
- new ScalingRecord(Instant.parse("2023-11-15T16:03:00.00Z")));
+ Instant.parse("2023-11-15T16:00:00.00Z"), new
ScalingRecord(Duration.ofMinutes(3)));
var secondRecordStart = Instant.parse("2023-11-15T16:20:00.00Z");
scalingTracking.addScalingRecord(
- secondRecordStart,
- new
ScalingRecord(secondRecordStart.plus(secondRescaleDuration)));
+ secondRecordStart, new ScalingRecord(secondRescaleDuration));
}
- private void addScalingRecordWithoutEndTime(Instant startTime) {
+ private void addScalingRecordWithoutRestartDuration(Instant startTime) {
ScalingRecord record = new ScalingRecord();
scalingTracking.addScalingRecord(startTime, record);
}