Repository: reef Updated Branches: refs/heads/master 73c28280c -> 5f4591083
[REEF-1657] Better thread names in REEF and Wake This work is towards cleaner shutdown of REEF application required for "REEF as a library" [REEF-1561](https://issues.apache.org/jira/browse/REEF-1561) project There are no functionality changes in this PR. Summary of changes: * More descriptive names for threads created in Wake `OrderedRemoteReceiverStage` and `DefaultThreadFactory` * Better thread names in REEF `ProcessContainer` and `EvaluatorMessageDispatcher` * Better logging in `REEFErrorHandler` and in many `.close()` methods in REEF * Minor refactoring in shutdown code; no functionality changes JIRA: [REEF-1657](https://issues.apache.org/jira/browse/REEF-1657) Pull Request: This closes #1172 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/5f459108 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/5f459108 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/5f459108 Branch: refs/heads/master Commit: 5f4591083b3ee74eacc70770193913e6972812b2 Parents: 73c2828 Author: Sergiy Matusevych <[email protected]> Authored: Fri Oct 28 00:35:38 2016 -0700 Committer: Markus Weimer <[email protected]> Committed: Mon Oct 31 13:01:30 2016 -0700 ---------------------------------------------------------------------- .../evaluator/EvaluatorMessageDispatcher.java | 20 ++-- .../runtime/common/launch/REEFErrorHandler.java | 24 ++-- .../runtime/common/utils/RemoteManager.java | 11 +- .../runtime/local/driver/ProcessContainer.java | 18 +-- .../reef/wake/impl/DefaultThreadFactory.java | 23 ++-- .../DefaultRemoteManagerImplementation.java | 109 ++++++++++--------- .../remote/impl/OrderedRemoteReceiverStage.java | 69 +++++------- 7 files changed, 148 insertions(+), 126 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/5f459108/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java index fc3a83b..18e868a 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java @@ -45,6 +45,8 @@ public final class EvaluatorMessageDispatcher implements AutoCloseable { private static final Logger LOG = Logger.getLogger(EvaluatorMessageDispatcher.class.getName()); + private final String evaluatorIdentifier; + /** * Dispatcher used for application provided event handlers. */ @@ -55,7 +57,6 @@ public final class EvaluatorMessageDispatcher implements AutoCloseable { */ private final DispatchingEStage serviceDispatcher; - /** * Dispatcher used for application provided driver-restart specific event handlers. */ @@ -67,7 +68,7 @@ public final class EvaluatorMessageDispatcher implements AutoCloseable { private final DispatchingEStage driverRestartServiceDispatcher; @Inject - EvaluatorMessageDispatcher( + private EvaluatorMessageDispatcher( // Application-provided Context event handlers @Parameter(ContextActiveHandlers.class) final Set<EventHandler<ActiveContext>> contextActiveHandlers, @Parameter(ContextClosedHandlers.class) final Set<EventHandler<ClosedContext>> contextClosedHandlers, @@ -129,10 +130,14 @@ public final class EvaluatorMessageDispatcher implements AutoCloseable { @Parameter(EvaluatorDispatcherThreads.class) final int numberOfThreads, @Parameter(EvaluatorManager.EvaluatorIdentifier.class) final String evaluatorIdentifier, final DriverExceptionHandler driverExceptionHandler, - final IdlenessCallbackEventHandlerFactory idlenessCallbackEventHandlerFactory - ) { + final IdlenessCallbackEventHandlerFactory idlenessCallbackEventHandlerFactory) { + + LOG.log(Level.FINER, "Creating message dispatcher for {0}", evaluatorIdentifier); + + this.evaluatorIdentifier = evaluatorIdentifier; + this.serviceDispatcher = new DispatchingEStage( + driverExceptionHandler, numberOfThreads, "EvaluatorMessageDispatcher:" + evaluatorIdentifier); - this.serviceDispatcher = new DispatchingEStage(driverExceptionHandler, numberOfThreads, evaluatorIdentifier); this.applicationDispatcher = new DispatchingEStage(this.serviceDispatcher); this.driverRestartApplicationDispatcher = new DispatchingEStage(this.serviceDispatcher); this.driverRestartServiceDispatcher = new DispatchingEStage(this.serviceDispatcher); @@ -282,9 +287,8 @@ public final class EvaluatorMessageDispatcher implements AutoCloseable { @Override public void close() throws Exception { - /** - * This effectively closes all dispatchers as they share the same stage. - */ + LOG.log(Level.FINER, "Closing message dispatcher for {0}", this.evaluatorIdentifier); + // This effectively closes all dispatchers as they share the same stage. this.serviceDispatcher.close(); } } http://git-wip-us.apache.org/repos/asf/reef/blob/5f459108/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFErrorHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFErrorHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFErrorHandler.java index e32537c..0611919 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFErrorHandler.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFErrorHandler.java @@ -39,6 +39,7 @@ import java.util.logging.Logger; public final class REEFErrorHandler implements EventHandler<Throwable>, AutoCloseable { private static final Logger LOG = Logger.getLogger(REEFErrorHandler.class.getName()); + private static final String CLASS_NAME = REEFErrorHandler.class.getCanonicalName(); // This class is used as the ErrorHandler in the RemoteManager. Hence, we need an InjectionFuture here. private final InjectionFuture<RemoteManager> remoteManager; @@ -47,10 +48,12 @@ public final class REEFErrorHandler implements EventHandler<Throwable>, AutoClos private final ExceptionCodec exceptionCodec; @Inject - REEFErrorHandler(final InjectionFuture<RemoteManager> remoteManager, - @Parameter(ErrorHandlerRID.class) final String errorHandlerRID, - @Parameter(LaunchID.class) final String launchID, - final ExceptionCodec exceptionCodec) { + REEFErrorHandler( + @Parameter(ErrorHandlerRID.class) final String errorHandlerRID, + @Parameter(LaunchID.class) final String launchID, + final InjectionFuture<RemoteManager> remoteManager, + final ExceptionCodec exceptionCodec) { + this.errorHandlerRID = errorHandlerRID; this.remoteManager = remoteManager; this.launchID = launchID; @@ -91,19 +94,22 @@ public final class REEFErrorHandler implements EventHandler<Throwable>, AutoClos @SuppressWarnings("checkstyle:illegalcatch") public void close() { + + LOG.entering(CLASS_NAME, "close"); + try { this.remoteManager.get().close(); } catch (final Throwable ex) { LOG.log(Level.SEVERE, "Unable to close the remote manager", ex); } + + LOG.exiting(CLASS_NAME, "close"); } @Override public String toString() { - return "REEFErrorHandler{" + - "remoteManager=" + remoteManager + - ", launchID='" + launchID + '\'' + - ", errorHandlerRID='" + errorHandlerRID + '\'' + - '}'; + return String.format( + "REEFErrorHandler: { remoteManager:{%s}, launchID:%s, errorHandlerRID:%s }", + this.remoteManager.get(), this.launchID, this.errorHandlerRID); } } http://git-wip-us.apache.org/repos/asf/reef/blob/5f459108/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/RemoteManager.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/RemoteManager.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/RemoteManager.java index 0eeff08..34fb065 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/RemoteManager.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/RemoteManager.java @@ -32,6 +32,7 @@ import java.util.logging.Logger; public class RemoteManager { private static final Logger LOG = Logger.getLogger(RemoteManager.class.getName()); + private static final String CLASS_NAME = RemoteManager.class.getCanonicalName(); private final org.apache.reef.wake.remote.RemoteManager raw; private final RemoteIdentifierFactory factory; @@ -41,7 +42,7 @@ public class RemoteManager { final RemoteIdentifierFactory factory) { this.raw = raw; this.factory = factory; - LOG.log(Level.FINE, "Instantiated 'RemoteManager' with remoteId: {0}", this.getMyIdentifier()); + LOG.log(Level.FINE, "Instantiated RemoteManager wrapper: {0}", this.raw); } public final org.apache.reef.wake.remote.RemoteManager raw() { @@ -49,7 +50,9 @@ public class RemoteManager { } public void close() throws Exception { + LOG.entering(CLASS_NAME, "close"); this.raw.close(); + LOG.exiting(CLASS_NAME, "close"); } public <T> EventHandler<T> getHandler( @@ -71,5 +74,9 @@ public class RemoteManager { public String getMyIdentifier() { return this.raw.getMyIdentifier().toString(); } -} + @Override + public String toString() { + return "RemoteManager wrap: " + this.raw; + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/5f459108/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ProcessContainer.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ProcessContainer.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ProcessContainer.java index 160de05..6805141 100644 --- a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ProcessContainer.java +++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ProcessContainer.java @@ -55,6 +55,7 @@ public final class ProcessContainer implements Container { private final File globalFolder; private final RunnableProcessObserver processObserver; private final ThreadGroup threadGroup; + private Thread theThread; private RunnableProcess process; @@ -134,13 +135,16 @@ public final class ProcessContainer implements Container { @Override public void run(final List<String> commandLine) { - this.process = new RunnableProcess(commandLine, + + this.process = new RunnableProcess( + commandLine, this.containedID, this.folder, this.processObserver, this.fileNames.getEvaluatorStdoutFileName(), this.fileNames.getEvaluatorStderrFileName()); - this.theThread = new Thread(this.threadGroup, this.process, this.containedID); + + this.theThread = new Thread(this.threadGroup, this.process, "ProcessContainer:" + this.containedID); this.theThread.start(); } @@ -189,12 +193,8 @@ public final class ProcessContainer implements Container { @Override public String toString() { - return "ProcessContainer{" + - "containedID='" + containedID + '\'' + - ", nodeID='" + nodeID + '\'' + - ", errorHandlerRID='" + errorHandlerRID + '\'' + - ", folder=" + folder + '\'' + - ", rack=" + rackName + - '}'; + return String.format( + "ProcessContainer{containedID=%s, nodeID=%s, errorHandlerRID=%s, folder=%s, rack=%s}", + this.containedID, this.nodeID, this.errorHandlerRID, this.folder, this.rackName); } } http://git-wip-us.apache.org/repos/asf/reef/blob/5f459108/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/DefaultThreadFactory.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/DefaultThreadFactory.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/DefaultThreadFactory.java index 53c6efb..1612c39 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/DefaultThreadFactory.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/DefaultThreadFactory.java @@ -25,10 +25,13 @@ import java.util.concurrent.atomic.AtomicInteger; * A default thread factory implementation that names created threads. */ public final class DefaultThreadFactory implements ThreadFactory { + private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1); + private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String prefix; + private Thread.UncaughtExceptionHandler uncaughtExceptionHandler; /** @@ -37,10 +40,7 @@ public final class DefaultThreadFactory implements ThreadFactory { * @param prefix the name prefix of the created thread */ public DefaultThreadFactory(final String prefix) { - final SecurityManager s = System.getSecurityManager(); - this.group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); - this.prefix = prefix + "-pool-" + POOL_NUMBER.getAndIncrement() + "-thread-"; - this.uncaughtExceptionHandler = null; + this(prefix, null); } /** @@ -52,7 +52,7 @@ public final class DefaultThreadFactory implements ThreadFactory { public DefaultThreadFactory(final String prefix, final Thread.UncaughtExceptionHandler uncaughtExceptionHandler) { final SecurityManager s = System.getSecurityManager(); this.group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); - this.prefix = prefix + "-pool-" + POOL_NUMBER.getAndIncrement() + "-thread-"; + this.prefix = String.format("%s:pool-%02d", prefix, POOL_NUMBER.getAndIncrement()); this.uncaughtExceptionHandler = uncaughtExceptionHandler; } @@ -72,17 +72,22 @@ public final class DefaultThreadFactory implements ThreadFactory { */ @Override public Thread newThread(final Runnable r) { - final Thread t = new Thread(group, r, prefix + threadNumber.getAndIncrement(), 0); + + final Thread t = new Thread(this.group, r, + String.format("%s:thread-%03d", this.prefix, this.threadNumber.getAndIncrement()), 0); + if (t.isDaemon()) { t.setDaemon(false); } + if (t.getPriority() != Thread.NORM_PRIORITY) { t.setPriority(Thread.NORM_PRIORITY); } - if (uncaughtExceptionHandler != null) { - t.setUncaughtExceptionHandler(uncaughtExceptionHandler); + + if (this.uncaughtExceptionHandler != null) { + t.setUncaughtExceptionHandler(this.uncaughtExceptionHandler); } + return t; } - } http://git-wip-us.apache.org/repos/asf/reef/blob/5f459108/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java index 6012ba1..243fa31 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java @@ -167,64 +167,75 @@ public final class DefaultRemoteManagerImplementation implements RemoteManager { @Override public void close() { - if (this.closed.compareAndSet(false, true)) { - - LOG.log(Level.FINE, "RemoteManager: {0} Closing remote manager id: {1}", - new Object[] {this.name, this.myIdentifier}); - - final Runnable closeRunnable = new Runnable() { - @Override - public void run() { - try { - LOG.log(Level.FINE, "Closing sender stage {0}", myIdentifier); - reSendStage.close(); - LOG.log(Level.FINE, "Closed the remote sender stage"); - } catch (final Exception e) { - LOG.log(Level.SEVERE, "Unable to close the remote sender stage", e); - } - - try { - LOG.log(Level.FINE, "Closing transport {0}", myIdentifier); - transport.close(); - LOG.log(Level.FINE, "Closed the transport"); - } catch (final Exception e) { - LOG.log(Level.SEVERE, "Unable to close the transport.", e); - } - - try { - LOG.log(Level.FINE, "Closing receiver stage {0}", myIdentifier); - reRecvStage.close(); - LOG.log(Level.FINE, "Closed the remote receiver stage"); - } catch (final Exception e) { - LOG.log(Level.SEVERE, "Unable to close the remote receiver stage", e); - } - } - }; + LOG.log(Level.FINE, "RemoteManager: {0} Closing remote manager id: {1}", + new Object[] {this.name, this.myIdentifier}); - final ExecutorService closeExecutor = Executors.newSingleThreadExecutor(); + if (!this.closed.compareAndSet(false, true)) { + LOG.log(Level.FINE, "RemoteManager: {0} already closed", this.name); + return; + } - closeExecutor.submit(closeRunnable); - closeExecutor.shutdown(); + final Runnable closeRunnable = new Runnable() { + @Override + public void run() { - if (!closeExecutor.isShutdown()) { - LOG.log(Level.SEVERE, "close executor did not shutdown properly."); - } + Thread.currentThread().setName(String.format("CLOSE:RemoteManager:%s:%s", name, myIdentifier)); + + try { + LOG.log(Level.FINE, "Closing sender stage {0}", myIdentifier); + reSendStage.close(); + LOG.log(Level.FINE, "Closed the remote sender stage"); + } catch (final Exception e) { + LOG.log(Level.SEVERE, "Unable to close the remote sender stage", e); + } + + try { + LOG.log(Level.FINE, "Closing transport {0}", myIdentifier); + transport.close(); + LOG.log(Level.FINE, "Closed the transport"); + } catch (final Exception e) { + LOG.log(Level.SEVERE, "Unable to close the transport.", e); + } - final long endTime = System.currentTimeMillis() + CLOSE_EXECUTOR_TIMEOUT; - while (!closeExecutor.isTerminated()) { try { - final long waitTime = endTime - System.currentTimeMillis(); - closeExecutor.awaitTermination(waitTime, TimeUnit.MILLISECONDS); - } catch (final InterruptedException e) { - LOG.log(Level.FINE, "Interrupted", e); + LOG.log(Level.FINE, "Closing receiver stage {0}", myIdentifier); + reRecvStage.close(); + LOG.log(Level.FINE, "Closed the remote receiver stage"); + } catch (final Exception e) { + LOG.log(Level.SEVERE, "Unable to close the remote receiver stage", e); } } + }; - if (closeExecutor.isTerminated()) { - LOG.log(Level.FINE, "Close executor terminated properly."); - } else { - LOG.log(Level.SEVERE, "Close executor did not terminate properly."); + final ExecutorService closeExecutor = Executors.newSingleThreadExecutor(); + + closeExecutor.submit(closeRunnable); + closeExecutor.shutdown(); + + if (!closeExecutor.isShutdown()) { + LOG.log(Level.SEVERE, "close executor did not shutdown properly."); + } + + final long endTime = System.currentTimeMillis() + CLOSE_EXECUTOR_TIMEOUT; + while (!closeExecutor.isTerminated()) { + try { + final long waitTime = endTime - System.currentTimeMillis(); + closeExecutor.awaitTermination(waitTime, TimeUnit.MILLISECONDS); + } catch (final InterruptedException e) { + LOG.log(Level.FINE, "Interrupted", e); } } + + if (closeExecutor.isTerminated()) { + LOG.log(Level.FINE, "Close executor terminated properly."); + } else { + LOG.log(Level.SEVERE, "Close executor did not terminate properly."); + } + } + + @Override + public String toString() { + return String.format("RemoteManager: { class:%s, name:%s, id:%s }", + this.getClass().getCanonicalName(), this.name, this.myIdentifier); } } http://git-wip-us.apache.org/repos/asf/reef/blob/5f459108/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/OrderedRemoteReceiverStage.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/OrderedRemoteReceiverStage.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/OrderedRemoteReceiverStage.java index 1d2d895..19a8162 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/OrderedRemoteReceiverStage.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/OrderedRemoteReceiverStage.java @@ -37,14 +37,15 @@ import java.util.logging.Logger; public class OrderedRemoteReceiverStage implements EStage<TransportEvent> { private static final Logger LOG = Logger.getLogger(OrderedRemoteReceiverStage.class.getName()); - private final long shutdownTimeout = WakeParameters.REMOTE_EXECUTOR_SHUTDOWN_TIMEOUT; - private final ConcurrentMap<SocketAddress, OrderedEventStream> streamMap; + private static final String CLASS_NAME = OrderedRemoteReceiverStage.class.getSimpleName(); + + private static final long SHUTDOWN_TIMEOUT = WakeParameters.REMOTE_EXECUTOR_SHUTDOWN_TIMEOUT; + private final ExecutorService pushExecutor; private final ExecutorService pullExecutor; private final ThreadPoolStage<TransportEvent> pushStage; - private final ThreadPoolStage<OrderedEventStream> pullStage; /** * Constructs an ordered remote receiver stage. @@ -54,58 +55,46 @@ public class OrderedRemoteReceiverStage implements EStage<TransportEvent> { */ public OrderedRemoteReceiverStage( final EventHandler<RemoteEvent<byte[]>> handler, final EventHandler<Throwable> errorHandler) { - this.streamMap = new ConcurrentHashMap<SocketAddress, OrderedEventStream>(); - this.pushExecutor = Executors.newCachedThreadPool( - new DefaultThreadFactory(OrderedRemoteReceiverStage.class.getName() + "_Push")); - this.pullExecutor = Executors.newCachedThreadPool( - new DefaultThreadFactory(OrderedRemoteReceiverStage.class.getName() + "_Pull")); + this.pushExecutor = Executors.newCachedThreadPool(new DefaultThreadFactory(CLASS_NAME + ":Push")); + this.pullExecutor = Executors.newCachedThreadPool(new DefaultThreadFactory(CLASS_NAME + ":Pull")); + + final ConcurrentMap<SocketAddress, OrderedEventStream> streamMap = new ConcurrentHashMap<>(); - this.pullStage = new ThreadPoolStage<OrderedEventStream>( + final ThreadPoolStage<OrderedEventStream> pullStage = new ThreadPoolStage<>( new OrderedPullEventHandler(handler), this.pullExecutor, errorHandler); - this.pushStage = new ThreadPoolStage<TransportEvent>( + + this.pushStage = new ThreadPoolStage<>( new OrderedPushEventHandler(streamMap, pullStage), this.pushExecutor, errorHandler); // for decoupling } @Override public void onNext(final TransportEvent value) { - LOG.log(Level.FINEST, "{0}", value); - pushStage.onNext(value); + LOG.log(Level.FINEST, "Push: {0}", value); + this.pushStage.onNext(value); } @Override public void close() throws Exception { - LOG.log(Level.FINE, "close"); - - if (pushExecutor != null) { - pushExecutor.shutdown(); - try { - // wait for threads to finish for timeout - if (!pushExecutor.awaitTermination(shutdownTimeout, TimeUnit.MILLISECONDS)) { - LOG.log(Level.WARNING, "Executor did not terminate in " + shutdownTimeout + "ms."); - final List<Runnable> droppedRunnables = pushExecutor.shutdownNow(); - LOG.log(Level.WARNING, "Executor dropped " + droppedRunnables.size() + " tasks."); - } - } catch (final InterruptedException e) { - LOG.log(Level.WARNING, "Close interrupted"); - throw new RemoteRuntimeException(e); - } - } + close("PushExecutor", this.pushExecutor); + close("PullExecutor", this.pullExecutor); + } - if (pullExecutor != null) { - pullExecutor.shutdown(); - try { - // wait for threads to finish for timeout - if (!pullExecutor.awaitTermination(shutdownTimeout, TimeUnit.MILLISECONDS)) { - LOG.log(Level.WARNING, "Executor did not terminate in " + shutdownTimeout + "ms."); - final List<Runnable> droppedRunnables = pullExecutor.shutdownNow(); - LOG.log(Level.WARNING, "Executor dropped " + droppedRunnables.size() + " tasks."); - } - } catch (final InterruptedException e) { - LOG.log(Level.WARNING, "Close interrupted"); - throw new RemoteRuntimeException(e); + private static void close(final String name, final ExecutorService executor) { + LOG.log(Level.FINE, "Close {0} begin", name); + executor.shutdown(); + try { + // wait for threads to finish for timeout + if (!executor.awaitTermination(SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS)) { + LOG.log(Level.WARNING, "{0}: Executor did not terminate in {1} ms.", new Object[] {name, SHUTDOWN_TIMEOUT}); + final List<Runnable> droppedRunnables = executor.shutdownNow(); + LOG.log(Level.WARNING, "{0}: Executor dropped {1} tasks.", new Object[] {name, droppedRunnables.size()}); } + } catch (final InterruptedException e) { + LOG.log(Level.WARNING, "Close interrupted"); + throw new RemoteRuntimeException(e); } + LOG.log(Level.FINE, "Close {0} end", name); } }
