Repository: reef Updated Branches: refs/heads/master bcfafbc34 -> 216ecdec0
[REEF-1729] Fix test job timeouts in Travis CI Gracefully shutdown all worker groups and wait for them to complete in the `.close()` method JIRA: [REEF-1729](https://issues.apache.org/jira/browse/REEF-1729) Closes #1268 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/216ecdec Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/216ecdec Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/216ecdec Branch: refs/heads/master Commit: 216ecdec0f47fa5414095cfe9366db91dda90b4c Parents: bcfafbc Author: taegeonum <[email protected]> Authored: Fri Mar 17 20:04:20 2017 +0900 Committer: Sergiy Matusevych <[email protected]> Committed: Mon Mar 20 15:47:58 2017 -0700 ---------------------------------------------------------------------- .../netty/NettyMessagingTransport.java | 24 +++++++++++++++----- 1 file changed, 18 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/216ecdec/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java index c3a910b..2643030 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java @@ -25,10 +25,12 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.group.ChannelGroup; +import io.netty.channel.group.ChannelGroupFuture; import io.netty.channel.group.DefaultChannelGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GlobalEventExecutor; import org.apache.reef.tang.annotations.Parameter; import org.apache.reef.wake.EStage; @@ -51,6 +53,7 @@ import java.net.BindException; import java.net.ConnectException; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.util.ArrayList; import java.util.Iterator; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -217,18 +220,27 @@ public final class NettyMessagingTransport implements Transport { LOG.log(Level.FINE, "Closing netty transport socket address: {0}", this.localAddress); - this.clientChannelGroup.close().awaitUninterruptibly(); - this.serverChannelGroup.close().awaitUninterruptibly(); + final ChannelGroupFuture clientChannelGroupFuture = this.clientChannelGroup.close(); + final ChannelGroupFuture serverChannelGroupFuture = this.serverChannelGroup.close(); + final ChannelFuture acceptorFuture = this.acceptor.close(); + + final ArrayList<Future> eventLoopGroupFutures = new ArrayList<>(3); + eventLoopGroupFutures.add(this.clientWorkerGroup.shutdownGracefully()); + eventLoopGroupFutures.add(this.serverBossGroup.shutdownGracefully()); + eventLoopGroupFutures.add(this.serverWorkerGroup.shutdownGracefully()); + + clientChannelGroupFuture.awaitUninterruptibly(); + serverChannelGroupFuture.awaitUninterruptibly(); try { - this.acceptor.close().sync(); + acceptorFuture.sync(); } catch (final Exception ex) { LOG.log(Level.SEVERE, "Error closing the acceptor channel for " + this.localAddress, ex); } - this.clientWorkerGroup.shutdownGracefully().awaitUninterruptibly(); - this.serverBossGroup.shutdownGracefully().awaitUninterruptibly(); - this.serverWorkerGroup.shutdownGracefully().awaitUninterruptibly(); + for (final Future eventLoopGroupFuture : eventLoopGroupFutures) { + eventLoopGroupFuture.awaitUninterruptibly(); + } LOG.log(Level.FINE, "Closing netty transport socket address: {0} done", this.localAddress); }
