This is an automated email from the ASF dual-hosted git repository. mbalassi pushed a commit to branch release-1.16.0.1-acs in repository https://gitbox.apache.org/repos/asf/flink.git
commit 85b44b46011c0797a8b9b0b377237709f57a0b55 Author: Maximilian Michels <m...@apache.org> AuthorDate: Mon Oct 10 18:30:09 2022 +0200 [backport][FLINK-29501] Add option to override job vertex parallelisms during job submission This allows to change the job vertex parallelism of a JobGraph during job submission time without having to modify the JobGraph upfront. The initial idea was to add a new field to the payload of the job submit Rest endpoint. However, it is probably more practical to handle the overrides in the same way as other PipelineOptions already do it, i.e. via the configuration. The implementation is deliberately lenient with respect to the presence of job vertices. If vertices have been removed or new ones have been added, only the ones found will have their parallelism overrides. The verification should be performed by the caller, not by Flink. In particular, we want to support scenarios where users modify the deployment and we might not yet have overrides for all vertices. --- .../flink/configuration/PipelineOptions.java | 9 +++++ .../flink/runtime/dispatcher/Dispatcher.java | 17 +++++++++ .../flink/runtime/dispatcher/DispatcherTest.java | 42 ++++++++++++++++++++++ 3 files changed, 68 insertions(+) diff --git a/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java index 0e04abf2b90..147dd1dc2ee 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java @@ -24,6 +24,7 @@ import org.apache.flink.configuration.description.Description; import org.apache.flink.configuration.description.TextElement; import java.time.Duration; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -165,6 +166,14 @@ public class PipelineOptions { "Register a custom, serializable user configuration object. The configuration can be " + " accessed in operators"); + public static final ConfigOption<Map<String, Integer>> PARALLELISM_OVERRIDES = + key("pipeline.jobvertex-parallelism-overrides") + .mapType(Integer.class) + .defaultValue(Collections.emptyMap()) + .withDescription( + "A parallelism override map (jobVertexId -> parallelism) which will be used to update" + + " the parallelism of the corresponding job vertices of submitted JobGraphs."); + public static final ConfigOption<Integer> MAX_PARALLELISM = key("pipeline.max-parallelism") .intType() diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index ca72be6f4d5..3b47a12aa48 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -29,6 +29,7 @@ import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.ClusterOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.blob.BlobServer; @@ -528,6 +529,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> } private CompletableFuture<Acknowledge> internalSubmitJob(JobGraph jobGraph) { + applyParallelismOverrides(jobGraph); log.info("Submitting job '{}' ({}).", jobGraph.getName(), jobGraph.getJobID()); return waitForTerminatingJob(jobGraph.getJobID(), jobGraph, this::persistAndRunJob) .handle((ignored, throwable) -> handleTermination(jobGraph.getJobID(), throwable)) @@ -1309,4 +1311,19 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> public CompletableFuture<Void> onRemovedJobGraph(JobID jobId) { return CompletableFuture.runAsync(() -> terminateJob(jobId), getMainThreadExecutor()); } + + private void applyParallelismOverrides(JobGraph jobGraph) { + Map<String, Integer> overrides = configuration.get(PipelineOptions.PARALLELISM_OVERRIDES); + for (JobVertex vertex : jobGraph.getVertices()) { + Integer override = overrides.get(vertex.getID().toHexString()); + if (override != null) { + log.info( + "Changing job vertex {} parallelism from {} to {}", + vertex.getID(), + vertex.getParallelism(), + override); + vertex.setParallelism(override); + } + } + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java index 0f8f130d5fb..153e74bdf20 100755 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.operators.ResourceSpec; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.core.testutils.FlinkMatchers; import org.apache.flink.core.testutils.OneShotLatch; @@ -46,6 +47,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraphBuilder; import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmaster.JobManagerRunner; import org.apache.flink.runtime.jobmaster.JobManagerRunnerResult; import org.apache.flink.runtime.jobmaster.JobManagerSharedServices; @@ -88,6 +90,8 @@ import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.Preconditions; import org.apache.flink.util.concurrent.FutureUtils; +import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap; + import org.assertj.core.api.Assertions; import org.hamcrest.Matchers; import org.junit.After; @@ -1031,6 +1035,44 @@ public class DispatcherTest extends AbstractDispatcherTest { InstantiationUtil.serializeObject(multipleJobsDetails); } + @Test + public void testOverridingJobVertexParallelisms() throws Exception { + JobVertex v1 = new JobVertex("v1"); + v1.setParallelism(1); + JobVertex v2 = new JobVertex("v2"); + v2.setParallelism(2); + JobVertex v3 = new JobVertex("v3"); + v3.setParallelism(3); + jobGraph = new JobGraph(jobGraph.getJobID(), "job", v1, v2, v3); + + configuration.set( + PipelineOptions.PARALLELISM_OVERRIDES, + ImmutableMap.of( + v1.getID().toHexString(), 10, + // v2 is omitted + v3.getID().toHexString(), 42, + // unknown vertex added + new JobVertexID().toHexString(), 23)); + + dispatcher = + createAndStartDispatcher( + heartbeatServices, + haServices, + new ExpectedJobIdJobManagerRunnerFactory( + jobId, createdJobManagerRunnerLatch)); + DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class); + + Assert.assertEquals(jobGraph.findVertexByID(v1.getID()).getParallelism(), 1); + Assert.assertEquals(jobGraph.findVertexByID(v2.getID()).getParallelism(), 2); + Assert.assertEquals(jobGraph.findVertexByID(v3.getID()).getParallelism(), 3); + + dispatcherGateway.submitJob(jobGraph, TIMEOUT).get(); + + Assert.assertEquals(jobGraph.findVertexByID(v1.getID()).getParallelism(), 10); + Assert.assertEquals(jobGraph.findVertexByID(v2.getID()).getParallelism(), 2); + Assert.assertEquals(jobGraph.findVertexByID(v3.getID()).getParallelism(), 42); + } + private JobManagerRunner runningJobManagerRunnerWithJobStatus( final JobStatus currentJobStatus) { Preconditions.checkArgument(!currentJobStatus.isTerminalState());