Repository: twill
Updated Branches:
  refs/heads/master f34a39af0 -> 6d6b3882a


(TWILL-248) Upgrade to use Netty-4.1

- Also enable ResourceReportClient to use HTTP compression

Signed-off-by: Terence Yim <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/6d6b3882
Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/6d6b3882
Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/6d6b3882

Branch: refs/heads/master
Commit: 6d6b3882a631e2260865f641674ee362c52fd1a0
Parents: f34a39a
Author: Terence Yim <[email protected]>
Authored: Tue Oct 10 13:26:11 2017 -0700
Committer: Terence Yim <[email protected]>
Committed: Mon Oct 16 09:10:18 2017 -0700

----------------------------------------------------------------------
 pom.xml                                         |  14 +-
 twill-core/pom.xml                              |  10 +-
 .../internal/appmaster/TrackerService.java      | 217 ++++++++++---------
 .../apache/twill/yarn/ResourceReportClient.java |  36 ++-
 4 files changed, 159 insertions(+), 118 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/twill/blob/6d6b3882/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index a6aa474..573936d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -162,7 +162,7 @@
         <guava.version>13.0.1</guava.version>
         <gson.version>2.2.4</gson.version>
         <findbugs.jsr305.version>2.0.1</findbugs.jsr305.version>
-        <netty.version>3.6.6.Final</netty.version>
+        <netty.version>4.1.16.Final</netty.version>
         <snappy-java.version>1.0.5</snappy-java.version>
         <jcl-over-slf4j.version>1.7.2</jcl-over-slf4j.version>
         <asm.version>5.0.2</asm.version>
@@ -777,7 +777,17 @@
             </dependency>
             <dependency>
                 <groupId>io.netty</groupId>
