This is an automated email from the ASF dual-hosted git repository.
mxm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new ecc15a36 [FLINK-33771] Extract method to estimate task slots and add
dedicated test (#773)
ecc15a36 is described below
commit ecc15a365714b50810c48270b9a192a3ec26a323
Author: Maximilian Michels <[email protected]>
AuthorDate: Wed Feb 7 11:38:57 2024 +0100
[FLINK-33771] Extract method to estimate task slots and add dedicated test
(#773)
---
.../apache/flink/autoscaler/ScalingExecutor.java | 38 ++---------
.../flink/autoscaler/utils/ResourceCheckUtils.java | 78 ++++++++++++++++++++++
.../autoscaler/utils/ResourceCheckUtilsTest.java | 76 +++++++++++++++++++++
3 files changed, 160 insertions(+), 32 deletions(-)
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
index d85b311e..094d4680 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
@@ -27,6 +27,7 @@ import
org.apache.flink.autoscaler.resources.NoopResourceCheck;
import org.apache.flink.autoscaler.resources.ResourceCheck;
import org.apache.flink.autoscaler.state.AutoScalerStateStore;
import org.apache.flink.autoscaler.utils.CalendarUtils;
+import org.apache.flink.autoscaler.utils.ResourceCheckUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
@@ -278,38 +279,11 @@ public class ScalingExecutor<KEY, Context extends
JobAutoScalerContext<KEY>> {
return true;
}
- var vertexMetrics = evaluatedMetrics.getVertexMetrics();
-
- int oldParallelismSum =
- vertexMetrics.values().stream()
- .map(map -> (int)
map.get(ScalingMetric.PARALLELISM).getCurrent())
- .reduce(0, Integer::sum);
-
- Map<JobVertexID, Integer> newParallelisms = new HashMap<>();
- for (Map.Entry<JobVertexID, Map<ScalingMetric,
EvaluatedScalingMetric>> entry :
- vertexMetrics.entrySet()) {
- JobVertexID jobVertexID = entry.getKey();
- ScalingSummary scalingSummary = scalingSummaries.get(jobVertexID);
- if (scalingSummary != null) {
- newParallelisms.put(jobVertexID,
scalingSummary.getNewParallelism());
- } else {
- newParallelisms.put(
- jobVertexID,
- (int)
entry.getValue().get(ScalingMetric.PARALLELISM).getCurrent());
- }
- }
-
- double numTaskSlotsUsed =
globalMetrics.get(ScalingMetric.NUM_TASK_SLOTS_USED).getCurrent();
-
- final int numTaskSlotsAfterRescale;
- if (oldParallelismSum >= numTaskSlotsUsed) {
- // Slot sharing is (partially) deactivated,
- // assuming no slot sharing in absence of additional data.
- numTaskSlotsAfterRescale =
newParallelisms.values().stream().reduce(0, Integer::sum);
- } else {
- // Slot sharing is activated
- numTaskSlotsAfterRescale =
newParallelisms.values().stream().reduce(0, Integer::max);
- }
+ int numTaskSlotsUsed =
+ (int)
globalMetrics.get(ScalingMetric.NUM_TASK_SLOTS_USED).getCurrent();
+ final int numTaskSlotsAfterRescale =
+ ResourceCheckUtils.estimateNumTaskSlotsAfterRescale(
+ evaluatedMetrics.getVertexMetrics(), scalingSummaries,
numTaskSlotsUsed);
int taskSlotsPerTm =
ctx.getConfiguration().get(TaskManagerOptions.NUM_TASK_SLOTS);
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/ResourceCheckUtils.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/ResourceCheckUtils.java
new file mode 100644
index 00000000..18bf58c0
--- /dev/null
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/ResourceCheckUtils.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.utils;
+
+import org.apache.flink.autoscaler.ScalingSummary;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Utils methods for resource checks. */
+public class ResourceCheckUtils {
+
+ public static int estimateNumTaskSlotsAfterRescale(
+ Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>>
vertexMetrics,
+ Map<JobVertexID, ScalingSummary> scalingSummaries,
+ int numTaskSlotsUsed) {
+
+ Map<JobVertexID, Integer> newParallelisms =
+ computeNewParallelisms(scalingSummaries, vertexMetrics);
+
+ if (currentMaxParallelism(vertexMetrics) == numTaskSlotsUsed) {
+ // Slot sharing is activated
+ return newParallelisms.values().stream().reduce(0, Integer::max);
+ } else {
+ // Slot sharing is (partially) deactivated,
+ // assuming no slot sharing in absence of additional metrics.
+ return newParallelisms.values().stream().reduce(0, Integer::sum);
+ }
+ }
+
+ private static Map<JobVertexID, Integer> computeNewParallelisms(
+ Map<JobVertexID, ScalingSummary> scalingSummaries,
+ Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>>
vertexMetrics) {
+
+ Map<JobVertexID, Integer> newParallelisms = new HashMap<>();
+
+ for (Map.Entry<JobVertexID, Map<ScalingMetric,
EvaluatedScalingMetric>> entry :
+ vertexMetrics.entrySet()) {
+ JobVertexID jobVertexID = entry.getKey();
+ ScalingSummary scalingSummary = scalingSummaries.get(jobVertexID);
+ if (scalingSummary != null) {
+ newParallelisms.put(jobVertexID,
scalingSummary.getNewParallelism());
+ } else {
+ newParallelisms.put(
+ jobVertexID,
+ (int)
entry.getValue().get(ScalingMetric.PARALLELISM).getCurrent());
+ }
+ }
+
+ return newParallelisms;
+ }
+
+ private static int currentMaxParallelism(
+ Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>>
vertexMetrics) {
+
+ return vertexMetrics.values().stream()
+ .map(map -> (int)
map.get(ScalingMetric.PARALLELISM).getCurrent())
+ .reduce(0, Integer::max);
+ }
+}
diff --git
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/utils/ResourceCheckUtilsTest.java
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/utils/ResourceCheckUtilsTest.java
new file mode 100644
index 00000000..585b5724
--- /dev/null
+++
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/utils/ResourceCheckUtilsTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.utils;
+
+import org.apache.flink.autoscaler.ScalingSummary;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/** Tests for {@link ResourceCheckUtils}. */
+class ResourceCheckUtilsTest {
+
+ @Test
+ void testEstimateNumTaskSlotsAfterRescale() {
+ var source = new JobVertexID();
+ var sink = new JobVertexID();
+ int sourceParallelism = 2;
+ int sinkParallelism = 3;
+ var metrics =
+ Map.of(
+ source,
+ Map.of(
+ ScalingMetric.PARALLELISM,
+ EvaluatedScalingMetric.of(sourceParallelism)),
+ sink,
+ Map.of(
+ ScalingMetric.PARALLELISM,
+ EvaluatedScalingMetric.of(sinkParallelism)));
+
+ Map<JobVertexID, ScalingSummary> scalingSummaries =
+ Map.of(source, new ScalingSummary(sourceParallelism, 7,
Map.of()));
+
+ // With slot sharing, the max parallelism determines the number of
task slots required
+ int numTaskSlotsUsed = Math.max(sourceParallelism, sinkParallelism);
+ assertThat(
+ ResourceCheckUtils.estimateNumTaskSlotsAfterRescale(
+ metrics, scalingSummaries, numTaskSlotsUsed))
+ .isEqualTo(7);
+
+ // Slot sharing disabled, the number of task slots equals the sum of
all parallelisms
+ numTaskSlotsUsed = sourceParallelism + sinkParallelism;
+ assertThat(
+ ResourceCheckUtils.estimateNumTaskSlotsAfterRescale(
+ metrics, scalingSummaries, numTaskSlotsUsed))
+ .isEqualTo(10);
+
+ // Slot sharing partially disabled, for lack of a better metric,
assume slot sharing is
+ // disabled
+ numTaskSlotsUsed = numTaskSlotsUsed - 1;
+ assertThat(
+ ResourceCheckUtils.estimateNumTaskSlotsAfterRescale(
+ metrics, scalingSummaries, numTaskSlotsUsed))
+ .isEqualTo(10);
+ }
+}