This is an automated email from the ASF dual-hosted git repository.
mxm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new acc04da2 [FLINK-36527][autoscaler] Introduce a parameter to support
autoscaler adopt a more radical strategy when source vertex or upstream shuffle
is keyBy (#904)
acc04da2 is described below
commit acc04da2b863df87f407aca9e5018c45741a563c
Author: big face cat <[email protected]>
AuthorDate: Thu Nov 21 00:48:43 2024 +0800
[FLINK-36527][autoscaler] Introduce a parameter to support autoscaler adopt
a more radical strategy when source vertex or upstream shuffle is keyBy (#904)
---
.../generated/auto_scaler_configuration.html | 6 ++
.../apache/flink/autoscaler/JobVertexScaler.java | 55 ++++++++++---
.../flink/autoscaler/config/AutoScalerOptions.java | 13 +++
.../flink/autoscaler/JobVertexScalerTest.java | 96 ++++++++++++++++++++--
4 files changed, 150 insertions(+), 20 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
index 49ff71ef..09baf217 100644
--- a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
+++ b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
@@ -188,6 +188,12 @@
<td>Duration</td>
<td>Time interval to resend the identical event</td>
</tr>
+ <tr>
+
<td><h5>job.autoscaler.scaling.key-group.partitions.adjust.mode</h5></td>
+ <td style="word-wrap: break-word;">EVENLY_SPREAD</td>
+ <td><p>Enum</p></td>
+ <td>How to adjust the parallelism of Source vertex or upstream
shuffle is keyBy<br /><br />Possible values:<ul><li>"EVENLY_SPREAD": This mode
ensures that the parallelism adjustment attempts to evenly distribute data
across subtasks. It is particularly effective for source vertices that are
aware of partition counts or vertices after 'keyBy' operation. The goal is to
have the number of key groups or partitions be divisible by the set
parallelism, ensuring even data distributi [...]
+ </tr>
<tr>
<td><h5>job.autoscaler.stabilization.interval</h5></td>
<td style="word-wrap: break-word;">5 min</td>
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
index 1d32b1aa..87099bc0 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
@@ -25,6 +25,8 @@ import org.apache.flink.autoscaler.metrics.ScalingMetric;
import org.apache.flink.autoscaler.topology.ShipStrategy;
import org.apache.flink.autoscaler.utils.AutoScalerUtils;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DescribedEnum;
+import org.apache.flink.configuration.description.InlineElement;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.util.Preconditions;
@@ -41,10 +43,12 @@ import java.util.Map;
import java.util.Objects;
import java.util.SortedMap;
+import static
org.apache.flink.autoscaler.JobVertexScaler.KeyGroupOrPartitionsAdjustMode.MAXIMIZE_UTILISATION;
import static
org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
import static
org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_UP_FACTOR;
import static
org.apache.flink.autoscaler.config.AutoScalerOptions.SCALE_DOWN_INTERVAL;
import static
org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_EVENT_INTERVAL;
+import static
org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE;
import static
org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
import static
org.apache.flink.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM;
import static
org.apache.flink.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM;
@@ -54,6 +58,7 @@ import static
org.apache.flink.autoscaler.metrics.ScalingMetric.NUM_SOURCE_PARTI
import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM;
import static
org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
import static org.apache.flink.autoscaler.topology.ShipStrategy.HASH;
+import static org.apache.flink.configuration.description.TextElement.text;
import static org.apache.flink.util.Preconditions.checkArgument;
/** Component responsible for computing vertex parallelism based on the
scaling metrics. */
@@ -411,26 +416,29 @@ public class JobVertexScaler<KEY, Context extends
JobAutoScalerContext<KEY>> {
var numKeyGroupsOrPartitions =
numSourcePartitions <= 0 ? maxParallelism :
numSourcePartitions;
- var upperBoundForAlignment =
- Math.min(
- // Optimize the case where newParallelism <=
maxParallelism / 2
- newParallelism > numKeyGroupsOrPartitions / 2
- ? numKeyGroupsOrPartitions
- : numKeyGroupsOrPartitions / 2,
- upperBound);
+ var upperBoundForAlignment = Math.min(numKeyGroupsOrPartitions,
upperBound);
+
+ KeyGroupOrPartitionsAdjustMode mode =
+
context.getConfiguration().get(SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE);
// When the shuffle type of vertex inputs contains keyBy or vertex is
a source,
// we try to adjust the parallelism such that it divides
// the numKeyGroupsOrPartitions without a remainder => data is evenly
spread across subtasks
for (int p = newParallelism; p <= upperBoundForAlignment; p++) {
- if (numKeyGroupsOrPartitions % p == 0) {
+ if (numKeyGroupsOrPartitions % p == 0
+ ||
+ // When Mode is MAXIMIZE_UTILISATION , Try to find the
smallest parallelism
+ // that can satisfy the current consumption rate.
+ (mode == MAXIMIZE_UTILISATION
+ && numKeyGroupsOrPartitions / p
+ < numKeyGroupsOrPartitions /
newParallelism)) {
return p;
}
}
- // When adjust the parallelism after rounding up cannot be evenly
divided by
- // numKeyGroupsOrPartitions, Try to find the smallest parallelism that
can satisfy the
- // current consumption rate.
+ // When adjusting the parallelism after rounding up cannot
+ // find the parallelism to meet requirements.
+ // Try to find the smallest parallelism that can satisfy the current
consumption rate.
int p = newParallelism;
for (; p > 0; p--) {
if (numKeyGroupsOrPartitions / p > numKeyGroupsOrPartitions /
newParallelism) {
@@ -465,4 +473,29 @@ public class JobVertexScaler<KEY, Context extends
JobAutoScalerContext<KEY>> {
protected void setClock(Clock clock) {
this.clock = Preconditions.checkNotNull(clock);
}
+
+ /** The mode of the key group or parallelism adjustment. */
+ public enum KeyGroupOrPartitionsAdjustMode implements DescribedEnum {
+ EVENLY_SPREAD(
+ "This mode ensures that the parallelism adjustment attempts to
evenly distribute data across subtasks"
+ + ". It is particularly effective for source vertices
that are aware of partition counts or vertices after "
+ + "'keyBy' operation. The goal is to have the number
of key groups or partitions be divisible by the set parallelism, ensuring even
data distribution and reducing data skew."),
+
+ MAXIMIZE_UTILISATION(
+ "This model is to maximize resource utilization. In this mode,
an attempt is made to set"
+ + " the parallelism that meets the current consumption
rate requirements. It is not enforced "
+ + "that the number of key groups or partitions is
divisible by the parallelism."),
+ ;
+
+ private final InlineElement description;
+
+ KeyGroupOrPartitionsAdjustMode(String description) {
+ this.description = text(description);
+ }
+
+ @Override
+ public InlineElement getDescription() {
+ return description;
+ }
+ }
}
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
index e1ea6a86..a5ffb0f9 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
@@ -17,6 +17,7 @@
package org.apache.flink.autoscaler.config;
+import org.apache.flink.autoscaler.JobVertexScaler;
import org.apache.flink.autoscaler.metrics.MetricAggregator;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
@@ -351,4 +352,16 @@ public class AutoScalerOptions {
.withFallbackKeys(oldOperatorConfigKey("quota.cpu"))
.withDescription(
"Quota of the CPU count. When scaling would go
beyond this number the the scaling is not going to happen.");
+
+ public static final
ConfigOption<JobVertexScaler.KeyGroupOrPartitionsAdjustMode>
+ SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE =
+
autoScalerConfig("scaling.key-group.partitions.adjust.mode")
+
.enumType(JobVertexScaler.KeyGroupOrPartitionsAdjustMode.class)
+ .defaultValue(
+
JobVertexScaler.KeyGroupOrPartitionsAdjustMode.EVENLY_SPREAD)
+ .withFallbackKeys(
+ oldOperatorConfigKey(
+
"scaling.key-group.partitions.adjust.mode"))
+ .withDescription(
+ "How to adjust the parallelism of Source
vertex or upstream shuffle is keyBy");
}
diff --git
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
index ae832073..704ddd58 100644
---
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
+++
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
@@ -323,8 +323,8 @@ public class JobVertexScalerTest {
@MethodSource("adjustmentInputsProvider")
public void testParallelismComputationWithAdjustment(
Collection<ShipStrategy> inputShipStrategies) {
- final int minParallelism = 1;
- final int maxParallelism = Integer.MAX_VALUE;
+ final int parallelismLowerLimit = 1;
+ final int parallelismUpperLimit = Integer.MAX_VALUE;
final var vertex = new JobVertexID();
assertEquals(
@@ -336,8 +336,8 @@ public class JobVertexScalerTest {
0,
36,
0.8,
- minParallelism,
- maxParallelism,
+ parallelismLowerLimit,
+ parallelismUpperLimit,
eventCollector,
context));
assertEquals(
@@ -349,8 +349,8 @@ public class JobVertexScalerTest {
0,
128,
1.5,
- minParallelism,
- maxParallelism,
+ parallelismLowerLimit,
+ parallelismUpperLimit,
eventCollector,
context));
assertEquals(
@@ -362,8 +362,8 @@ public class JobVertexScalerTest {
0,
720,
1.3,
- minParallelism,
- maxParallelism,
+ parallelismLowerLimit,
+ parallelismUpperLimit,
eventCollector,
context));
assertEquals(
@@ -375,7 +375,44 @@ public class JobVertexScalerTest {
0,
720,
Integer.MAX_VALUE,
- minParallelism,
+ parallelismLowerLimit,
+ parallelismUpperLimit,
+ eventCollector,
+ context));
+
+ int maxParallelism = 128;
+ double scaleFactor = 2.5;
+ int currentParallelism = 10;
+ int expectedEvenly = 32;
+ int expectedMaximumUtilization = 26;
+ assertEquals(
+ expectedEvenly,
+ JobVertexScaler.scale(
+ vertex,
+ currentParallelism,
+ inputShipStrategies,
+ 0,
+ maxParallelism,
+ scaleFactor,
+ parallelismLowerLimit,
+ parallelismUpperLimit,
+ eventCollector,
+ context));
+
+ Configuration conf = context.getConfiguration();
+ conf.set(
+ AutoScalerOptions.SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE,
+
JobVertexScaler.KeyGroupOrPartitionsAdjustMode.MAXIMIZE_UTILISATION);
+ assertEquals(
+ expectedMaximumUtilization,
+ JobVertexScaler.scale(
+ vertex,
+ currentParallelism,
+ inputShipStrategies,
+ 0,
+ maxParallelism,
+ scaleFactor,
+ parallelismLowerLimit,
maxParallelism,
eventCollector,
context));
@@ -1004,6 +1041,47 @@ public class JobVertexScalerTest {
parallelismUpperLimit,
eventCollector,
context));
+
+ int partition = 199;
+ double scaleFactor = 4;
+ int currentParallelism = 24;
+ int expectedEvenly = 199;
+ // At MAXIMIZE_UTILISATION, 99 subtasks consume two partitions,
+ // one subtask consumes one partition.
+ int expectedMaximumUtilization = 100;
+
+ assertEquals(
+ expectedEvenly,
+ JobVertexScaler.scale(
+ vertex,
+ currentParallelism,
+ List.of(),
+ partition,
+ parallelismUpperLimit,
+ scaleFactor,
+ parallelismLowerLimit,
+ parallelismUpperLimit,
+ eventCollector,
+ context));
+
+ Configuration conf = context.getConfiguration();
+ conf.set(
+ AutoScalerOptions.SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE,
+
JobVertexScaler.KeyGroupOrPartitionsAdjustMode.MAXIMIZE_UTILISATION);
+
+ assertEquals(
+ expectedMaximumUtilization,
+ JobVertexScaler.scale(
+ vertex,
+ currentParallelism,
+ List.of(),
+ partition,
+ parallelismUpperLimit,
+ scaleFactor,
+ parallelismLowerLimit,
+ parallelismUpperLimit,
+ eventCollector,
+ context));
}
@Test