Repository: flink
Updated Branches:
  refs/heads/master 7e9557afe -> 124872319


[FLINK-5999] [resMgnr] Move JobLeaderIdService shut down into 
ResourceManagerRunner

The JobLeaderIdService is being created by the ResourceManagerRunner and then 
given to a
ResourceManager. Before the ResourceManager stopped the service before being 
stopped
itself. This could lead to a concurrent modification exception by a state 
changing action
executed by the actor thread. In order to avoid this concurrent modification, 
the service's
shut down is now being executed after the ResourceManager has been shut down.

This closes #3526.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/12487231
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/12487231
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/12487231

Branch: refs/heads/master
Commit: 124872319889206b8ceaa7222c25318047102f6e
Parents: 7e9557a
Author: Till Rohrmann <[email protected]>
Authored: Mon Mar 13 15:55:02 2017 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Mon Mar 20 18:35:58 2017 +0100

----------------------------------------------------------------------
 .../resourcemanager/ResourceManager.java        | 10 +--
 .../ResourceManagerConfiguration.java           | 22 +------
 .../resourcemanager/ResourceManagerRunner.java  | 37 ++++++++---
 .../ResourceManagerRuntimeServices.java         | 69 ++++++++++++++++++++
 ...urceManagerRuntimeServicesConfiguration.java | 59 +++++++++++++++++
 .../resourcemanager/ResourceManagerHATest.java  | 17 +++--
 .../ResourceManagerJobMasterTest.java           |  5 +-
 .../ResourceManagerTaskExecutorTest.java        |  7 +-
 .../slotmanager/SlotProtocolTest.java           | 10 ++-
 .../taskexecutor/TaskExecutorITCase.java        |  5 +-
 .../yarn/YarnFlinkApplicationMasterRunner.java  | 21 +++---
 11 files changed, 191 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/12487231/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index badfbe2..91fbba6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -179,13 +179,7 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
                Exception exception = null;
 
                try {
-                       jobLeaderIdService.stop();
-               } catch (Exception e) {
-                       exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
-               }
-
-               try {
-                       leaderElectionService.stop();
+                       super.shutDown();
                } catch (Exception e) {
                        exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
                }
