rmetzger commented on a change in pull request #15071:
URL: https://github.com/apache/flink/pull/15071#discussion_r586557140



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
##########
@@ -248,9 +250,24 @@ public AdaptiveScheduler(
                         declarativeSlotPool::reserveFreeSlot,
                         declarativeSlotPool::freeReservedSlot);
 
-        for (JobVertex vertex : jobGraph.getVertices()) {
-            if (vertex.getParallelism() == 
ExecutionConfig.PARALLELISM_DEFAULT) {
-                vertex.setParallelism(1);
+        if (configuration.get(JobManagerOptions.SCHEDULER_MODE)
+                == SchedulerExecutionMode.REACTIVE) {
+            LOG.info("Modifying job parallelism for running in reactive 
mode.");
+            for (JobVertex vertex : jobGraph.getVertices()) {
+                if (vertex.getMaxParallelism() == 
JobVertex.MAX_PARALLELISM_DEFAULT) {
+                    
vertex.setParallelism(Transformation.UPPER_BOUND_MAX_PARALLELISM);

Review comment:
       I'm not sure if we can detect whether the user has set the parallelism 
or not. At least the LocalStreamEnvironment and the TestStreamEnvironment (when 
used with MiniClusterWithClientResource) are setting a default parallelism in 
the ExecutionConfig, which will be set by the Api to JobGraph translation 
process. (In case of the LocalStreamEnvironment, it'll be the number of CPU 
cores, in TestStreamEnvironment, the number of slots)
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to