Repository: twill Updated Branches: refs/heads/master aa70499ea -> b7785bde4
(TWILL-248) Speedup shutdown of tracker service This closes #63 on Github Signed-off-by: Terence Yim <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/twill/repo Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/b7785bde Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/b7785bde Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/b7785bde Branch: refs/heads/master Commit: b7785bde4e7e990072f803d89353e37f26ed8af5 Parents: aa70499 Author: Terence Yim <[email protected]> Authored: Mon Oct 30 15:10:34 2017 -0700 Committer: Terence Yim <[email protected]> Committed: Tue Oct 31 10:52:16 2017 -0700 ---------------------------------------------------------------------- .../internal/appmaster/TrackerService.java | 26 ++++++++++++++++---- 1 file changed, 21 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/twill/blob/b7785bde/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/TrackerService.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/TrackerService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/TrackerService.java index bb8cf57..10de10c 100644 --- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/TrackerService.java +++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/TrackerService.java @@ -31,6 +31,8 @@ import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; +import io.netty.channel.group.ChannelGroup; +import io.netty.channel.group.DefaultChannelGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; @@ -48,6 +50,8 @@ import io.netty.handler.codec.http.HttpUtil; import io.netty.handler.codec.http.HttpVersion; import io.netty.util.CharsetUtil; import io.netty.util.ReferenceCountUtil; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.ImmediateEventExecutor; import org.apache.twill.api.ResourceReport; import org.apache.twill.internal.json.ResourceReportAdapter; import org.slf4j.Logger; @@ -60,6 +64,8 @@ import java.net.InetSocketAddress; import java.net.URI; import java.net.URL; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.TimeUnit; /** @@ -82,7 +88,7 @@ public final class TrackerService extends AbstractIdleService { private String host; private ServerBootstrap bootstrap; - private Channel serverChannel; + private ChannelGroup channelGroup; private InetSocketAddress bindAddress; private URL url; @@ -119,6 +125,7 @@ public final class TrackerService extends AbstractIdleService { @Override protected void startUp() throws Exception { + channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE); EventLoopGroup bossGroup = new NioEventLoopGroup(NUM_BOSS_THREADS, new ThreadFactoryBuilder() .setDaemon(true).setNameFormat("boss-thread").build()); @@ -132,6 +139,7 @@ public final class TrackerService extends AbstractIdleService { .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { + channelGroup.add(ch); ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("codec", new HttpServerCodec()); pipeline.addLast("compressor", new HttpContentCompressor()); @@ -140,7 +148,9 @@ public final class TrackerService extends AbstractIdleService { } }); - serverChannel = bootstrap.bind(new InetSocketAddress(host, 0)).sync().channel(); + Channel serverChannel = bootstrap.bind(new InetSocketAddress(host, 0)).sync().channel(); + channelGroup.add(serverChannel); + bindAddress = (InetSocketAddress) serverChannel.localAddress(); url = URI.create(String.format("http://%s:%d", host, bindAddress.getPort())).toURL(); @@ -149,9 +159,15 @@ public final class TrackerService extends AbstractIdleService { @Override protected void shutDown() throws Exception { - serverChannel.close().await(); - bootstrap.config().group().shutdownGracefully(1, CLOSE_CHANNEL_TIMEOUT, TimeUnit.SECONDS).await(); - bootstrap.config().childGroup().shutdownGracefully(1, CLOSE_CHANNEL_TIMEOUT, TimeUnit.SECONDS).await(); + channelGroup.close().awaitUninterruptibly(); + + List<Future<?>> futures = new ArrayList<>(); + futures.add(bootstrap.config().group().shutdownGracefully(0, CLOSE_CHANNEL_TIMEOUT, TimeUnit.SECONDS)); + futures.add(bootstrap.config().childGroup().shutdownGracefully(0, CLOSE_CHANNEL_TIMEOUT, TimeUnit.SECONDS)); + + for (Future<?> future : futures) { + future.awaitUninterruptibly(); + } LOG.info("Tracker service stopped at {}", url); }
