This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit 6c6c44e7072abb534b5117e485fa2806548a3b96 Author: Rui Fan <fan...@apache.org> AuthorDate: Mon Dec 16 16:52:22 2024 +0800 [hotfix] Refactor autoscaler related `setClock` to optimize testing --- .../main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java | 2 ++ .../src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java | 6 ++++++ .../java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java | 1 - .../org/apache/flink/autoscaler/RecommendedParallelismTest.java | 1 - 4 files changed, 8 insertions(+), 2 deletions(-) diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java index d0e29e67..d8ee54ab 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java @@ -256,5 +256,7 @@ public class JobAutoScalerImpl<KEY, Context extends JobAutoScalerContext<KEY>> @VisibleForTesting void setClock(Clock clock) { this.clock = Preconditions.checkNotNull(clock); + this.metricsCollector.setClock(clock); + this.scalingExecutor.setClock(clock); } } diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java index 248fec71..af8384c4 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java @@ -41,6 +41,7 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nullable; +import java.time.Clock; import java.time.Duration; import java.time.Instant; import java.util.Collections; @@ -494,4 +495,9 @@ public class ScalingExecutor<KEY, Context extends JobAutoScalerContext<KEY>> { return !scaleEnabled || isExcluded; } + + @VisibleForTesting + void setClock(Clock clock) { + jobVertexScaler.setClock(clock); + } } diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java index 4599d634..df950bb4 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java @@ -417,7 +417,6 @@ public class BacklogBasedScalingTest { private void setClocksTo(Instant time) { var clock = Clock.fixed(time, ZoneId.systemDefault()); - metricsCollector.setClock(clock); autoscaler.setClock(clock); } diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java index 8637e126..f34aa1fc 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java @@ -279,7 +279,6 @@ public class RecommendedParallelismTest { private void setClocksTo(Instant time) { var clock = Clock.fixed(time, ZoneId.systemDefault()); - metricsCollector.setClock(clock); autoscaler.setClock(clock); } }