This is an automated email from the ASF dual-hosted git repository.

mbalassi pushed a commit to branch release-1.16.0.1-acs
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 85b44b46011c0797a8b9b0b377237709f57a0b55
Author: Maximilian Michels <m...@apache.org>
AuthorDate: Mon Oct 10 18:30:09 2022 +0200

    [backport][FLINK-29501] Add option to override job vertex parallelisms 
during job submission
    
    This allows to change the job vertex parallelism of a JobGraph during job 
submission time without
    having to modify the JobGraph upfront.
    
    The initial idea was to add a new field to the payload of the job submit 
Rest endpoint. However, it
    is probably more practical to handle the overrides in the same way as other 
PipelineOptions already
    do it, i.e. via the configuration.
    
    The implementation is deliberately lenient with respect to the presence of 
job vertices. If vertices
    have been removed or new ones have been added, only the ones found will 
have their parallelism
    overrides. The verification should be performed by the caller, not by 
Flink. In particular, we want
    to support scenarios where users modify the deployment and we might not yet 
have overrides for all
    vertices.
---
 .../flink/configuration/PipelineOptions.java       |  9 +++++
 .../flink/runtime/dispatcher/Dispatcher.java       | 17 +++++++++
 .../flink/runtime/dispatcher/DispatcherTest.java   | 42 ++++++++++++++++++++++
 3 files changed, 68 insertions(+)

diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java
index 0e04abf2b90..147dd1dc2ee 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.description.Description;
 import org.apache.flink.configuration.description.TextElement;
 
 import java.time.Duration;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -165,6 +166,14 @@ public class PipelineOptions {
                             "Register a custom, serializable user 
configuration object. The configuration can be "
                                     + " accessed in operators");
 
+    public static final ConfigOption<Map<String, Integer>> 
PARALLELISM_OVERRIDES =
+            key("pipeline.jobvertex-parallelism-overrides")
+                    .mapType(Integer.class)
+                    .defaultValue(Collections.emptyMap())
+                    .withDescription(
+                            "A parallelism override map (jobVertexId -> 
parallelism) which will be used to update"
+                                    + " the parallelism of the corresponding 
job vertices of submitted JobGraphs.");
+
     public static final ConfigOption<Integer> MAX_PARALLELISM =
             key("pipeline.max-parallelism")
                     .intType()
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index ca72be6f4d5..3b47a12aa48 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -29,6 +29,7 @@ import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.ClusterOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.blob.BlobServer;
@@ -528,6 +529,7 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId>
     }
 
     private CompletableFuture<Acknowledge> internalSubmitJob(JobGraph 
jobGraph) {
+        applyParallelismOverrides(jobGraph);
         log.info("Submitting job '{}' ({}).", jobGraph.getName(), 
jobGraph.getJobID());
         return waitForTerminatingJob(jobGraph.getJobID(), jobGraph, 
this::persistAndRunJob)
                 .handle((ignored, throwable) -> 
handleTermination(jobGraph.getJobID(), throwable))
@@ -1309,4 +1311,19 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId>
     public CompletableFuture<Void> onRemovedJobGraph(JobID jobId) {
         return CompletableFuture.runAsync(() -> terminateJob(jobId), 
getMainThreadExecutor());
     }
+
+    private void applyParallelismOverrides(JobGraph jobGraph) {
+        Map<String, Integer> overrides = 
configuration.get(PipelineOptions.PARALLELISM_OVERRIDES);
+        for (JobVertex vertex : jobGraph.getVertices()) {
+            Integer override = overrides.get(vertex.getID().toHexString());
+            if (override != null) {
+                log.info(
+                        "Changing job vertex {} parallelism from {} to {}",
+                        vertex.getID(),
+                        vertex.getParallelism(),
+                        override);
+                vertex.setParallelism(override);
+            }
+        }
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index 0f8f130d5fb..153e74bdf20 100755
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.operators.ResourceSpec;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.core.testutils.FlinkMatchers;
 import org.apache.flink.core.testutils.OneShotLatch;
@@ -46,6 +47,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
 import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
 import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmaster.JobManagerRunner;
 import org.apache.flink.runtime.jobmaster.JobManagerRunnerResult;
 import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
@@ -88,6 +90,8 @@ import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.concurrent.FutureUtils;
 
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap;
+
 import org.assertj.core.api.Assertions;
 import org.hamcrest.Matchers;
 import org.junit.After;
@@ -1031,6 +1035,44 @@ public class DispatcherTest extends 
AbstractDispatcherTest {
         InstantiationUtil.serializeObject(multipleJobsDetails);
     }
 
+    @Test
+    public void testOverridingJobVertexParallelisms() throws Exception {
+        JobVertex v1 = new JobVertex("v1");
+        v1.setParallelism(1);
+        JobVertex v2 = new JobVertex("v2");
+        v2.setParallelism(2);
+        JobVertex v3 = new JobVertex("v3");
+        v3.setParallelism(3);
+        jobGraph = new JobGraph(jobGraph.getJobID(), "job", v1, v2, v3);
+
+        configuration.set(
+                PipelineOptions.PARALLELISM_OVERRIDES,
+                ImmutableMap.of(
+                        v1.getID().toHexString(), 10,
+                        // v2 is omitted
+                        v3.getID().toHexString(), 42,
+                        // unknown vertex added
+                        new JobVertexID().toHexString(), 23));
+
+        dispatcher =
+                createAndStartDispatcher(
+                        heartbeatServices,
+                        haServices,
+                        new ExpectedJobIdJobManagerRunnerFactory(
+                                jobId, createdJobManagerRunnerLatch));
+        DispatcherGateway dispatcherGateway = 
dispatcher.getSelfGateway(DispatcherGateway.class);
+
+        
Assert.assertEquals(jobGraph.findVertexByID(v1.getID()).getParallelism(), 1);
+        
Assert.assertEquals(jobGraph.findVertexByID(v2.getID()).getParallelism(), 2);
+        
Assert.assertEquals(jobGraph.findVertexByID(v3.getID()).getParallelism(), 3);
+
+        dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
+
+        
Assert.assertEquals(jobGraph.findVertexByID(v1.getID()).getParallelism(), 10);
+        
Assert.assertEquals(jobGraph.findVertexByID(v2.getID()).getParallelism(), 2);
+        
Assert.assertEquals(jobGraph.findVertexByID(v3.getID()).getParallelism(), 42);
+    }
+
     private JobManagerRunner runningJobManagerRunnerWithJobStatus(
             final JobStatus currentJobStatus) {
         Preconditions.checkArgument(!currentJobStatus.isTerminalState());

Reply via email to