This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit 2bd6b1b38171ce821509d46d687291b876f72a2e Author: Rui Fan <[email protected]> AuthorDate: Mon Jan 22 15:24:40 2024 +0800 [FLINK-34178][autoscaler] Fix the bug that observed scaling restart time is always great than `stabilization.interval` --- .../apache/flink/autoscaler/JobAutoScalerImpl.java | 22 +++++++++++----------- .../flink/autoscaler/ScalingMetricCollector.java | 4 ++-- .../apache/flink/autoscaler/ScalingTracking.java | 12 +++++++----- .../autoscaler/metrics/CollectedMetricHistory.java | 1 + .../flink/autoscaler/BacklogBasedScalingTest.java | 5 ++++- .../autoscaler/ScalingMetricEvaluatorTest.java | 20 ++++++++++---------- 6 files changed, 35 insertions(+), 29 deletions(-) diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java index 04ab36e7..b3839ff4 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java @@ -160,14 +160,23 @@ public class JobAutoScalerImpl<KEY, Context extends JobAutoScalerContext<KEY>> var collectedMetrics = metricsCollector.updateMetrics(ctx, stateStore); var jobTopology = collectedMetrics.getJobTopology(); + var now = clock.instant(); + var scalingTracking = getTrimmedScalingTracking(stateStore, ctx, now); + var scalingHistory = getTrimmedScalingHistory(stateStore, ctx, now); + // A scaling tracking without an end time gets created whenever a scaling decision is + // applied. Here, we record the end time for it (runScalingLogic is only called when the job + // transitions back into the RUNNING state). + if (scalingTracking.recordRestartDurationIfTrackedAndParallelismMatches( + collectedMetrics.getJobRunningTs(), jobTopology, scalingHistory)) { + stateStore.storeScalingTracking(ctx, scalingTracking); + } + if (collectedMetrics.getMetricHistory().isEmpty()) { return; } LOG.debug("Collected metrics: {}", collectedMetrics); - var now = clock.instant(); // Scaling tracking data contains previous restart times that are taken into account - var scalingTracking = getTrimmedScalingTracking(stateStore, ctx, now); var restartTime = scalingTracking.getMaxRestartTimeOrDefault(ctx.getConfiguration()); var evaluatedMetrics = evaluator.evaluate(ctx.getConfiguration(), collectedMetrics, restartTime); @@ -179,15 +188,6 @@ public class JobAutoScalerImpl<KEY, Context extends JobAutoScalerContext<KEY>> jobTopology.getVerticesInTopologicalOrder(), () -> lastEvaluatedMetrics.get(ctx.getJobKey())); - var scalingHistory = getTrimmedScalingHistory(stateStore, ctx, now); - // A scaling tracking without an end time gets created whenever a scaling decision is - // applied. Here, we record the end time for it (runScalingLogic is only called when the job - // transitions back into the RUNNING state). - if (scalingTracking.recordRestartDurationIfTrackedAndParallelismMatches( - now, jobTopology, scalingHistory)) { - stateStore.storeScalingTracking(ctx, scalingTracking); - } - if (!collectedMetrics.isFullyCollected()) { // We have done an upfront evaluation, but we are not ready for scaling. resetRecommendedParallelism(evaluatedMetrics.getVertexMetrics()); diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java index fce91ea4..ed5a2e47 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java @@ -149,10 +149,10 @@ public abstract class ScalingMetricCollector<KEY, Context extends JobAutoScalerC if (isStabilizing) { LOG.info("Stabilizing until {}", readable(stableTime)); stateStore.storeCollectedMetrics(ctx, metricHistory); - return new CollectedMetricHistory(topology, Collections.emptySortedMap()); + return new CollectedMetricHistory(topology, Collections.emptySortedMap(), jobRunningTs); } - var collectedMetrics = new CollectedMetricHistory(topology, metricHistory); + var collectedMetrics = new CollectedMetricHistory(topology, metricHistory, jobRunningTs); if (now.isBefore(windowFullTime)) { LOG.info("Metric window not full until {}", readable(windowFullTime)); } else { diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java index 0f11445b..af8a2f40 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java @@ -69,7 +69,8 @@ public class ScalingTracking { * Sets restart duration for the latest scaling record if its parallelism matches the current * job parallelism. * - * @param now The instant to be used as the end time when calculating the restart duration. + * @param jobRunningTs The instant when the JobStatus is switched to RUNNING, it will be used as + * the end time when calculating the restart duration. * @param jobTopology The current job topology containing details of the job's parallelism. * @param scalingHistory The scaling history. * @return true if the restart duration is successfully recorded, false if the restart duration @@ -77,7 +78,7 @@ public class ScalingTracking { * not match the actual parallelism. */ public boolean recordRestartDurationIfTrackedAndParallelismMatches( - Instant now, + Instant jobRunningTs, JobTopology jobTopology, Map<JobVertexID, SortedMap<Instant, ScalingSummary>> scalingHistory) { return getLatestScalingRecordEntry() @@ -94,12 +95,13 @@ public class ScalingTracking { if (targetParallelismMatchesActual( targetParallelism, actualParallelism)) { value.setRestartDuration( - Duration.between(scalingTimestamp, now)); + Duration.between(scalingTimestamp, jobRunningTs)); LOG.debug( "Recorded restart duration of {} seconds (from {} till {})", - Duration.between(scalingTimestamp, now).getSeconds(), + Duration.between(scalingTimestamp, jobRunningTs) + .getSeconds(), scalingTimestamp, - now); + jobRunningTs); return true; } } else { diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CollectedMetricHistory.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CollectedMetricHistory.java index 31a19879..442c6666 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CollectedMetricHistory.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CollectedMetricHistory.java @@ -30,5 +30,6 @@ import java.util.SortedMap; public class CollectedMetricHistory { final JobTopology jobTopology; final SortedMap<Instant, CollectedMetrics> metricHistory; + final Instant jobRunningTs; @Setter private boolean fullyCollected; } diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java index d48f059b..bd71dba1 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java @@ -398,9 +398,12 @@ public class BacklogBasedScalingTest { new JobTopology( new VertexInfo(source1, Set.of(), 4, 720), new VertexInfo(sink, Set.of(source1), 4, 720))); + + var expectedEndTime = Instant.ofEpochMilli(10); + metricsCollector.setJobUpdateTs(expectedEndTime); autoscaler.scale(context); - assertLastTrackingEndTimeIs(now); + assertLastTrackingEndTimeIs(expectedEndTime); } private void assertLastTrackingEndTimeIs(Instant expectedEndTime) throws Exception { diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java index 260687e0..0bfbd989 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java @@ -118,7 +118,7 @@ public class ScalingMetricEvaluatorTest { evaluator .evaluate( conf, - new CollectedMetricHistory(topology, metricHistory), + new CollectedMetricHistory(topology, metricHistory, Instant.now()), Duration.ZERO) .getVertexMetrics(); @@ -152,7 +152,7 @@ public class ScalingMetricEvaluatorTest { evaluator .evaluate( conf, - new CollectedMetricHistory(topology, metricHistory), + new CollectedMetricHistory(topology, metricHistory, Instant.now()), Duration.ZERO) .getVertexMetrics(); assertEquals( @@ -175,7 +175,7 @@ public class ScalingMetricEvaluatorTest { evaluator .evaluate( conf, - new CollectedMetricHistory(topology, metricHistory), + new CollectedMetricHistory(topology, metricHistory, Instant.now()), Duration.ZERO) .getVertexMetrics(); assertEquals( @@ -197,7 +197,7 @@ public class ScalingMetricEvaluatorTest { evaluator .evaluate( conf, - new CollectedMetricHistory(topology, metricHistory), + new CollectedMetricHistory(topology, metricHistory, Instant.now()), Duration.ZERO) .getVertexMetrics(); assertEquals( @@ -243,7 +243,7 @@ public class ScalingMetricEvaluatorTest { evaluator .evaluate( conf, - new CollectedMetricHistory(topology, metricHistory), + new CollectedMetricHistory(topology, metricHistory, Instant.now()), Duration.ZERO) .getVertexMetrics(); assertEquals( @@ -461,7 +461,7 @@ public class ScalingMetricEvaluatorTest { evaluator .evaluate( conf, - new CollectedMetricHistory(topology, metricHistory), + new CollectedMetricHistory(topology, metricHistory, Instant.now()), restartTime) .getVertexMetrics() .get(source) @@ -476,7 +476,7 @@ public class ScalingMetricEvaluatorTest { evaluator .evaluate( conf, - new CollectedMetricHistory(topology, metricHistory), + new CollectedMetricHistory(topology, metricHistory, Instant.now()), restartTime) .getVertexMetrics() .get(source) @@ -489,7 +489,7 @@ public class ScalingMetricEvaluatorTest { evaluator .evaluate( conf, - new CollectedMetricHistory(topology, metricHistory), + new CollectedMetricHistory(topology, metricHistory, Instant.now()), restartTime) .getVertexMetrics() .get(source) @@ -531,7 +531,7 @@ public class ScalingMetricEvaluatorTest { evaluator .evaluate( conf, - new CollectedMetricHistory(topology, metricHistory), + new CollectedMetricHistory(topology, metricHistory, Instant.now()), restartTime) .getVertexMetrics() .get(source) @@ -562,7 +562,7 @@ public class ScalingMetricEvaluatorTest { evaluator .evaluate( conf, - new CollectedMetricHistory(topology, metricHistory), + new CollectedMetricHistory(topology, metricHistory, Instant.now()), restartTime) .getVertexMetrics() .get(source)
