This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit 0b3cc1312e9070ac0a28cea76e1c6d9284f52acd Author: Gyula Fora <[email protected]> AuthorDate: Thu Dec 1 08:59:46 2022 +0100 [FLINK-30260][autoscaler] Add scaling metric evaluation logic --- .../autoscaler/ScalingMetricEvaluator.java | 228 +++++++++++++ .../autoscaler/BacklogBasedScalingTest.java | 352 +++++++++++++++++++++ .../autoscaler/ScalingMetricEvaluatorTest.java | 227 +++++++++++++ 3 files changed, 807 insertions(+) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java new file mode 100644 index 00000000..fdf5e33f --- /dev/null +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.autoscaler; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; +import org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetrics; +import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric; +import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; +import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology; +import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.util.Preconditions; + +import org.apache.commons.math3.stat.StatUtils; +import org.jetbrains.annotations.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Clock; +import java.time.Instant; +import java.util.HashMap; +import java.util.Map; +import java.util.SortedMap; + +import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION; +import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY; +import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE; +import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.LAG; +import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM; +import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.OUTPUT_RATIO; +import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM; +import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD; +import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD; +import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SOURCE_DATA_RATE; +import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE; +import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_OUTPUT_RATE; +import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE; + +/** Job scaling evaluator for autoscaler. */ +public class ScalingMetricEvaluator { + + private static final Logger LOG = LoggerFactory.getLogger(ScalingMetricEvaluator.class); + + private Clock clock = Clock.systemDefaultZone(); + + public Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> evaluate( + Configuration conf, CollectedMetrics collectedMetrics) { + + var scalingOutput = new HashMap<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>>(); + var metricsHistory = collectedMetrics.getMetricHistory(); + var topology = collectedMetrics.getJobTopology(); + + for (var vertex : topology.getVerticesInTopologicalOrder()) { + scalingOutput.put( + vertex, + computeVertexScalingSummary( + conf, scalingOutput, metricsHistory, topology, vertex)); + } + + return scalingOutput; + } + + @NotNull + private Map<ScalingMetric, EvaluatedScalingMetric> computeVertexScalingSummary( + Configuration conf, + HashMap<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> scalingOutput, + SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>> metricsHistory, + JobTopology topology, + JobVertexID vertex) { + + var latestVertexMetrics = metricsHistory.get(metricsHistory.lastKey()).get(vertex); + + var evaluatedMetrics = new HashMap<ScalingMetric, EvaluatedScalingMetric>(); + computeTargetDataRate( + topology, + vertex, + conf, + scalingOutput, + metricsHistory, + latestVertexMetrics, + evaluatedMetrics); + + evaluatedMetrics.put( + TRUE_PROCESSING_RATE, + new EvaluatedScalingMetric( + latestVertexMetrics.get(TRUE_PROCESSING_RATE), + getAverage(TRUE_PROCESSING_RATE, vertex, metricsHistory, conf))); + + evaluatedMetrics.put( + PARALLELISM, EvaluatedScalingMetric.of(topology.getParallelisms().get(vertex))); + evaluatedMetrics.put( + MAX_PARALLELISM, + EvaluatedScalingMetric.of(topology.getMaxParallelisms().get(vertex))); + + computeProcessingRateThresholds(evaluatedMetrics, conf); + + var isSink = topology.getOutputs().get(vertex).isEmpty(); + if (!isSink) { + evaluatedMetrics.put( + TRUE_OUTPUT_RATE, + new EvaluatedScalingMetric( + latestVertexMetrics.get(TRUE_OUTPUT_RATE), + getAverage(TRUE_OUTPUT_RATE, vertex, metricsHistory, conf))); + evaluatedMetrics.put( + OUTPUT_RATIO, + new EvaluatedScalingMetric( + latestVertexMetrics.get(OUTPUT_RATIO), + getAverage(OUTPUT_RATIO, vertex, metricsHistory, conf))); + } + + return evaluatedMetrics; + } + + @VisibleForTesting + protected static void computeProcessingRateThresholds( + Map<ScalingMetric, EvaluatedScalingMetric> metrics, Configuration conf) { + + double utilizationBoundary = conf.getDouble(TARGET_UTILIZATION_BOUNDARY); + + double scaleUpThreshold = + AutoScalerUtils.getTargetProcessingCapacity( + metrics, conf, conf.get(TARGET_UTILIZATION) + utilizationBoundary, false); + + double scaleDownThreshold = + AutoScalerUtils.getTargetProcessingCapacity( + metrics, conf, conf.get(TARGET_UTILIZATION) - utilizationBoundary, true); + + metrics.put(SCALE_UP_RATE_THRESHOLD, EvaluatedScalingMetric.of(scaleUpThreshold)); + metrics.put(SCALE_DOWN_RATE_THRESHOLD, EvaluatedScalingMetric.of(scaleDownThreshold)); + } + + private void computeTargetDataRate( + JobTopology topology, + JobVertexID vertex, + Configuration conf, + HashMap<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> alreadyEvaluated, + SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>> metricsHistory, + Map<ScalingMetric, Double> latestVertexMetrics, + Map<ScalingMetric, EvaluatedScalingMetric> out) { + + boolean isSource = topology.getInputs().get(vertex).isEmpty(); + if (isSource) { + double catchUpTargetSec = conf.get(AutoScalerOptions.CATCH_UP_DURATION).toSeconds(); + + var sourceRateMetric = + latestVertexMetrics.containsKey(TARGET_DATA_RATE) + ? TARGET_DATA_RATE + : SOURCE_DATA_RATE; + if (!latestVertexMetrics.containsKey(sourceRateMetric)) { + throw new RuntimeException( + "Cannot evaluate metrics without source target rate information"); + } + + out.put( + TARGET_DATA_RATE, + new EvaluatedScalingMetric( + latestVertexMetrics.get(sourceRateMetric), + getAverage(sourceRateMetric, vertex, metricsHistory, conf))); + + double lag = latestVertexMetrics.getOrDefault(LAG, 0.); + double catchUpInputRate = catchUpTargetSec == 0 ? 0 : lag / catchUpTargetSec; + if (catchUpInputRate > 0) { + LOG.info( + "Extra backlog processing input rate for {} is {}", + vertex, + catchUpInputRate); + } + out.put(CATCH_UP_DATA_RATE, EvaluatedScalingMetric.of(catchUpInputRate)); + } else { + var inputs = topology.getInputs().get(vertex); + double sumCurrentTargetRate = 0; + double sumAvgTargetRate = 0; + double sumCatchUpDataRate = 0; + for (var inputVertex : inputs) { + var inputEvaluatedMetrics = alreadyEvaluated.get(inputVertex); + var inputTargetRate = inputEvaluatedMetrics.get(TARGET_DATA_RATE); + var outputRateMultiplier = inputEvaluatedMetrics.get(OUTPUT_RATIO).getAverage(); + sumCurrentTargetRate += inputTargetRate.getCurrent() * outputRateMultiplier; + sumAvgTargetRate += inputTargetRate.getAverage() * outputRateMultiplier; + sumCatchUpDataRate += + inputEvaluatedMetrics.get(CATCH_UP_DATA_RATE).getCurrent() + * outputRateMultiplier; + } + out.put( + TARGET_DATA_RATE, + new EvaluatedScalingMetric(sumCurrentTargetRate, sumAvgTargetRate)); + out.put(CATCH_UP_DATA_RATE, EvaluatedScalingMetric.of(sumCatchUpDataRate)); + } + } + + private double getAverage( + ScalingMetric metric, + JobVertexID jobVertexId, + SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>> metricsHistory, + Configuration conf) { + return StatUtils.mean( + metricsHistory + .tailMap(clock.instant().minus(conf.get(AutoScalerOptions.METRICS_WINDOW))) + .values().stream() + .map(m -> m.get(jobVertexId)) + .filter(m -> m.containsKey(metric)) + .mapToDouble(m -> m.get(metric)) + .filter(d -> !Double.isNaN(d)) + .toArray()); + } + + @VisibleForTesting + protected void setClock(Clock clock) { + this.clock = Preconditions.checkNotNull(clock); + } +} diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java new file mode 100644 index 00000000..1e1466c8 --- /dev/null +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java @@ -0,0 +1,352 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.autoscaler; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.operator.TestUtils; +import org.apache.flink.kubernetes.operator.TestingFlinkService; +import org.apache.flink.kubernetes.operator.api.FlinkDeployment; +import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; +import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric; +import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology; +import org.apache.flink.kubernetes.operator.autoscaler.topology.VertexInfo; +import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; +import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.time.ZoneId; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; + +/** Test for scaling metrics collection logic. */ +@EnableKubernetesMockClient(crud = true) +public class BacklogBasedScalingTest { + + private ScalingMetricEvaluator evaluator; + private TestingFlinkService service; + private TestingMetricsCollector metricsCollector; + private ScalingExecutor scalingExecutor; + + private FlinkDeployment app; + private JobVertexID source1, sink; + + private FlinkConfigManager confManager; + private JobAutoScaler autoscaler; + + private KubernetesClient kubernetesClient; + + @BeforeEach + public void setup() { + evaluator = new ScalingMetricEvaluator(); + scalingExecutor = new ScalingExecutor(kubernetesClient); + service = new TestingFlinkService(); + + app = TestUtils.buildApplicationCluster(); + app.getMetadata().setGeneration(1L); + app.getStatus().getJobStatus().setJobId(new JobID().toHexString()); + kubernetesClient.resource(app).createOrReplace(); + + source1 = new JobVertexID(); + sink = new JobVertexID(); + + metricsCollector = + new TestingMetricsCollector( + new JobTopology( + new VertexInfo(source1, Set.of(), 1, 720), + new VertexInfo(sink, Set.of(source1), 1, 720))); + + var defaultConf = new Configuration(); + defaultConf.set(AutoScalerOptions.AUTOSCALER_ENABLED, true); + defaultConf.set(AutoScalerOptions.STABILIZATION_INTERVAL, Duration.ZERO); + defaultConf.set(AutoScalerOptions.RESTART_TIME, Duration.ofSeconds(1)); + defaultConf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ofSeconds(2)); + defaultConf.set(AutoScalerOptions.SCALING_ENABLED, true); + defaultConf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 1.); + defaultConf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.8); + defaultConf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.1); + defaultConf.set(AutoScalerOptions.SCALE_UP_GRACE_PERIOD, Duration.ZERO); + + confManager = new FlinkConfigManager(defaultConf); + ReconciliationUtils.updateStatusForDeployedSpec( + app, confManager.getDeployConfig(app.getMetadata(), app.getSpec())); + app.getStatus().getJobStatus().setState(JobStatus.RUNNING.name()); + + autoscaler = + new JobAutoScaler( + kubernetesClient, + confManager, + metricsCollector, + evaluator, + scalingExecutor, + TestUtils.createTestMetricGroup(new Configuration())); + } + + @Test + public void test() throws Exception { + var ctx = createAutoscalerTestContext(); + var now = Instant.now(); + setClocksTo(now); + app.getStatus().getJobStatus().setStartTime(String.valueOf(now.toEpochMilli())); + metricsCollector.setCurrentMetrics( + Map.of( + source1, + Map.of( + FlinkMetric.BUSY_TIME_PER_SEC, + new AggregatedMetric("", Double.NaN, Double.NaN, 850., Double.NaN), + FlinkMetric.NUM_RECORDS_OUT_PER_SEC, + new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 500.), + FlinkMetric.NUM_RECORDS_IN_PER_SEC, + new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 500.), + FlinkMetric.PENDING_RECORDS, + new AggregatedMetric( + "", Double.NaN, Double.NaN, Double.NaN, 2000.)), + sink, + Map.of( + FlinkMetric.BUSY_TIME_PER_SEC, + new AggregatedMetric("", Double.NaN, Double.NaN, 850., Double.NaN), + FlinkMetric.NUM_RECORDS_IN_PER_SEC, + new AggregatedMetric( + "", Double.NaN, Double.NaN, Double.NaN, 500.)))); + + autoscaler.scale(app, service, confManager.getObserveConfig(app), ctx); + assertFalse(AutoScalerInfo.forResource(app, kubernetesClient).getMetricHistory().isEmpty()); + + now = now.plus(Duration.ofSeconds(1)); + setClocksTo(now); + autoscaler.scale(app, service, confManager.getObserveConfig(app), ctx); + + var scaledParallelism = ScalingExecutorTest.getScaledParallelism(app); + assertEquals(4, scaledParallelism.get(source1)); + assertEquals(4, scaledParallelism.get(sink)); + + metricsCollector.setJobTopology( + new JobTopology( + new VertexInfo(source1, Set.of(), 4, 24), + new VertexInfo(sink, Set.of(source1), 4, 720))); + metricsCollector.setCurrentMetrics( + Map.of( + source1, + Map.of( + FlinkMetric.BUSY_TIME_PER_SEC, + new AggregatedMetric("", Double.NaN, Double.NaN, 1000., Double.NaN), + FlinkMetric.NUM_RECORDS_OUT_PER_SEC, + new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 1800.), + FlinkMetric.NUM_RECORDS_IN_PER_SEC, + new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 1800.), + FlinkMetric.PENDING_RECORDS, + new AggregatedMetric( + "", Double.NaN, Double.NaN, Double.NaN, 2500.)), + sink, + Map.of( + FlinkMetric.BUSY_TIME_PER_SEC, + new AggregatedMetric("", Double.NaN, Double.NaN, 1000., Double.NaN), + FlinkMetric.NUM_RECORDS_IN_PER_SEC, + new AggregatedMetric( + "", Double.NaN, Double.NaN, Double.NaN, 1800.)))); + + now = now.plus(Duration.ofSeconds(1)); + setClocksTo(now); + app.getStatus().getJobStatus().setStartTime(String.valueOf(now.toEpochMilli())); + autoscaler.scale( + app, service, confManager.getObserveConfig(app), createAutoscalerTestContext()); + assertFalse(AutoScalerInfo.forResource(app, kubernetesClient).getMetricHistory().isEmpty()); + scaledParallelism = ScalingExecutorTest.getScaledParallelism(app); + assertEquals(4, scaledParallelism.get(source1)); + assertEquals(4, scaledParallelism.get(sink)); + + metricsCollector.setCurrentMetrics( + Map.of( + source1, + Map.of( + FlinkMetric.BUSY_TIME_PER_SEC, + new AggregatedMetric("", Double.NaN, Double.NaN, 1000., Double.NaN), + FlinkMetric.NUM_RECORDS_OUT_PER_SEC, + new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 1800.), + FlinkMetric.NUM_RECORDS_IN_PER_SEC, + new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 1800.), + FlinkMetric.PENDING_RECORDS, + new AggregatedMetric( + "", Double.NaN, Double.NaN, Double.NaN, 1200.)), + sink, + Map.of( + FlinkMetric.BUSY_TIME_PER_SEC, + new AggregatedMetric("", Double.NaN, Double.NaN, 1000., Double.NaN), + FlinkMetric.NUM_RECORDS_IN_PER_SEC, + new AggregatedMetric( + "", Double.NaN, Double.NaN, Double.NaN, 1800.)))); + + now = now.plus(Duration.ofSeconds(1)); + setClocksTo(now); + autoscaler.scale( + app, service, confManager.getObserveConfig(app), createAutoscalerTestContext()); + assertFalse(AutoScalerInfo.forResource(app, kubernetesClient).getMetricHistory().isEmpty()); + scaledParallelism = ScalingExecutorTest.getScaledParallelism(app); + assertEquals(4, scaledParallelism.get(source1)); + assertEquals(4, scaledParallelism.get(sink)); + + // We have finally caught up to our original lag, time to scale down + metricsCollector.setCurrentMetrics( + Map.of( + source1, + Map.of( + FlinkMetric.BUSY_TIME_PER_SEC, + new AggregatedMetric("", Double.NaN, Double.NaN, 600., Double.NaN), + FlinkMetric.NUM_RECORDS_OUT_PER_SEC, + new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 800.), + FlinkMetric.NUM_RECORDS_IN_PER_SEC, + new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 800.), + FlinkMetric.PENDING_RECORDS, + new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 0.)), + sink, + Map.of( + FlinkMetric.BUSY_TIME_PER_SEC, + new AggregatedMetric("", Double.NaN, Double.NaN, 600., Double.NaN), + FlinkMetric.NUM_RECORDS_IN_PER_SEC, + new AggregatedMetric( + "", Double.NaN, Double.NaN, Double.NaN, 800.)))); + now = now.plus(Duration.ofSeconds(1)); + setClocksTo(now); + autoscaler.scale( + app, service, confManager.getObserveConfig(app), createAutoscalerTestContext()); + + scaledParallelism = ScalingExecutorTest.getScaledParallelism(app); + assertEquals(2, scaledParallelism.get(source1)); + assertEquals(2, scaledParallelism.get(sink)); + metricsCollector.setJobTopology( + new JobTopology( + new VertexInfo(source1, Set.of(), 2, 24), + new VertexInfo(sink, Set.of(source1), 2, 720))); + + metricsCollector.setCurrentMetrics( + Map.of( + source1, + Map.of( + FlinkMetric.BUSY_TIME_PER_SEC, + new AggregatedMetric("", Double.NaN, Double.NaN, 1000., Double.NaN), + FlinkMetric.NUM_RECORDS_OUT_PER_SEC, + new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 900.), + FlinkMetric.NUM_RECORDS_IN_PER_SEC, + new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 900.), + FlinkMetric.PENDING_RECORDS, + new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 500.)), + sink, + Map.of( + FlinkMetric.BUSY_TIME_PER_SEC, + new AggregatedMetric("", Double.NaN, Double.NaN, 1000., Double.NaN), + FlinkMetric.NUM_RECORDS_IN_PER_SEC, + new AggregatedMetric( + "", Double.NaN, Double.NaN, Double.NaN, 900.)))); + now = now.plus(Duration.ofSeconds(1)); + setClocksTo(now); + app.getStatus().getJobStatus().setStartTime(String.valueOf(now.toEpochMilli())); + autoscaler.scale( + app, service, confManager.getObserveConfig(app), createAutoscalerTestContext()); + scaledParallelism = ScalingExecutorTest.getScaledParallelism(app); + assertEquals(2, scaledParallelism.get(source1)); + assertEquals(2, scaledParallelism.get(sink)); + + metricsCollector.setCurrentMetrics( + Map.of( + source1, + Map.of( + FlinkMetric.BUSY_TIME_PER_SEC, + new AggregatedMetric("", Double.NaN, Double.NaN, 1000., Double.NaN), + FlinkMetric.NUM_RECORDS_OUT_PER_SEC, + new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 900.), + FlinkMetric.NUM_RECORDS_IN_PER_SEC, + new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 900.), + FlinkMetric.PENDING_RECORDS, + new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 100.)), + sink, + Map.of( + FlinkMetric.BUSY_TIME_PER_SEC, + new AggregatedMetric("", Double.NaN, Double.NaN, 1000., Double.NaN), + FlinkMetric.NUM_RECORDS_IN_PER_SEC, + new AggregatedMetric( + "", Double.NaN, Double.NaN, Double.NaN, 900.)))); + now = now.plus(Duration.ofSeconds(1)); + setClocksTo(now); + autoscaler.scale( + app, service, confManager.getObserveConfig(app), createAutoscalerTestContext()); + scaledParallelism = ScalingExecutorTest.getScaledParallelism(app); + assertEquals(2, scaledParallelism.get(source1)); + assertEquals(2, scaledParallelism.get(sink)); + + metricsCollector.setCurrentMetrics( + Map.of( + source1, + Map.of( + FlinkMetric.BUSY_TIME_PER_SEC, + new AggregatedMetric("", Double.NaN, Double.NaN, 500., Double.NaN), + FlinkMetric.NUM_RECORDS_OUT_PER_SEC, + new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 500.), + FlinkMetric.NUM_RECORDS_IN_PER_SEC, + new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 500.), + FlinkMetric.PENDING_RECORDS, + new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 0.)), + sink, + Map.of( + FlinkMetric.BUSY_TIME_PER_SEC, + new AggregatedMetric("", Double.NaN, Double.NaN, 500., Double.NaN), + FlinkMetric.NUM_RECORDS_IN_PER_SEC, + new AggregatedMetric( + "", Double.NaN, Double.NaN, Double.NaN, 500.)))); + now = now.plus(Duration.ofSeconds(1)); + setClocksTo(now); + autoscaler.scale( + app, service, confManager.getObserveConfig(app), createAutoscalerTestContext()); + scaledParallelism = ScalingExecutorTest.getScaledParallelism(app); + assertEquals(2, scaledParallelism.get(source1)); + assertEquals(2, scaledParallelism.get(sink)); + } + + private void setClocksTo(Instant time) { + var clock = Clock.fixed(time, ZoneId.systemDefault()); + metricsCollector.setClock(clock); + evaluator.setClock(clock); + scalingExecutor.setClock(clock); + } + + @NotNull + private TestUtils.TestingContext<HasMetadata> createAutoscalerTestContext() { + return new TestUtils.TestingContext<>() { + public <T1> Set<T1> getSecondaryResources(Class<T1> aClass) { + return (Set) + kubernetesClient.configMaps().inAnyNamespace().list().getItems().stream() + .collect(Collectors.toSet()); + } + }; + } +} diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java new file mode 100644 index 00000000..7a4af640 --- /dev/null +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.autoscaler; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; +import org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetrics; +import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric; +import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; +import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology; +import org.apache.flink.kubernetes.operator.autoscaler.topology.VertexInfo; +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.time.Instant; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; + +import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.CATCH_UP_DURATION; +import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.RESTART_TIME; +import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION; +import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY; +import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE; +import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.LAG; +import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.OUTPUT_RATIO; +import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD; +import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD; +import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SOURCE_DATA_RATE; +import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE; +import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_OUTPUT_RATE; +import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE; +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** Scaling evaluator test. */ +public class ScalingMetricEvaluatorTest { + + @Test + public void testLagBasedSourceScaling() { + var source = new JobVertexID(); + var sink = new JobVertexID(); + + var topology = + new JobTopology( + new VertexInfo(source, Collections.emptySet(), 1, 1), + new VertexInfo(sink, Set.of(source), 1, 1)); + + var evaluator = new ScalingMetricEvaluator(); + + var metricHistory = new TreeMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>>(); + + metricHistory.put( + Instant.now(), + Map.of( + source, + Map.of( + SOURCE_DATA_RATE, + 100., + LAG, + 0., + OUTPUT_RATIO, + 2., + TRUE_OUTPUT_RATE, + 200., + TRUE_PROCESSING_RATE, + 200.), + sink, + Map.of(TRUE_PROCESSING_RATE, 2000.))); + + metricHistory.put( + Instant.now(), + Map.of( + source, + Map.of( + SOURCE_DATA_RATE, 200., + LAG, 1000., + OUTPUT_RATIO, 2., + TRUE_OUTPUT_RATE, 200., + TRUE_PROCESSING_RATE, 200.), + sink, + Map.of(TRUE_PROCESSING_RATE, 2000.))); + + var conf = new Configuration(); + + conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ofSeconds(2)); + conf.set(AutoScalerOptions.RESTART_TIME, Duration.ZERO); + var evaluatedMetrics = + evaluator.evaluate(conf, new CollectedMetrics(topology, metricHistory)); + assertEquals( + new EvaluatedScalingMetric(200, 150), + evaluatedMetrics.get(source).get(TARGET_DATA_RATE)); + assertEquals( + EvaluatedScalingMetric.of(500), + evaluatedMetrics.get(source).get(CATCH_UP_DATA_RATE)); + assertEquals( + new EvaluatedScalingMetric(400, 300), + evaluatedMetrics.get(sink).get(TARGET_DATA_RATE)); + assertEquals( + EvaluatedScalingMetric.of(1000), + evaluatedMetrics.get(sink).get(CATCH_UP_DATA_RATE)); + + conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ofSeconds(1)); + evaluatedMetrics = evaluator.evaluate(conf, new CollectedMetrics(topology, metricHistory)); + assertEquals( + new EvaluatedScalingMetric(200, 150), + evaluatedMetrics.get(source).get(TARGET_DATA_RATE)); + assertEquals( + EvaluatedScalingMetric.of(1000), + evaluatedMetrics.get(source).get(CATCH_UP_DATA_RATE)); + assertEquals( + new EvaluatedScalingMetric(400, 300), + evaluatedMetrics.get(sink).get(TARGET_DATA_RATE)); + assertEquals( + EvaluatedScalingMetric.of(2000), + evaluatedMetrics.get(sink).get(CATCH_UP_DATA_RATE)); + + // Restart time should not affect evaluated metrics + conf.set(AutoScalerOptions.RESTART_TIME, Duration.ofSeconds(2)); + + evaluatedMetrics = evaluator.evaluate(conf, new CollectedMetrics(topology, metricHistory)); + assertEquals( + new EvaluatedScalingMetric(200, 150), + evaluatedMetrics.get(source).get(TARGET_DATA_RATE)); + assertEquals( + EvaluatedScalingMetric.of(1000), + evaluatedMetrics.get(source).get(CATCH_UP_DATA_RATE)); + assertEquals( + new EvaluatedScalingMetric(400, 300), + evaluatedMetrics.get(sink).get(TARGET_DATA_RATE)); + assertEquals( + EvaluatedScalingMetric.of(2000), + evaluatedMetrics.get(sink).get(CATCH_UP_DATA_RATE)); + + // Turn off lag based scaling + conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ZERO); + evaluatedMetrics = evaluator.evaluate(conf, new CollectedMetrics(topology, metricHistory)); + assertEquals( + new EvaluatedScalingMetric(200, 150), + evaluatedMetrics.get(source).get(TARGET_DATA_RATE)); + assertEquals( + EvaluatedScalingMetric.of(0), evaluatedMetrics.get(source).get(CATCH_UP_DATA_RATE)); + assertEquals( + new EvaluatedScalingMetric(400, 300), + evaluatedMetrics.get(sink).get(TARGET_DATA_RATE)); + assertEquals( + EvaluatedScalingMetric.of(0), evaluatedMetrics.get(sink).get(CATCH_UP_DATA_RATE)); + + // Test 0 lag + metricHistory.clear(); + metricHistory.put( + Instant.now(), + Map.of( + source, + Map.of( + SOURCE_DATA_RATE, + 100., + LAG, + 0., + OUTPUT_RATIO, + 2., + TRUE_OUTPUT_RATE, + 200., + TRUE_PROCESSING_RATE, + 200.), + sink, + Map.of(TRUE_PROCESSING_RATE, 2000.))); + + conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ofMinutes(1)); + evaluatedMetrics = evaluator.evaluate(conf, new CollectedMetrics(topology, metricHistory)); + assertEquals( + new EvaluatedScalingMetric(100, 100), + evaluatedMetrics.get(source).get(TARGET_DATA_RATE)); + assertEquals( + new EvaluatedScalingMetric(200, 200), + evaluatedMetrics.get(sink).get(TARGET_DATA_RATE)); + } + + @Test + public void testUtilizationBoundaryComputation() { + + var conf = new Configuration(); + conf.set(TARGET_UTILIZATION, 0.8); + conf.set(TARGET_UTILIZATION_BOUNDARY, 0.1); + conf.set(RESTART_TIME, Duration.ofSeconds(1)); + conf.set(CATCH_UP_DURATION, Duration.ZERO); + + // Default behaviour, restart time does not factor in + assertEquals(Tuple2.of(778.0, 1000.0), getThresholds(700, 0, conf)); + + conf.set(CATCH_UP_DURATION, Duration.ofSeconds(2)); + assertEquals(Tuple2.of(1128.0, 1700.0), getThresholds(700, 350, conf)); + assertEquals(Tuple2.of(778.0, 1350.0), getThresholds(700, 0, conf)); + } + + private Tuple2<Double, Double> getThresholds( + double inputTargetRate, double catchUpRate, Configuration conf) { + var map = new HashMap<ScalingMetric, EvaluatedScalingMetric>(); + + map.put(TARGET_DATA_RATE, new EvaluatedScalingMetric(Double.NaN, inputTargetRate)); + map.put(CATCH_UP_DATA_RATE, EvaluatedScalingMetric.of(catchUpRate)); + + ScalingMetricEvaluator.computeProcessingRateThresholds(map, conf); + return Tuple2.of( + map.get(SCALE_UP_RATE_THRESHOLD).getCurrent(), + map.get(SCALE_DOWN_RATE_THRESHOLD).getCurrent()); + } +}
