rmetzger commented on a change in pull request #15071:
URL: https://github.com/apache/flink/pull/15071#discussion_r586557140
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
##########
@@ -248,9 +250,24 @@ public AdaptiveScheduler(
declarativeSlotPool::reserveFreeSlot,
declarativeSlotPool::freeReservedSlot);
- for (JobVertex vertex : jobGraph.getVertices()) {
- if (vertex.getParallelism() ==
ExecutionConfig.PARALLELISM_DEFAULT) {
- vertex.setParallelism(1);
+ if (configuration.get(JobManagerOptions.SCHEDULER_MODE)
+ == SchedulerExecutionMode.REACTIVE) {
+ LOG.info("Modifying job parallelism for running in reactive
mode.");
+ for (JobVertex vertex : jobGraph.getVertices()) {
+ if (vertex.getMaxParallelism() ==
JobVertex.MAX_PARALLELISM_DEFAULT) {
+
vertex.setParallelism(Transformation.UPPER_BOUND_MAX_PARALLELISM);
Review comment:
I'm not sure if we can detect whether the user has set the parallelism
or not. At least the LocalStreamEnvironment and the TestStreamEnvironment (when
used with MiniClusterWithClientResource) are setting a default parallelism in
the ExecutionConfig, which will be set by the Api to JobGraph translation
process. (In case of the LocalStreamEnvironment, it'll be the number of CPU
cores, in TestStreamEnvironment, the number of slots)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]