@@ -193,7 +187,7 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
                clearState();
 
                try {
-                       super.shutDown();
+                       leaderElectionService.stop();
                } catch (Exception e) {
                        exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/12487231/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java
index d04d852..fa75bbb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.resourcemanager;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.ResourceManagerOptions;
 import 
org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
 import org.apache.flink.util.Preconditions;
 import scala.concurrent.duration.Duration;
@@ -33,15 +32,12 @@ public class ResourceManagerConfiguration {
 
        private final Time timeout;
        private final Time heartbeatInterval;
-       private final Time jobTimeout;
 
        public ResourceManagerConfiguration(
                        Time timeout,
-                       Time heartbeatInterval,
-                       Time jobTimeout) {
+                       Time heartbeatInterval) {
                this.timeout = Preconditions.checkNotNull(timeout, "timeout");
                this.heartbeatInterval = 
Preconditions.checkNotNull(heartbeatInterval, "heartbeatInterval");
-               this.jobTimeout = Preconditions.checkNotNull(jobTimeout, 
"jobTimeout");
        }
 
        public Time getTimeout() {
@@ -52,10 +48,6 @@ public class ResourceManagerConfiguration {
                return heartbeatInterval;
        }
 
-       public Time getJobTimeout() {
-               return jobTimeout;
-       }
-
        // 
--------------------------------------------------------------------------
        // Static factory methods
        // 
--------------------------------------------------------------------------
@@ -81,16 +73,6 @@ public class ResourceManagerConfiguration {
                                "value " + 
AkkaOptions.AKKA_WATCH_HEARTBEAT_INTERVAL + '.', e);
                }
 
-               final String strJobTimeout = 
configuration.getString(ResourceManagerOptions.JOB_TIMEOUT);
-               final Time jobTimeout;
-
-               try {
-                       jobTimeout = 
Time.milliseconds(Duration.apply(strJobTimeout).toMillis());
-               } catch (NumberFormatException e) {
-                       throw new ConfigurationException("Could not parse the 
resource manager's job timeout " +
-                               "value " + ResourceManagerOptions.JOB_TIMEOUT + 
'.', e);
-               }
-
-               return new ResourceManagerConfiguration(timeout, 
heartbeatInterval, jobTimeout);
+               return new ResourceManagerConfiguration(timeout, 
heartbeatInterval);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/12487231/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
index 749b407..73b27b5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
@@ -21,10 +21,9 @@ package org.apache.flink.runtime.resourcemanager;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
-import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,6 +38,8 @@ public class ResourceManagerRunner implements 
FatalErrorHandler {
 
        private final Object lock = new Object();
 
+       private final ResourceManagerRuntimeServices 
resourceManagerRuntimeServices;
+
        private final ResourceManager<?> resourceManager;
 
        public ResourceManagerRunner(
@@ -53,19 +54,21 @@ public class ResourceManagerRunner implements 
FatalErrorHandler {
                Preconditions.checkNotNull(metricRegistry);
 
                final ResourceManagerConfiguration resourceManagerConfiguration 
= ResourceManagerConfiguration.fromConfiguration(configuration);
-               final SlotManagerFactory slotManagerFactory = new 
DefaultSlotManager.Factory();
-               final JobLeaderIdService jobLeaderIdService = new 
JobLeaderIdService(
+
+               final ResourceManagerRuntimeServicesConfiguration 
resourceManagerRuntimeServicesConfiguration = 
ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
+
+               resourceManagerRuntimeServices = 
ResourceManagerRuntimeServices.fromConfiguration(
+                       resourceManagerRuntimeServicesConfiguration,
                        highAvailabilityServices,
-                       rpcService.getScheduledExecutor(),
-                       resourceManagerConfiguration.getJobTimeout());
+                       rpcService.getScheduledExecutor());
 
                this.resourceManager = new StandaloneResourceManager(
                        rpcService,
                        resourceManagerConfiguration,
                        highAvailabilityServices,
-                       slotManagerFactory,
+                       resourceManagerRuntimeServices.getSlotManagerFactory(),
                        metricRegistry,
-                       jobLeaderIdService,
+                       resourceManagerRuntimeServices.getJobLeaderIdService(),
                        this);
        }
 
@@ -82,8 +85,24 @@ public class ResourceManagerRunner implements 
FatalErrorHandler {
        }
 
        private void shutDownInternally() throws Exception {
+               Exception exception = null;
                synchronized (lock) {
-                       resourceManager.shutDown();
+
+                       try {
+                               resourceManager.shutDown();
+                       } catch (Exception e) {
+                               exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
+                       }
+
+                       try {
+                               resourceManagerRuntimeServices.shutDown();
+                       } catch (Exception e) {
+                               exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
+                       }
+
+                       if (exception != null) {
+                               ExceptionUtils.rethrow(exception, "Error while 
shutting down the resource manager runner.");
+                       }
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/12487231/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServices.java
new file mode 100644
index 0000000..56edde4
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServices.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Container class for the {@link ResourceManager} services.
+ */
+public class ResourceManagerRuntimeServices {
+
+       private final SlotManagerFactory slotManagerFactory;
+       private final JobLeaderIdService jobLeaderIdService;
+
+       public ResourceManagerRuntimeServices(SlotManagerFactory 
slotManagerFactory, JobLeaderIdService jobLeaderIdService) {
+               this.slotManagerFactory = 
Preconditions.checkNotNull(slotManagerFactory);
+               this.jobLeaderIdService = 
Preconditions.checkNotNull(jobLeaderIdService);
+       }
+
+       public SlotManagerFactory getSlotManagerFactory() {
+               return slotManagerFactory;
+       }
+
+       public JobLeaderIdService getJobLeaderIdService() {
+               return jobLeaderIdService;
+       }
+
+       // -------------------- Lifecycle methods 
-----------------------------------
+
+       public void shutDown() throws Exception {
+               jobLeaderIdService.stop();
+       }
+
+       // -------------------- Static methods 
--------------------------------------
+
+       public static ResourceManagerRuntimeServices fromConfiguration(
+                       ResourceManagerRuntimeServicesConfiguration 
configuration,
+                       HighAvailabilityServices highAvailabilityServices,
+                       ScheduledExecutor scheduledExecutor) throws Exception {
+
+               final SlotManagerFactory slotManagerFactory = new 
DefaultSlotManager.Factory();
+               final JobLeaderIdService jobLeaderIdService = new 
JobLeaderIdService(
+                       highAvailabilityServices,
+                       scheduledExecutor,
+                       configuration.getJobTimeout());
+
+               return new ResourceManagerRuntimeServices(slotManagerFactory, 
jobLeaderIdService);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/12487231/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServicesConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServicesConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServicesConfiguration.java
new file mode 100644
index 0000000..6de5f4d
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServicesConfiguration.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ResourceManagerOptions;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
+import org.apache.flink.util.Preconditions;
+import scala.concurrent.duration.Duration;
+
+/**
+ * Configuration class for the {@link ResourceManagerRuntimeServices} class.
+ */
+public class ResourceManagerRuntimeServicesConfiguration {
+
+       private final Time jobTimeout;
+
+       public ResourceManagerRuntimeServicesConfiguration(Time jobTimeout) {
+               this.jobTimeout = Preconditions.checkNotNull(jobTimeout);
+       }
+
+       public Time getJobTimeout() {
+               return jobTimeout;
+       }
+
+       // ---------------------------- Static methods 
----------------------------------
+
+       public static ResourceManagerRuntimeServicesConfiguration 
fromConfiguration(Configuration configuration) throws ConfigurationException {
+
+               final String strJobTimeout = 
configuration.getString(ResourceManagerOptions.JOB_TIMEOUT);
+               final Time jobTimeout;
+
+               try {
+                       jobTimeout = 
Time.milliseconds(Duration.apply(strJobTimeout).toMillis());
+               } catch (NumberFormatException e) {
+                       throw new ConfigurationException("Could not parse the 
resource manager's job timeout " +
+                               "value " + ResourceManagerOptions.JOB_TIMEOUT + 
'.', e);
+               }
+
+               return new 
ResourceManagerRuntimeServicesConfiguration(jobTimeout);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/12487231/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
index 58dedc3..1aa799b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
@@ -48,14 +48,17 @@ public class ResourceManagerHATest {
 
                ResourceManagerConfiguration resourceManagerConfiguration = new 
ResourceManagerConfiguration(
                        Time.seconds(5L),
-                       Time.seconds(5L),
-                       Time.minutes(5L));
+                       Time.seconds(5L));
+
+               ResourceManagerRuntimeServicesConfiguration 
resourceManagerRuntimeServicesConfiguration = new 
ResourceManagerRuntimeServicesConfiguration(Time.seconds(5L));
+               ResourceManagerRuntimeServices resourceManagerRuntimeServices = 
ResourceManagerRuntimeServices.fromConfiguration(
+                       resourceManagerRuntimeServicesConfiguration,
+                       highAvailabilityServices,
+                       rpcService.getScheduledExecutor());
+
                SlotManagerFactory slotManagerFactory = new 
TestingSlotManagerFactory();
                MetricRegistry metricRegistry = mock(MetricRegistry.class);
-               JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
-                       highAvailabilityServices,
-                       rpcService.getScheduledExecutor(),
-                       resourceManagerConfiguration.getJobTimeout());
+
                TestingFatalErrorHandler testingFatalErrorHandler = new 
TestingFatalErrorHandler();
 
                final ResourceManager resourceManager =
@@ -65,7 +68,7 @@ public class ResourceManagerHATest {
                                highAvailabilityServices,
                                slotManagerFactory,
                                metricRegistry,
-                               jobLeaderIdService,
+                               
resourceManagerRuntimeServices.getJobLeaderIdService(),
                                testingFatalErrorHandler);
                resourceManager.start();
                // before grant leadership, resourceManager's leaderId is null

http://git-wip-us.apache.org/repos/asf/flink/blob/12487231/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
index 031f76e..fb166d4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
@@ -195,14 +195,13 @@ public class ResourceManagerJobMasterTest {
 
                ResourceManagerConfiguration resourceManagerConfiguration = new 
ResourceManagerConfiguration(
                        Time.seconds(5L),
-                       Time.seconds(5L),
-                       Time.minutes(5L));
+                       Time.seconds(5L));
                SlotManagerFactory slotManagerFactory = new 
TestingSlotManagerFactory();
                MetricRegistry metricRegistry = mock(MetricRegistry.class);
                JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
                        highAvailabilityServices,
                        rpcService.getScheduledExecutor(),
-                       resourceManagerConfiguration.getJobTimeout());
+                       Time.minutes(5L));
 
                ResourceManager resourceManager = new StandaloneResourceManager(
                        rpcService,

http://git-wip-us.apache.org/repos/asf/flink/blob/12487231/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index 4456235..0a1addb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -148,14 +148,13 @@ public class ResourceManagerTaskExecutorTest {
                TestingSlotManagerFactory slotManagerFactory = new 
TestingSlotManagerFactory();
                ResourceManagerConfiguration resourceManagerConfiguration = new 
ResourceManagerConfiguration(
                        Time.seconds(5L),
-                       Time.seconds(5L),
-                       Time.minutes(5L));
+                       Time.seconds(5L));
+
                MetricRegistry metricRegistry = mock(MetricRegistry.class);
                JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
                        highAvailabilityServices,
                        rpcService.getScheduledExecutor(),
-                       resourceManagerConfiguration.getJobTimeout());
-
+                       Time.minutes(5L));
 
                StandaloneResourceManager resourceManager =
                        new StandaloneResourceManager(

http://git-wip-us.apache.org/repos/asf/flink/blob/12487231/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index 1e5edbe..ea660f8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -109,13 +109,12 @@ public class SlotProtocolTest extends TestLogger {
 
                ResourceManagerConfiguration resourceManagerConfiguration = new 
ResourceManagerConfiguration(
                        Time.seconds(5L),
-                       Time.seconds(5L),
-                       Time.minutes(5L));
+                       Time.seconds(5L));
 
                JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
                        testingHaServices,
                        testRpcService.getScheduledExecutor(),
-                       resourceManagerConfiguration.getJobTimeout());
+                       Time.seconds(5L));
 
                final TestingSlotManagerFactory slotManagerFactory = new 
TestingSlotManagerFactory();
                SpiedResourceManager resourceManager =
@@ -217,13 +216,12 @@ public class SlotProtocolTest extends TestLogger {
 
                ResourceManagerConfiguration resourceManagerConfiguration = new 
ResourceManagerConfiguration(
                        Time.seconds(5L),
-                       Time.seconds(5L),
-                       Time.minutes(5L));
+                       Time.seconds(5L));
 
                JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
                        testingHaServices,
                        testRpcService.getScheduledExecutor(),
-                       resourceManagerConfiguration.getJobTimeout());
+                       Time.seconds(5L));
 
                TestingSlotManagerFactory slotManagerFactory = new 
TestingSlotManagerFactory();
                ResourceManager<ResourceID> resourceManager =

http://git-wip-us.apache.org/repos/asf/flink/blob/12487231/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index 16edbf7..43f33a3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -97,13 +97,12 @@ public class TaskExecutorITCase {
                TestingSerialRpcService rpcService = new 
TestingSerialRpcService();
                ResourceManagerConfiguration resourceManagerConfiguration = new 
ResourceManagerConfiguration(
                        Time.milliseconds(500L),
-                       Time.milliseconds(500L),
-                       Time.minutes(5L));
+                       Time.milliseconds(500L));
                SlotManagerFactory slotManagerFactory = new 
DefaultSlotManager.Factory();
                JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
                        testingHAServices,
                        rpcService.getScheduledExecutor(),
-                       resourceManagerConfiguration.getJobTimeout());
+                       Time.minutes(5L));
                MetricRegistry metricRegistry = mock(MetricRegistry.class);
                HeartbeatServices heartbeatServices = 
