This is an automated email from the ASF dual-hosted git repository.

mbalassi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new f01f8ec0 [FLINK-33812] Consider override parallelism when calculating 
the parallelism of a job
f01f8ec0 is described below

commit f01f8ec0dcde79b21c164143128f0c30ef1aa091
Author: Jerry Wang <[email protected]>
AuthorDate: Fri Dec 22 17:33:26 2023 +0800

    [FLINK-33812] Consider override parallelism when calculating the 
parallelism of a job
---
 .../kubernetes/operator/config/FlinkConfigBuilder.java   | 16 ++++++++++++++++
 .../operator/config/FlinkConfigBuilderTest.java          | 14 ++++++++++++++
 2 files changed, 30 insertions(+)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
index f99c08fc..39d0fed6 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
@@ -68,6 +68,7 @@ import java.nio.file.Files;
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Optional;
 
 import static 
org.apache.flink.configuration.DeploymentOptions.SHUTDOWN_ON_APPLICATION_FINISH;
 import static 
org.apache.flink.configuration.DeploymentOptions.SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR;
@@ -375,9 +376,24 @@ public class FlinkConfigBuilder {
                     * effectiveConfig.get(TaskManagerOptions.NUM_TASK_SLOTS);
         }
 
+        Optional<Integer> maxOverrideParallelism = 
getMaxParallelismFromOverrideConfig();
+        if (maxOverrideParallelism.isPresent() && maxOverrideParallelism.get() 
> 0) {
+            return maxOverrideParallelism.get();
+        }
+
         return spec.getJob().getParallelism();
     }
 
+    private Optional<Integer> getMaxParallelismFromOverrideConfig() {
+        return effectiveConfig
+                .getOptional(PipelineOptions.PARALLELISM_OVERRIDES)
+                .flatMap(
+                        overrides ->
+                                overrides.values().stream()
+                                        .map(Integer::valueOf)
+                                        .max(Integer::compareTo));
+    }
+
     protected Configuration build() {
 
         // Set cluster config
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
index 00b418eb..4c905c3f 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
@@ -854,6 +854,20 @@ public class FlinkConfigBuilderTest {
                 5,
                 configuration.get(
                         
StandaloneKubernetesConfigOptionsInternal.KUBERNETES_TASKMANAGER_REPLICAS));
+
+        dep.getSpec()
+                .getFlinkConfiguration()
+                .put(PipelineOptions.PARALLELISM_OVERRIDES.key(), 
"vertex1:10,vertex2:20");
+        configuration =
+                new FlinkConfigBuilder(dep, new Configuration())
+                        .applyFlinkConfiguration()
+                        .applyTaskManagerSpec()
+                        .applyJobOrSessionSpec()
+                        .build();
+        assertEquals(
+                10,
+                configuration.get(
+                        
StandaloneKubernetesConfigOptionsInternal.KUBERNETES_TASKMANAGER_REPLICAS));
     }
 
     @Test

Reply via email to