Repository: reef Updated Branches: refs/heads/master 8e628faeb -> 31fbd0e99
[REEF-1654] Implement graceful shutdown of Wake executor services This is work towards "REEF as a library" project [REEF-1561](https://issues.apache.org/jira/browse/REEF-1561) Summary of changes: * Catch the `InterruptedException` in `ThreadPoolStage` and make sure its `.close()` method never throws * Gracefully shutdown threads in `NettyMessagingTransport.close()` * Catch errors when closing the acceptor channel in `NettyMessagingTransport` * Make shure `NettyMessagingTransport.close()` never throws * Improve logging in `.close()` methods of `NettyMessagingTransport` and `ThreadPoolStage` * Minor refactoring for readability JIRA: [REEF-1654](https://issues.apache.org/jira/browse/REEF-1654) Pull Request: This closes #1174 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/31fbd0e9 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/31fbd0e9 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/31fbd0e9 Branch: refs/heads/master Commit: 31fbd0e997534f14ec826470e838deda3ea6012f Parents: 8e628fa Author: Sergiy Matusevych <[email protected]> Authored: Mon Oct 31 22:13:54 2016 -0700 Committer: Markus Weimer <[email protected]> Committed: Tue Nov 1 17:03:36 2016 -0700 ---------------------------------------------------------------------- .../apache/reef/wake/impl/ThreadPoolStage.java | 29 +++++++++++---- .../netty/NettyMessagingTransport.java | 38 ++++++++++++-------- 2 files changed, 46 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/31fbd0e9/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ThreadPoolStage.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ThreadPoolStage.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ThreadPoolStage.java index 3b39c8a..7b6107f 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ThreadPoolStage.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ThreadPoolStage.java @@ -40,13 +40,15 @@ import java.util.logging.Logger; * @param <T> type */ public final class ThreadPoolStage<T> extends AbstractEStage<T> { + private static final Logger LOG = Logger.getLogger(ThreadPoolStage.class.getName()); + private static final long SHUTDOWN_TIMEOUT = WakeParameters.EXECUTOR_SHUTDOWN_TIMEOUT; + private final EventHandler<T> handler; + private final EventHandler<Throwable> errorHandler; private final ExecutorService executor; private final int numThreads; - private final long shutdownTimeout = WakeParameters.EXECUTOR_SHUTDOWN_TIMEOUT; - private final EventHandler<Throwable> errorHandler; /** * Constructs a thread-pool stage. @@ -206,14 +208,29 @@ public final class ThreadPoolStage<T> extends AbstractEStage<T> { * Closes resources. */ @Override - public void close() throws Exception { + public void close() { + if (closed.compareAndSet(false, true) && numThreads > 0) { + + LOG.log(Level.FINEST, "Closing ThreadPoolStage {0}: begin", this.name); + executor.shutdown(); - if (!executor.awaitTermination(shutdownTimeout, TimeUnit.MILLISECONDS)) { - LOG.log(Level.WARNING, "Executor did not terminate in " + shutdownTimeout + "ms."); + + boolean isTerminated = false; + try { + isTerminated = executor.awaitTermination(SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS); + } catch (final InterruptedException ex) { + LOG.log(Level.WARNING, "Interrupted closing ThreadPoolStage " + this.name, ex); + } + + if (!isTerminated) { final List<Runnable> droppedRunnables = executor.shutdownNow(); - LOG.log(Level.WARNING, "Executor dropped " + droppedRunnables.size() + " tasks."); + LOG.log(Level.SEVERE, + "Closing ThreadPoolStage {0}: Executor did not terminate in {1} ms. Dropping {2} tasks", + new Object[] {this.name, SHUTDOWN_TIMEOUT, droppedRunnables.size()}); } + + LOG.log(Level.FINEST, "Closing ThreadPoolStage {0}: end", this.name); } } http://git-wip-us.apache.org/repos/asf/reef/blob/31fbd0e9/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java index c37e556..c3a910b 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java @@ -61,9 +61,15 @@ import java.util.logging.Logger; /** * Messaging transport implementation with Netty. */ -public class NettyMessagingTransport implements Transport { +public final class NettyMessagingTransport implements Transport { + + /** + * Indicates a hostname that isn't set or known. + */ + public static final String UNKNOWN_HOST_NAME = "##UNKNOWN##"; + + private static final String CLASS_NAME = NettyMessagingTransport.class.getSimpleName(); - private static final String CLASS_NAME = NettyMessagingTransport.class.getName(); private static final Logger LOG = Logger.getLogger(CLASS_NAME); private static final int SERVER_BOSS_NUM_THREADS = 3; @@ -91,10 +97,6 @@ public class NettyMessagingTransport implements Transport { private final int numberOfTries; private final int retryTimeout; - /** - * Indicates a hostname that isn't set or known. - */ - public static final String UNKNOWN_HOST_NAME = "##UNKNOWN##"; /** * Constructs a messaging transport. @@ -108,7 +110,7 @@ public class NettyMessagingTransport implements Transport { * @param tcpPortProvider gives an iterator that produces random tcp ports in a range */ @Inject - NettyMessagingTransport( + private NettyMessagingTransport( @Parameter(RemoteConfiguration.HostAddress.class) final String hostAddress, @Parameter(RemoteConfiguration.Port.class) final int port, @Parameter(RemoteConfiguration.RemoteClientStage.class) final EStage<TransportEvent> clientStage, @@ -131,11 +133,11 @@ public class NettyMessagingTransport implements Transport { this.serverEventListener = new NettyServerEventListener(this.addrToLinkRefMap, serverStage); this.serverBossGroup = new NioEventLoopGroup(SERVER_BOSS_NUM_THREADS, - new DefaultThreadFactory(CLASS_NAME + "ServerBoss")); + new DefaultThreadFactory(CLASS_NAME + ":ServerBoss")); this.serverWorkerGroup = new NioEventLoopGroup(SERVER_WORKER_NUM_THREADS, - new DefaultThreadFactory(CLASS_NAME + "ServerWorker")); + new DefaultThreadFactory(CLASS_NAME + ":ServerWorker")); this.clientWorkerGroup = new NioEventLoopGroup(CLIENT_WORKER_NUM_THREADS, - new DefaultThreadFactory(CLASS_NAME + "ClientWorker")); + new DefaultThreadFactory(CLASS_NAME + ":ClientWorker")); this.clientBootstrap = new Bootstrap(); this.clientBootstrap.group(this.clientWorkerGroup) @@ -211,16 +213,22 @@ public class NettyMessagingTransport implements Transport { * Closes all channels and releases all resources. */ @Override - public void close() throws Exception { + public void close() { LOG.log(Level.FINE, "Closing netty transport socket address: {0}", this.localAddress); this.clientChannelGroup.close().awaitUninterruptibly(); this.serverChannelGroup.close().awaitUninterruptibly(); - this.acceptor.close().sync(); - this.clientWorkerGroup.shutdownGracefully(); - this.serverBossGroup.shutdownGracefully(); - this.serverWorkerGroup.shutdownGracefully(); + + try { + this.acceptor.close().sync(); + } catch (final Exception ex) { + LOG.log(Level.SEVERE, "Error closing the acceptor channel for " + this.localAddress, ex); + } + + this.clientWorkerGroup.shutdownGracefully().awaitUninterruptibly(); + this.serverBossGroup.shutdownGracefully().awaitUninterruptibly(); + this.serverWorkerGroup.shutdownGracefully().awaitUninterruptibly(); LOG.log(Level.FINE, "Closing netty transport socket address: {0} done", this.localAddress); }
