This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit 37bf5107a957110cfa4289858f32a0baa00d2dbf Author: Gyula Fora <[email protected]> AuthorDate: Thu Dec 1 09:00:40 2022 +0100 [FLINK-30260][autoscaler] Add scaling execution logic --- .../operator/autoscaler/ScalingExecutor.java | 362 +++++++++++++++++++++ .../operator/autoscaler/ScalingSummary.java | 51 +++ .../operator/autoscaler/ScalingExecutorTest.java | 350 ++++++++++++++++++++ 3 files changed, 763 insertions(+) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java new file mode 100644 index 00000000..afd9d2fc --- /dev/null +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java @@ -0,0 +1,362 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.autoscaler; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; +import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric; +import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; +import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils; +import org.apache.flink.kubernetes.operator.utils.KubernetesClientUtils; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.util.Preconditions; + +import io.fabric8.kubernetes.client.KubernetesClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.time.ZoneId; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.SortedMap; + +import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR; +import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SCALE_UP_GRACE_PERIOD; +import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SCALING_ENABLED; +import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.STABILIZATION_INTERVAL; +import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION; +import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM; +import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM; +import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM; +import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM; +import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD; +import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD; +import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE; + +/** Class responsible for executing scaling decisions. */ +public class ScalingExecutor implements Cleanup { + + public static final ConfigOption<Map<String, String>> PARALLELISM_OVERRIDES = + ConfigOptions.key("pipeline.jobvertex-parallelism-overrides") + .mapType() + .defaultValue(Collections.emptyMap()) + .withDescription( + "A parallelism override map (jobVertexId -> parallelism) which will be used to update" + + " the parallelism of the corresponding job vertices of submitted JobGraphs."); + + private static final Logger LOG = LoggerFactory.getLogger(ScalingExecutor.class); + + private final KubernetesClient kubernetesClient; + + private Clock clock = Clock.system(ZoneId.systemDefault()); + + public ScalingExecutor(KubernetesClient kubernetesClient) { + this.kubernetesClient = kubernetesClient; + } + + public boolean scaleResource( + AbstractFlinkResource<?, ?> resource, + AutoScalerInfo scalingInformation, + Configuration conf, + Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> evaluatedMetrics) + throws Exception { + + if (!conf.get(SCALING_ENABLED)) { + return false; + } + + if (!stabilizationPeriodPassed(resource, conf)) { + return false; + } + + var scalingHistory = scalingInformation.getScalingHistory(); + var scalingSummaries = computeScalingSummary(conf, evaluatedMetrics, scalingHistory); + if (scalingSummaries.isEmpty()) { + LOG.info("All job vertices are currently running at their target parallelism."); + return false; + } + + if (allVerticesWithinUtilizationTarget(evaluatedMetrics, scalingSummaries)) { + return false; + } + + LOG.info("Scaling vertices:"); + scalingSummaries.forEach( + (v, s) -> + LOG.info( + "{} | Parallelism {} -> {}", + v, + s.getCurrentParallelism(), + s.getNewParallelism())); + + setVertexParallelismOverrides(resource, evaluatedMetrics, scalingSummaries); + + KubernetesClientUtils.replaceSpecAfterScaling(kubernetesClient, resource); + scalingInformation.addToScalingHistory(clock.instant(), scalingSummaries); + + return true; + } + + private boolean stabilizationPeriodPassed( + AbstractFlinkResource<?, ?> resource, Configuration conf) { + var now = clock.instant(); + var startTs = + Instant.ofEpochMilli( + Long.parseLong(resource.getStatus().getJobStatus().getStartTime())); + var stableTime = startTs.plus(conf.get(STABILIZATION_INTERVAL)); + + if (stableTime.isAfter(now)) { + LOG.info("Waiting until {} to stabilize before new scale operation.", stableTime); + return false; + } + return true; + } + + protected static boolean allVerticesWithinUtilizationTarget( + Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> evaluatedMetrics, + Map<JobVertexID, ScalingSummary> scalingSummaries) { + + for (Map.Entry<JobVertexID, ScalingSummary> entry : scalingSummaries.entrySet()) { + var vertex = entry.getKey(); + var scalingSummary = entry.getValue(); + var metrics = evaluatedMetrics.get(vertex); + + double processingRate = metrics.get(TRUE_PROCESSING_RATE).getAverage(); + double scaleUpRateThreshold = metrics.get(SCALE_UP_RATE_THRESHOLD).getCurrent(); + double scaleDownRateThreshold = metrics.get(SCALE_DOWN_RATE_THRESHOLD).getCurrent(); + + if (processingRate < scaleUpRateThreshold || processingRate > scaleDownRateThreshold) { + LOG.info( + "Vertex {}(pCurr={}, pNew={}) processing rate {} is outside ({}, {})", + vertex, + scalingSummary.getCurrentParallelism(), + scalingSummary.getNewParallelism(), + processingRate, + scaleUpRateThreshold, + scaleDownRateThreshold); + return false; + } else { + LOG.debug( + "Vertex {}(pCurr={}, pNew={}) processing rate {} is within target ({}, {})", + vertex, + scalingSummary.getCurrentParallelism(), + scalingSummary.getNewParallelism(), + processingRate, + scaleUpRateThreshold, + scaleDownRateThreshold); + } + } + LOG.info("All vertex processing rates are within target."); + return true; + } + + private Map<JobVertexID, ScalingSummary> computeScalingSummary( + Configuration conf, + Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> evaluatedMetrics, + Map<JobVertexID, SortedMap<Instant, ScalingSummary>> scalingHistory) { + + var out = new HashMap<JobVertexID, ScalingSummary>(); + evaluatedMetrics.forEach( + (v, metrics) -> { + var currentParallelism = + (int) metrics.get(ScalingMetric.PARALLELISM).getCurrent(); + var newParallelism = + computeScaleTargetParallelism( + conf, + v, + metrics, + scalingHistory.getOrDefault(v, Collections.emptySortedMap())); + if (currentParallelism != newParallelism) { + out.put(v, new ScalingSummary(currentParallelism, newParallelism, metrics)); + } + }); + + return out; + } + + protected int computeScaleTargetParallelism( + Configuration conf, + JobVertexID vertex, + Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics, + SortedMap<Instant, ScalingSummary> history) { + + var currentParallelism = (int) evaluatedMetrics.get(PARALLELISM).getCurrent(); + double averageTrueProcessingRate = evaluatedMetrics.get(TRUE_PROCESSING_RATE).getAverage(); + + if (Double.isNaN(averageTrueProcessingRate)) { + LOG.info( + "True processing rate is not available for {}, cannot compute new parallelism", + vertex); + return currentParallelism; + } + + double targetCapacity = + AutoScalerUtils.getTargetProcessingCapacity( + evaluatedMetrics, conf, conf.get(TARGET_UTILIZATION), true); + if (Double.isNaN(targetCapacity)) { + LOG.info( + "Target data rate is not available for {}, cannot compute new parallelism", + vertex); + return currentParallelism; + } + + LOG.info("Target processing capacity for {} is {}", vertex, targetCapacity); + double scaleFactor = targetCapacity / averageTrueProcessingRate; + double minScaleFactor = 1 - conf.get(MAX_SCALE_DOWN_FACTOR); + if (scaleFactor < minScaleFactor) { + LOG.info( + "Computed scale factor of {} for {} is capped by maximum scale down factor to {}", + scaleFactor, + vertex, + minScaleFactor); + scaleFactor = minScaleFactor; + } + + int newParallelism = + scale( + currentParallelism, + (int) evaluatedMetrics.get(MAX_PARALLELISM).getCurrent(), + scaleFactor, + conf.getInteger(VERTEX_MIN_PARALLELISM), + conf.getInteger(VERTEX_MAX_PARALLELISM)); + + if (!history.isEmpty()) { + if (detectImmediateScaleDownAfterScaleUp( + conf, history, currentParallelism, newParallelism)) { + LOG.info( + "Skipping immediate scale down after scale up for {} resetting target parallelism to {}", + vertex, + currentParallelism); + newParallelism = currentParallelism; + } + + // currentParallelism = 2 , newParallelism = 1, minimumProcRate = 1000 r/s + // history + // currentParallelism 1 => 3 -> empiricalProcRate = 800 + // empiricalProcRate + upperBoundary < minimumProcRate => don't scale + } + + return newParallelism; + } + + private boolean detectImmediateScaleDownAfterScaleUp( + Configuration conf, + SortedMap<Instant, ScalingSummary> history, + int currentParallelism, + int newParallelism) { + var lastScalingTs = history.lastKey(); + var lastSummary = history.get(lastScalingTs); + + boolean isScaleDown = newParallelism < currentParallelism; + boolean lastScaleUp = lastSummary.getNewParallelism() > lastSummary.getCurrentParallelism(); + + var gracePeriod = conf.get(SCALE_UP_GRACE_PERIOD); + + boolean withinConfiguredTime = + Duration.between(lastScalingTs, clock.instant()).minus(gracePeriod).isNegative(); + + return isScaleDown && lastScaleUp && withinConfiguredTime; + } + + public static int scale( + int parallelism, + int numKeyGroups, + double scaleFactor, + int minParallelism, + int maxParallelism) { + Preconditions.checkArgument( + minParallelism <= maxParallelism, + "The minimum parallelism must not be greater than the maximum parallelism."); + if (minParallelism > numKeyGroups) { + LOG.warn( + "Specified autoscaler minimum parallelism {} is greater than the operator max parallelism {}. The min parallelism will be set to the operator max parallelism.", + minParallelism, + numKeyGroups); + } + if (numKeyGroups < maxParallelism && maxParallelism != Integer.MAX_VALUE) { + LOG.warn( + "Specified autoscaler maximum parallelism {} is greater than the operator max parallelism {}. This means the operator max parallelism can never be reached.", + maxParallelism, + numKeyGroups); + } + + int newParallelism = + // Prevent integer overflow when converting from double to integer. + // We do not have to detect underflow because doubles cannot + // underflow. + (int) Math.min(Math.ceil(scaleFactor * parallelism), Integer.MAX_VALUE); + + // Cap parallelism at either number of key groups or parallelism limit + final int upperBound = Math.min(numKeyGroups, maxParallelism); + + // Apply min/max parallelism + newParallelism = Math.min(Math.max(minParallelism, newParallelism), upperBound); + + // Try to adjust the parallelism such that it divides the number of key groups without a + // remainder => state is evenly spread across subtasks + for (int p = newParallelism; p <= numKeyGroups / 2 && p <= upperBound; p++) { + if (numKeyGroups % p == 0) { + return p; + } + } + + // If key group adjustment fails, use originally computed parallelism + return newParallelism; + } + + private void setVertexParallelismOverrides( + AbstractFlinkResource<?, ?> resource, + Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> evaluatedMetrics, + Map<JobVertexID, ScalingSummary> summaries) { + var flinkConf = Configuration.fromMap(resource.getSpec().getFlinkConfiguration()); + var overrides = new HashMap<String, String>(); + evaluatedMetrics.forEach( + (id, metrics) -> { + if (summaries.containsKey(id)) { + overrides.put( + id.toHexString(), + String.valueOf(summaries.get(id).getNewParallelism())); + } else { + overrides.put( + id.toHexString(), + String.valueOf( + (int) metrics.get(ScalingMetric.PARALLELISM).getCurrent())); + } + }); + flinkConf.set(PARALLELISM_OVERRIDES, overrides); + + resource.getSpec().setFlinkConfiguration(flinkConf.toMap()); + } + + @VisibleForTesting + protected void setClock(Clock clock) { + this.clock = Preconditions.checkNotNull(clock); + } + + @Override + public void cleanup(AbstractFlinkResource<?, ?> cr) { + // No cleanup is currently necessary + } +} diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingSummary.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingSummary.java new file mode 100644 index 00000000..7def1716 --- /dev/null +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingSummary.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.autoscaler; + +import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric; +import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; + +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.Map; + +/** Scaling summary returned by the {@link ScalingMetricEvaluator}. */ +@Data +@NoArgsConstructor +public class ScalingSummary { + + private int currentParallelism; + + private int newParallelism; + + private Map<ScalingMetric, EvaluatedScalingMetric> metrics; + + public ScalingSummary( + int currentParallelism, + int newParallelism, + Map<ScalingMetric, EvaluatedScalingMetric> metrics) { + if (currentParallelism == newParallelism) { + throw new IllegalArgumentException( + "Current parallelism should not be equal to newParallelism during scaling."); + } + this.currentParallelism = currentParallelism; + this.newParallelism = newParallelism; + this.metrics = metrics; + } +} diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java new file mode 100644 index 00000000..e819fd89 --- /dev/null +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java @@ -0,0 +1,350 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.autoscaler; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.operator.TestUtils; +import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; +import org.apache.flink.kubernetes.operator.api.FlinkDeployment; +import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; +import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric; +import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient; +import org.junit.Assert; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.time.ZoneId; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.kubernetes.operator.autoscaler.ScalingExecutor.PARALLELISM_OVERRIDES; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** Test for scaling metrics collection logic. */ +@EnableKubernetesMockClient(crud = true) +public class ScalingExecutorTest { + + private ScalingExecutor scalingDecisionExecutor; + private Configuration conf; + private KubernetesClient kubernetesClient; + private FlinkDeployment flinkDep; + + @BeforeEach + public void setup() { + scalingDecisionExecutor = new ScalingExecutor(kubernetesClient); + conf = new Configuration(); + conf.set(AutoScalerOptions.STABILIZATION_INTERVAL, Duration.ZERO); + conf.set(AutoScalerOptions.SCALING_ENABLED, true); + conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 1.); + conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ZERO); + + flinkDep = TestUtils.buildApplicationCluster(); + kubernetesClient.resource(flinkDep).createOrReplace(); + flinkDep.getStatus() + .getJobStatus() + .setStartTime(String.valueOf(System.currentTimeMillis())); + } + + @Test + public void testStabilizationPeriod() throws Exception { + conf.set(AutoScalerOptions.STABILIZATION_INTERVAL, Duration.ofMinutes(1)); + + var metrics = Map.of(new JobVertexID(), evaluated(1, 110, 100)); + + var scalingInfo = new AutoScalerInfo(new HashMap<>()); + var clock = Clock.fixed(Instant.now(), ZoneId.systemDefault()); + flinkDep.getStatus() + .getJobStatus() + .setStartTime(String.valueOf(clock.instant().toEpochMilli())); + + scalingDecisionExecutor.setClock(clock); + assertFalse(scalingDecisionExecutor.scaleResource(flinkDep, scalingInfo, conf, metrics)); + + clock = Clock.offset(clock, Duration.ofSeconds(30)); + scalingDecisionExecutor.setClock(clock); + assertFalse(scalingDecisionExecutor.scaleResource(flinkDep, scalingInfo, conf, metrics)); + + clock = Clock.offset(clock, Duration.ofSeconds(20)); + scalingDecisionExecutor.setClock(clock); + assertFalse(scalingDecisionExecutor.scaleResource(flinkDep, scalingInfo, conf, metrics)); + + clock = Clock.offset(clock, Duration.ofSeconds(20)); + scalingDecisionExecutor.setClock(clock); + assertTrue(scalingDecisionExecutor.scaleResource(flinkDep, scalingInfo, conf, metrics)); + + flinkDep.getStatus() + .getJobStatus() + .setStartTime(String.valueOf(clock.instant().toEpochMilli())); + assertFalse(scalingDecisionExecutor.scaleResource(flinkDep, scalingInfo, conf, metrics)); + + clock = Clock.offset(clock, Duration.ofSeconds(59)); + scalingDecisionExecutor.setClock(clock); + assertFalse(scalingDecisionExecutor.scaleResource(flinkDep, scalingInfo, conf, metrics)); + + clock = Clock.offset(clock, Duration.ofSeconds(2)); + scalingDecisionExecutor.setClock(clock); + assertTrue(scalingDecisionExecutor.scaleResource(flinkDep, scalingInfo, conf, metrics)); + } + + @Test + public void testUtilizationBoundaries() { + // Restart time should not affect utilization boundary + conf.set(AutoScalerOptions.RESTART_TIME, Duration.ZERO); + conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ZERO); + + var flinkDep = TestUtils.buildApplicationCluster(); + kubernetesClient.resource(flinkDep).createOrReplace(); + + var op1 = new JobVertexID(); + + conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.6); + conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.); + + var evaluated = Map.of(op1, evaluated(1, 70, 100)); + var scalingSummary = Map.of(op1, new ScalingSummary(2, 1, evaluated.get(op1))); + assertFalse(ScalingExecutor.allVerticesWithinUtilizationTarget(evaluated, scalingSummary)); + + conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.2); + evaluated = Map.of(op1, evaluated(1, 70, 100)); + scalingSummary = Map.of(op1, new ScalingSummary(2, 1, evaluated.get(op1))); + assertTrue(ScalingExecutor.allVerticesWithinUtilizationTarget(evaluated, scalingSummary)); + assertNull(getScaledParallelism(flinkDep)); + + var op2 = new JobVertexID(); + evaluated = + Map.of( + op1, evaluated(1, 70, 100), + op2, evaluated(1, 85, 100)); + scalingSummary = + Map.of( + op1, + new ScalingSummary(1, 2, evaluated.get(op1)), + op2, + new ScalingSummary(1, 2, evaluated.get(op2))); + + assertFalse(ScalingExecutor.allVerticesWithinUtilizationTarget(evaluated, scalingSummary)); + + evaluated = + Map.of( + op1, evaluated(1, 70, 100), + op2, evaluated(1, 70, 100)); + scalingSummary = + Map.of( + op1, + new ScalingSummary(1, 2, evaluated.get(op1)), + op2, + new ScalingSummary(1, 2, evaluated.get(op2))); + assertTrue(ScalingExecutor.allVerticesWithinUtilizationTarget(evaluated, scalingSummary)); + + // Test with backlog based scaling + evaluated = Map.of(op1, evaluated(1, 70, 100, 15)); + scalingSummary = Map.of(op1, new ScalingSummary(1, 2, evaluated.get(op1))); + assertFalse(ScalingExecutor.allVerticesWithinUtilizationTarget(evaluated, scalingSummary)); + } + + @Test + public void testParallelismScaling() { + var op = new JobVertexID(); + conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.); + assertEquals( + 5, + scalingDecisionExecutor.computeScaleTargetParallelism( + conf, op, evaluated(10, 50, 100), Collections.emptySortedMap())); + + conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8); + assertEquals( + 8, + scalingDecisionExecutor.computeScaleTargetParallelism( + conf, op, evaluated(10, 50, 100), Collections.emptySortedMap())); + + conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8); + assertEquals( + 10, + scalingDecisionExecutor.computeScaleTargetParallelism( + conf, op, evaluated(10, 80, 100), Collections.emptySortedMap())); + + conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8); + assertEquals( + 8, + scalingDecisionExecutor.computeScaleTargetParallelism( + conf, op, evaluated(10, 60, 100), Collections.emptySortedMap())); + + assertEquals( + 8, + scalingDecisionExecutor.computeScaleTargetParallelism( + conf, op, evaluated(10, 59, 100), Collections.emptySortedMap())); + + conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.5); + assertEquals( + 10, + scalingDecisionExecutor.computeScaleTargetParallelism( + conf, op, evaluated(2, 100, 40), Collections.emptySortedMap())); + + conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.6); + assertEquals( + 4, + scalingDecisionExecutor.computeScaleTargetParallelism( + conf, op, evaluated(2, 100, 100), Collections.emptySortedMap())); + + conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.); + conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.5); + assertEquals( + 5, + scalingDecisionExecutor.computeScaleTargetParallelism( + conf, op, evaluated(10, 10, 100), Collections.emptySortedMap())); + + conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.6); + assertEquals( + 4, + scalingDecisionExecutor.computeScaleTargetParallelism( + conf, op, evaluated(10, 10, 100), Collections.emptySortedMap())); + } + + @Test + public void testParallelismComputation() { + final int minParallelism = 1; + final int maxParallelism = Integer.MAX_VALUE; + assertEquals(1, ScalingExecutor.scale(1, 720, 0.0001, minParallelism, maxParallelism)); + assertEquals(1, ScalingExecutor.scale(2, 720, 0.1, minParallelism, maxParallelism)); + assertEquals(5, ScalingExecutor.scale(6, 720, 0.8, minParallelism, maxParallelism)); + assertEquals(32, ScalingExecutor.scale(16, 128, 1.5, minParallelism, maxParallelism)); + assertEquals(400, ScalingExecutor.scale(200, 720, 2, minParallelism, maxParallelism)); + assertEquals( + 720, + ScalingExecutor.scale(200, 720, Integer.MAX_VALUE, minParallelism, maxParallelism)); + } + + @Test + public void testParallelismComputationWithLimit() { + assertEquals(5, ScalingExecutor.scale(6, 720, 0.8, 1, 700)); + assertEquals(8, ScalingExecutor.scale(8, 720, 0.8, 8, 700)); + + assertEquals(32, ScalingExecutor.scale(16, 128, 1.5, 1, Integer.MAX_VALUE)); + assertEquals(64, ScalingExecutor.scale(16, 128, 1.5, 60, Integer.MAX_VALUE)); + + assertEquals(300, ScalingExecutor.scale(200, 720, 2, 1, 300)); + assertEquals(600, ScalingExecutor.scale(200, 720, Integer.MAX_VALUE, 1, 600)); + } + + @Test + public void ensureMinParallelismDoesNotExceedMax() { + Assert.assertThrows( + IllegalArgumentException.class, + () -> + assertEquals( + 600, ScalingExecutor.scale(200, 720, Integer.MAX_VALUE, 500, 499))); + } + + @Test + public void testMinParallelismLimitIsUsed() { + conf.setInteger(AutoScalerOptions.VERTEX_MIN_PARALLELISM, 5); + assertEquals( + 5, + scalingDecisionExecutor.computeScaleTargetParallelism( + conf, + new JobVertexID(), + evaluated(10, 100, 500), + Collections.emptySortedMap())); + } + + @Test + public void testMaxParallelismLimitIsUsed() { + conf.setInteger(AutoScalerOptions.VERTEX_MAX_PARALLELISM, 10); + conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.); + assertEquals( + 10, + scalingDecisionExecutor.computeScaleTargetParallelism( + conf, + new JobVertexID(), + evaluated(10, 500, 100), + Collections.emptySortedMap())); + } + + @Test + public void testScaleDownAfterScaleUpDetection() throws Exception { + var op = new JobVertexID(); + var scalingInfo = new AutoScalerInfo(new HashMap<>()); + conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.); + conf.set(AutoScalerOptions.SCALE_UP_GRACE_PERIOD, Duration.ofMinutes(1)); + + scalingDecisionExecutor.scaleResource( + flinkDep, scalingInfo, conf, Map.of(op, evaluated(5, 100, 50))); + assertEquals(Map.of(op, 10), getScaledParallelism(flinkDep)); + + // Should now allow scale back down immediately + scalingDecisionExecutor.scaleResource( + flinkDep, scalingInfo, conf, Map.of(op, evaluated(10, 50, 100))); + assertEquals(Map.of(op, 10), getScaledParallelism(flinkDep)); + + // Pass some time... + var clock = Clock.offset(Clock.systemDefaultZone(), Duration.ofSeconds(61)); + scalingDecisionExecutor.setClock(clock); + scalingDecisionExecutor.scaleResource( + flinkDep, scalingInfo, conf, Map.of(op, evaluated(10, 50, 100))); + assertEquals(Map.of(op, 5), getScaledParallelism(flinkDep)); + + // Allow immediate scale up + scalingDecisionExecutor.scaleResource( + flinkDep, scalingInfo, conf, Map.of(op, evaluated(5, 100, 50))); + assertEquals(Map.of(op, 10), getScaledParallelism(flinkDep)); + } + + private Map<ScalingMetric, EvaluatedScalingMetric> evaluated( + int parallelism, double target, double procRate, double catchupRate) { + var metrics = new HashMap<ScalingMetric, EvaluatedScalingMetric>(); + metrics.put(ScalingMetric.PARALLELISM, EvaluatedScalingMetric.of(parallelism)); + metrics.put(ScalingMetric.MAX_PARALLELISM, EvaluatedScalingMetric.of(720)); + metrics.put(ScalingMetric.TARGET_DATA_RATE, new EvaluatedScalingMetric(target, target)); + metrics.put(ScalingMetric.CATCH_UP_DATA_RATE, EvaluatedScalingMetric.of(catchupRate)); + metrics.put( + ScalingMetric.TRUE_PROCESSING_RATE, new EvaluatedScalingMetric(procRate, procRate)); + ScalingMetricEvaluator.computeProcessingRateThresholds(metrics, conf); + return metrics; + } + + private Map<ScalingMetric, EvaluatedScalingMetric> evaluated( + int parallelism, double target, double procRate) { + return evaluated(parallelism, target, procRate, 0.); + } + + protected static Map<JobVertexID, Integer> getScaledParallelism( + AbstractFlinkResource<?, ?> resource) { + + var conf = Configuration.fromMap(resource.getSpec().getFlinkConfiguration()); + var overrides = conf.get(PARALLELISM_OVERRIDES); + if (overrides == null || overrides.isEmpty()) { + return null; + } + + var out = new HashMap<JobVertexID, Integer>(); + + overrides.forEach((k, v) -> out.put(JobVertexID.fromHexString(k), Integer.parseInt(v))); + return out; + } +}
