mxm commented on code in PR #847:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/847#discussion_r1666386431
##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java:
##########
@@ -95,13 +106,23 @@ public int computeScaleTargetParallelism(
LOG.warn(
"Target data rate is not available for {}, cannot compute
new parallelism",
vertex);
- return currentParallelism;
+ return VertexScalingResult.normalScaling(currentParallelism);
}
+ targetCapacity *= backpropagationScaleFactor;
+
LOG.debug("Target processing capacity for {} is {}", vertex,
targetCapacity);
double scaleFactor = targetCapacity / averageTrueProcessingRate;
- double minScaleFactor = 1 - conf.get(MAX_SCALE_DOWN_FACTOR);
double maxScaleFactor = 1 + conf.get(MAX_SCALE_UP_FACTOR);
+ double minScaleFactor = 1 - conf.get(MAX_SCALE_DOWN_FACTOR);
+
+ // if bottleneck propagation is applied and scaling down is forbidden,
limit minScaleFactor
+ // with 1
+ if (backpropagationScaleFactor < 1.0
+ &&
!conf.getBoolean(BOTTLENECK_PROPAGATION_SCALE_DOWN_ENABLED)) {
+ minScaleFactor = 1.0;
+ }
Review Comment:
```suggestion
if (!conf.getBoolean(BOTTLENECK_PROPAGATION_SCALE_DOWN_ENABLED) {
minScaleFactor = Math.max(minScaleFactor, 1.0);
}
```
Just a stylistic suggestion. Feel free to ignore.
##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/VertexScalingResult.java:
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
+/** Class for storing information on how a single vertex is scaled. */
+@AllArgsConstructor
+@Getter
+public class VertexScalingResult {
+ private int parallelism;
+ private double bottleneckScaleFactor;
+ private boolean isBottleneck;
Review Comment:
Do we need the `isBottleneck` field? We can add a method:
```java
public boolean isBottleneck() {
return bottleneckScaleFactor < 1.0;
}
```
##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java:
##########
@@ -71,21 +72,31 @@ public JobVertexScaler(AutoScalerEventHandler<KEY, Context>
autoScalerEventHandl
this.autoScalerEventHandler = autoScalerEventHandler;
}
- public int computeScaleTargetParallelism(
+ public VertexScalingResult computeScaleTargetParallelism(
Context context,
JobVertexID vertex,
Collection<ShipStrategy> inputShipStrategies,
Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
SortedMap<Instant, ScalingSummary> history,
- Duration restartTime) {
+ Duration restartTime,
+ double backpropagationScaleFactor) {
Review Comment:
If I'm not mistaken, this factor is always 1.0 in the production code path.
If so, I'd prefer removing it.
##########
flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java:
##########
@@ -643,6 +643,334 @@ public void testScalingUnderGcPressure() throws Exception
{
assertTrue(eventCollector.events.isEmpty());
}
+ @Test
+ public void testScalingWithBackPropEnabledSimpleGraph() throws Exception {
+ var sourceHexString = "0bfd135746ac8efb3cce668b12e16d3a";
+ var source = JobVertexID.fromHexString(sourceHexString);
+ var filterOperatorHexString = "869fb403873411306404e9f2e4438c0e";
+ var filterOperator =
JobVertexID.fromHexString(filterOperatorHexString);
+ var sinkHexString = "a6b7102b8d3e3a9564998c1ffeb5e2b7";
+ var sink = JobVertexID.fromHexString(sinkHexString);
+
+ JobTopology jobTopology =
+ new JobTopology(
+ new VertexInfo(source, Map.of(), 1, 10, false, null),
+ new VertexInfo(
+ filterOperator, Map.of(source, REBALANCE), 1,
5, false, null),
+ new VertexInfo(sink, Map.of(filterOperator, HASH), 1,
10, false, null));
+
+ var conf = context.getConfiguration();
+ conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.d);
+ conf.set(AutoScalerOptions.BOTTLENECK_PROPAGATION_ENABLED, true);
+
+ // If back propagation is enabled, then parallelism of all vertices is
5
+ var metrics =
+ new EvaluatedMetrics(
+ Map.of(
+ source,
+ evaluatedWithMaxParallelism(1, 100, 10, 10),
+ filterOperator,
+ evaluatedWithMaxParallelism(1, 100, 10, 5),
+ sink,
+ evaluatedWithMaxParallelism(1, 100, 10, 10)),
+ dummyGlobalMetrics);
+ var now = Instant.now();
+ assertThat(
+ scalingExecutor.scaleResource(
+ context,
+ metrics,
+ new HashMap<>(),
+ new ScalingTracking(),
+ now,
+ jobTopology))
+ .isTrue();
+
+ Map<String, String> parallelismOverrides =
stateStore.getParallelismOverrides(context);
+
+ // Check if backpropagation has effect on another vertices
+ assertThat(parallelismOverrides)
+ .containsAllEntriesOf(
+ Map.of(
+ "0bfd135746ac8efb3cce668b12e16d3a",
+ "5",
+ "869fb403873411306404e9f2e4438c0e",
+ "5",
+ "a6b7102b8d3e3a9564998c1ffeb5e2b7",
+ "5"));
+
+ conf.set(AutoScalerOptions.BOTTLENECK_PROPAGATION_ENABLED, false);
+ now = Instant.now();
+ assertThat(
+ scalingExecutor.scaleResource(
+ context,
+ metrics,
+ new HashMap<>(),
+ new ScalingTracking(),
+ now,
+ jobTopology))
+ .isTrue();
+
+ parallelismOverrides = stateStore.getParallelismOverrides(context);
+
+ assertThat(parallelismOverrides)
+ .containsAllEntriesOf(
+ Map.of(
+ "0bfd135746ac8efb3cce668b12e16d3a",
+ "10",
+ "869fb403873411306404e9f2e4438c0e",
+ "5",
+ "a6b7102b8d3e3a9564998c1ffeb5e2b7",
+ "10"));
+
+ conf.set(AutoScalerOptions.BOTTLENECK_PROPAGATION_ENABLED, true);
+
+ jobTopology =
+ new JobTopology(
+ new VertexInfo(source, Map.of(), 1, 11, false, null),
+ new VertexInfo(
+ filterOperator, Map.of(source, REBALANCE), 1,
6, false, null),
+ new VertexInfo(sink, Map.of(filterOperator, HASH), 1,
11, false, null));
+
+ metrics =
+ new EvaluatedMetrics(
+ Map.of(
+ source,
+ evaluatedWithMaxParallelism(1, 100, 10, 11),
+ filterOperator,
+ evaluatedWithMaxParallelism(1, 100, 10, 6),
+ sink,
+ evaluatedWithMaxParallelism(1, 100, 10, 11)),
+ dummyGlobalMetrics);
+ now = Instant.now();
+ assertThat(
+ scalingExecutor.scaleResource(
+ context,
+ metrics,
+ new HashMap<>(),
+ new ScalingTracking(),
+ now,
+ jobTopology))
+ .isTrue();
+
+ parallelismOverrides = stateStore.getParallelismOverrides(context);
+
+ // Check max parallelism of the bottleneck vertex affects on other
vertices parallelism
+ assertThat(parallelismOverrides)
+ .containsAllEntriesOf(
+ Map.of(
+ "0bfd135746ac8efb3cce668b12e16d3a",
+ "6",
+ "869fb403873411306404e9f2e4438c0e",
+ "6",
+ "a6b7102b8d3e3a9564998c1ffeb5e2b7",
+ "6"));
+ }
+
+ @Test
+ public void testScalingWithBackPropEnabledComplexGraph() throws Exception {
+ var source1HexString = "0bfd135746ac8efb3cce668b12e16d3a";
+ var source1 = JobVertexID.fromHexString(source1HexString);
+ var source2HexString = "3082f1a7c9ee7aaab5172f2c0a5fba90";
+ var source2 = JobVertexID.fromHexString(source2HexString);
+ var source3HexString = "108440e208a8fe04cca0fee9d0161721";
+ var source3 = JobVertexID.fromHexString(source3HexString);
+ var joinOperator1HexString = "869fb403873411306404e9f2e4438c0e";
+ var joinOperator1 = JobVertexID.fromHexString(joinOperator1HexString);
+ var joinOperator2HexString = "50d0addd07a68c439013f2767a6bd813";
+ var joinOperator2 = JobVertexID.fromHexString(joinOperator2HexString);
+ var sinkHexString = "a6b7102b8d3e3a9564998c1ffeb5e2b7";
+ var sink = JobVertexID.fromHexString(sinkHexString);
Review Comment:
Is it possible to simplify this test, such that it tests multiple inputs but
maybe not multiple branches?
##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java:
##########
@@ -95,13 +106,23 @@ public int computeScaleTargetParallelism(
LOG.warn(
"Target data rate is not available for {}, cannot compute
new parallelism",
vertex);
- return currentParallelism;
+ return VertexScalingResult.normalScaling(currentParallelism);
}
+ targetCapacity *= backpropagationScaleFactor;
+
LOG.debug("Target processing capacity for {} is {}", vertex,
targetCapacity);
double scaleFactor = targetCapacity / averageTrueProcessingRate;
- double minScaleFactor = 1 - conf.get(MAX_SCALE_DOWN_FACTOR);
double maxScaleFactor = 1 + conf.get(MAX_SCALE_UP_FACTOR);
+ double minScaleFactor = 1 - conf.get(MAX_SCALE_DOWN_FACTOR);
Review Comment:
Any reason to flip min/max here?
##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/VertexScalingResult.java:
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
+/** Class for storing information on how a single vertex is scaled. */
+@AllArgsConstructor
+@Getter
+public class VertexScalingResult {
+ private int parallelism;
+ private double bottleneckScaleFactor;
Review Comment:
We should hint this must be `<= 1.0`, e.g. add a comment and will be used to
limit upstream vertices parallelism.
##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/IntermediateScalingResult.java:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Class for storing intermediate scaling results. */
+public class IntermediateScalingResult {
+
+ private final Map<JobVertexID, ScalingSummary> scalingSummaries;
+ private final List<JobVertexID> bottlenecks;
+
+ private double backpropagationScaleFactor = 1.0;
+
+ public IntermediateScalingResult() {
+ scalingSummaries = new HashMap<>();
+ bottlenecks = new ArrayList<>();
+ }
+
+ void addScalingSummary(JobVertexID vertex, ScalingSummary scalingSummary) {
+ scalingSummaries.put(vertex, scalingSummary);
+ }
+
+ void addBottleneckVertex(JobVertexID bottleneck, double factor) {
+ bottlenecks.add(bottleneck);
+ backpropagationScaleFactor = Math.min(backpropagationScaleFactor,
factor);
Review Comment:
Why is the scaling factor not kept on a per-vertex level? If there are two
vertices within different branches, they will influence each other, e.g. a
propgagation factor of 0.1 will override another with 0.9. I think we need to
account for it per input, similarly to how we propagate the actual rates.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]