mock(HeartbeatServices.class, RETURNS_MOCKS);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/12487231/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
index 6fb7c86..7a0dbbe 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
@@ -35,11 +35,10 @@ import 
org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmaster.JobManagerRunner;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
-import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
-import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
-import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
@@ -64,8 +63,8 @@ import java.io.ObjectInputStream;
  * <p>The lifetime of the YARN application bound to that of the Flink job. 
Other
  * YARN Application Master implementations are for example the YARN session.
  * 
- * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmaster.JobManagerRunner}
- * and {@link org.apache.flink.yarn.YarnResourceManager}.
+ * It starts actor system and the actors for {@link JobManagerRunner}
+ * and {@link YarnResourceManager}.
  *
  * The JobMasnagerRunner start a {@link 
org.apache.flink.runtime.jobmaster.JobMaster}
  * JobMaster handles Flink job execution, while the YarnResourceManager 
handles container
@@ -193,20 +192,20 @@ public class YarnFlinkApplicationMasterRunner extends 
AbstractYarnFlinkApplicati
 
        private ResourceManager<?> createResourceManager(Configuration config) 
throws Exception {
                final ResourceManagerConfiguration resourceManagerConfiguration 
= ResourceManagerConfiguration.fromConfiguration(config);
-               final SlotManagerFactory slotManagerFactory = new 
DefaultSlotManager.Factory();
-               final JobLeaderIdService jobLeaderIdService = new 
JobLeaderIdService(
+               final ResourceManagerRuntimeServicesConfiguration 
resourceManagerRuntimeServicesConfiguration = 
ResourceManagerRuntimeServicesConfiguration.fromConfiguration(config);
+               final ResourceManagerRuntimeServices 
resourceManagerRuntimeServices = 
ResourceManagerRuntimeServices.fromConfiguration(
+                       resourceManagerRuntimeServicesConfiguration,
                        haServices,
-                       commonRpcService.getScheduledExecutor(),
-                       resourceManagerConfiguration.getJobTimeout());
+                       commonRpcService.getScheduledExecutor());
 
                return new YarnResourceManager(config,
                                ENV,
                                commonRpcService,
                                resourceManagerConfiguration,
                                haServices,
-                               slotManagerFactory,
+                               
resourceManagerRuntimeServices.getSlotManagerFactory(),
                                metricRegistry,
-                               jobLeaderIdService,
+                               
resourceManagerRuntimeServices.getJobLeaderIdService(),
                                this);
        }
 

Reply via email to