Repository: twill Updated Branches: refs/heads/site 49f35954d -> 2a97588cc
(TWILL-248) Upgrade to use Netty-4.1 - Also enable ResourceReportClient to use HTTP compression Signed-off-by: Terence Yim <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/twill/repo Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/6d6b3882 Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/6d6b3882 Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/6d6b3882 Branch: refs/heads/site Commit: 6d6b3882a631e2260865f641674ee362c52fd1a0 Parents: f34a39a Author: Terence Yim <[email protected]> Authored: Tue Oct 10 13:26:11 2017 -0700 Committer: Terence Yim <[email protected]> Committed: Mon Oct 16 09:10:18 2017 -0700 ---------------------------------------------------------------------- pom.xml | 14 +- twill-core/pom.xml | 10 +- .../internal/appmaster/TrackerService.java | 217 ++++++++++--------- .../apache/twill/yarn/ResourceReportClient.java | 36 ++- 4 files changed, 159 insertions(+), 118 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/twill/blob/6d6b3882/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index a6aa474..573936d 100644 --- a/pom.xml +++ b/pom.xml @@ -162,7 +162,7 @@ <guava.version>13.0.1</guava.version> <gson.version>2.2.4</gson.version> <findbugs.jsr305.version>2.0.1</findbugs.jsr305.version> - <netty.version>3.6.6.Final</netty.version> + <netty.version>4.1.16.Final</netty.version> <snappy-java.version>1.0.5</snappy-java.version> <jcl-over-slf4j.version>1.7.2</jcl-over-slf4j.version> <asm.version>5.0.2</asm.version> @@ -777,7 +777,17 @@ </dependency> <dependency> <groupId>io.netty</groupId> - <artifactId>netty</artifactId> + <artifactId>netty-buffer</artifactId> + <version>${netty.version}</version> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-codec-http</artifactId> + <version>${netty.version}</version> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-handler</artifactId> <version>${netty.version}</version> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/twill/blob/6d6b3882/twill-core/pom.xml ---------------------------------------------------------------------- diff --git a/twill-core/pom.xml b/twill-core/pom.xml index f265f26..4bee172 100644 --- a/twill-core/pom.xml +++ b/twill-core/pom.xml @@ -55,7 +55,15 @@ </dependency> <dependency> <groupId>io.netty</groupId> - <artifactId>netty</artifactId> + <artifactId>netty-buffer</artifactId> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-codec-http</artifactId> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-handler</artifactId> </dependency> <dependency> <groupId>org.xerial.snappy</groupId> http://git-wip-us.apache.org/repos/asf/twill/blob/6d6b3882/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/TrackerService.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/TrackerService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/TrackerService.java index f91efcc..bb8cf57 100644 --- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/TrackerService.java +++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/TrackerService.java @@ -20,38 +20,36 @@ package org.apache.twill.internal.appmaster; import com.google.common.base.Supplier; import com.google.common.util.concurrent.AbstractIdleService; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufOutputStream; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpContentCompressor; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpServerCodec; +import io.netty.handler.codec.http.HttpUtil; +import io.netty.handler.codec.http.HttpVersion; +import io.netty.util.CharsetUtil; +import io.netty.util.ReferenceCountUtil; import org.apache.twill.api.ResourceReport; import org.apache.twill.internal.json.ResourceReportAdapter; -import org.jboss.netty.bootstrap.ServerBootstrap; -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.buffer.ChannelBufferOutputStream; -import org.jboss.netty.buffer.ChannelBuffers; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelFactory; -import org.jboss.netty.channel.ChannelFuture; -import org.jboss.netty.channel.ChannelFutureListener; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.ChannelPipeline; -import org.jboss.netty.channel.ChannelPipelineFactory; -import org.jboss.netty.channel.Channels; -import org.jboss.netty.channel.ExceptionEvent; -import org.jboss.netty.channel.MessageEvent; -import org.jboss.netty.channel.SimpleChannelUpstreamHandler; -import org.jboss.netty.channel.group.ChannelGroup; -import org.jboss.netty.channel.group.DefaultChannelGroup; -import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; -import org.jboss.netty.handler.codec.http.DefaultHttpResponse; -import org.jboss.netty.handler.codec.http.HttpChunkAggregator; -import org.jboss.netty.handler.codec.http.HttpContentCompressor; -import org.jboss.netty.handler.codec.http.HttpHeaders; -import org.jboss.netty.handler.codec.http.HttpMethod; -import org.jboss.netty.handler.codec.http.HttpRequest; -import org.jboss.netty.handler.codec.http.HttpRequestDecoder; -import org.jboss.netty.handler.codec.http.HttpResponse; -import org.jboss.netty.handler.codec.http.HttpResponseEncoder; -import org.jboss.netty.handler.codec.http.HttpResponseStatus; -import org.jboss.netty.handler.codec.http.HttpVersion; -import org.jboss.netty.util.CharsetUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,8 +60,6 @@ import java.net.InetSocketAddress; import java.net.URI; import java.net.URL; import java.nio.charset.StandardCharsets; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** @@ -78,14 +74,15 @@ public final class TrackerService extends AbstractIdleService { private static final Logger LOG = LoggerFactory.getLogger(TrackerService.class); private static final int NUM_BOSS_THREADS = 1; + private static final int NUM_WORKER_THREADS = 10; private static final int CLOSE_CHANNEL_TIMEOUT = 5; private static final int MAX_INPUT_SIZE = 100 * 1024 * 1024; private final Supplier<ResourceReport> resourceReport; - private final ChannelGroup channelGroup; private String host; private ServerBootstrap bootstrap; + private Channel serverChannel; private InetSocketAddress bindAddress; private URL url; @@ -95,7 +92,6 @@ public final class TrackerService extends AbstractIdleService { * @param resourceReport live report that the service will return to clients. */ TrackerService(Supplier<ResourceReport> resourceReport) { - this.channelGroup = new DefaultChannelGroup("appMasterTracker"); this.resourceReport = resourceReport; } @@ -123,52 +119,40 @@ public final class TrackerService extends AbstractIdleService { @Override protected void startUp() throws Exception { - Executor bossThreads = Executors.newFixedThreadPool(NUM_BOSS_THREADS, - new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("boss-thread") - .build()); - - Executor workerThreads = Executors.newCachedThreadPool(new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("worker-thread#%d") - .build()); - - ChannelFactory factory = new NioServerSocketChannelFactory(bossThreads, workerThreads); - - bootstrap = new ServerBootstrap(factory); - - bootstrap.setPipelineFactory(new ChannelPipelineFactory() { - public ChannelPipeline getPipeline() { - ChannelPipeline pipeline = Channels.pipeline(); - - pipeline.addLast("decoder", new HttpRequestDecoder()); - pipeline.addLast("aggregator", new HttpChunkAggregator(MAX_INPUT_SIZE)); - pipeline.addLast("encoder", new HttpResponseEncoder()); - pipeline.addLast("compressor", new HttpContentCompressor()); - pipeline.addLast("handler", new ReportHandler()); - - return pipeline; - } - }); - - Channel channel = bootstrap.bind(new InetSocketAddress(host, 0)); - bindAddress = (InetSocketAddress) channel.getLocalAddress(); + EventLoopGroup bossGroup = new NioEventLoopGroup(NUM_BOSS_THREADS, + new ThreadFactoryBuilder() + .setDaemon(true).setNameFormat("boss-thread").build()); + EventLoopGroup workerGroup = new NioEventLoopGroup(NUM_WORKER_THREADS, + new ThreadFactoryBuilder() + .setDaemon(true).setNameFormat("worker-thread#%d").build()); + + bootstrap = new ServerBootstrap() + .group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .childHandler(new ChannelInitializer<SocketChannel>() { + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline pipeline = ch.pipeline(); + pipeline.addLast("codec", new HttpServerCodec()); + pipeline.addLast("compressor", new HttpContentCompressor()); + pipeline.addLast("aggregator", new HttpObjectAggregator(MAX_INPUT_SIZE)); + pipeline.addLast("handler", new ReportHandler()); + } + }); + + serverChannel = bootstrap.bind(new InetSocketAddress(host, 0)).sync().channel(); + bindAddress = (InetSocketAddress) serverChannel.localAddress(); url = URI.create(String.format("http://%s:%d", host, bindAddress.getPort())).toURL(); - channelGroup.add(channel); LOG.info("Tracker service started at {}", url); } @Override protected void shutDown() throws Exception { - try { - if (!channelGroup.close().await(CLOSE_CHANNEL_TIMEOUT, TimeUnit.SECONDS)) { - LOG.warn("Timeout when closing all channels."); - } - } finally { - bootstrap.releaseExternalResources(); - } + serverChannel.close().await(); + bootstrap.config().group().shutdownGracefully(1, CLOSE_CHANNEL_TIMEOUT, TimeUnit.SECONDS).await(); + bootstrap.config().childGroup().shutdownGracefully(1, CLOSE_CHANNEL_TIMEOUT, TimeUnit.SECONDS).await(); + LOG.info("Tracker service stopped at {}", url); } @@ -176,7 +160,7 @@ public final class TrackerService extends AbstractIdleService { * Handler to return resources used by this application master, which will be available through * the host and port set when this application master registered itself to the resource manager. */ - final class ReportHandler extends SimpleChannelUpstreamHandler { + final class ReportHandler extends ChannelInboundHandlerAdapter { private final ResourceReportAdapter reportAdapter; ReportHandler() { @@ -184,51 +168,68 @@ public final class TrackerService extends AbstractIdleService { } @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { - HttpRequest request = (HttpRequest) e.getMessage(); - if (request.getMethod() != HttpMethod.GET) { - HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.METHOD_NOT_ALLOWED); - response.setHeader(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=UTF-8"); - response.setContent(ChannelBuffers.wrappedBuffer("Only GET is supported".getBytes(StandardCharsets.UTF_8))); - writeResponse(e.getChannel(), response); - return; - } - - if (!PATH.equals(request.getUri())) { - // Redirect all GET call to the /resources path. - HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.TEMPORARY_REDIRECT); - response.setHeader(HttpHeaders.Names.LOCATION, PATH); - writeResponse(e.getChannel(), response); - return; + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + try { + if (!(msg instanceof HttpRequest)) { + // Ignore if it is not HttpRequest + return; + } + + HttpRequest request = (HttpRequest) msg; + if (!HttpMethod.GET.equals(request.method())) { + FullHttpResponse response = new DefaultFullHttpResponse( + HttpVersion.HTTP_1_1, HttpResponseStatus.METHOD_NOT_ALLOWED, + Unpooled.copiedBuffer("Only GET is supported", StandardCharsets.UTF_8)); + + HttpUtil.setContentLength(response, response.content().readableBytes()); + response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8"); + writeAndClose(ctx.channel(), response); + return; + } + + if (!PATH.equals(request.uri())) { + // Redirect all GET call to the /resources path. + HttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, + HttpResponseStatus.TEMPORARY_REDIRECT); + HttpUtil.setContentLength(response, 0); + response.headers().set(HttpHeaderNames.LOCATION, PATH); + writeAndClose(ctx.channel(), response); + return; + } + + writeResourceReport(ctx.channel()); + } finally { + ReferenceCountUtil.release(msg); } + } - writeResourceReport(e.getChannel()); + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + ctx.channel().close(); } private void writeResourceReport(Channel channel) { - HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); - response.setHeader(HttpHeaders.Names.CONTENT_TYPE, "application/json; charset=UTF-8"); - - ChannelBuffer content = ChannelBuffers.dynamicBuffer(); - Writer writer = new OutputStreamWriter(new ChannelBufferOutputStream(content), CharsetUtil.UTF_8); - reportAdapter.toJson(resourceReport.get(), writer); + ByteBuf content = Unpooled.buffer(); + Writer writer = new OutputStreamWriter(new ByteBufOutputStream(content), CharsetUtil.UTF_8); try { + reportAdapter.toJson(resourceReport.get(), writer); writer.close(); - } catch (IOException e1) { - LOG.error("error writing resource report", e1); + } catch (IOException e) { + LOG.error("error writing resource report", e); + writeAndClose(channel, new DefaultFullHttpResponse( + HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR, + Unpooled.copiedBuffer(e.getMessage(), StandardCharsets.UTF_8))); + return; } - response.setContent(content); - writeResponse(channel, response); - } - private void writeResponse(Channel channel, HttpResponse response) { - ChannelFuture future = channel.write(response); - future.addListener(ChannelFutureListener.CLOSE); + FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content); + HttpUtil.setContentLength(response, content.readableBytes()); + response.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/json; charset=UTF-8"); + channel.writeAndFlush(response); } - @Override - public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { - e.getChannel().close(); + private void writeAndClose(Channel channel, HttpResponse response) { + channel.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); } } } http://git-wip-us.apache.org/repos/asf/twill/blob/6d6b3882/twill-yarn/src/main/java/org/apache/twill/yarn/ResourceReportClient.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/ResourceReportClient.java b/twill-yarn/src/main/java/org/apache/twill/yarn/ResourceReportClient.java index fb8b7e8..4676751 100644 --- a/twill-yarn/src/main/java/org/apache/twill/yarn/ResourceReportClient.java +++ b/twill-yarn/src/main/java/org/apache/twill/yarn/ResourceReportClient.java @@ -17,19 +17,21 @@ */ package org.apache.twill.yarn; -import com.google.common.base.Charsets; -import com.google.common.io.Closeables; import org.apache.twill.api.ResourceReport; import org.apache.twill.internal.json.ResourceReportAdapter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedReader; import java.io.IOException; +import java.io.InputStream; import java.io.InputStreamReader; import java.io.Reader; +import java.net.HttpURLConnection; import java.net.URL; +import java.nio.charset.StandardCharsets; import java.util.List; +import java.util.zip.DeflaterInputStream; +import java.util.zip.GZIPInputStream; /** * Package private class to get {@link ResourceReport} from the application master. @@ -52,12 +54,16 @@ final class ResourceReportClient { public ResourceReport get() { for (URL url : resourceUrls) { try { - Reader reader = new BufferedReader(new InputStreamReader(url.openStream(), Charsets.UTF_8)); - try { + HttpURLConnection urlConn = (HttpURLConnection) url.openConnection(); + urlConn.setRequestProperty("Accept-Encoding", "gzip, deflate"); + + if (urlConn.getResponseCode() != 200) { + continue; + } + + try (Reader reader = new InputStreamReader(getInputStream(urlConn), StandardCharsets.UTF_8)) { LOG.trace("Report returned by {}", url); return reportAdapter.fromJson(reader); - } finally { - Closeables.closeQuietly(reader); } } catch (IOException e) { // Just log a trace as it's ok to not able to fetch resource report @@ -66,4 +72,20 @@ final class ResourceReportClient { } return null; } + + private InputStream getInputStream(HttpURLConnection urlConn) throws IOException { + InputStream is = urlConn.getInputStream(); + String contentEncoding = urlConn.getContentEncoding(); + if (contentEncoding == null) { + return is; + } + if ("gzip".equalsIgnoreCase(contentEncoding)) { + return new GZIPInputStream(is); + } + if ("deflate".equalsIgnoreCase(contentEncoding)) { + return new DeflaterInputStream(is); + } + // This should never happen + throw new IOException("Unsupported content encoding " + contentEncoding); + } }
