tillrohrmann commented on a change in pull request #15071:
URL: https://github.com/apache/flink/pull/15071#discussion_r588317421



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
##########
@@ -58,13 +58,18 @@
     private static final Logger LOG =
             
LoggerFactory.getLogger(DefaultSlotPoolServiceSchedulerFactory.class);
 
+    private final JobManagerOptions.SchedulerType schedulerType;
+
     private final SlotPoolServiceFactory slotPoolServiceFactory;
 
     private final SchedulerNGFactory schedulerNGFactory;
 
     private DefaultSlotPoolServiceSchedulerFactory(
-            SlotPoolServiceFactory slotPoolServiceFactory, SchedulerNGFactory 
schedulerNGFactory) {
+            SlotPoolServiceFactory slotPoolServiceFactory,
+            JobManagerOptions.SchedulerType schedulerType,
+            SchedulerNGFactory schedulerNGFactory) {

Review comment:
       Isn't the `SchedulerType` defined by the `SchedulerNGFactory`?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
##########
@@ -287,6 +291,23 @@ public void onUnknownDeploymentsOf(
 
         log.info("Initializing job {} ({}).", jobName, jid);
 
+        if 
(jobMasterConfiguration.getConfiguration().get(JobManagerOptions.SCHEDULER_MODE)
+                == SchedulerExecutionMode.REACTIVE) {
+            Preconditions.checkState(
+                    slotPoolServiceSchedulerFactory.getSchedulerType()
+                            == JobManagerOptions.SchedulerType.Adaptive,
+                    "Expecting Adaptive Scheduler for reactive mode");

Review comment:
       I am wondering whether the `SchedulerExecutionMode` shouldn't influence 
which scheduler is chosen. That way we wouldn't need this precondition here.

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/scheduling/ReactiveModeITCase.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.scheduling;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.configuration.ClusterOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.SchedulerExecutionMode;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.testutils.MiniClusterResource;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.time.Duration;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+/** Tests for Reactive Mode (FLIP-159). */
+public class ReactiveModeITCase extends TestLogger {
+    private static final int NUMBER_SLOTS_PER_TASK_MANAGER = 2;
+    private static final int INITIAL_NUMBER_TASK_MANAGERS = 1;
+
+    @Rule
+    public final MiniClusterResource miniClusterResource =
+            new MiniClusterWithClientResource(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setConfiguration(getReactiveModeConfiguration())
+                            
.setNumberTaskManagers(INITIAL_NUMBER_TASK_MANAGERS)
+                            
.setNumberSlotsPerTaskManager(NUMBER_SLOTS_PER_TASK_MANAGER)
+                            .build());
+
+    private static Configuration getReactiveModeConfiguration() {
+        final Configuration conf = new Configuration();
+
+        conf.set(JobManagerOptions.SCHEDULER, 
JobManagerOptions.SchedulerType.Adaptive);
+        conf.set(JobManagerOptions.SCHEDULER_MODE, 
SchedulerExecutionMode.REACTIVE);
+        conf.set(ClusterOptions.ENABLE_DECLARATIVE_RESOURCE_MANAGEMENT, true);
+
+        return conf;
+    }
+
+    /**
+     * Users can set maxParallelism and reactive mode must not run with a 
parallelism higher than
+     * maxParallelism.
+     */
+    @Test
+    public void testScaleLimitByMaxParallelism() throws Exception {
+        // test preparation: ensure we have 2 TaskManagers running
+        startAdditionalTaskManager();
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        // we set maxParallelism = 1 and assert it never exceeds it
+        final DataStream<String> input =
+                env.addSource(new 
ParallelismTrackingSource()).setMaxParallelism(1);
+        input.addSink(new 
ParallelismTrackingSink<>()).getTransformation().setMaxParallelism(1);
+
+        ParallelismTrackingSource.expectInstances(1);
+        ParallelismTrackingSink.expectInstances(1);
+
+        env.executeAsync();
+
+        ParallelismTrackingSource.waitForInstances();
+        ParallelismTrackingSink.waitForInstances();
+    }
+
+    /** Test that a job scales up when a TaskManager gets added to the 
cluster. */
+    @Test
+    public void testScaleUpOnAdditionalTaskManager() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        final DataStream<String> input = env.addSource(new 
ParallelismTrackingSource());
+        input.addSink(new ParallelismTrackingSink<>()).getTransformation();

Review comment:
       ```suggestion
           input.addSink(new ParallelismTrackingSink<>());
   ```

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/scheduling/ReactiveModeITCase.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.scheduling;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.configuration.ClusterOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.SchedulerExecutionMode;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.testutils.MiniClusterResource;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.time.Duration;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+/** Tests for Reactive Mode (FLIP-159). */
+public class ReactiveModeITCase extends TestLogger {
+    private static final int NUMBER_SLOTS_PER_TASK_MANAGER = 2;
+    private static final int INITIAL_NUMBER_TASK_MANAGERS = 1;
+
+    @Rule
+    public final MiniClusterResource miniClusterResource =
+            new MiniClusterWithClientResource(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setConfiguration(getReactiveModeConfiguration())
+                            
.setNumberTaskManagers(INITIAL_NUMBER_TASK_MANAGERS)
+                            
.setNumberSlotsPerTaskManager(NUMBER_SLOTS_PER_TASK_MANAGER)
+                            .build());
+
+    private static Configuration getReactiveModeConfiguration() {
+        final Configuration conf = new Configuration();
+
+        conf.set(JobManagerOptions.SCHEDULER, 
JobManagerOptions.SchedulerType.Adaptive);
+        conf.set(JobManagerOptions.SCHEDULER_MODE, 
SchedulerExecutionMode.REACTIVE);
+        conf.set(ClusterOptions.ENABLE_DECLARATIVE_RESOURCE_MANAGEMENT, true);
+
+        return conf;
+    }
+
+    /**
+     * Users can set maxParallelism and reactive mode must not run with a 
parallelism higher than
+     * maxParallelism.
+     */
+    @Test
+    public void testScaleLimitByMaxParallelism() throws Exception {
+        // test preparation: ensure we have 2 TaskManagers running
+        startAdditionalTaskManager();
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        // we set maxParallelism = 1 and assert it never exceeds it
+        final DataStream<String> input =
+                env.addSource(new 
ParallelismTrackingSource()).setMaxParallelism(1);
+        input.addSink(new 
ParallelismTrackingSink<>()).getTransformation().setMaxParallelism(1);
+
+        ParallelismTrackingSource.expectInstances(1);
+        ParallelismTrackingSink.expectInstances(1);
+
+        env.executeAsync();
+
+        ParallelismTrackingSource.waitForInstances();
+        ParallelismTrackingSink.waitForInstances();
+    }
+
+    /** Test that a job scales up when a TaskManager gets added to the 
cluster. */
+    @Test
+    public void testScaleUpOnAdditionalTaskManager() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        final DataStream<String> input = env.addSource(new 
ParallelismTrackingSource());
+        input.addSink(new ParallelismTrackingSink<>()).getTransformation();
+
+        ParallelismTrackingSource.expectInstances(
+                NUMBER_SLOTS_PER_TASK_MANAGER * INITIAL_NUMBER_TASK_MANAGERS);
+        ParallelismTrackingSink.expectInstances(
+                NUMBER_SLOTS_PER_TASK_MANAGER * INITIAL_NUMBER_TASK_MANAGERS);
+
+        env.executeAsync();
+
+        ParallelismTrackingSource.waitForInstances();
+        ParallelismTrackingSink.waitForInstances();
+
+        // expect scale up to 2 TaskManagers:
+        
ParallelismTrackingSource.expectInstances(NUMBER_SLOTS_PER_TASK_MANAGER * 2);
+        ParallelismTrackingSink.expectInstances(NUMBER_SLOTS_PER_TASK_MANAGER 
* 2);
+
+        miniClusterResource.getMiniCluster().startTaskManager();
+
+        ParallelismTrackingSource.waitForInstances();
+        ParallelismTrackingSink.waitForInstances();
+    }
+
+    @Test
+    public void testScaleDownOnTaskManagerLoss() throws Exception {
+        // test preparation: ensure we have 2 TaskManagers running
+        startAdditionalTaskManager();
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
0L));
+        final DataStream<String> input = env.addSource(new 
ParallelismTrackingSource());
+        input.addSink(new ParallelismTrackingSink<>()).getTransformation();
+
+        
ParallelismTrackingSource.expectInstances(NUMBER_SLOTS_PER_TASK_MANAGER * 2);
+        ParallelismTrackingSink.expectInstances(NUMBER_SLOTS_PER_TASK_MANAGER 
* 2);
+
+        env.executeAsync();
+
+        ParallelismTrackingSource.waitForInstances();
+        ParallelismTrackingSink.waitForInstances();
+
+        // scale down to 1 TaskManagers:
+        
ParallelismTrackingSource.expectInstances(NUMBER_SLOTS_PER_TASK_MANAGER);
+        ParallelismTrackingSink.expectInstances(NUMBER_SLOTS_PER_TASK_MANAGER);
+
+        miniClusterResource.getMiniCluster().terminateTaskManager(0).get();
+
+        ParallelismTrackingSource.waitForInstances();
+        ParallelismTrackingSink.waitForInstances();
+    }
+
+    private int getNumberOfConnectedTaskManagers() throws ExecutionException, 
InterruptedException {
+        return miniClusterResource
+                .getMiniCluster()
+                .requestClusterOverview()
+                .get()
+                .getNumTaskManagersConnected();
+    }
+
+    private void startAdditionalTaskManager() throws Exception {
+        miniClusterResource.getMiniCluster().startTaskManager();
+        CommonTestUtils.waitUntilCondition(
+                () -> getNumberOfConnectedTaskManagers() == 2,
+                Deadline.fromNow(Duration.ofMillis(10_000L)));
+    }
+
+    private static class ParallelismTrackingSource implements 
ParallelSourceFunction<String> {
+        private volatile boolean running = true;
+
+        @GuardedBy("instances")
+        private static CountDownLatch instances;

Review comment:
       I don't think that you should use a non-final field as a lock. I would 
suggest to introduce a final lock. Moreover we want to synchronize writes/reads 
to `instances` which does not work if you use `instances` as the lock.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
##########
@@ -287,6 +291,23 @@ public void onUnknownDeploymentsOf(
 
         log.info("Initializing job {} ({}).", jobName, jid);
 
+        if 
(jobMasterConfiguration.getConfiguration().get(JobManagerOptions.SCHEDULER_MODE)
+                == SchedulerExecutionMode.REACTIVE) {
+            Preconditions.checkState(
+                    slotPoolServiceSchedulerFactory.getSchedulerType()
+                            == JobManagerOptions.SchedulerType.Adaptive,
+                    "Expecting Adaptive Scheduler for reactive mode");
+            log.info("Modifying job parallelism for running in reactive 
mode.");
+            for (JobVertex vertex : jobGraph.getVertices()) {
+                if (vertex.getMaxParallelism() == 
JobVertex.MAX_PARALLELISM_DEFAULT) {
+                    
vertex.setParallelism(Transformation.UPPER_BOUND_MAX_PARALLELISM);
+                    
vertex.setMaxParallelism(Transformation.UPPER_BOUND_MAX_PARALLELISM);
+                } else {
+                    vertex.setParallelism(vertex.getMaxParallelism());
+                }
+            }

Review comment:
       I am wondering whether this logic should rather go into the 
`DefaultJobManagerRunnerFactory`. That's also where we pick the scheduler 
implementation. Maybe grouping this logic together makes sense (selecting the 
right scheduler implementation for the required execution mode and configuring 
the job graph accordingly). It is more of an idea to discuss. We could also 
tackle this as a follow up.

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/scheduling/ReactiveModeITCase.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.scheduling;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.configuration.ClusterOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.SchedulerExecutionMode;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.testutils.MiniClusterResource;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.time.Duration;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+/** Tests for Reactive Mode (FLIP-159). */
+public class ReactiveModeITCase extends TestLogger {
+    private static final int NUMBER_SLOTS_PER_TASK_MANAGER = 2;
+    private static final int INITIAL_NUMBER_TASK_MANAGERS = 1;
+
+    @Rule
+    public final MiniClusterResource miniClusterResource =
+            new MiniClusterWithClientResource(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setConfiguration(getReactiveModeConfiguration())
+                            
.setNumberTaskManagers(INITIAL_NUMBER_TASK_MANAGERS)
+                            
.setNumberSlotsPerTaskManager(NUMBER_SLOTS_PER_TASK_MANAGER)
+                            .build());
+
+    private static Configuration getReactiveModeConfiguration() {
+        final Configuration conf = new Configuration();
+
+        conf.set(JobManagerOptions.SCHEDULER, 
JobManagerOptions.SchedulerType.Adaptive);
+        conf.set(JobManagerOptions.SCHEDULER_MODE, 
SchedulerExecutionMode.REACTIVE);
+        conf.set(ClusterOptions.ENABLE_DECLARATIVE_RESOURCE_MANAGEMENT, true);
+
+        return conf;
+    }
+
+    /**
+     * Users can set maxParallelism and reactive mode must not run with a 
parallelism higher than
+     * maxParallelism.
+     */
+    @Test
+    public void testScaleLimitByMaxParallelism() throws Exception {
+        // test preparation: ensure we have 2 TaskManagers running
+        startAdditionalTaskManager();
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        // we set maxParallelism = 1 and assert it never exceeds it
+        final DataStream<String> input =
+                env.addSource(new 
ParallelismTrackingSource()).setMaxParallelism(1);
+        input.addSink(new 
ParallelismTrackingSink<>()).getTransformation().setMaxParallelism(1);
+
+        ParallelismTrackingSource.expectInstances(1);
+        ParallelismTrackingSink.expectInstances(1);
+
+        env.executeAsync();
+
+        ParallelismTrackingSource.waitForInstances();
+        ParallelismTrackingSink.waitForInstances();
+    }
+
+    /** Test that a job scales up when a TaskManager gets added to the 
cluster. */
+    @Test
+    public void testScaleUpOnAdditionalTaskManager() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        final DataStream<String> input = env.addSource(new 
ParallelismTrackingSource());
+        input.addSink(new ParallelismTrackingSink<>()).getTransformation();
+
+        ParallelismTrackingSource.expectInstances(
+                NUMBER_SLOTS_PER_TASK_MANAGER * INITIAL_NUMBER_TASK_MANAGERS);
+        ParallelismTrackingSink.expectInstances(
+                NUMBER_SLOTS_PER_TASK_MANAGER * INITIAL_NUMBER_TASK_MANAGERS);
+
+        env.executeAsync();
+
+        ParallelismTrackingSource.waitForInstances();
+        ParallelismTrackingSink.waitForInstances();
+
+        // expect scale up to 2 TaskManagers:
+        
ParallelismTrackingSource.expectInstances(NUMBER_SLOTS_PER_TASK_MANAGER * 2);
+        ParallelismTrackingSink.expectInstances(NUMBER_SLOTS_PER_TASK_MANAGER 
* 2);
+
+        miniClusterResource.getMiniCluster().startTaskManager();
+
+        ParallelismTrackingSource.waitForInstances();
+        ParallelismTrackingSink.waitForInstances();
+    }
+
+    @Test
+    public void testScaleDownOnTaskManagerLoss() throws Exception {
+        // test preparation: ensure we have 2 TaskManagers running
+        startAdditionalTaskManager();
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
0L));
+        final DataStream<String> input = env.addSource(new 
ParallelismTrackingSource());
+        input.addSink(new ParallelismTrackingSink<>()).getTransformation();
+
+        
ParallelismTrackingSource.expectInstances(NUMBER_SLOTS_PER_TASK_MANAGER * 2);
+        ParallelismTrackingSink.expectInstances(NUMBER_SLOTS_PER_TASK_MANAGER 
* 2);
+
+        env.executeAsync();
+
+        ParallelismTrackingSource.waitForInstances();
+        ParallelismTrackingSink.waitForInstances();
+
+        // scale down to 1 TaskManagers:
+        
ParallelismTrackingSource.expectInstances(NUMBER_SLOTS_PER_TASK_MANAGER);
+        ParallelismTrackingSink.expectInstances(NUMBER_SLOTS_PER_TASK_MANAGER);
+
+        miniClusterResource.getMiniCluster().terminateTaskManager(0).get();
+
+        ParallelismTrackingSource.waitForInstances();
+        ParallelismTrackingSink.waitForInstances();
+    }
+
+    private int getNumberOfConnectedTaskManagers() throws ExecutionException, 
InterruptedException {
+        return miniClusterResource
+                .getMiniCluster()
+                .requestClusterOverview()
+                .get()
+                .getNumTaskManagersConnected();
+    }
+
+    private void startAdditionalTaskManager() throws Exception {
+        miniClusterResource.getMiniCluster().startTaskManager();
+        CommonTestUtils.waitUntilCondition(
+                () -> getNumberOfConnectedTaskManagers() == 2,
+                Deadline.fromNow(Duration.ofMillis(10_000L)));
+    }
+
+    private static class ParallelismTrackingSource implements 
ParallelSourceFunction<String> {
+        private volatile boolean running = true;
+
+        @GuardedBy("instances")
+        private static CountDownLatch instances;
+
+        public static void expectInstances(int count) {
+            instances = new CountDownLatch(count);
+        }
+
+        public static void waitForInstances() throws InterruptedException {
+            internalWait(instances);
+        }
+
+        @Override
+        public void run(SourceContext<String> ctx) throws Exception {
+            internalCountDown(instances);

Review comment:
       Here we read `instances` outside of a lock. Hence we cannot be sure that 
we see the last written value to `instances`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
##########
@@ -58,13 +58,18 @@
     private static final Logger LOG =
             
LoggerFactory.getLogger(DefaultSlotPoolServiceSchedulerFactory.class);
 
+    private final JobManagerOptions.SchedulerType schedulerType;
+
     private final SlotPoolServiceFactory slotPoolServiceFactory;
 
     private final SchedulerNGFactory schedulerNGFactory;
 
     private DefaultSlotPoolServiceSchedulerFactory(
-            SlotPoolServiceFactory slotPoolServiceFactory, SchedulerNGFactory 
schedulerNGFactory) {
+            SlotPoolServiceFactory slotPoolServiceFactory,
+            JobManagerOptions.SchedulerType schedulerType,
+            SchedulerNGFactory schedulerNGFactory) {

Review comment:
       Maybe this can become part of the `SchedulerNGFactory`?

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/scheduling/ReactiveModeITCase.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.scheduling;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.configuration.ClusterOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.SchedulerExecutionMode;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.testutils.MiniClusterResource;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.time.Duration;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+/** Tests for Reactive Mode (FLIP-159). */
+public class ReactiveModeITCase extends TestLogger {
+    private static final int NUMBER_SLOTS_PER_TASK_MANAGER = 2;
+    private static final int INITIAL_NUMBER_TASK_MANAGERS = 1;
+
+    @Rule
+    public final MiniClusterResource miniClusterResource =
+            new MiniClusterWithClientResource(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setConfiguration(getReactiveModeConfiguration())
+                            
.setNumberTaskManagers(INITIAL_NUMBER_TASK_MANAGERS)
+                            
.setNumberSlotsPerTaskManager(NUMBER_SLOTS_PER_TASK_MANAGER)
+                            .build());
+
+    private static Configuration getReactiveModeConfiguration() {
+        final Configuration conf = new Configuration();
+
+        conf.set(JobManagerOptions.SCHEDULER, 
JobManagerOptions.SchedulerType.Adaptive);
+        conf.set(JobManagerOptions.SCHEDULER_MODE, 
SchedulerExecutionMode.REACTIVE);
+        conf.set(ClusterOptions.ENABLE_DECLARATIVE_RESOURCE_MANAGEMENT, true);
+
+        return conf;
+    }
+
+    /**
+     * Users can set maxParallelism and reactive mode must not run with a 
parallelism higher than
+     * maxParallelism.
+     */
+    @Test
+    public void testScaleLimitByMaxParallelism() throws Exception {
+        // test preparation: ensure we have 2 TaskManagers running
+        startAdditionalTaskManager();
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        // we set maxParallelism = 1 and assert it never exceeds it
+        final DataStream<String> input =
+                env.addSource(new 
ParallelismTrackingSource()).setMaxParallelism(1);
+        input.addSink(new 
ParallelismTrackingSink<>()).getTransformation().setMaxParallelism(1);
+
+        ParallelismTrackingSource.expectInstances(1);
+        ParallelismTrackingSink.expectInstances(1);
+
+        env.executeAsync();
+
+        ParallelismTrackingSource.waitForInstances();
+        ParallelismTrackingSink.waitForInstances();
+    }
+
+    /** Test that a job scales up when a TaskManager gets added to the 
cluster. */
+    @Test
+    public void testScaleUpOnAdditionalTaskManager() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        final DataStream<String> input = env.addSource(new 
ParallelismTrackingSource());
+        input.addSink(new ParallelismTrackingSink<>()).getTransformation();
+
+        ParallelismTrackingSource.expectInstances(
+                NUMBER_SLOTS_PER_TASK_MANAGER * INITIAL_NUMBER_TASK_MANAGERS);
+        ParallelismTrackingSink.expectInstances(
+                NUMBER_SLOTS_PER_TASK_MANAGER * INITIAL_NUMBER_TASK_MANAGERS);
+
+        env.executeAsync();
+
+        ParallelismTrackingSource.waitForInstances();
+        ParallelismTrackingSink.waitForInstances();
+
+        // expect scale up to 2 TaskManagers:
+        
ParallelismTrackingSource.expectInstances(NUMBER_SLOTS_PER_TASK_MANAGER * 2);
+        ParallelismTrackingSink.expectInstances(NUMBER_SLOTS_PER_TASK_MANAGER 
* 2);
+
+        miniClusterResource.getMiniCluster().startTaskManager();
+
+        ParallelismTrackingSource.waitForInstances();
+        ParallelismTrackingSink.waitForInstances();
+    }
+
+    @Test
+    public void testScaleDownOnTaskManagerLoss() throws Exception {
+        // test preparation: ensure we have 2 TaskManagers running
+        startAdditionalTaskManager();
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
0L));
+        final DataStream<String> input = env.addSource(new 
ParallelismTrackingSource());
+        input.addSink(new ParallelismTrackingSink<>()).getTransformation();
+
+        
ParallelismTrackingSource.expectInstances(NUMBER_SLOTS_PER_TASK_MANAGER * 2);
+        ParallelismTrackingSink.expectInstances(NUMBER_SLOTS_PER_TASK_MANAGER 
* 2);
+
+        env.executeAsync();
+
+        ParallelismTrackingSource.waitForInstances();
+        ParallelismTrackingSink.waitForInstances();
+
+        // scale down to 1 TaskManagers:
+        
ParallelismTrackingSource.expectInstances(NUMBER_SLOTS_PER_TASK_MANAGER);
+        ParallelismTrackingSink.expectInstances(NUMBER_SLOTS_PER_TASK_MANAGER);
+
+        miniClusterResource.getMiniCluster().terminateTaskManager(0).get();
+
+        ParallelismTrackingSource.waitForInstances();
+        ParallelismTrackingSink.waitForInstances();
+    }
+
+    private int getNumberOfConnectedTaskManagers() throws ExecutionException, 
InterruptedException {
+        return miniClusterResource
+                .getMiniCluster()
+                .requestClusterOverview()
+                .get()
+                .getNumTaskManagersConnected();
+    }
+
+    private void startAdditionalTaskManager() throws Exception {
+        miniClusterResource.getMiniCluster().startTaskManager();
+        CommonTestUtils.waitUntilCondition(
+                () -> getNumberOfConnectedTaskManagers() == 2,
+                Deadline.fromNow(Duration.ofMillis(10_000L)));
+    }
+
+    private static class ParallelismTrackingSource implements 
ParallelSourceFunction<String> {
+        private volatile boolean running = true;
+
+        @GuardedBy("instances")
+        private static CountDownLatch instances;
+
+        public static void expectInstances(int count) {
+            instances = new CountDownLatch(count);
+        }
+
+        public static void waitForInstances() throws InterruptedException {
+            internalWait(instances);
+        }
+
+        @Override
+        public void run(SourceContext<String> ctx) throws Exception {
+            internalCountDown(instances);
+            while (running) {
+                synchronized (ctx.getCheckpointLock()) {
+                    ctx.collect("test");
+                }
+                Thread.sleep(100);
+            }
+        }
+
+        @Override
+        public void cancel() {
+            running = false;
+        }
+    }
+
+    private static class ParallelismTrackingSink<T> extends 
RichSinkFunction<T> {
+        @GuardedBy("instances")
+        private static CountDownLatch instances;

Review comment:
       same here.

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/scheduling/ReactiveModeITCase.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.scheduling;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.configuration.ClusterOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.SchedulerExecutionMode;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.testutils.MiniClusterResource;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.time.Duration;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+/** Tests for Reactive Mode (FLIP-159). */
+public class ReactiveModeITCase extends TestLogger {
+    private static final int NUMBER_SLOTS_PER_TASK_MANAGER = 2;
+    private static final int INITIAL_NUMBER_TASK_MANAGERS = 1;
+
+    @Rule
+    public final MiniClusterResource miniClusterResource =
+            new MiniClusterWithClientResource(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setConfiguration(getReactiveModeConfiguration())
+                            
.setNumberTaskManagers(INITIAL_NUMBER_TASK_MANAGERS)
+                            
.setNumberSlotsPerTaskManager(NUMBER_SLOTS_PER_TASK_MANAGER)
+                            .build());
+
+    private static Configuration getReactiveModeConfiguration() {
+        final Configuration conf = new Configuration();
+
+        conf.set(JobManagerOptions.SCHEDULER, 
JobManagerOptions.SchedulerType.Adaptive);
+        conf.set(JobManagerOptions.SCHEDULER_MODE, 
SchedulerExecutionMode.REACTIVE);
+        conf.set(ClusterOptions.ENABLE_DECLARATIVE_RESOURCE_MANAGEMENT, true);
+
+        return conf;
+    }
+
+    /**
+     * Users can set maxParallelism and reactive mode must not run with a 
parallelism higher than
+     * maxParallelism.
+     */
+    @Test
+    public void testScaleLimitByMaxParallelism() throws Exception {
+        // test preparation: ensure we have 2 TaskManagers running
+        startAdditionalTaskManager();
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        // we set maxParallelism = 1 and assert it never exceeds it
+        final DataStream<String> input =
+                env.addSource(new 
ParallelismTrackingSource()).setMaxParallelism(1);
+        input.addSink(new 
ParallelismTrackingSink<>()).getTransformation().setMaxParallelism(1);
+
+        ParallelismTrackingSource.expectInstances(1);
+        ParallelismTrackingSink.expectInstances(1);
+
+        env.executeAsync();
+
+        ParallelismTrackingSource.waitForInstances();
+        ParallelismTrackingSink.waitForInstances();
+    }
+
+    /** Test that a job scales up when a TaskManager gets added to the 
cluster. */
+    @Test
+    public void testScaleUpOnAdditionalTaskManager() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        final DataStream<String> input = env.addSource(new 
ParallelismTrackingSource());
+        input.addSink(new ParallelismTrackingSink<>()).getTransformation();
+
+        ParallelismTrackingSource.expectInstances(
+                NUMBER_SLOTS_PER_TASK_MANAGER * INITIAL_NUMBER_TASK_MANAGERS);
+        ParallelismTrackingSink.expectInstances(
+                NUMBER_SLOTS_PER_TASK_MANAGER * INITIAL_NUMBER_TASK_MANAGERS);
+
+        env.executeAsync();
+
+        ParallelismTrackingSource.waitForInstances();
+        ParallelismTrackingSink.waitForInstances();
+
+        // expect scale up to 2 TaskManagers:
+        
ParallelismTrackingSource.expectInstances(NUMBER_SLOTS_PER_TASK_MANAGER * 2);
+        ParallelismTrackingSink.expectInstances(NUMBER_SLOTS_PER_TASK_MANAGER 
* 2);
+
+        miniClusterResource.getMiniCluster().startTaskManager();
+
+        ParallelismTrackingSource.waitForInstances();
+        ParallelismTrackingSink.waitForInstances();
+    }
+
+    @Test
+    public void testScaleDownOnTaskManagerLoss() throws Exception {
+        // test preparation: ensure we have 2 TaskManagers running
+        startAdditionalTaskManager();
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
0L));
+        final DataStream<String> input = env.addSource(new 
ParallelismTrackingSource());
+        input.addSink(new ParallelismTrackingSink<>()).getTransformation();
+
+        
ParallelismTrackingSource.expectInstances(NUMBER_SLOTS_PER_TASK_MANAGER * 2);
+        ParallelismTrackingSink.expectInstances(NUMBER_SLOTS_PER_TASK_MANAGER 
* 2);
+
+        env.executeAsync();
+
+        ParallelismTrackingSource.waitForInstances();
+        ParallelismTrackingSink.waitForInstances();
+
+        // scale down to 1 TaskManagers:
+        
ParallelismTrackingSource.expectInstances(NUMBER_SLOTS_PER_TASK_MANAGER);
+        ParallelismTrackingSink.expectInstances(NUMBER_SLOTS_PER_TASK_MANAGER);
+
+        miniClusterResource.getMiniCluster().terminateTaskManager(0).get();
+
+        ParallelismTrackingSource.waitForInstances();
+        ParallelismTrackingSink.waitForInstances();
+    }
+
+    private int getNumberOfConnectedTaskManagers() throws ExecutionException, 
InterruptedException {
+        return miniClusterResource
+                .getMiniCluster()
+                .requestClusterOverview()
+                .get()
+                .getNumTaskManagersConnected();
+    }
+
+    private void startAdditionalTaskManager() throws Exception {
+        miniClusterResource.getMiniCluster().startTaskManager();
+        CommonTestUtils.waitUntilCondition(
+                () -> getNumberOfConnectedTaskManagers() == 2,
+                Deadline.fromNow(Duration.ofMillis(10_000L)));
+    }
+
+    private static class ParallelismTrackingSource implements 
ParallelSourceFunction<String> {
+        private volatile boolean running = true;
+
+        @GuardedBy("instances")
+        private static CountDownLatch instances;
+
+        public static void expectInstances(int count) {
+            instances = new CountDownLatch(count);

Review comment:
       Access outside of defined guard.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to