[FLINK-8637] [flip6] Use JobManagerSharedServices to pass in services to 
JobMaster

Pass a JobManagerSharedServices instance to the JobMaster instead of the
each service individually.

This closes #5457.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/24c30878
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/24c30878
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/24c30878

Branch: refs/heads/master
Commit: 24c30878ed6f6ed1599a5ec23362055e0e88916f
Parents: f719bef
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Mon Feb 12 00:51:48 2018 +0100
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Tue Feb 13 08:50:51 2018 +0100

----------------------------------------------------------------------
 .../flink/runtime/dispatcher/Dispatcher.java    |  12 +-
 .../dispatcher/StandaloneDispatcher.java        |   6 +-
 .../entrypoint/JobClusterEntrypoint.java        |  16 +-
 .../runtime/jobmaster/JobManagerRunner.java     |  27 ++-
 .../runtime/jobmaster/JobManagerServices.java   | 163 ----------------
 .../jobmaster/JobManagerSharedServices.java     | 189 +++++++++++++++++++
 .../flink/runtime/jobmaster/JobMaster.java      |  31 ++-
 .../minicluster/MiniClusterJobDispatcher.java   |  12 +-
 .../backpressure/BackPressureStatsTracker.java  |  16 ++
 .../VoidBackPressureStatsTracker.java           |  10 +
 .../runtime/dispatcher/DispatcherTest.java      |   6 +-
 .../flink/runtime/jobmaster/JobMasterTest.java  |  29 +--
 .../TestingJobManagerSharedServicesBuilder.java | 101 ++++++++++
 13 files changed, 384 insertions(+), 234 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/24c30878/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index da63765..c83dce0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -38,7 +38,7 @@ import 
org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
 import org.apache.flink.runtime.jobmaster.JobManagerRunner;
-import org.apache.flink.runtime.jobmaster.JobManagerServices;
+import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.messages.Acknowledge;
@@ -89,7 +89,7 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
 
        private final HighAvailabilityServices highAvailabilityServices;
        private final ResourceManagerGateway resourceManagerGateway;
-       private final JobManagerServices jobManagerServices;
+       private final JobManagerSharedServices jobManagerSharedServices;
        private final HeartbeatServices heartbeatServices;
        private final BlobServer blobServer;
        private final MetricRegistry metricRegistry;
@@ -127,7 +127,7 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
                this.metricRegistry = 
Preconditions.checkNotNull(metricRegistry);
                this.fatalErrorHandler = 
Preconditions.checkNotNull(fatalErrorHandler);
 
-               this.jobManagerServices = JobManagerServices.fromConfiguration(
+               this.jobManagerSharedServices = 
JobManagerSharedServices.fromConfiguration(
                        configuration,
                        this.blobServer);
 
@@ -155,7 +155,7 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
                clearState();
 
                try {
-                       jobManagerServices.shutdown();
+                       jobManagerSharedServices.shutdown();
                } catch (Throwable t) {
                        exception = ExceptionUtils.firstOrSuppressed(t, 
exception);
                }
@@ -234,7 +234,7 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
                                        highAvailabilityServices,
                                        heartbeatServices,
                                        blobServer,
-                                       jobManagerServices,
+                                       jobManagerSharedServices,
                                        metricRegistry,
                                        new 
DispatcherOnCompleteActions(jobGraph.getJobID()),
                                        fatalErrorHandler);
@@ -545,7 +545,7 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
                HighAvailabilityServices highAvailabilityServices,
                HeartbeatServices heartbeatServices,
                BlobServer blobServer,
-               JobManagerServices jobManagerServices,
+               JobManagerSharedServices jobManagerSharedServices,
                MetricRegistry metricRegistry,
                OnCompletionActions onCompleteActions,
                FatalErrorHandler fatalErrorHandler) throws Exception;

http://git-wip-us.apache.org/repos/asf/flink/blob/24c30878/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
index 5533d59..6592e98 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
@@ -26,7 +26,7 @@ import 
org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmaster.JobManagerRunner;
-import org.apache.flink.runtime.jobmaster.JobManagerServices;
+import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
 import org.apache.flink.runtime.jobmaster.JobMaster;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
