[FLINK-8637] [flip6] Use JobManagerSharedServices to pass in services to JobMaster
Pass a JobManagerSharedServices instance to the JobMaster instead of the each service individually. This closes #5457. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/24c30878 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/24c30878 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/24c30878 Branch: refs/heads/master Commit: 24c30878ed6f6ed1599a5ec23362055e0e88916f Parents: f719bef Author: Till Rohrmann <trohrm...@apache.org> Authored: Mon Feb 12 00:51:48 2018 +0100 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Tue Feb 13 08:50:51 2018 +0100 ---------------------------------------------------------------------- .../flink/runtime/dispatcher/Dispatcher.java | 12 +- .../dispatcher/StandaloneDispatcher.java | 6 +- .../entrypoint/JobClusterEntrypoint.java | 16 +- .../runtime/jobmaster/JobManagerRunner.java | 27 ++- .../runtime/jobmaster/JobManagerServices.java | 163 ---------------- .../jobmaster/JobManagerSharedServices.java | 189 +++++++++++++++++++ .../flink/runtime/jobmaster/JobMaster.java | 31 ++- .../minicluster/MiniClusterJobDispatcher.java | 12 +- .../backpressure/BackPressureStatsTracker.java | 16 ++ .../VoidBackPressureStatsTracker.java | 10 + .../runtime/dispatcher/DispatcherTest.java | 6 +- .../flink/runtime/jobmaster/JobMasterTest.java | 29 +-- .../TestingJobManagerSharedServicesBuilder.java | 101 ++++++++++ 13 files changed, 384 insertions(+), 234 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/24c30878/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index da63765..c83dce0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -38,7 +38,7 @@ import org.apache.flink.runtime.jobmanager.OnCompletionActions; import org.apache.flink.runtime.jobmanager.SubmittedJobGraph; import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore; import org.apache.flink.runtime.jobmaster.JobManagerRunner; -import org.apache.flink.runtime.jobmaster.JobManagerServices; +import org.apache.flink.runtime.jobmaster.JobManagerSharedServices; import org.apache.flink.runtime.leaderelection.LeaderContender; import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.messages.Acknowledge; @@ -89,7 +89,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme private final HighAvailabilityServices highAvailabilityServices; private final ResourceManagerGateway resourceManagerGateway; - private final JobManagerServices jobManagerServices; + private final JobManagerSharedServices jobManagerSharedServices; private final HeartbeatServices heartbeatServices; private final BlobServer blobServer; private final MetricRegistry metricRegistry; @@ -127,7 +127,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme this.metricRegistry = Preconditions.checkNotNull(metricRegistry); this.fatalErrorHandler = Preconditions.checkNotNull(fatalErrorHandler); - this.jobManagerServices = JobManagerServices.fromConfiguration( + this.jobManagerSharedServices = JobManagerSharedServices.fromConfiguration( configuration, this.blobServer); @@ -155,7 +155,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme clearState(); try { - jobManagerServices.shutdown(); + jobManagerSharedServices.shutdown(); } catch (Throwable t) { exception = ExceptionUtils.firstOrSuppressed(t, exception); } @@ -234,7 +234,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme highAvailabilityServices, heartbeatServices, blobServer, - jobManagerServices, + jobManagerSharedServices, metricRegistry, new DispatcherOnCompleteActions(jobGraph.getJobID()), fatalErrorHandler); @@ -545,7 +545,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, BlobServer blobServer, - JobManagerServices jobManagerServices, + JobManagerSharedServices jobManagerSharedServices, MetricRegistry metricRegistry, OnCompletionActions onCompleteActions, FatalErrorHandler fatalErrorHandler) throws Exception; http://git-wip-us.apache.org/repos/asf/flink/blob/24c30878/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java index 5533d59..6592e98 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java @@ -26,7 +26,7 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.OnCompletionActions; import org.apache.flink.runtime.jobmaster.JobManagerRunner; -import org.apache.flink.runtime.jobmaster.JobManagerServices; +import org.apache.flink.runtime.jobmaster.JobManagerSharedServices; import org.apache.flink.runtime.jobmaster.JobMaster; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; @@ -76,7 +76,7 @@ public class StandaloneDispatcher extends Dispatcher { HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, BlobServer blobServer, - JobManagerServices jobManagerServices, + JobManagerSharedServices jobManagerSharedServices, MetricRegistry metricRegistry, OnCompletionActions onCompleteActions, FatalErrorHandler fatalErrorHandler) throws Exception { @@ -89,7 +89,7 @@ public class StandaloneDispatcher extends Dispatcher { highAvailabilityServices, heartbeatServices, blobServer, - jobManagerServices, + jobManagerSharedServices, metricRegistry, onCompleteActions, fatalErrorHandler, http://git-wip-us.apache.org/repos/asf/flink/blob/24c30878/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java index 1ff303b..e03c318 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java @@ -34,7 +34,7 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.OnCompletionActions; import org.apache.flink.runtime.jobmaster.JobManagerRunner; -import org.apache.flink.runtime.jobmaster.JobManagerServices; +import org.apache.flink.runtime.jobmaster.JobManagerSharedServices; import org.apache.flink.runtime.jobmaster.JobMasterGateway; import org.apache.flink.runtime.jobmaster.JobMasterId; import org.apache.flink.runtime.jobmaster.JobMasterRestEndpoint; @@ -73,7 +73,7 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint { private ResourceManager<?> resourceManager; - private JobManagerServices jobManagerServices; + private JobManagerSharedServices jobManagerSharedServices; private JobMasterRestEndpoint jobMasterRestEndpoint; @@ -98,7 +98,7 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint { HeartbeatServices heartbeatServices, MetricRegistry metricRegistry) throws Exception { - jobManagerServices = JobManagerServices.fromConfiguration(configuration, blobServer); + jobManagerSharedServices = JobManagerSharedServices.fromConfiguration(configuration, blobServer); resourceManagerRetrievalService = highAvailabilityServices.getResourceManagerLeaderRetriever(); @@ -154,7 +154,7 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint { ResourceID.generate(), rpcService, highAvailabilityServices, - jobManagerServices, + jobManagerSharedServices, heartbeatServices, blobServer, metricRegistry, @@ -204,7 +204,7 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint { ResourceID resourceId, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, - JobManagerServices jobManagerServices, + JobManagerSharedServices jobManagerSharedServices, HeartbeatServices heartbeatServices, BlobServer blobServer, MetricRegistry metricRegistry, @@ -221,7 +221,7 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint { highAvailabilityServices, heartbeatServices, blobServer, - jobManagerServices, + jobManagerSharedServices, metricRegistry, new TerminatingOnCompleteActions(jobGraph.getJobID()), fatalErrorHandler, @@ -252,9 +252,9 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint { } } - if (jobManagerServices != null) { + if (jobManagerSharedServices != null) { try { - jobManagerServices.shutdown(); + jobManagerSharedServices.shutdown(); } catch (Throwable t) { exception = ExceptionUtils.firstOrSuppressed(t, exception); } http://git-wip-us.apache.org/repos/asf/flink/blob/24c30878/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java index 06700d9..8468f28 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java @@ -24,7 +24,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; +import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; @@ -84,7 +84,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F /** Leader election for this job. */ private final LeaderElectionService leaderElectionService; - private final JobManagerServices jobManagerServices; + private final JobManagerSharedServices jobManagerSharedServices; private final JobMaster jobManager; @@ -101,7 +101,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F * Exceptions that occur while creating the JobManager or JobManagerRunner are directly * thrown and not reported to the given {@code FatalErrorHandler}. * - * <p>This JobManagerRunner assumes that it owns the given {@code JobManagerServices}. + * <p>This JobManagerRunner assumes that it owns the given {@code JobManagerSharedServices}. * It will shut them down on error and on calls to {@link #shutdown()}. * * @throws Exception Thrown if the runner cannot be set up, because either one of the @@ -115,7 +115,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F final HighAvailabilityServices haServices, final HeartbeatServices heartbeatServices, final BlobServer blobServer, - final JobManagerServices jobManagerServices, + final JobManagerSharedServices jobManagerSharedServices, final MetricRegistry metricRegistry, final OnCompletionActions toNotifyOnComplete, final FatalErrorHandler errorHandler, @@ -128,7 +128,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F this.jobGraph = checkNotNull(jobGraph); this.toNotifyOnComplete = checkNotNull(toNotifyOnComplete); this.errorHandler = checkNotNull(errorHandler); - this.jobManagerServices = checkNotNull(jobManagerServices); + this.jobManagerSharedServices = checkNotNull(jobManagerSharedServices); checkArgument(jobGraph.getNumberOfVertices() > 0, "The given job is empty"); @@ -137,7 +137,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F this.jobManagerMetricGroup = jobManagerMetrics; // libraries and class loader first - final BlobLibraryCacheManager libraryCacheManager = jobManagerServices.libraryCacheManager; + final LibraryCacheManager libraryCacheManager = jobManagerSharedServices.getLibraryCacheManager(); try { libraryCacheManager.registerJob( jobGraph.getJobID(), jobGraph.getUserJarBlobKeys(), jobGraph.getClasspaths()); @@ -154,6 +154,8 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F this.runningJobsRegistry = haServices.getRunningJobsRegistry(); this.leaderElectionService = haServices.getJobManagerLeaderElectionService(jobGraph.getJobID()); + this.timeout = jobManagerSharedServices.getTimeout(); + // now start the JobManager this.jobManager = new JobMaster( rpcService, @@ -161,20 +163,15 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F jobGraph, configuration, haServices, + jobManagerSharedServices, heartbeatServices, - jobManagerServices.executorService, blobServer, - jobManagerServices.restartStrategyFactory, - jobManagerServices.rpcAskTimeout, jobManagerMetrics, this, this, userCodeLoader, restAddress, - metricRegistry.getMetricQueryServicePath(), - jobManagerServices.backPressureStatsTrackerImpl); - - this.timeout = jobManagerServices.rpcAskTimeout; + metricRegistry.getMetricQueryServicePath()); } catch (Throwable t) { // clean up everything @@ -378,7 +375,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F onFatalError(new Exception("Could not start the job manager.", throwable)); } }, - jobManagerServices.executorService); + jobManagerSharedServices.getScheduledExecutorService()); } catch (Exception e) { onFatalError(new Exception("Could not start the job manager.", e)); } @@ -405,7 +402,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F onFatalError(new Exception("Could not start the job manager.", throwable)); } }, - jobManagerServices.executorService); + jobManagerSharedServices.getScheduledExecutorService()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/24c30878/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java deleted file mode 100644 index 2295b5b..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.jobmaster; - -import org.apache.flink.api.common.time.Time; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.CoreOptions; -import org.apache.flink.configuration.IllegalConfigurationException; -import org.apache.flink.configuration.WebOptions; -import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.blob.BlobServer; -import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; -import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders; -import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory; -import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTrackerImpl; -import org.apache.flink.runtime.rest.handler.legacy.backpressure.StackTraceSampleCoordinator; -import org.apache.flink.runtime.util.ExecutorThreadFactory; -import org.apache.flink.runtime.util.Hardware; -import org.apache.flink.util.ExceptionUtils; - -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -import scala.concurrent.duration.FiniteDuration; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * Utility class to hold all auxiliary services used by the {@link JobMaster}. - */ -public class JobManagerServices { - - public final ScheduledExecutorService executorService; - - public final BlobLibraryCacheManager libraryCacheManager; - - public final RestartStrategyFactory restartStrategyFactory; - - public final Time rpcAskTimeout; - - private final StackTraceSampleCoordinator stackTraceSampleCoordinator; - public final BackPressureStatsTrackerImpl backPressureStatsTrackerImpl; - - public JobManagerServices( - ScheduledExecutorService executorService, - BlobLibraryCacheManager libraryCacheManager, - RestartStrategyFactory restartStrategyFactory, - Time rpcAskTimeout, - StackTraceSampleCoordinator stackTraceSampleCoordinator, - BackPressureStatsTrackerImpl backPressureStatsTrackerImpl) { - - this.executorService = checkNotNull(executorService); - this.libraryCacheManager = checkNotNull(libraryCacheManager); - this.restartStrategyFactory = checkNotNull(restartStrategyFactory); - this.rpcAskTimeout = checkNotNull(rpcAskTimeout); - this.stackTraceSampleCoordinator = checkNotNull(stackTraceSampleCoordinator); - this.backPressureStatsTrackerImpl = checkNotNull(backPressureStatsTrackerImpl); - - executorService.scheduleWithFixedDelay( - backPressureStatsTrackerImpl::cleanUpOperatorStatsCache, - backPressureStatsTrackerImpl.getCleanUpInterval(), - backPressureStatsTrackerImpl.getCleanUpInterval(), - TimeUnit.MILLISECONDS); - } - - /** - * Shutdown the {@link JobMaster} services. - * - * <p>This method makes sure all services are closed or shut down, even when an exception occurred - * in the shutdown of one component. The first encountered exception is thrown, with successive - * exceptions added as suppressed exceptions. - * - * @throws Exception The first Exception encountered during shutdown. - */ - public void shutdown() throws Exception { - Throwable firstException = null; - - try { - executorService.shutdownNow(); - } catch (Throwable t) { - firstException = t; - } - - libraryCacheManager.shutdown(); - - stackTraceSampleCoordinator.shutDown(); - backPressureStatsTrackerImpl.shutDown(); - - if (firstException != null) { - ExceptionUtils.rethrowException(firstException, "Error while shutting down JobManager services"); - } - } - - // ------------------------------------------------------------------------ - // Creating the components from a configuration - // ------------------------------------------------------------------------ - - public static JobManagerServices fromConfiguration( - Configuration config, - BlobServer blobServer) throws Exception { - - checkNotNull(config); - checkNotNull(blobServer); - - final String classLoaderResolveOrder = - config.getString(CoreOptions.CLASSLOADER_RESOLVE_ORDER); - - final String alwaysParentFirstLoaderString = - config.getString(CoreOptions.ALWAYS_PARENT_FIRST_LOADER); - final String[] alwaysParentFirstLoaderPatterns = alwaysParentFirstLoaderString.split(";"); - - final BlobLibraryCacheManager libraryCacheManager = - new BlobLibraryCacheManager( - blobServer, - FlinkUserCodeClassLoaders.ResolveOrder.fromString(classLoaderResolveOrder), - alwaysParentFirstLoaderPatterns); - - final FiniteDuration timeout; - try { - timeout = AkkaUtils.getTimeout(config); - } catch (NumberFormatException e) { - throw new IllegalConfigurationException(AkkaUtils.formatDurationParingErrorMessage()); - } - - final ScheduledExecutorService futureExecutor = Executors.newScheduledThreadPool( - Hardware.getNumberCPUCores(), - new ExecutorThreadFactory("jobmanager-future")); - - final StackTraceSampleCoordinator stackTraceSampleCoordinator = - new StackTraceSampleCoordinator(futureExecutor, timeout.toMillis()); - final BackPressureStatsTrackerImpl backPressureStatsTrackerImpl = new BackPressureStatsTrackerImpl( - stackTraceSampleCoordinator, - config.getInteger(WebOptions.BACKPRESSURE_CLEANUP_INTERVAL), - config.getInteger(WebOptions.BACKPRESSURE_NUM_SAMPLES), - config.getInteger(WebOptions.BACKPRESSURE_REFRESH_INTERVAL), - Time.milliseconds(config.getInteger(WebOptions.BACKPRESSURE_DELAY))); - - return new JobManagerServices( - futureExecutor, - libraryCacheManager, - RestartStrategyFactory.createRestartStrategyFactory(config), - Time.of(timeout.length(), timeout.unit()), - stackTraceSampleCoordinator, - backPressureStatsTrackerImpl); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/24c30878/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerSharedServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerSharedServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerSharedServices.java new file mode 100644 index 0000000..d028f88 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerSharedServices.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.configuration.WebOptions; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; +import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders; +import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory; +import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker; +import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTrackerImpl; +import org.apache.flink.runtime.rest.handler.legacy.backpressure.StackTraceSampleCoordinator; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.runtime.util.Hardware; +import org.apache.flink.util.ExceptionUtils; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import scala.concurrent.duration.FiniteDuration; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Utility class which holds all auxiliary shared services used by the {@link JobMaster}. + * Consequently, the {@link JobMaster} should never shut these services down. + */ +public class JobManagerSharedServices { + + private final ScheduledExecutorService scheduledExecutorService; + + private final LibraryCacheManager libraryCacheManager; + + private final RestartStrategyFactory restartStrategyFactory; + + private final StackTraceSampleCoordinator stackTraceSampleCoordinator; + + private final BackPressureStatsTracker backPressureStatsTracker; + + private final Time timeout; + + public JobManagerSharedServices( + ScheduledExecutorService scheduledExecutorService, + LibraryCacheManager libraryCacheManager, + RestartStrategyFactory restartStrategyFactory, + StackTraceSampleCoordinator stackTraceSampleCoordinator, + BackPressureStatsTracker backPressureStatsTracker, + Time backPressureStatsTrackerCleanupInterval, + Time timeout) { + + this.scheduledExecutorService = checkNotNull(scheduledExecutorService); + this.libraryCacheManager = checkNotNull(libraryCacheManager); + this.restartStrategyFactory = checkNotNull(restartStrategyFactory); + this.stackTraceSampleCoordinator = checkNotNull(stackTraceSampleCoordinator); + this.backPressureStatsTracker = checkNotNull(backPressureStatsTracker); + this.timeout = checkNotNull(timeout); + + scheduledExecutorService.scheduleWithFixedDelay( + backPressureStatsTracker::cleanUpOperatorStatsCache, + backPressureStatsTrackerCleanupInterval.toMilliseconds(), + backPressureStatsTrackerCleanupInterval.toMilliseconds(), + TimeUnit.MILLISECONDS); + } + + public ScheduledExecutorService getScheduledExecutorService() { + return scheduledExecutorService; + } + + public LibraryCacheManager getLibraryCacheManager() { + return libraryCacheManager; + } + + public RestartStrategyFactory getRestartStrategyFactory() { + return restartStrategyFactory; + } + + public BackPressureStatsTracker getBackPressureStatsTracker() { + return backPressureStatsTracker; + } + + public Time getTimeout() { + return timeout; + } + + /** + * Shutdown the {@link JobMaster} services. + * + * <p>This method makes sure all services are closed or shut down, even when an exception occurred + * in the shutdown of one component. The first encountered exception is thrown, with successive + * exceptions added as suppressed exceptions. + * + * @throws Exception The first Exception encountered during shutdown. + */ + public void shutdown() throws Exception { + Throwable firstException = null; + + try { + scheduledExecutorService.shutdownNow(); + } catch (Throwable t) { + firstException = t; + } + + libraryCacheManager.shutdown(); + stackTraceSampleCoordinator.shutDown(); + backPressureStatsTracker.shutDown(); + + if (firstException != null) { + ExceptionUtils.rethrowException(firstException, "Error while shutting down JobManager services"); + } + } + + // ------------------------------------------------------------------------ + // Creating the components from a configuration + // ------------------------------------------------------------------------ + + public static JobManagerSharedServices fromConfiguration( + Configuration config, + BlobServer blobServer) throws Exception { + + checkNotNull(config); + checkNotNull(blobServer); + + final String classLoaderResolveOrder = + config.getString(CoreOptions.CLASSLOADER_RESOLVE_ORDER); + + final String alwaysParentFirstLoaderString = + config.getString(CoreOptions.ALWAYS_PARENT_FIRST_LOADER); + final String[] alwaysParentFirstLoaderPatterns = alwaysParentFirstLoaderString.split(";"); + + final BlobLibraryCacheManager libraryCacheManager = + new BlobLibraryCacheManager( + blobServer, + FlinkUserCodeClassLoaders.ResolveOrder.fromString(classLoaderResolveOrder), + alwaysParentFirstLoaderPatterns); + + final FiniteDuration timeout; + try { + timeout = AkkaUtils.getTimeout(config); + } catch (NumberFormatException e) { + throw new IllegalConfigurationException(AkkaUtils.formatDurationParingErrorMessage()); + } + + final ScheduledExecutorService futureExecutor = Executors.newScheduledThreadPool( + Hardware.getNumberCPUCores(), + new ExecutorThreadFactory("jobmanager-future")); + + final StackTraceSampleCoordinator stackTraceSampleCoordinator = + new StackTraceSampleCoordinator(futureExecutor, timeout.toMillis()); + final int cleanUpInterval = config.getInteger(WebOptions.BACKPRESSURE_CLEANUP_INTERVAL); + final BackPressureStatsTrackerImpl backPressureStatsTracker = new BackPressureStatsTrackerImpl( + stackTraceSampleCoordinator, + cleanUpInterval, + config.getInteger(WebOptions.BACKPRESSURE_NUM_SAMPLES), + config.getInteger(WebOptions.BACKPRESSURE_REFRESH_INTERVAL), + Time.milliseconds(config.getInteger(WebOptions.BACKPRESSURE_DELAY))); + + return new JobManagerSharedServices( + futureExecutor, + libraryCacheManager, + RestartStrategyFactory.createRestartStrategyFactory(config), + stackTraceSampleCoordinator, + backPressureStatsTracker, + Time.milliseconds(cleanUpInterval), + Time.milliseconds(timeout.toMillis())); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/24c30878/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index a8f1154..bf24dc6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -171,7 +171,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast private final HeartbeatManager<Void, Void> resourceManagerHeartbeatManager; /** The execution context which is used to execute futures. */ - private final Executor executor; + private final ScheduledExecutorService scheduledExecutorService; private final OnCompletionActions jobCompletionActions; @@ -214,18 +214,15 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast JobGraph jobGraph, Configuration configuration, HighAvailabilityServices highAvailabilityService, + JobManagerSharedServices jobManagerSharedServices, HeartbeatServices heartbeatServices, - ScheduledExecutorService executor, BlobServer blobServer, - RestartStrategyFactory restartStrategyFactory, - Time rpcAskTimeout, @Nullable JobManagerMetricGroup jobManagerMetricGroup, OnCompletionActions jobCompletionActions, FatalErrorHandler errorHandler, ClassLoader userCodeLoader, @Nullable String restAddress, - @Nullable String metricQueryServicePath, - BackPressureStatsTracker backPressureStatsTracker) throws Exception { + @Nullable String metricQueryServicePath) throws Exception { super(rpcService, AkkaRpcServiceUtils.createRandomName(JOB_MANAGER_NAME)); @@ -233,10 +230,10 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast this.resourceId = checkNotNull(resourceId); this.jobGraph = checkNotNull(jobGraph); - this.rpcTimeout = rpcAskTimeout; + this.rpcTimeout = jobManagerSharedServices.getTimeout(); this.highAvailabilityServices = checkNotNull(highAvailabilityService); this.blobServer = checkNotNull(blobServer); - this.executor = checkNotNull(executor); + this.scheduledExecutorService = jobManagerSharedServices.getScheduledExecutorService(); this.jobCompletionActions = checkNotNull(jobCompletionActions); this.errorHandler = checkNotNull(errorHandler); this.userCodeLoader = checkNotNull(userCodeLoader); @@ -271,7 +268,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast final RestartStrategy restartStrategy = (restartStrategyConfiguration != null) ? RestartStrategyFactory.createRestartStrategy(restartStrategyConfiguration) : - restartStrategyFactory.createRestartStrategy(); + jobManagerSharedServices.getRestartStrategyFactory().createRestartStrategy(); log.info("Using restart strategy {} for {} ({}).", restartStrategy, jobName, jid); @@ -294,12 +291,12 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast null, jobGraph, configuration, - executor, - executor, + scheduledExecutorService, + scheduledExecutorService, slotPool.getSlotProvider(), userCodeLoader, checkpointRecoveryFactory, - rpcAskTimeout, + rpcTimeout, restartStrategy, jobMetricGroup, -1, @@ -316,7 +313,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast .orElse(FutureUtils.completedExceptionally(new JobMasterException("The JobMaster has not been started with a REST endpoint."))); this.metricQueryServicePath = metricQueryServicePath; - this.backPressureStatsTracker = checkNotNull(backPressureStatsTracker); + this.backPressureStatsTracker = checkNotNull(jobManagerSharedServices.getBackPressureStatsTracker()); } //---------------------------------------------------------------------------------------------- @@ -813,7 +810,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast @Override public CompletableFuture<JobDetails> requestJobDetails(@RpcTimeout Time timeout) { - return CompletableFuture.supplyAsync(() -> WebMonitorUtils.createDetailsForJob(executionGraph), executor); + return CompletableFuture.supplyAsync(() -> WebMonitorUtils.createDetailsForJob(executionGraph), scheduledExecutorService); } @Override @@ -963,7 +960,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast } // start scheduling job in another thread - executor.execute( + scheduledExecutorService.execute( () -> { try { executionGraph.scheduleForExecution(); @@ -1034,7 +1031,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast if (newJobStatus.isGloballyTerminalState()) { final ArchivedExecutionGraph archivedExecutionGraph = ArchivedExecutionGraph.createFrom(executionGraph); - executor.execute(() -> jobCompletionActions.jobReachedGloballyTerminalState(archivedExecutionGraph)); + scheduledExecutorService.execute(() -> jobCompletionActions.jobReachedGloballyTerminalState(archivedExecutionGraph)); } } @@ -1069,7 +1066,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast getFencingToken(), resourceManagerAddress, resourceManagerId, - executor); + scheduledExecutorService); resourceManagerConnection.start(); } http://git-wip-us.apache.org/repos/asf/flink/blob/24c30878/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java index e5ee9d7..6fba534 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java @@ -31,7 +31,7 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.OnCompletionActions; import org.apache.flink.runtime.jobmaster.JobManagerRunner; -import org.apache.flink.runtime.jobmaster.JobManagerServices; +import org.apache.flink.runtime.jobmaster.JobManagerSharedServices; import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.rpc.FatalErrorHandler; @@ -79,7 +79,7 @@ public class MiniClusterJobDispatcher { private final BlobServer blobServer; /** all the services that the JobManager needs, such as BLOB service, factories, etc. */ - private final JobManagerServices jobManagerServices; + private final JobManagerSharedServices jobManagerSharedServices; /** Registry for all metrics in the mini cluster. */ private final MetricRegistry metricRegistry; @@ -155,7 +155,7 @@ public class MiniClusterJobDispatcher { this.numJobManagers = numJobManagers; LOG.info("Creating JobMaster services"); - this.jobManagerServices = JobManagerServices.fromConfiguration(config, blobServer); + this.jobManagerSharedServices = JobManagerSharedServices.fromConfiguration(config, blobServer); } // ------------------------------------------------------------------------ @@ -191,9 +191,9 @@ public class MiniClusterJobDispatcher { } } - // shut down the JobManagerServices + // shut down the JobManagerSharedServices try { - jobManagerServices.shutdown(); + jobManagerSharedServices.shutdown(); } catch (Throwable throwable) { exception = ExceptionUtils.firstOrSuppressed(throwable, exception); } @@ -285,7 +285,7 @@ public class MiniClusterJobDispatcher { haServices, heartbeatServices, blobServer, - jobManagerServices, + jobManagerSharedServices, metricRegistry, onCompletion, errorHandler, http://git-wip-us.apache.org/repos/asf/flink/blob/24c30878/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTracker.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTracker.java index 356c8b8..7c447c0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTracker.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTracker.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.rest.handler.legacy.backpressure; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.util.FlinkException; import java.util.Optional; @@ -35,4 +36,19 @@ public interface BackPressureStatsTracker { * @return Back pressure statistics for an operator */ Optional<OperatorBackPressureStats> getOperatorBackPressureStats(ExecutionJobVertex vertex); + + /** + * Cleans up the operator stats cache if it contains timed out entries. + * + * <p>The Guava cache only evicts as maintenance during normal operations. + * If this handler is inactive, it will never be cleaned. + */ + void cleanUpOperatorStatsCache(); + + /** + * Shuts the BackPressureStatsTracker down. + * + * @throws FlinkException if the BackPressureStatsTracker could not be shut down + */ + void shutDown() throws FlinkException; } http://git-wip-us.apache.org/repos/asf/flink/blob/24c30878/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/VoidBackPressureStatsTracker.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/VoidBackPressureStatsTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/VoidBackPressureStatsTracker.java index 2a55d47..567af66 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/VoidBackPressureStatsTracker.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/VoidBackPressureStatsTracker.java @@ -32,5 +32,15 @@ public enum VoidBackPressureStatsTracker implements BackPressureStatsTracker { public Optional<OperatorBackPressureStats> getOperatorBackPressureStats(ExecutionJobVertex vertex) { return Optional.empty(); } + + @Override + public void cleanUpOperatorStatsCache() { + // nothing to do + } + + @Override + public void shutDown() { + // nothing to do + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/24c30878/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java index 0d58f62..8fe3c3b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java @@ -39,7 +39,7 @@ import org.apache.flink.runtime.jobmanager.OnCompletionActions; import org.apache.flink.runtime.jobmanager.SubmittedJobGraph; import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore; import org.apache.flink.runtime.jobmaster.JobManagerRunner; -import org.apache.flink.runtime.jobmaster.JobManagerServices; +import org.apache.flink.runtime.jobmaster.JobManagerSharedServices; import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; @@ -392,7 +392,7 @@ public class DispatcherTest extends TestLogger { HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, BlobServer blobServer, - JobManagerServices jobManagerServices, + JobManagerSharedServices jobManagerSharedServices, MetricRegistry metricRegistry, OnCompletionActions onCompleteActions, FatalErrorHandler fatalErrorHandler) throws Exception { @@ -406,7 +406,7 @@ public class DispatcherTest extends TestLogger { highAvailabilityServices, heartbeatServices, blobServer, - jobManagerServices, + jobManagerSharedServices, metricRegistry, onCompleteActions, fatalErrorHandler, http://git-wip-us.apache.org/repos/asf/flink/blob/24c30878/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index 027ba5e..29f8b38 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -29,7 +29,6 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; -import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices; import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; @@ -40,7 +39,6 @@ import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.registration.RegistrationResponse; import org.apache.flink.runtime.resourcemanager.ResourceManagerId; import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway; -import org.apache.flink.runtime.rest.handler.legacy.backpressure.VoidBackPressureStatsTracker; import org.apache.flink.runtime.rpc.TestingRpcService; import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway; import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; @@ -55,7 +53,6 @@ import org.junit.experimental.categories.Category; import java.net.URL; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import static org.hamcrest.MatcherAssert.assertThat; @@ -104,6 +101,11 @@ public class JobMasterTest extends TestLogger { final JobGraph jobGraph = new JobGraph(); Configuration configuration = new Configuration(); + + final JobManagerSharedServices jobManagerSharedServices = new TestingJobManagerSharedServicesBuilder() + .setTimeout(testingTimeout) + .build(); + try (BlobServer blobServer = new BlobServer(configuration, new VoidBlobStore())) { blobServer.start(); @@ -113,18 +115,15 @@ public class JobMasterTest extends TestLogger { jobGraph, configuration, haServices, + jobManagerSharedServices, heartbeatServices, - Executors.newScheduledThreadPool(1), blobServer, - new NoRestartStrategy.NoRestartStrategyFactory(), - testingTimeout, null, new NoOpOnCompletionActions(), testingFatalErrorHandler, FlinkUserCodeClassLoaders.parentFirst(new URL[0], JobMasterTest.class.getClassLoader()), null, - null, - VoidBackPressureStatsTracker.INSTANCE); + null); CompletableFuture<Acknowledge> startFuture = jobMaster.start(jobMasterId, testingTimeout); @@ -152,6 +151,7 @@ public class JobMasterTest extends TestLogger { testingFatalErrorHandler.rethrowError(); } finally { + jobManagerSharedServices.shutdown(); rpc.stopService(); } } @@ -203,6 +203,11 @@ public class JobMasterTest extends TestLogger { final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler(); Configuration configuration = new Configuration(); + + final JobManagerSharedServices jobManagerSharedServices = new TestingJobManagerSharedServicesBuilder() + .setTimeout(testingTimeout) + .build(); + try (BlobServer blobServer = new BlobServer(configuration, new VoidBlobStore())) { blobServer.start(); @@ -212,18 +217,15 @@ public class JobMasterTest extends TestLogger { jobGraph, configuration, haServices, + jobManagerSharedServices, heartbeatServices, - Executors.newScheduledThreadPool(1), blobServer, - new NoRestartStrategy.NoRestartStrategyFactory(), - testingTimeout, null, new NoOpOnCompletionActions(), testingFatalErrorHandler, FlinkUserCodeClassLoaders.parentFirst(new URL[0], JobMasterTest.class.getClassLoader()), null, - null, - VoidBackPressureStatsTracker.INSTANCE); + null); CompletableFuture<Acknowledge> startFuture = jobMaster.start(jobMasterId, testingTimeout); @@ -251,6 +253,7 @@ public class JobMasterTest extends TestLogger { testingFatalErrorHandler.rethrowError(); } finally { + jobManagerSharedServices.shutdown(); rpc.stopService(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/24c30878/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerSharedServicesBuilder.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerSharedServicesBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerSharedServicesBuilder.java new file mode 100644 index 0000000..74b97a4 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerSharedServicesBuilder.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager; +import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory; +import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker; +import org.apache.flink.runtime.rest.handler.legacy.backpressure.StackTraceSampleCoordinator; +import org.apache.flink.runtime.rest.handler.legacy.backpressure.VoidBackPressureStatsTracker; +import org.apache.flink.runtime.testingUtils.TestingUtils; + +import java.util.concurrent.ScheduledExecutorService; + +import static org.mockito.Mockito.mock; + +/** + * Builder for the {@link JobManagerSharedServices}. + */ +public class TestingJobManagerSharedServicesBuilder { + + private ScheduledExecutorService scheduledExecutorService; + + private LibraryCacheManager libraryCacheManager; + + private RestartStrategyFactory restartStrategyFactory; + + private StackTraceSampleCoordinator stackTraceSampleCoordinator; + + private BackPressureStatsTracker backPressureStatsTracker; + + private Time timeout; + + public TestingJobManagerSharedServicesBuilder() { + scheduledExecutorService = TestingUtils.defaultExecutor(); + libraryCacheManager = mock(LibraryCacheManager.class); + restartStrategyFactory = new NoRestartStrategy.NoRestartStrategyFactory(); + stackTraceSampleCoordinator = mock(StackTraceSampleCoordinator.class); + backPressureStatsTracker = VoidBackPressureStatsTracker.INSTANCE; + } + + public TestingJobManagerSharedServicesBuilder setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) { + this.scheduledExecutorService = scheduledExecutorService; + return this; + } + + public TestingJobManagerSharedServicesBuilder setLibraryCacheManager(LibraryCacheManager libraryCacheManager) { + this.libraryCacheManager = libraryCacheManager; + return this; + + } + + public TestingJobManagerSharedServicesBuilder setRestartStrategyFactory(RestartStrategyFactory restartStrategyFactory) { + this.restartStrategyFactory = restartStrategyFactory; + return this; + } + + public TestingJobManagerSharedServicesBuilder setStackTraceSampleCoordinator(StackTraceSampleCoordinator stackTraceSampleCoordinator) { + this.stackTraceSampleCoordinator = stackTraceSampleCoordinator; + return this; + } + + public TestingJobManagerSharedServicesBuilder setBackPressureStatsTracker(BackPressureStatsTracker backPressureStatsTracker) { + this.backPressureStatsTracker = backPressureStatsTracker; + return this; + + } + + public TestingJobManagerSharedServicesBuilder setTimeout(Time timeout) { + this.timeout = timeout; + return this; + } + + public JobManagerSharedServices build() { + return new JobManagerSharedServices( + scheduledExecutorService, + libraryCacheManager, + restartStrategyFactory, + stackTraceSampleCoordinator, + backPressureStatsTracker, + timeout, + timeout); + } +}