This is an automated email from the ASF dual-hosted git repository.
mbalassi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new f01f8ec0 [FLINK-33812] Consider override parallelism when calculating
the parallelism of a job
f01f8ec0 is described below
commit f01f8ec0dcde79b21c164143128f0c30ef1aa091
Author: Jerry Wang <[email protected]>
AuthorDate: Fri Dec 22 17:33:26 2023 +0800
[FLINK-33812] Consider override parallelism when calculating the
parallelism of a job
---
.../kubernetes/operator/config/FlinkConfigBuilder.java | 16 ++++++++++++++++
.../operator/config/FlinkConfigBuilderTest.java | 14 ++++++++++++++
2 files changed, 30 insertions(+)
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
index f99c08fc..39d0fed6 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
@@ -68,6 +68,7 @@ import java.nio.file.Files;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Optional;
import static
org.apache.flink.configuration.DeploymentOptions.SHUTDOWN_ON_APPLICATION_FINISH;
import static
org.apache.flink.configuration.DeploymentOptions.SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR;
@@ -375,9 +376,24 @@ public class FlinkConfigBuilder {
* effectiveConfig.get(TaskManagerOptions.NUM_TASK_SLOTS);
}
+ Optional<Integer> maxOverrideParallelism =
getMaxParallelismFromOverrideConfig();
+ if (maxOverrideParallelism.isPresent() && maxOverrideParallelism.get()
> 0) {
+ return maxOverrideParallelism.get();
+ }
+
return spec.getJob().getParallelism();
}
+ private Optional<Integer> getMaxParallelismFromOverrideConfig() {
+ return effectiveConfig
+ .getOptional(PipelineOptions.PARALLELISM_OVERRIDES)
+ .flatMap(
+ overrides ->
+ overrides.values().stream()
+ .map(Integer::valueOf)
+ .max(Integer::compareTo));
+ }
+
protected Configuration build() {
// Set cluster config
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
index 00b418eb..4c905c3f 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
@@ -854,6 +854,20 @@ public class FlinkConfigBuilderTest {
5,
configuration.get(
StandaloneKubernetesConfigOptionsInternal.KUBERNETES_TASKMANAGER_REPLICAS));
+
+ dep.getSpec()
+ .getFlinkConfiguration()
+ .put(PipelineOptions.PARALLELISM_OVERRIDES.key(),
"vertex1:10,vertex2:20");
+ configuration =
+ new FlinkConfigBuilder(dep, new Configuration())
+ .applyFlinkConfiguration()
+ .applyTaskManagerSpec()
+ .applyJobOrSessionSpec()
+ .build();
+ assertEquals(
+ 10,
+ configuration.get(
+
StandaloneKubernetesConfigOptionsInternal.KUBERNETES_TASKMANAGER_REPLICAS));
}
@Test