This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new acc04da2 [FLINK-36527][autoscaler] Introduce a parameter to support 
autoscaler adopt a more radical strategy when source vertex or upstream shuffle 
is keyBy (#904)
acc04da2 is described below

commit acc04da2b863df87f407aca9e5018c45741a563c
Author: big face cat <[email protected]>
AuthorDate: Thu Nov 21 00:48:43 2024 +0800

    [FLINK-36527][autoscaler] Introduce a parameter to support autoscaler adopt 
a more radical strategy when source vertex or upstream shuffle is keyBy (#904)
---
 .../generated/auto_scaler_configuration.html       |  6 ++
 .../apache/flink/autoscaler/JobVertexScaler.java   | 55 ++++++++++---
 .../flink/autoscaler/config/AutoScalerOptions.java | 13 +++
 .../flink/autoscaler/JobVertexScalerTest.java      | 96 ++++++++++++++++++++--
 4 files changed, 150 insertions(+), 20 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html 
b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
index 49ff71ef..09baf217 100644
--- a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
+++ b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
@@ -188,6 +188,12 @@
             <td>Duration</td>
             <td>Time interval to resend the identical event</td>
         </tr>
+        <tr>
+            
<td><h5>job.autoscaler.scaling.key-group.partitions.adjust.mode</h5></td>
+            <td style="word-wrap: break-word;">EVENLY_SPREAD</td>
+            <td><p>Enum</p></td>
+            <td>How to adjust the parallelism of Source vertex or upstream 
shuffle is keyBy<br /><br />Possible values:<ul><li>"EVENLY_SPREAD": This mode 
ensures that the parallelism adjustment attempts to evenly distribute data 
across subtasks. It is particularly effective for source vertices that are 
aware of partition counts or vertices after 'keyBy' operation. The goal is to 
have the number of key groups or partitions be divisible by the set 
parallelism, ensuring even data distributi [...]
+        </tr>
         <tr>
             <td><h5>job.autoscaler.stabilization.interval</h5></td>
             <td style="word-wrap: break-word;">5 min</td>
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
index 1d32b1aa..87099bc0 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
@@ -25,6 +25,8 @@ import org.apache.flink.autoscaler.metrics.ScalingMetric;
 import org.apache.flink.autoscaler.topology.ShipStrategy;
 import org.apache.flink.autoscaler.utils.AutoScalerUtils;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DescribedEnum;
+import org.apache.flink.configuration.description.InlineElement;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.util.Preconditions;
 
@@ -41,10 +43,12 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.SortedMap;
 
+import static 
org.apache.flink.autoscaler.JobVertexScaler.KeyGroupOrPartitionsAdjustMode.MAXIMIZE_UTILISATION;
 import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
 import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_UP_FACTOR;
 import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.SCALE_DOWN_INTERVAL;
 import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_EVENT_INTERVAL;
+import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE;
 import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
 import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM;
 import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM;
@@ -54,6 +58,7 @@ import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.NUM_SOURCE_PARTI
 import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM;
 import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
 import static org.apache.flink.autoscaler.topology.ShipStrategy.HASH;
+import static org.apache.flink.configuration.description.TextElement.text;
 import static org.apache.flink.util.Preconditions.checkArgument;
 
 /** Component responsible for computing vertex parallelism based on the 
scaling metrics. */
@@ -411,26 +416,29 @@ public class JobVertexScaler<KEY, Context extends 
JobAutoScalerContext<KEY>> {
 
         var numKeyGroupsOrPartitions =
                 numSourcePartitions <= 0 ? maxParallelism : 
numSourcePartitions;
-        var upperBoundForAlignment =
-                Math.min(
-                        // Optimize the case where newParallelism <= 
maxParallelism / 2
-                        newParallelism > numKeyGroupsOrPartitions / 2
-                                ? numKeyGroupsOrPartitions
-                                : numKeyGroupsOrPartitions / 2,
-                        upperBound);
+        var upperBoundForAlignment = Math.min(numKeyGroupsOrPartitions, 
upperBound);
+
+        KeyGroupOrPartitionsAdjustMode mode =
+                
context.getConfiguration().get(SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE);
 
         // When the shuffle type of vertex inputs contains keyBy or vertex is 
a source,
         // we try to adjust the parallelism such that it divides
         // the numKeyGroupsOrPartitions without a remainder => data is evenly 
spread across subtasks
         for (int p = newParallelism; p <= upperBoundForAlignment; p++) {
-            if (numKeyGroupsOrPartitions % p == 0) {
+            if (numKeyGroupsOrPartitions % p == 0
+                    ||
+                    // When Mode is MAXIMIZE_UTILISATION , Try to find the 
smallest parallelism
+                    // that can satisfy the current consumption rate.
+                    (mode == MAXIMIZE_UTILISATION
+                            && numKeyGroupsOrPartitions / p
+                                    < numKeyGroupsOrPartitions / 
newParallelism)) {
                 return p;
             }
         }
 
-        // When adjust the parallelism after rounding up cannot be evenly 
divided by
-        // numKeyGroupsOrPartitions, Try to find the smallest parallelism that 
can satisfy the
-        // current consumption rate.
+        // When adjusting the parallelism after rounding up cannot
+        // find the parallelism to meet requirements.
+        // Try to find the smallest parallelism that can satisfy the current 
consumption rate.
         int p = newParallelism;
         for (; p > 0; p--) {
             if (numKeyGroupsOrPartitions / p > numKeyGroupsOrPartitions / 
newParallelism) {
@@ -465,4 +473,29 @@ public class JobVertexScaler<KEY, Context extends 
JobAutoScalerContext<KEY>> {
     protected void setClock(Clock clock) {
         this.clock = Preconditions.checkNotNull(clock);
     }
+
+    /** The mode of the key group or parallelism adjustment. */
+    public enum KeyGroupOrPartitionsAdjustMode implements DescribedEnum {
+        EVENLY_SPREAD(
+                "This mode ensures that the parallelism adjustment attempts to 
evenly distribute data across subtasks"
+                        + ". It is particularly effective for source vertices 
that are aware of partition counts or vertices after "
+                        + "'keyBy' operation. The goal is to have the number 
of key groups or partitions be divisible by the set parallelism, ensuring even 
data distribution and reducing data skew."),
+
+        MAXIMIZE_UTILISATION(
+                "This model is to maximize resource utilization. In this mode, 
an attempt is made to set"
+                        + " the parallelism that meets the current consumption 
rate requirements. It is not enforced "
+                        + "that the number of key groups or partitions is 
divisible by the parallelism."),
+        ;
+
+        private final InlineElement description;
+
+        KeyGroupOrPartitionsAdjustMode(String description) {
+            this.description = text(description);
+        }
+
+        @Override
+        public InlineElement getDescription() {
+            return description;
+        }
+    }
 }
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
index e1ea6a86..a5ffb0f9 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.autoscaler.config;
 
+import org.apache.flink.autoscaler.JobVertexScaler;
 import org.apache.flink.autoscaler.metrics.MetricAggregator;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
@@ -351,4 +352,16 @@ public class AutoScalerOptions {
                     .withFallbackKeys(oldOperatorConfigKey("quota.cpu"))
                     .withDescription(
                             "Quota of the CPU count. When scaling would go 
beyond this number the the scaling is not going to happen.");
+
+    public static final 
ConfigOption<JobVertexScaler.KeyGroupOrPartitionsAdjustMode>
+            SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE =
+                    
autoScalerConfig("scaling.key-group.partitions.adjust.mode")
+                            
.enumType(JobVertexScaler.KeyGroupOrPartitionsAdjustMode.class)
+                            .defaultValue(
+                                    
JobVertexScaler.KeyGroupOrPartitionsAdjustMode.EVENLY_SPREAD)
+                            .withFallbackKeys(
+                                    oldOperatorConfigKey(
+                                            
"scaling.key-group.partitions.adjust.mode"))
+                            .withDescription(
+                                    "How to adjust the parallelism of Source 
vertex or upstream shuffle is keyBy");
 }
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
index ae832073..704ddd58 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
@@ -323,8 +323,8 @@ public class JobVertexScalerTest {
     @MethodSource("adjustmentInputsProvider")
     public void testParallelismComputationWithAdjustment(
             Collection<ShipStrategy> inputShipStrategies) {
-        final int minParallelism = 1;
-        final int maxParallelism = Integer.MAX_VALUE;
+        final int parallelismLowerLimit = 1;
+        final int parallelismUpperLimit = Integer.MAX_VALUE;
         final var vertex = new JobVertexID();
 
         assertEquals(
@@ -336,8 +336,8 @@ public class JobVertexScalerTest {
                         0,
                         36,
                         0.8,
-                        minParallelism,
-                        maxParallelism,
+                        parallelismLowerLimit,
+                        parallelismUpperLimit,
                         eventCollector,
                         context));
         assertEquals(
@@ -349,8 +349,8 @@ public class JobVertexScalerTest {
                         0,
                         128,
                         1.5,
-                        minParallelism,
-                        maxParallelism,
+                        parallelismLowerLimit,
+                        parallelismUpperLimit,
                         eventCollector,
                         context));
         assertEquals(
@@ -362,8 +362,8 @@ public class JobVertexScalerTest {
                         0,
                         720,
                         1.3,
-                        minParallelism,
-                        maxParallelism,
+                        parallelismLowerLimit,
+                        parallelismUpperLimit,
                         eventCollector,
                         context));
         assertEquals(
@@ -375,7 +375,44 @@ public class JobVertexScalerTest {
                         0,
                         720,
                         Integer.MAX_VALUE,
-                        minParallelism,
+                        parallelismLowerLimit,
+                        parallelismUpperLimit,
+                        eventCollector,
+                        context));
+
+        int maxParallelism = 128;
+        double scaleFactor = 2.5;
+        int currentParallelism = 10;
+        int expectedEvenly = 32;
+        int expectedMaximumUtilization = 26;
+        assertEquals(
+                expectedEvenly,
+                JobVertexScaler.scale(
+                        vertex,
+                        currentParallelism,
+                        inputShipStrategies,
+                        0,
+                        maxParallelism,
+                        scaleFactor,
+                        parallelismLowerLimit,
+                        parallelismUpperLimit,
+                        eventCollector,
+                        context));
+
+        Configuration conf = context.getConfiguration();
+        conf.set(
+                AutoScalerOptions.SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE,
+                
JobVertexScaler.KeyGroupOrPartitionsAdjustMode.MAXIMIZE_UTILISATION);
+        assertEquals(
+                expectedMaximumUtilization,
+                JobVertexScaler.scale(
+                        vertex,
+                        currentParallelism,
+                        inputShipStrategies,
+                        0,
+                        maxParallelism,
+                        scaleFactor,
+                        parallelismLowerLimit,
                         maxParallelism,
                         eventCollector,
                         context));
@@ -1004,6 +1041,47 @@ public class JobVertexScalerTest {
                         parallelismUpperLimit,
                         eventCollector,
                         context));
+
+        int partition = 199;
+        double scaleFactor = 4;
+        int currentParallelism = 24;
+        int expectedEvenly = 199;
+        // At MAXIMIZE_UTILISATION, 99 subtasks consume two partitions,
+        // one subtask consumes one partition.
+        int expectedMaximumUtilization = 100;
+
+        assertEquals(
+                expectedEvenly,
+                JobVertexScaler.scale(
+                        vertex,
+                        currentParallelism,
+                        List.of(),
+                        partition,
+                        parallelismUpperLimit,
+                        scaleFactor,
+                        parallelismLowerLimit,
+                        parallelismUpperLimit,
+                        eventCollector,
+                        context));
+
+        Configuration conf = context.getConfiguration();
+        conf.set(
+                AutoScalerOptions.SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE,
+                
JobVertexScaler.KeyGroupOrPartitionsAdjustMode.MAXIMIZE_UTILISATION);
+
+        assertEquals(
+                expectedMaximumUtilization,
+                JobVertexScaler.scale(
+                        vertex,
+                        currentParallelism,
+                        List.of(),
+                        partition,
+                        parallelismUpperLimit,
+                        scaleFactor,
+                        parallelismLowerLimit,
+                        parallelismUpperLimit,
+                        eventCollector,
+                        context));
     }
 
     @Test

Reply via email to