tillrohrmann commented on a change in pull request #15071:
URL: https://github.com/apache/flink/pull/15071#discussion_r591307211



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/ReactiveModeUtils.java
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Utilities for reactive mode. */
+public enum ReactiveModeUtils {
+    ;
+    private static final Logger LOG = 
LoggerFactory.getLogger(ReactiveModeUtils.class);
+
+    /**
+     * Sets the parallelism of all vertexes in the passed JobGraph to the 
highest possible max

Review comment:
       ```suggestion
        * Sets the parallelism of all vertices in the passed JobGraph to the 
highest possible max
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/ReactiveModeUtils.java
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Utilities for reactive mode. */
+public enum ReactiveModeUtils {

Review comment:
       Personally I don't have anything against `enum` for utility classes but 
there has been some discussion about it in that past that it might be 
misleading. I think the current consensus is to use final classes with a 
private constructor.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/ReactiveModeUtils.java
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Utilities for reactive mode. */
+public enum ReactiveModeUtils {
+    ;
+    private static final Logger LOG = 
LoggerFactory.getLogger(ReactiveModeUtils.class);
+
+    /**
+     * Sets the parallelism of all vertexes in the passed JobGraph to the 
highest possible max
+     * parallelism, unless the user defined a maxParallelism.
+     *
+     * @param jobGraph The JobGraph to modify.
+     */
+    public static void configureJobGraphForReactiveMode(JobGraph jobGraph) {
+        LOG.info("Modifying job parallelism for running in reactive mode.");
+        for (JobVertex vertex : jobGraph.getVertices()) {
+            if (vertex.getMaxParallelism() == 
JobVertex.MAX_PARALLELISM_DEFAULT) {
+                
vertex.setParallelism(Transformation.UPPER_BOUND_MAX_PARALLELISM);
+                
vertex.setMaxParallelism(Transformation.UPPER_BOUND_MAX_PARALLELISM);
+            } else {
+                vertex.setParallelism(vertex.getMaxParallelism());
+            }
+        }
+    }

Review comment:
       In particular, stating that we ignore any configured parallelism.

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/scheduling/ReactiveModeITCase.java
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.scheduling;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.SchedulerExecutionMode;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.testutils.MiniClusterResource;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.time.Duration;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+/** Tests for Reactive Mode (FLIP-159). */
+public class ReactiveModeITCase extends TestLogger {
+    private static final int NUMBER_SLOTS_PER_TASK_MANAGER = 2;
+    private static final int INITIAL_NUMBER_TASK_MANAGERS = 1;
+
+    @Rule
+    public final MiniClusterResource miniClusterResource =
+            new MiniClusterWithClientResource(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setConfiguration(getReactiveModeConfiguration())
+                            
.setNumberTaskManagers(INITIAL_NUMBER_TASK_MANAGERS)
+                            
.setNumberSlotsPerTaskManager(NUMBER_SLOTS_PER_TASK_MANAGER)
+                            .build());
+
+    private static Configuration getReactiveModeConfiguration() {
+        final Configuration conf = new Configuration();
+        conf.set(JobManagerOptions.SCHEDULER_MODE, 
SchedulerExecutionMode.REACTIVE);
+        return conf;
+    }
+
+    /**
+     * Users can set maxParallelism and reactive mode must not run with a 
parallelism higher than
+     * maxParallelism.
+     */
+    @Test
+    public void testScaleLimitByMaxParallelism() throws Exception {
+        // test preparation: ensure we have 2 TaskManagers running
+        startAdditionalTaskManager();
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        // we set maxParallelism = 1 and assert it never exceeds it
+        final DataStream<String> input =
+                env.addSource(new 
ParallelismTrackingSource()).setMaxParallelism(1);
+        input.addSink(new 
ParallelismTrackingSink<>()).getTransformation().setMaxParallelism(1);
+
+        ParallelismTrackingSource.expectInstances(1);
+        ParallelismTrackingSink.expectInstances(1);
+
+        env.executeAsync();
+
+        ParallelismTrackingSource.waitForInstances();
+        ParallelismTrackingSink.waitForInstances();
+    }
+
+    /** Test that a job scales up when a TaskManager gets added to the 
cluster. */
+    @Test
+    public void testScaleUpOnAdditionalTaskManager() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        final DataStream<String> input = env.addSource(new 
ParallelismTrackingSource());
+        input.addSink(new ParallelismTrackingSink<>()).getTransformation();
+
+        ParallelismTrackingSource.expectInstances(
+                NUMBER_SLOTS_PER_TASK_MANAGER * INITIAL_NUMBER_TASK_MANAGERS);
+        ParallelismTrackingSink.expectInstances(
+                NUMBER_SLOTS_PER_TASK_MANAGER * INITIAL_NUMBER_TASK_MANAGERS);
+
+        env.executeAsync();
+
+        ParallelismTrackingSource.waitForInstances();
+        ParallelismTrackingSink.waitForInstances();
+
+        // expect scale up to 2 TaskManagers:
+        
ParallelismTrackingSource.expectInstances(NUMBER_SLOTS_PER_TASK_MANAGER * 2);
+        ParallelismTrackingSink.expectInstances(NUMBER_SLOTS_PER_TASK_MANAGER 
* 2);
+
+        miniClusterResource.getMiniCluster().startTaskManager();
+
+        ParallelismTrackingSource.waitForInstances();
+        ParallelismTrackingSink.waitForInstances();
+    }
+
+    @Test
+    public void testScaleDownOnTaskManagerLoss() throws Exception {
+        // test preparation: ensure we have 2 TaskManagers running
+        startAdditionalTaskManager();
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
0L));
+        final DataStream<String> input = env.addSource(new 
ParallelismTrackingSource());
+        input.addSink(new ParallelismTrackingSink<>());
+
+        
ParallelismTrackingSource.expectInstances(NUMBER_SLOTS_PER_TASK_MANAGER * 2);
+        ParallelismTrackingSink.expectInstances(NUMBER_SLOTS_PER_TASK_MANAGER 
* 2);
+
+        env.executeAsync();
+
+        ParallelismTrackingSource.waitForInstances();
+        ParallelismTrackingSink.waitForInstances();
+
+        // scale down to 1 TaskManagers:
+        
ParallelismTrackingSource.expectInstances(NUMBER_SLOTS_PER_TASK_MANAGER);
+        ParallelismTrackingSink.expectInstances(NUMBER_SLOTS_PER_TASK_MANAGER);
+
+        miniClusterResource.getMiniCluster().terminateTaskManager(0).get();
+
+        ParallelismTrackingSource.waitForInstances();
+        ParallelismTrackingSink.waitForInstances();
+    }
+
+    private int getNumberOfConnectedTaskManagers() throws ExecutionException, 
InterruptedException {
+        return miniClusterResource
+                .getMiniCluster()
+                .requestClusterOverview()
+                .get()
+                .getNumTaskManagersConnected();
+    }
+
+    private void startAdditionalTaskManager() throws Exception {
+        miniClusterResource.getMiniCluster().startTaskManager();
+        CommonTestUtils.waitUntilCondition(
+                () -> getNumberOfConnectedTaskManagers() == 2,
+                Deadline.fromNow(Duration.ofMillis(10_000L)));
+    }
+
+    private static class ParallelismTrackingSource implements 
ParallelSourceFunction<String> {
+        private volatile boolean running = true;
+
+        private static final InstanceTracker instances = new InstanceTracker();
+
+        public static void expectInstances(int count) {
+            instances.expectInstances(count);
+        }
+
+        public static void waitForInstances() throws InterruptedException {
+            instances.waitForInstances();
+        }
+
+        @Override
+        public void run(SourceContext<String> ctx) throws Exception {
+            instances.reportNewInstance();
+            while (running) {
+                synchronized (ctx.getCheckpointLock()) {
+                    ctx.collect("test");
+                }
+                Thread.sleep(100);
+            }
+        }
+
+        @Override
+        public void cancel() {
+            running = false;
+        }
+    }
+
+    private static class ParallelismTrackingSink<T> extends 
RichSinkFunction<T> {
+
+        private static final InstanceTracker instances = new InstanceTracker();
+
+        public static void expectInstances(int count) {
+            instances.expectInstances(count);
+        }
+
+        public static void waitForInstances() throws InterruptedException {
+            instances.waitForInstances();
+        }
+
+        @Override
+        public void open(Configuration parameters) throws Exception {
+            instances.reportNewInstance();
+        }
+    }
+
+    private static class InstanceTracker {
+        private final Object lock = new Object();
+
+        @GuardedBy("lock")
+        private CountDownLatch latch = new CountDownLatch(0);
+
+        public void reportNewInstance() {
+            synchronized (lock) {
+                if (latch.getCount() == 0) {
+                    throw new RuntimeException("Test error. More instances 
than expected.");
+                }
+                latch.countDown();
+            }
+        }
+
+        public void waitForInstances() throws InterruptedException {
+            while (true) {
+                synchronized (lock) {
+                    if (latch.await(10, TimeUnit.MILLISECONDS)) {
+                        break;
+                    }
+                }
+            }

Review comment:
       Why not simply `synchronized(lock) { latch.await(); }`?

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/scheduling/ReactiveModeITCase.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.scheduling;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.ClusterOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.SchedulerExecutionMode;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.testutils.MiniClusterResource;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.util.concurrent.CountDownLatch;
+
+/** Tests for Reactive Mode (FLIP-159). */
+public class ReactiveModeITCase extends TestLogger {
+    private static final int NUMBER_SLOTS_PER_TASK_MANAGER = 2;
+    private static final int INITIAL_NUMBER_TASK_MANAGERS = 1;
+
+    @ClassRule
+    public static final MiniClusterResource MINI_CLUSTER_WITH_CLIENT_RESOURCE =
+            new MiniClusterWithClientResource(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setConfiguration(getReactiveModeConfiguration())
+                            
.setNumberTaskManagers(INITIAL_NUMBER_TASK_MANAGERS)
+                            
.setNumberSlotsPerTaskManager(NUMBER_SLOTS_PER_TASK_MANAGER)
+                            .build());
+
+    private static Configuration getReactiveModeConfiguration() {
+        final Configuration conf = new Configuration();
+
+        conf.set(JobManagerOptions.SCHEDULER, 
JobManagerOptions.SchedulerType.Adaptive);
+        conf.set(JobManagerOptions.SCHEDULER_MODE, 
SchedulerExecutionMode.REACTIVE);
+        conf.set(ClusterOptions.ENABLE_DECLARATIVE_RESOURCE_MANAGEMENT, true);
+
+        return conf;
+    }
+
+    @Test
+    public void testScaleUpAndDownWithMaxParallelism() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1); // we set parallelism to ensure it's overwritten
+        
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
0L));
+        final DataStream<String> input = env.addSource(new 
ParallelismTrackingSource());
+        // we set maxParallelism = 1 and assert it never exceeds it
+        input.addSink(new 
ParallelismTrackingSink<>()).getTransformation().setMaxParallelism(1);

Review comment:
       It seems odd that we need to use the internal API to set the max 
parallelism. I think this is not right. I would suggest to file a JIRA issue to 
fix this problem.

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/scheduling/ReactiveModeITCase.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.scheduling;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.configuration.ClusterOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.SchedulerExecutionMode;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.testutils.MiniClusterResource;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.time.Duration;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+/** Tests for Reactive Mode (FLIP-159). */
+public class ReactiveModeITCase extends TestLogger {
+    private static final int NUMBER_SLOTS_PER_TASK_MANAGER = 2;
+    private static final int INITIAL_NUMBER_TASK_MANAGERS = 1;
+
+    @Rule
+    public final MiniClusterResource miniClusterResource =
+            new MiniClusterWithClientResource(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setConfiguration(getReactiveModeConfiguration())
+                            
.setNumberTaskManagers(INITIAL_NUMBER_TASK_MANAGERS)
+                            
.setNumberSlotsPerTaskManager(NUMBER_SLOTS_PER_TASK_MANAGER)
+                            .build());
+
+    private static Configuration getReactiveModeConfiguration() {
+        final Configuration conf = new Configuration();
+
+        conf.set(JobManagerOptions.SCHEDULER, 
JobManagerOptions.SchedulerType.Adaptive);
+        conf.set(JobManagerOptions.SCHEDULER_MODE, 
SchedulerExecutionMode.REACTIVE);
+        conf.set(ClusterOptions.ENABLE_DECLARATIVE_RESOURCE_MANAGEMENT, true);
+
+        return conf;
+    }
+
+    /**
+     * Users can set maxParallelism and reactive mode must not run with a 
parallelism higher than
+     * maxParallelism.
+     */
+    @Test
+    public void testScaleLimitByMaxParallelism() throws Exception {
+        // test preparation: ensure we have 2 TaskManagers running
+        startAdditionalTaskManager();
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        // we set maxParallelism = 1 and assert it never exceeds it
+        final DataStream<String> input =
+                env.addSource(new 
ParallelismTrackingSource()).setMaxParallelism(1);
+        input.addSink(new 
ParallelismTrackingSink<>()).getTransformation().setMaxParallelism(1);
+
+        ParallelismTrackingSource.expectInstances(1);
+        ParallelismTrackingSink.expectInstances(1);
+
+        env.executeAsync();
+
+        ParallelismTrackingSource.waitForInstances();
+        ParallelismTrackingSink.waitForInstances();
+    }
+
+    /** Test that a job scales up when a TaskManager gets added to the 
cluster. */
+    @Test
+    public void testScaleUpOnAdditionalTaskManager() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        final DataStream<String> input = env.addSource(new 
ParallelismTrackingSource());
+        input.addSink(new ParallelismTrackingSink<>()).getTransformation();

Review comment:
       `getTransformation` is still there. Please remove it.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/ReactiveModeUtils.java
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Utilities for reactive mode. */
+public enum ReactiveModeUtils {
+    ;
+    private static final Logger LOG = 
LoggerFactory.getLogger(ReactiveModeUtils.class);
+
+    /**
+     * Sets the parallelism of all vertexes in the passed JobGraph to the 
highest possible max
+     * parallelism, unless the user defined a maxParallelism.
+     *
+     * @param jobGraph The JobGraph to modify.
+     */
+    public static void configureJobGraphForReactiveMode(JobGraph jobGraph) {
+        LOG.info("Modifying job parallelism for running in reactive mode.");
+        for (JobVertex vertex : jobGraph.getVertices()) {
+            if (vertex.getMaxParallelism() == 
JobVertex.MAX_PARALLELISM_DEFAULT) {
+                
vertex.setParallelism(Transformation.UPPER_BOUND_MAX_PARALLELISM);
+                
vertex.setMaxParallelism(Transformation.UPPER_BOUND_MAX_PARALLELISM);
+            } else {
+                vertex.setParallelism(vertex.getMaxParallelism());
+            }
+        }
+    }

Review comment:
       I guess that we should update the FLIP about the specifics here. I think 
before we said that we expect that no parallelism has been configured. Looking 
at this code, the behaviour is slightly different now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to