SamBarker commented on code in PR #904:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/904#discussion_r1804328631


##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java:
##########
@@ -416,21 +416,31 @@ protected static <KEY, Context extends 
JobAutoScalerContext<KEY>> int scale(
                         // Optimize the case where newParallelism <= 
maxParallelism / 2
                         newParallelism > numKeyGroupsOrPartitions / 2
                                 ? numKeyGroupsOrPartitions
-                                : numKeyGroupsOrPartitions / 2,
+                                : numKeyGroupsOrPartitions / 2 + 
numKeyGroupsOrPartitions % 2,
                         upperBound);
 
+        boolean scalingRadical =
+                
context.getConfiguration().get(AutoScalerOptions.SCALING_RADICAL_ENABLED);
+
         // When the shuffle type of vertex inputs contains keyBy or vertex is 
a source,
         // we try to adjust the parallelism such that it divides
         // the numKeyGroupsOrPartitions without a remainder => data is evenly 
spread across subtasks
         for (int p = newParallelism; p <= upperBoundForAlignment; p++) {
-            if (numKeyGroupsOrPartitions % p == 0) {
+            if (numKeyGroupsOrPartitions % p == 0
+                    ||
+                    // When scaling radical is enabled, Try to find the 
smallest parallelism that
+                    // can satisfy the
+                    // current consumption rate.
+                    (scalingRadical
+                            && numKeyGroupsOrPartitions / p
+                                    < numKeyGroupsOrPartitions / 
newParallelism)) {

Review Comment:
   I think extracting this as a method `canMaximiseUtilisation` the intent is 
to make the intent of the condition easier to understand when working through 
the code. 



##########
docs/layouts/shortcodes/generated/auto_scaler_configuration.html:
##########
@@ -188,6 +188,12 @@
             <td>Duration</td>
             <td>Time interval to resend the identical event</td>
         </tr>
+        <tr>
+            <td><h5>job.autoscaler.scaling.radical.enabled</h5></td>

Review Comment:
   Coming at this cold, its not at all clear to me what radical means. While 
the description goes some way towards clarifying the intent it doesn't feel 
like a great term, additionally following through the JIRA links `radical` 
feels like a very off term for a default (assuming I'm following properly). I 
wonder if `job.autoscaler.scaling.maximizeUtilisation.enabled` would make 
things more explicit?



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java:
##########
@@ -416,21 +416,31 @@ protected static <KEY, Context extends 
JobAutoScalerContext<KEY>> int scale(
                         // Optimize the case where newParallelism <= 
maxParallelism / 2
                         newParallelism > numKeyGroupsOrPartitions / 2
                                 ? numKeyGroupsOrPartitions
-                                : numKeyGroupsOrPartitions / 2,
+                                : numKeyGroupsOrPartitions / 2 + 
numKeyGroupsOrPartitions % 2,
                         upperBound);
 
+        boolean scalingRadical =
+                
context.getConfiguration().get(AutoScalerOptions.SCALING_RADICAL_ENABLED);
+
         // When the shuffle type of vertex inputs contains keyBy or vertex is 
a source,
         // we try to adjust the parallelism such that it divides
         // the numKeyGroupsOrPartitions without a remainder => data is evenly 
spread across subtasks
         for (int p = newParallelism; p <= upperBoundForAlignment; p++) {
-            if (numKeyGroupsOrPartitions % p == 0) {
+            if (numKeyGroupsOrPartitions % p == 0
+                    ||
+                    // When scaling radical is enabled, Try to find the 
smallest parallelism that
+                    // can satisfy the
+                    // current consumption rate.
+                    (scalingRadical
+                            && numKeyGroupsOrPartitions / p
+                                    < numKeyGroupsOrPartitions / 
newParallelism)) {
                 return p;
             }
         }
 
-        // When adjust the parallelism after rounding up cannot be evenly 
divided by
-        // numKeyGroupsOrPartitions, Try to find the smallest parallelism that 
can satisfy the
-        // current consumption rate.
+        // When adjust the parallelism after rounding up cannot be
+        // find the right degree of parallelism to meet requirements,
+        // Try to find the smallest parallelism that can satisfy the current 
consumption rate.

Review Comment:
   nits
   ```suggestion
           // When adjusting the parallelism after rounding up cannot
           // find the right degree of parallelism to meet requirements.
           // Try to find the smallest parallelism that can satisfy the current 
consumption rate.
   ```



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##########
@@ -351,4 +351,13 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
                     .withFallbackKeys(oldOperatorConfigKey("quota.cpu"))
                     .withDescription(
                             "Quota of the CPU count. When scaling would go 
beyond this number the the scaling is not going to happen.");
+
+    public static final ConfigOption<Boolean> SCALING_RADICAL_ENABLED =
+            autoScalerConfig("scaling.radical.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    
.withFallbackKeys(oldOperatorConfigKey("scaling.radical.enabled"))
+                    .withDescription(
+                            "If this option is enabled, The determination of 
parallelism will be more radical, which"
+                                    + " will maximize resource utilization, 
but may also cause data skew in some vertex.");

Review Comment:
   I think it might be helpful to give consumers/users some more context as to 
how/why it would potentially cause skew.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to