@@ -76,7 +76,7 @@ public class StandaloneDispatcher extends Dispatcher {
                        HighAvailabilityServices highAvailabilityServices,
                        HeartbeatServices heartbeatServices,
                        BlobServer blobServer,
-                       JobManagerServices jobManagerServices,
+                       JobManagerSharedServices jobManagerSharedServices,
                        MetricRegistry metricRegistry,
                        OnCompletionActions onCompleteActions,
                        FatalErrorHandler fatalErrorHandler) throws Exception {
@@ -89,7 +89,7 @@ public class StandaloneDispatcher extends Dispatcher {
                        highAvailabilityServices,
                        heartbeatServices,
                        blobServer,
-                       jobManagerServices,
+                       jobManagerSharedServices,
                        metricRegistry,
                        onCompleteActions,
                        fatalErrorHandler,

http://git-wip-us.apache.org/repos/asf/flink/blob/24c30878/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
index 1ff303b..e03c318 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
@@ -34,7 +34,7 @@ import 
org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmaster.JobManagerRunner;
-import org.apache.flink.runtime.jobmaster.JobManagerServices;
+import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.jobmaster.JobMasterRestEndpoint;
@@ -73,7 +73,7 @@ public abstract class JobClusterEntrypoint extends 
ClusterEntrypoint {
 
        private ResourceManager<?> resourceManager;
 
-       private JobManagerServices jobManagerServices;
+       private JobManagerSharedServices jobManagerSharedServices;
 
        private JobMasterRestEndpoint jobMasterRestEndpoint;
 
@@ -98,7 +98,7 @@ public abstract class JobClusterEntrypoint extends 
ClusterEntrypoint {
                        HeartbeatServices heartbeatServices,
                        MetricRegistry metricRegistry) throws Exception {
 
-               jobManagerServices = 
JobManagerServices.fromConfiguration(configuration, blobServer);
+               jobManagerSharedServices = 
JobManagerSharedServices.fromConfiguration(configuration, blobServer);
 
                resourceManagerRetrievalService = 
highAvailabilityServices.getResourceManagerLeaderRetriever();
 
@@ -154,7 +154,7 @@ public abstract class JobClusterEntrypoint extends 
ClusterEntrypoint {
                        ResourceID.generate(),
                        rpcService,
                        highAvailabilityServices,
-                       jobManagerServices,
+                       jobManagerSharedServices,
                        heartbeatServices,
                        blobServer,
                        metricRegistry,
@@ -204,7 +204,7 @@ public abstract class JobClusterEntrypoint extends 
ClusterEntrypoint {
                        ResourceID resourceId,
                        RpcService rpcService,
                        HighAvailabilityServices highAvailabilityServices,
-                       JobManagerServices jobManagerServices,
+                       JobManagerSharedServices jobManagerSharedServices,
                        HeartbeatServices heartbeatServices,
                        BlobServer blobServer,
                        MetricRegistry metricRegistry,
@@ -221,7 +221,7 @@ public abstract class JobClusterEntrypoint extends 
ClusterEntrypoint {
                        highAvailabilityServices,
                        heartbeatServices,
                        blobServer,
-                       jobManagerServices,
+                       jobManagerSharedServices,
                        metricRegistry,
                        new TerminatingOnCompleteActions(jobGraph.getJobID()),
                        fatalErrorHandler,
@@ -252,9 +252,9 @@ public abstract class JobClusterEntrypoint extends 
ClusterEntrypoint {
                        }
                }
 
-               if (jobManagerServices != null) {
+               if (jobManagerSharedServices != null) {
                        try {
-                               jobManagerServices.shutdown();
+                               jobManagerSharedServices.shutdown();
                        } catch (Throwable t) {
                                exception = ExceptionUtils.firstOrSuppressed(t, 
exception);
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/24c30878/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index 06700d9..8468f28 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -24,7 +24,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -84,7 +84,7 @@ public class JobManagerRunner implements LeaderContender, 
OnCompletionActions, F
        /** Leader election for this job. */
        private final LeaderElectionService leaderElectionService;
 
-       private final JobManagerServices jobManagerServices;
+       private final JobManagerSharedServices jobManagerSharedServices;
 
        private final JobMaster jobManager;
 
@@ -101,7 +101,7 @@ public class JobManagerRunner implements LeaderContender, 
OnCompletionActions, F
         * Exceptions that occur while creating the JobManager or 
JobManagerRunner are directly
         * thrown and not reported to the given {@code FatalErrorHandler}.
         *
-        * <p>This JobManagerRunner assumes that it owns the given {@code 
JobManagerServices}.
+        * <p>This JobManagerRunner assumes that it owns the given {@code 
JobManagerSharedServices}.
         * It will shut them down on error and on calls to {@link #shutdown()}.
         *
         * @throws Exception Thrown if the runner cannot be set up, because 
either one of the
@@ -115,7 +115,7 @@ public class JobManagerRunner implements LeaderContender, 
OnCompletionActions, F
                        final HighAvailabilityServices haServices,
                        final HeartbeatServices heartbeatServices,
                        final BlobServer blobServer,
-                       final JobManagerServices jobManagerServices,
+                       final JobManagerSharedServices jobManagerSharedServices,
                        final MetricRegistry metricRegistry,
                        final OnCompletionActions toNotifyOnComplete,
                        final FatalErrorHandler errorHandler,
@@ -128,7 +128,7 @@ public class JobManagerRunner implements LeaderContender, 
OnCompletionActions, F
                        this.jobGraph = checkNotNull(jobGraph);
                        this.toNotifyOnComplete = 
checkNotNull(toNotifyOnComplete);
                        this.errorHandler = checkNotNull(errorHandler);
-                       this.jobManagerServices = 
checkNotNull(jobManagerServices);
+                       this.jobManagerSharedServices = 
checkNotNull(jobManagerSharedServices);
 
                        checkArgument(jobGraph.getNumberOfVertices() > 0, "The 
given job is empty");
 
@@ -137,7 +137,7 @@ public class JobManagerRunner implements LeaderContender, 
OnCompletionActions, F
                        this.jobManagerMetricGroup = jobManagerMetrics;
 
                        // libraries and class loader first
-                       final BlobLibraryCacheManager libraryCacheManager = 
jobManagerServices.libraryCacheManager;
+                       final LibraryCacheManager libraryCacheManager = 
jobManagerSharedServices.getLibraryCacheManager();
                        try {
                                libraryCacheManager.registerJob(
                                                jobGraph.getJobID(), 
jobGraph.getUserJarBlobKeys(), jobGraph.getClasspaths());
@@ -154,6 +154,8 @@ public class JobManagerRunner implements LeaderContender, 
OnCompletionActions, F
                        this.runningJobsRegistry = 
haServices.getRunningJobsRegistry();
                        this.leaderElectionService = 
haServices.getJobManagerLeaderElectionService(jobGraph.getJobID());
 
+                       this.timeout = jobManagerSharedServices.getTimeout();
+
                        // now start the JobManager
                        this.jobManager = new JobMaster(
                                rpcService,
@@ -161,20 +163,15 @@ public class JobManagerRunner implements LeaderContender, 
OnCompletionActions, F
                                jobGraph,
                                configuration,
                                haServices,
+                               jobManagerSharedServices,
                                heartbeatServices,
-                               jobManagerServices.executorService,
                                blobServer,
-                               jobManagerServices.restartStrategyFactory,
-                               jobManagerServices.rpcAskTimeout,
                                jobManagerMetrics,
                                this,
                                this,
                                userCodeLoader,
                                restAddress,
-                               metricRegistry.getMetricQueryServicePath(),
-                               
jobManagerServices.backPressureStatsTrackerImpl);
-
-                       this.timeout = jobManagerServices.rpcAskTimeout;
+                               metricRegistry.getMetricQueryServicePath());
                }
                catch (Throwable t) {
                        // clean up everything
@@ -378,7 +375,7 @@ public class JobManagerRunner implements LeaderContender, 
OnCompletionActions, F
                                                                
onFatalError(new Exception("Could not start the job manager.", throwable));
                                                        }
                                                },
-                                               
jobManagerServices.executorService);
+                                               
jobManagerSharedServices.getScheduledExecutorService());
                                } catch (Exception e) {
                                        onFatalError(new Exception("Could not 
start the job manager.", e));
                                }
@@ -405,7 +402,7 @@ public class JobManagerRunner implements LeaderContender, 
OnCompletionActions, F
                                                onFatalError(new 
Exception("Could not start the job manager.", throwable));
                                        }
                                },
-                               jobManagerServices.executorService);
+                               
jobManagerSharedServices.getScheduledExecutorService());
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/24c30878/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
deleted file mode 100644
index 2295b5b..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmaster;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.configuration.WebOptions;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.blob.BlobServer;
-import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
-import 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
-import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
-import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTrackerImpl;
-import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.StackTraceSampleCoordinator;
-import org.apache.flink.runtime.util.ExecutorThreadFactory;
-import org.apache.flink.runtime.util.Hardware;
-import org.apache.flink.util.ExceptionUtils;
-
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import scala.concurrent.duration.FiniteDuration;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Utility class to hold all auxiliary services used by the {@link JobMaster}.
- */
-public class JobManagerServices {
-
-       public final ScheduledExecutorService executorService;
-
-       public final BlobLibraryCacheManager libraryCacheManager;
-
-       public final RestartStrategyFactory restartStrategyFactory;
-
-       public final Time rpcAskTimeout;
-
-       private final StackTraceSampleCoordinator stackTraceSampleCoordinator;
-       public final BackPressureStatsTrackerImpl backPressureStatsTrackerImpl;
-
-       public JobManagerServices(
-                       ScheduledExecutorService executorService,
-                       BlobLibraryCacheManager libraryCacheManager,
-                       RestartStrategyFactory restartStrategyFactory,
-                       Time rpcAskTimeout,
-                       StackTraceSampleCoordinator stackTraceSampleCoordinator,
-                       BackPressureStatsTrackerImpl 
backPressureStatsTrackerImpl) {
-
-               this.executorService = checkNotNull(executorService);
-               this.libraryCacheManager = checkNotNull(libraryCacheManager);
-               this.restartStrategyFactory = 
checkNotNull(restartStrategyFactory);
-               this.rpcAskTimeout = checkNotNull(rpcAskTimeout);
-               this.stackTraceSampleCoordinator = 
checkNotNull(stackTraceSampleCoordinator);
-               this.backPressureStatsTrackerImpl = 
checkNotNull(backPressureStatsTrackerImpl);
-
-               executorService.scheduleWithFixedDelay(
-                       backPressureStatsTrackerImpl::cleanUpOperatorStatsCache,
-                       backPressureStatsTrackerImpl.getCleanUpInterval(),
-                       backPressureStatsTrackerImpl.getCleanUpInterval(),
-                       TimeUnit.MILLISECONDS);
-       }
-
-       /**
-        * Shutdown the {@link JobMaster} services.
-        *
-        * <p>This method makes sure all services are closed or shut down, even 
when an exception occurred
-        * in the shutdown of one component. The first encountered exception is 
thrown, with successive
-        * exceptions added as suppressed exceptions.
-        *
-        * @throws Exception The first Exception encountered during shutdown.
-        */
-       public void shutdown() throws Exception {
-               Throwable firstException = null;
-
-               try {
-                       executorService.shutdownNow();
-               } catch (Throwable t) {
-                       firstException = t;
-               }
-
-               libraryCacheManager.shutdown();
-
-               stackTraceSampleCoordinator.shutDown();
-               backPressureStatsTrackerImpl.shutDown();
-
-               if (firstException != null) {
-                       ExceptionUtils.rethrowException(firstException, "Error 
while shutting down JobManager services");
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Creating the components from a configuration
-       // 
------------------------------------------------------------------------
-
-       public static JobManagerServices fromConfiguration(
-                       Configuration config,
-                       BlobServer blobServer) throws Exception {
-
-               checkNotNull(config);
-               checkNotNull(blobServer);
-
-               final String classLoaderResolveOrder =
-                       config.getString(CoreOptions.CLASSLOADER_RESOLVE_ORDER);
-
-               final String alwaysParentFirstLoaderString =
-                       
config.getString(CoreOptions.ALWAYS_PARENT_FIRST_LOADER);
-               final String[] alwaysParentFirstLoaderPatterns = 
alwaysParentFirstLoaderString.split(";");
-
-               final BlobLibraryCacheManager libraryCacheManager =
-                       new BlobLibraryCacheManager(
-                               blobServer,
-                               
FlinkUserCodeClassLoaders.ResolveOrder.fromString(classLoaderResolveOrder),
-                               alwaysParentFirstLoaderPatterns);
-
-               final FiniteDuration timeout;
-               try {
-                       timeout = AkkaUtils.getTimeout(config);
-               } catch (NumberFormatException e) {
-                       throw new 
IllegalConfigurationException(AkkaUtils.formatDurationParingErrorMessage());
-               }
-
-               final ScheduledExecutorService futureExecutor = 
Executors.newScheduledThreadPool(
-                               Hardware.getNumberCPUCores(),
-                               new ExecutorThreadFactory("jobmanager-future"));
-
-               final StackTraceSampleCoordinator stackTraceSampleCoordinator =
-                       new StackTraceSampleCoordinator(futureExecutor, 
timeout.toMillis());
-               final BackPressureStatsTrackerImpl backPressureStatsTrackerImpl 
= new BackPressureStatsTrackerImpl(
-                       stackTraceSampleCoordinator,
-                       
config.getInteger(WebOptions.BACKPRESSURE_CLEANUP_INTERVAL),
-                       config.getInteger(WebOptions.BACKPRESSURE_NUM_SAMPLES),
-                       
config.getInteger(WebOptions.BACKPRESSURE_REFRESH_INTERVAL),
-                       
Time.milliseconds(config.getInteger(WebOptions.BACKPRESSURE_DELAY)));
-
-               return new JobManagerServices(
-                       futureExecutor,
-                       libraryCacheManager,
-                       
RestartStrategyFactory.createRestartStrategyFactory(config),
-                       Time.of(timeout.length(), timeout.unit()),
-                       stackTraceSampleCoordinator,
-                       backPressureStatsTrackerImpl);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/24c30878/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerSharedServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerSharedServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerSharedServices.java
new file mode 100644
index 0000000..d028f88
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerSharedServices.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
+import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
+import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
+import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTrackerImpl;
+import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.StackTraceSampleCoordinator;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.runtime.util.Hardware;
+import org.apache.flink.util.ExceptionUtils;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Utility class which holds all auxiliary shared services used by the {@link 
JobMaster}.
+ * Consequently, the {@link JobMaster} should never shut these services down.
+ */
+public class JobManagerSharedServices {
+
+       private final ScheduledExecutorService scheduledExecutorService;
+
+       private final LibraryCacheManager libraryCacheManager;
+
+       private final RestartStrategyFactory restartStrategyFactory;
+
+       private final StackTraceSampleCoordinator stackTraceSampleCoordinator;
+
+       private final BackPressureStatsTracker backPressureStatsTracker;
+
+       private final Time timeout;
+
+       public JobManagerSharedServices(
+                       ScheduledExecutorService scheduledExecutorService,
+                       LibraryCacheManager libraryCacheManager,
+                       RestartStrategyFactory restartStrategyFactory,
+                       StackTraceSampleCoordinator stackTraceSampleCoordinator,
+                       BackPressureStatsTracker backPressureStatsTracker,
+                       Time backPressureStatsTrackerCleanupInterval,
+                       Time timeout) {
+
+               this.scheduledExecutorService = 
checkNotNull(scheduledExecutorService);
+               this.libraryCacheManager = checkNotNull(libraryCacheManager);
+               this.restartStrategyFactory = 
checkNotNull(restartStrategyFactory);
+               this.stackTraceSampleCoordinator = 
checkNotNull(stackTraceSampleCoordinator);
+               this.backPressureStatsTracker = 
checkNotNull(backPressureStatsTracker);
+               this.timeout = checkNotNull(timeout);
+
+               scheduledExecutorService.scheduleWithFixedDelay(
+                       backPressureStatsTracker::cleanUpOperatorStatsCache,
+                       
backPressureStatsTrackerCleanupInterval.toMilliseconds(),
+                       
backPressureStatsTrackerCleanupInterval.toMilliseconds(),
+                       TimeUnit.MILLISECONDS);
+       }
+
+       public ScheduledExecutorService getScheduledExecutorService() {
+               return scheduledExecutorService;
+       }
+
+       public LibraryCacheManager getLibraryCacheManager() {
+               return libraryCacheManager;
+       }
+
+       public RestartStrategyFactory getRestartStrategyFactory() {
+               return restartStrategyFactory;
+       }
+
+       public BackPressureStatsTracker getBackPressureStatsTracker() {
+               return backPressureStatsTracker;
+       }
+
+       public Time getTimeout() {
+               return timeout;
+       }
+
+       /**
+        * Shutdown the {@link JobMaster} services.
+        *
+        * <p>This method makes sure all services are closed or shut down, even 
when an exception occurred
+        * in the shutdown of one component. The first encountered exception is 
thrown, with successive
+        * exceptions added as suppressed exceptions.
+        *
+        * @throws Exception The first Exception encountered during shutdown.
+        */
+       public void shutdown() throws Exception {
+               Throwable firstException = null;
+
+               try {
+                       scheduledExecutorService.shutdownNow();
+               } catch (Throwable t) {
+                       firstException = t;
+               }
+
+               libraryCacheManager.shutdown();
+               stackTraceSampleCoordinator.shutDown();
+               backPressureStatsTracker.shutDown();
+
+               if (firstException != null) {
+                       ExceptionUtils.rethrowException(firstException, "Error 
while shutting down JobManager services");
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Creating the components from a configuration
+       // 
------------------------------------------------------------------------
+
+       public static JobManagerSharedServices fromConfiguration(
+                       Configuration config,
+                       BlobServer blobServer) throws Exception {
+
+               checkNotNull(config);
+               checkNotNull(blobServer);
+
+               final String classLoaderResolveOrder =
+                       config.getString(CoreOptions.CLASSLOADER_RESOLVE_ORDER);
+
+               final String alwaysParentFirstLoaderString =
+                       
config.getString(CoreOptions.ALWAYS_PARENT_FIRST_LOADER);
+               final String[] alwaysParentFirstLoaderPatterns = 
alwaysParentFirstLoaderString.split(";");
+
+               final BlobLibraryCacheManager libraryCacheManager =
+                       new BlobLibraryCacheManager(
+                               blobServer,
+                               
FlinkUserCodeClassLoaders.ResolveOrder.fromString(classLoaderResolveOrder),
+                               alwaysParentFirstLoaderPatterns);
+
+               final FiniteDuration timeout;
+               try {
+                       timeout = AkkaUtils.getTimeout(config);
+               } catch (NumberFormatException e) {
+                       throw new 
IllegalConfigurationException(AkkaUtils.formatDurationParingErrorMessage());
+               }
+
+               final ScheduledExecutorService futureExecutor = 
Executors.newScheduledThreadPool(
+                               Hardware.getNumberCPUCores(),
+                               new ExecutorThreadFactory("jobmanager-future"));
+
+               final StackTraceSampleCoordinator stackTraceSampleCoordinator =
+                       new StackTraceSampleCoordinator(futureExecutor, 
timeout.toMillis());
+               final int cleanUpInterval = 
config.getInteger(WebOptions.BACKPRESSURE_CLEANUP_INTERVAL);
+               final BackPressureStatsTrackerImpl backPressureStatsTracker = 
new BackPressureStatsTrackerImpl(
+                       stackTraceSampleCoordinator,
+                       cleanUpInterval,
+                       config.getInteger(WebOptions.BACKPRESSURE_NUM_SAMPLES),
+                       
config.getInteger(WebOptions.BACKPRESSURE_REFRESH_INTERVAL),
+                       
Time.milliseconds(config.getInteger(WebOptions.BACKPRESSURE_DELAY)));
+
+               return new JobManagerSharedServices(
+                       futureExecutor,
+                       libraryCacheManager,
+                       
RestartStrategyFactory.createRestartStrategyFactory(config),
+                       stackTraceSampleCoordinator,
+                       backPressureStatsTracker,
+                       Time.milliseconds(cleanUpInterval),
+                       Time.milliseconds(timeout.toMillis()));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/24c30878/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index a8f1154..bf24dc6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -171,7 +171,7 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
        private final HeartbeatManager<Void, Void> 
resourceManagerHeartbeatManager;
 
        /** The execution context which is used to execute futures. */
-       private final Executor executor;
+       private final ScheduledExecutorService scheduledExecutorService;
 
        private final OnCompletionActions jobCompletionActions;
 
@@ -214,18 +214,15 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
                        JobGraph jobGraph,
                        Configuration configuration,
                        HighAvailabilityServices highAvailabilityService,
+                       JobManagerSharedServices jobManagerSharedServices,
                        HeartbeatServices heartbeatServices,
-                       ScheduledExecutorService executor,
                        BlobServer blobServer,
-                       RestartStrategyFactory restartStrategyFactory,
-                       Time rpcAskTimeout,
                        @Nullable JobManagerMetricGroup jobManagerMetricGroup,
                        OnCompletionActions jobCompletionActions,
                        FatalErrorHandler errorHandler,
                        ClassLoader userCodeLoader,
                        @Nullable String restAddress,
-                       @Nullable String metricQueryServicePath,
-                       BackPressureStatsTracker backPressureStatsTracker) 
throws Exception {
+                       @Nullable String metricQueryServicePath) throws 
Exception {
 
                super(rpcService, 
AkkaRpcServiceUtils.createRandomName(JOB_MANAGER_NAME));
 
@@ -233,10 +230,10 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
 
                this.resourceId = checkNotNull(resourceId);
                this.jobGraph = checkNotNull(jobGraph);
-               this.rpcTimeout = rpcAskTimeout;
+               this.rpcTimeout = jobManagerSharedServices.getTimeout();
                this.highAvailabilityServices = 
checkNotNull(highAvailabilityService);
                this.blobServer = checkNotNull(blobServer);
-               this.executor = checkNotNull(executor);
+               this.scheduledExecutorService = 
jobManagerSharedServices.getScheduledExecutorService();
                this.jobCompletionActions = checkNotNull(jobCompletionActions);
                this.errorHandler = checkNotNull(errorHandler);
                this.userCodeLoader = checkNotNull(userCodeLoader);
@@ -271,7 +268,7 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
 
                final RestartStrategy restartStrategy = 
(restartStrategyConfiguration != null) ?
                                
RestartStrategyFactory.createRestartStrategy(restartStrategyConfiguration) :
-                               restartStrategyFactory.createRestartStrategy();
+                               
jobManagerSharedServices.getRestartStrategyFactory().createRestartStrategy();
 
                log.info("Using restart strategy {} for {} ({}).", 
restartStrategy, jobName, jid);
 
@@ -294,12 +291,12 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
                        null,
                        jobGraph,
                        configuration,
-                       executor,
-                       executor,
+                       scheduledExecutorService,
+                       scheduledExecutorService,
                        slotPool.getSlotProvider(),
                        userCodeLoader,
                        checkpointRecoveryFactory,
-                       rpcAskTimeout,
+                       rpcTimeout,
                        restartStrategy,
                        jobMetricGroup,
                        -1,
@@ -316,7 +313,7 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
                        .orElse(FutureUtils.completedExceptionally(new 
JobMasterException("The JobMaster has not been started with a REST 
endpoint.")));
 
                this.metricQueryServicePath = metricQueryServicePath;
-               this.backPressureStatsTracker = 
checkNotNull(backPressureStatsTracker);
+               this.backPressureStatsTracker = 
checkNotNull(jobManagerSharedServices.getBackPressureStatsTracker());
        }
 
        
//----------------------------------------------------------------------------------------------
@@ -813,7 +810,7 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
 
        @Override
        public CompletableFuture<JobDetails> requestJobDetails(@RpcTimeout Time 
timeout) {
-               return CompletableFuture.supplyAsync(() -> 
WebMonitorUtils.createDetailsForJob(executionGraph), executor);
+               return CompletableFuture.supplyAsync(() -> 
WebMonitorUtils.createDetailsForJob(executionGraph), scheduledExecutorService);
        }
 
        @Override
@@ -963,7 +960,7 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
                }
 
                // start scheduling job in another thread
-               executor.execute(
+               scheduledExecutorService.execute(
                        () -> {
                                try {
                                        executionGraph.scheduleForExecution();
@@ -1034,7 +1031,7 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
 
                if (newJobStatus.isGloballyTerminalState()) {
                        final ArchivedExecutionGraph archivedExecutionGraph = 
ArchivedExecutionGraph.createFrom(executionGraph);
-                       executor.execute(() -> 
jobCompletionActions.jobReachedGloballyTerminalState(archivedExecutionGraph));
+                       scheduledExecutorService.execute(() -> 
jobCompletionActions.jobReachedGloballyTerminalState(archivedExecutionGraph));
                }
        }
 
@@ -1069,7 +1066,7 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
                                getFencingToken(),
                                resourceManagerAddress,
                                resourceManagerId,
-                               executor);
+                               scheduledExecutorService);
 
                        resourceManagerConnection.start();
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/24c30878/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
index e5ee9d7..6fba534 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
@@ -31,7 +31,7 @@ import 
org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmaster.JobManagerRunner;
-import org.apache.flink.runtime.jobmaster.JobManagerServices;
+import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
@@ -79,7 +79,7 @@ public class MiniClusterJobDispatcher {
        private final BlobServer blobServer;
 
        /** all the services that the JobManager needs, such as BLOB service, 
factories, etc. */
-       private final JobManagerServices jobManagerServices;
+       private final JobManagerSharedServices jobManagerSharedServices;
 
        /** Registry for all metrics in the mini cluster. */
        private final MetricRegistry metricRegistry;
@@ -155,7 +155,7 @@ public class MiniClusterJobDispatcher {
                this.numJobManagers = numJobManagers;
 
                LOG.info("Creating JobMaster services");
-               this.jobManagerServices = 
JobManagerServices.fromConfiguration(config, blobServer);
+               this.jobManagerSharedServices = 
JobManagerSharedServices.fromConfiguration(config, blobServer);
        }
 
        // 
------------------------------------------------------------------------
@@ -191,9 +191,9 @@ public class MiniClusterJobDispatcher {
                                        }
                                }
 
-                               // shut down the JobManagerServices
+                               // shut down the JobManagerSharedServices
                                try {
-                                       jobManagerServices.shutdown();
+                                       jobManagerSharedServices.shutdown();
                                } catch (Throwable throwable) {
                                        exception = 
ExceptionUtils.firstOrSuppressed(throwable, exception);
                                }
@@ -285,7 +285,7 @@ public class MiniClusterJobDispatcher {
                                        haServices,
                                        heartbeatServices,
                                        blobServer,
-                                       jobManagerServices,
+                                       jobManagerSharedServices,
                                        metricRegistry,
                                        onCompletion,
                                        errorHandler,

http://git-wip-us.apache.org/repos/asf/flink/blob/24c30878/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTracker.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTracker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTracker.java
index 356c8b8..7c447c0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTracker.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTracker.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.rest.handler.legacy.backpressure;
 
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.util.FlinkException;
 
 import java.util.Optional;
 
@@ -35,4 +36,19 @@ public interface BackPressureStatsTracker {
         * @return Back pressure statistics for an operator
         */
        Optional<OperatorBackPressureStats> 
getOperatorBackPressureStats(ExecutionJobVertex vertex);
+
+       /**
+        * Cleans up the operator stats cache if it contains timed out entries.
+        *
+        * <p>The Guava cache only evicts as maintenance during normal 
operations.
+        * If this handler is inactive, it will never be cleaned.
+        */
+       void cleanUpOperatorStatsCache();
+
+       /**
+        * Shuts the BackPressureStatsTracker down.
+        *
+        * @throws FlinkException if the BackPressureStatsTracker could not be 
shut down
+        */
+       void shutDown() throws FlinkException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/24c30878/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/VoidBackPressureStatsTracker.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/VoidBackPressureStatsTracker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/VoidBackPressureStatsTracker.java
index 2a55d47..567af66 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/VoidBackPressureStatsTracker.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/VoidBackPressureStatsTracker.java
@@ -32,5 +32,15 @@ public enum VoidBackPressureStatsTracker implements 
BackPressureStatsTracker {
                public Optional<OperatorBackPressureStats> 
getOperatorBackPressureStats(ExecutionJobVertex vertex) {
                        return Optional.empty();
                }
+
+               @Override
+               public void cleanUpOperatorStatsCache() {
+                       // nothing to do
+               }
+
+               @Override
+               public void shutDown() {
+                       // nothing to do
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/24c30878/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index 0d58f62..8fe3c3b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -39,7 +39,7 @@ import 
org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
 import org.apache.flink.runtime.jobmaster.JobManagerRunner;
-import org.apache.flink.runtime.jobmaster.JobManagerServices;
+import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
@@ -392,7 +392,7 @@ public class DispatcherTest extends TestLogger {
                                HighAvailabilityServices 
highAvailabilityServices,
                                HeartbeatServices heartbeatServices,
                                BlobServer blobServer,
-                               JobManagerServices jobManagerServices,
+                               JobManagerSharedServices 
jobManagerSharedServices,
                                MetricRegistry metricRegistry,
                                OnCompletionActions onCompleteActions,
                                FatalErrorHandler fatalErrorHandler) throws 
Exception {
@@ -406,7 +406,7 @@ public class DispatcherTest extends TestLogger {
                                highAvailabilityServices,
                                heartbeatServices,
                                blobServer,
-                               jobManagerServices,
+                               jobManagerSharedServices,
                                metricRegistry,
                                onCompleteActions,
                                fatalErrorHandler,

http://git-wip-us.apache.org/repos/asf/flink/blob/24c30878/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 027ba5e..29f8b38 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -29,7 +29,6 @@ import 
org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
-import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
 import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
@@ -40,7 +39,6 @@ import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
-import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.VoidBackPressureStatsTracker;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
@@ -55,7 +53,6 @@ import org.junit.experimental.categories.Category;
 
 import java.net.URL;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -104,6 +101,11 @@ public class JobMasterTest extends TestLogger {
                final JobGraph jobGraph = new JobGraph();
 
                Configuration configuration = new Configuration();
+
+               final JobManagerSharedServices jobManagerSharedServices = new 
TestingJobManagerSharedServicesBuilder()
+                       .setTimeout(testingTimeout)
+                       .build();
+
                try (BlobServer blobServer = new BlobServer(configuration, new 
VoidBlobStore())) {
                        blobServer.start();
 
@@ -113,18 +115,15 @@ public class JobMasterTest extends TestLogger {
                                jobGraph,
                                configuration,
                                haServices,
+                               jobManagerSharedServices,
                                heartbeatServices,
-                               Executors.newScheduledThreadPool(1),
                                blobServer,
-                               new 
NoRestartStrategy.NoRestartStrategyFactory(),
-                               testingTimeout,
                                null,
                                new NoOpOnCompletionActions(),
                                testingFatalErrorHandler,
                                FlinkUserCodeClassLoaders.parentFirst(new 
URL[0], JobMasterTest.class.getClassLoader()),
                                null,
-                               null,
-                               VoidBackPressureStatsTracker.INSTANCE);
+                               null);
 
                        CompletableFuture<Acknowledge> startFuture = 
jobMaster.start(jobMasterId, testingTimeout);
 
@@ -152,6 +151,7 @@ public class JobMasterTest extends TestLogger {
                        testingFatalErrorHandler.rethrowError();
 
                } finally {
+                       jobManagerSharedServices.shutdown();
                        rpc.stopService();
                }
        }
@@ -203,6 +203,11 @@ public class JobMasterTest extends TestLogger {
                final TestingFatalErrorHandler testingFatalErrorHandler = new 
TestingFatalErrorHandler();
 
                Configuration configuration = new Configuration();
+
+               final JobManagerSharedServices jobManagerSharedServices = new 
TestingJobManagerSharedServicesBuilder()
+                       .setTimeout(testingTimeout)
+                       .build();
+
                try (BlobServer blobServer = new BlobServer(configuration, new 
VoidBlobStore())) {
                        blobServer.start();
 
@@ -212,18 +217,15 @@ public class JobMasterTest extends TestLogger {
                                jobGraph,
                                configuration,
                                haServices,
+                               jobManagerSharedServices,
                                heartbeatServices,
-                               Executors.newScheduledThreadPool(1),
                                blobServer,
-                               new 
NoRestartStrategy.NoRestartStrategyFactory(),
-                               testingTimeout,
                                null,
                                new NoOpOnCompletionActions(),
                                testingFatalErrorHandler,
                                FlinkUserCodeClassLoaders.parentFirst(new 
URL[0], JobMasterTest.class.getClassLoader()),
                                null,
-                               null,
-                               VoidBackPressureStatsTracker.INSTANCE);
+                               null);
 
                        CompletableFuture<Acknowledge> startFuture = 
jobMaster.start(jobMasterId, testingTimeout);
 
@@ -251,6 +253,7 @@ public class JobMasterTest extends TestLogger {
                        testingFatalErrorHandler.rethrowError();
 
                } finally {
+                       jobManagerSharedServices.shutdown();
                        rpc.stopService();
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/24c30878/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerSharedServicesBuilder.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerSharedServicesBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerSharedServicesBuilder.java
new file mode 100644
index 0000000..74b97a4
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerSharedServicesBuilder.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
+import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
+import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.StackTraceSampleCoordinator;
+import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.VoidBackPressureStatsTracker;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Builder for the {@link JobManagerSharedServices}.
+ */
+public class TestingJobManagerSharedServicesBuilder {
+
+       private ScheduledExecutorService scheduledExecutorService;
+
+       private LibraryCacheManager libraryCacheManager;
+
+       private RestartStrategyFactory restartStrategyFactory;
+
+       private StackTraceSampleCoordinator stackTraceSampleCoordinator;
+
+       private BackPressureStatsTracker backPressureStatsTracker;
+
+       private Time timeout;
+
+       public TestingJobManagerSharedServicesBuilder() {
+               scheduledExecutorService = TestingUtils.defaultExecutor();
+               libraryCacheManager = mock(LibraryCacheManager.class);
+               restartStrategyFactory = new 
NoRestartStrategy.NoRestartStrategyFactory();
+               stackTraceSampleCoordinator = 
mock(StackTraceSampleCoordinator.class);
+               backPressureStatsTracker = 
VoidBackPressureStatsTracker.INSTANCE;
+       }
+
+       public TestingJobManagerSharedServicesBuilder 
setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) {
+               this.scheduledExecutorService = scheduledExecutorService;
+               return this;
+       }
+
+       public TestingJobManagerSharedServicesBuilder 
setLibraryCacheManager(LibraryCacheManager libraryCacheManager) {
+               this.libraryCacheManager = libraryCacheManager;
+               return this;
+
+       }
+
+       public TestingJobManagerSharedServicesBuilder 
setRestartStrategyFactory(RestartStrategyFactory restartStrategyFactory) {
+               this.restartStrategyFactory = restartStrategyFactory;
+               return this;
+       }
+
+       public TestingJobManagerSharedServicesBuilder 
setStackTraceSampleCoordinator(StackTraceSampleCoordinator 
stackTraceSampleCoordinator) {
+               this.stackTraceSampleCoordinator = stackTraceSampleCoordinator;
+               return this;
+       }
+
+       public TestingJobManagerSharedServicesBuilder 
setBackPressureStatsTracker(BackPressureStatsTracker backPressureStatsTracker) {
+               this.backPressureStatsTracker = backPressureStatsTracker;
+               return this;
+
+       }
+
+       public TestingJobManagerSharedServicesBuilder setTimeout(Time timeout) {
+               this.timeout = timeout;
+               return this;
+       }
+
+       public JobManagerSharedServices build() {
+               return new JobManagerSharedServices(
+                       scheduledExecutorService,
+                       libraryCacheManager,
+                       restartStrategyFactory,
+                       stackTraceSampleCoordinator,
+                       backPressureStatsTracker,
+                       timeout,
+                       timeout);
+       }
+}

Reply via email to