-                <artifactId>netty</artifactId>
+                <artifactId>netty-buffer</artifactId>
+                <version>${netty.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>io.netty</groupId>
+                <artifactId>netty-codec-http</artifactId>
+                <version>${netty.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>io.netty</groupId>
+                <artifactId>netty-handler</artifactId>
                 <version>${netty.version}</version>
             </dependency>
             <dependency>

http://git-wip-us.apache.org/repos/asf/twill/blob/6d6b3882/twill-core/pom.xml
----------------------------------------------------------------------
diff --git a/twill-core/pom.xml b/twill-core/pom.xml
index f265f26..4bee172 100644
--- a/twill-core/pom.xml
+++ b/twill-core/pom.xml
@@ -55,7 +55,15 @@
         </dependency>
         <dependency>
             <groupId>io.netty</groupId>
-            <artifactId>netty</artifactId>
+            <artifactId>netty-buffer</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-codec-http</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-handler</artifactId>
         </dependency>
         <dependency>
             <groupId>org.xerial.snappy</groupId>

http://git-wip-us.apache.org/repos/asf/twill/blob/6d6b3882/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/TrackerService.java
----------------------------------------------------------------------
diff --git 
a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/TrackerService.java
 
b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/TrackerService.java
index f91efcc..bb8cf57 100644
--- 
a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/TrackerService.java
+++ 
b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/TrackerService.java
@@ -20,38 +20,36 @@ package org.apache.twill.internal.appmaster;
 import com.google.common.base.Supplier;
 import com.google.common.util.concurrent.AbstractIdleService;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufOutputStream;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpContentCompressor;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpMethod;
+import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpServerCodec;
+import io.netty.handler.codec.http.HttpUtil;
+import io.netty.handler.codec.http.HttpVersion;
+import io.netty.util.CharsetUtil;
+import io.netty.util.ReferenceCountUtil;
 import org.apache.twill.api.ResourceReport;
 import org.apache.twill.internal.json.ResourceReportAdapter;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBufferOutputStream;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
-import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
-import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
-import org.jboss.netty.handler.codec.http.HttpContentCompressor;
-import org.jboss.netty.handler.codec.http.HttpHeaders;
-import org.jboss.netty.handler.codec.http.HttpMethod;
-import org.jboss.netty.handler.codec.http.HttpRequest;
-import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
-import org.jboss.netty.handler.codec.http.HttpResponse;
-import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
-import org.jboss.netty.handler.codec.http.HttpResponseStatus;
-import org.jboss.netty.handler.codec.http.HttpVersion;
-import org.jboss.netty.util.CharsetUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -62,8 +60,6 @@ import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URL;
 import java.nio.charset.StandardCharsets;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -78,14 +74,15 @@ public final class TrackerService extends 
AbstractIdleService {
 
   private static final Logger LOG  = 
LoggerFactory.getLogger(TrackerService.class);
   private static final int NUM_BOSS_THREADS = 1;
+  private static final int NUM_WORKER_THREADS = 10;
   private static final int CLOSE_CHANNEL_TIMEOUT = 5;
   private static final int MAX_INPUT_SIZE = 100 * 1024 * 1024;
 
   private final Supplier<ResourceReport> resourceReport;
-  private final ChannelGroup channelGroup;
 
   private String host;
   private ServerBootstrap bootstrap;
+  private Channel serverChannel;
   private InetSocketAddress bindAddress;
   private URL url;
 
@@ -95,7 +92,6 @@ public final class TrackerService extends AbstractIdleService 
{
    * @param resourceReport live report that the service will return to clients.
    */
   TrackerService(Supplier<ResourceReport> resourceReport) {
-    this.channelGroup = new DefaultChannelGroup("appMasterTracker");
     this.resourceReport = resourceReport;
   }
 
@@ -123,52 +119,40 @@ public final class TrackerService extends 
AbstractIdleService {
 
   @Override
   protected void startUp() throws Exception {
-    Executor bossThreads = Executors.newFixedThreadPool(NUM_BOSS_THREADS,
-                                                        new 
ThreadFactoryBuilder()
-                                                          .setDaemon(true)
-                                                          
.setNameFormat("boss-thread")
-                                                          .build());
-
-    Executor workerThreads = Executors.newCachedThreadPool(new 
ThreadFactoryBuilder()
-                                                             .setDaemon(true)
-                                                             
.setNameFormat("worker-thread#%d")
-                                                             .build());
-
-    ChannelFactory factory = new NioServerSocketChannelFactory(bossThreads, 
workerThreads);
-
-    bootstrap = new ServerBootstrap(factory);
-
-    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
-      public ChannelPipeline getPipeline() {
-        ChannelPipeline pipeline = Channels.pipeline();
-
-        pipeline.addLast("decoder", new HttpRequestDecoder());
-        pipeline.addLast("aggregator", new 
HttpChunkAggregator(MAX_INPUT_SIZE));
-        pipeline.addLast("encoder", new HttpResponseEncoder());
-        pipeline.addLast("compressor", new HttpContentCompressor());
-        pipeline.addLast("handler", new ReportHandler());
-
-        return pipeline;
-      }
-    });
-
-    Channel channel = bootstrap.bind(new InetSocketAddress(host, 0));
-    bindAddress = (InetSocketAddress) channel.getLocalAddress();
+    EventLoopGroup bossGroup = new NioEventLoopGroup(NUM_BOSS_THREADS,
+                                                     new ThreadFactoryBuilder()
+                                                       
.setDaemon(true).setNameFormat("boss-thread").build());
+    EventLoopGroup workerGroup = new NioEventLoopGroup(NUM_WORKER_THREADS,
+                                                       new 
ThreadFactoryBuilder()
+                                                         
.setDaemon(true).setNameFormat("worker-thread#%d").build());
+
+    bootstrap = new ServerBootstrap()
+      .group(bossGroup, workerGroup)
+      .channel(NioServerSocketChannel.class)
+      .childHandler(new ChannelInitializer<SocketChannel>() {
+        @Override
+        protected void initChannel(SocketChannel ch) throws Exception {
+          ChannelPipeline pipeline = ch.pipeline();
+          pipeline.addLast("codec", new HttpServerCodec());
+          pipeline.addLast("compressor", new HttpContentCompressor());
+          pipeline.addLast("aggregator", new 
HttpObjectAggregator(MAX_INPUT_SIZE));
+          pipeline.addLast("handler", new ReportHandler());
+        }
+      });
+
+    serverChannel = bootstrap.bind(new InetSocketAddress(host, 
0)).sync().channel();
+    bindAddress = (InetSocketAddress) serverChannel.localAddress();
     url = URI.create(String.format("http://%s:%d";, host, 
bindAddress.getPort())).toURL();
-    channelGroup.add(channel);
 
     LOG.info("Tracker service started at {}", url);
   }
 
   @Override
   protected void shutDown() throws Exception {
-    try {
-      if (!channelGroup.close().await(CLOSE_CHANNEL_TIMEOUT, 
TimeUnit.SECONDS)) {
-        LOG.warn("Timeout when closing all channels.");
-      }
-    } finally {
-      bootstrap.releaseExternalResources();
-    }
+    serverChannel.close().await();
+    bootstrap.config().group().shutdownGracefully(1, CLOSE_CHANNEL_TIMEOUT, 
TimeUnit.SECONDS).await();
+    bootstrap.config().childGroup().shutdownGracefully(1, 
CLOSE_CHANNEL_TIMEOUT, TimeUnit.SECONDS).await();
+
     LOG.info("Tracker service stopped at {}", url);
   }
 
@@ -176,7 +160,7 @@ public final class TrackerService extends 
AbstractIdleService {
    * Handler to return resources used by this application master, which will 
be available through
    * the host and port set when this application master registered itself to 
the resource manager.
    */
-  final class ReportHandler extends SimpleChannelUpstreamHandler {
+  final class ReportHandler extends ChannelInboundHandlerAdapter {
     private final ResourceReportAdapter reportAdapter;
 
     ReportHandler() {
@@ -184,51 +168,68 @@ public final class TrackerService extends 
AbstractIdleService {
     }
 
     @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) 
throws Exception {
-      HttpRequest request = (HttpRequest) e.getMessage();
-      if (request.getMethod() != HttpMethod.GET) {
-        HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, 
HttpResponseStatus.METHOD_NOT_ALLOWED);
-        response.setHeader(HttpHeaders.Names.CONTENT_TYPE, "text/plain; 
charset=UTF-8");
-        response.setContent(ChannelBuffers.wrappedBuffer("Only GET is 
supported".getBytes(StandardCharsets.UTF_8)));
-        writeResponse(e.getChannel(), response);
-        return;
-      }
-
-      if (!PATH.equals(request.getUri())) {
-        // Redirect all GET call to the /resources path.
-        HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, 
HttpResponseStatus.TEMPORARY_REDIRECT);
-        response.setHeader(HttpHeaders.Names.LOCATION, PATH);
-        writeResponse(e.getChannel(), response);
-        return;
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
+      try {
+        if (!(msg instanceof HttpRequest)) {
+          // Ignore if it is not HttpRequest
+          return;
+        }
+
+        HttpRequest request = (HttpRequest) msg;
+        if (!HttpMethod.GET.equals(request.method())) {
+          FullHttpResponse response = new DefaultFullHttpResponse(
+            HttpVersion.HTTP_1_1, HttpResponseStatus.METHOD_NOT_ALLOWED,
+            Unpooled.copiedBuffer("Only GET is supported", 
StandardCharsets.UTF_8));
+
+          HttpUtil.setContentLength(response, 
response.content().readableBytes());
+          response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; 
charset=UTF-8");
+          writeAndClose(ctx.channel(), response);
+          return;
+        }
+
+        if (!PATH.equals(request.uri())) {
+          // Redirect all GET call to the /resources path.
+          HttpResponse response = new 
DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
+                                                              
HttpResponseStatus.TEMPORARY_REDIRECT);
+          HttpUtil.setContentLength(response, 0);
+          response.headers().set(HttpHeaderNames.LOCATION, PATH);
+          writeAndClose(ctx.channel(), response);
+          return;
+        }
+
+        writeResourceReport(ctx.channel());
+      } finally {
+        ReferenceCountUtil.release(msg);
       }
+    }
 
-      writeResourceReport(e.getChannel());
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
throws Exception {
+      ctx.channel().close();
     }
 
     private void writeResourceReport(Channel channel) {
-      HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, 
HttpResponseStatus.OK);
-      response.setHeader(HttpHeaders.Names.CONTENT_TYPE, "application/json; 
charset=UTF-8");
-
-      ChannelBuffer content = ChannelBuffers.dynamicBuffer();
-      Writer writer = new OutputStreamWriter(new 
ChannelBufferOutputStream(content), CharsetUtil.UTF_8);
-      reportAdapter.toJson(resourceReport.get(), writer);
+      ByteBuf content = Unpooled.buffer();
+      Writer writer = new OutputStreamWriter(new ByteBufOutputStream(content), 
CharsetUtil.UTF_8);
       try {
+        reportAdapter.toJson(resourceReport.get(), writer);
         writer.close();
-      } catch (IOException e1) {
-        LOG.error("error writing resource report", e1);
+      } catch (IOException e) {
+        LOG.error("error writing resource report", e);
+        writeAndClose(channel, new DefaultFullHttpResponse(
+          HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR,
+          Unpooled.copiedBuffer(e.getMessage(), StandardCharsets.UTF_8)));
+        return;
       }
-      response.setContent(content);
-      writeResponse(channel, response);
-    }
 
-    private void writeResponse(Channel channel, HttpResponse response) {
-      ChannelFuture future = channel.write(response);
-      future.addListener(ChannelFutureListener.CLOSE);
+      FullHttpResponse response = new 
DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);
+      HttpUtil.setContentLength(response, content.readableBytes());
+      response.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/json; 
charset=UTF-8");
+      channel.writeAndFlush(response);
     }
 
-    @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
-      e.getChannel().close();
+    private void writeAndClose(Channel channel, HttpResponse response) {
+      channel.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/twill/blob/6d6b3882/twill-yarn/src/main/java/org/apache/twill/yarn/ResourceReportClient.java
----------------------------------------------------------------------
diff --git 
a/twill-yarn/src/main/java/org/apache/twill/yarn/ResourceReportClient.java 
b/twill-yarn/src/main/java/org/apache/twill/yarn/ResourceReportClient.java
index fb8b7e8..4676751 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/ResourceReportClient.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/ResourceReportClient.java
@@ -17,19 +17,21 @@
  */
 package org.apache.twill.yarn;
 
-import com.google.common.base.Charsets;
-import com.google.common.io.Closeables;
 import org.apache.twill.api.ResourceReport;
 import org.apache.twill.internal.json.ResourceReportAdapter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedReader;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.Reader;
+import java.net.HttpURLConnection;
 import java.net.URL;
+import java.nio.charset.StandardCharsets;
 import java.util.List;
+import java.util.zip.DeflaterInputStream;
+import java.util.zip.GZIPInputStream;
 
 /**
  * Package private class to get {@link ResourceReport} from the application 
master.
@@ -52,12 +54,16 @@ final class ResourceReportClient {
   public ResourceReport get() {
     for (URL url : resourceUrls) {
       try {
-        Reader reader = new BufferedReader(new 
InputStreamReader(url.openStream(), Charsets.UTF_8));
-        try {
+        HttpURLConnection urlConn = (HttpURLConnection) url.openConnection();
+        urlConn.setRequestProperty("Accept-Encoding", "gzip, deflate");
+
+        if (urlConn.getResponseCode() != 200) {
+          continue;
+        }
+
+        try (Reader reader = new InputStreamReader(getInputStream(urlConn), 
StandardCharsets.UTF_8)) {
           LOG.trace("Report returned by {}", url);
           return reportAdapter.fromJson(reader);
-        } finally {
-          Closeables.closeQuietly(reader);
         }
       } catch (IOException e) {
         // Just log a trace as it's ok to not able to fetch resource report
@@ -66,4 +72,20 @@ final class ResourceReportClient {
     }
     return null;
   }
+
+  private InputStream getInputStream(HttpURLConnection urlConn) throws 
IOException {
+    InputStream is = urlConn.getInputStream();
+    String contentEncoding = urlConn.getContentEncoding();
+    if (contentEncoding == null) {
+      return is;
+    }
+    if ("gzip".equalsIgnoreCase(contentEncoding)) {
+      return new GZIPInputStream(is);
+    }
+    if ("deflate".equalsIgnoreCase(contentEncoding)) {
+      return new DeflaterInputStream(is);
+    }
+    // This should never happen
+    throw new IOException("Unsupported content encoding " + contentEncoding);
+  }
 }

Reply via email to