jojochuang commented on a change in pull request #3966:
URL: https://github.com/apache/hadoop/pull/3966#discussion_r812515540



##########
File path: hadoop-common-project/hadoop-common/pom.xml
##########
@@ -1014,6 +1050,36 @@
               </filesets>
             </configuration>
           </plugin>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-shade-plugin</artifactId>
+            <version>${maven-shade-plugin.version}</version>
+            <executions>
+              <execution>
+                <phase>package</phase>
+                <goals>
+                  <goal>shade</goal>
+                </goals>
+                <configuration>
+                  <artifactSet>
+                    <includes>
+                      <include>io.netty:*</include>
+                    </includes>
+                    <excludes>
+                      <!-- this is netty 3 -->
+                      <exclude>*:netty</exclude>

Review comment:
       i don't think this is necessary any more. Netty3 is not in the current 
hadoop-common dependency tree. I expect we will remove netty3 dependency 
altogether in Hadoop 3.4.0.

##########
File path: 
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
##########
@@ -4025,4 +4338,326 @@ protected int getMaxIdleTime() {
   public String getServerName() {
     return serverName;
   }
+
+  // avoid netty trying to "guess" an appropriate buffer size.
+  private static final RecvByteBufAllocator IPC_RECVBUF_ALLOCATOR =
+      new FixedRecvByteBufAllocator(NIO_BUFFER_LIMIT);
+
+  private class NettyListener implements Listener<io.netty.channel.Channel> {
+    private ServerBootstrap bootstrap;
+    private NettyThreadFactory listenerFactory;
+    private NettyThreadFactory readerFactory;
+    private ChannelGroup acceptChannels =
+        new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+    private int backlogLength = conf.getInt(
+        CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_KEY,
+        CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_DEFAULT);
+    private EventLoopGroup acceptors;
+    private EventLoopGroup readers;
+
+    NettyListener(int port) throws IOException {
+      if (!LOG.isDebugEnabled()) {
+        ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.DISABLED);
+      }
+      Class<? extends io.netty.channel.socket.ServerSocketChannel> 
channelClass;
+      listenerFactory = new NettyThreadFactory("Netty Socket Acceptor", port);
+      readerFactory = new NettyThreadFactory("Netty Socket Reader", port);
+
+      // netty's readers double as responders so double the readers to
+      // compensate.
+      int numReaders = 2 * getNumReaders();
+      // Attempt to use native transport if available.
+      if (Epoll.isAvailable()) { // Linux.
+        channelClass = EpollServerSocketChannel.class;
+        acceptors = new EpollEventLoopGroup(1, listenerFactory);
+        readers = new EpollEventLoopGroup(numReaders, readerFactory);
+      } else if (KQueue.isAvailable()) { // OS X/BSD.
+        channelClass = KQueueServerSocketChannel.class;
+        acceptors = new KQueueEventLoopGroup(1, listenerFactory);
+        readers = new KQueueEventLoopGroup(numReaders, readerFactory);
+      } else {
+        channelClass = NioServerSocketChannel.class;
+        acceptors = new NioEventLoopGroup(1, listenerFactory);
+        readers = new NioEventLoopGroup(numReaders, readerFactory);
+      }
+      bootstrap = new ServerBootstrap()
+          .group(acceptors, readers)
+          .channel(channelClass)
+          .option(ChannelOption.SO_BACKLOG, backlogLength)

Review comment:
       consider adding option SO_RCVBUF and SO_SNDBUF, and make them 
configurable. 

##########
File path: 
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
##########
@@ -4025,4 +4338,326 @@ protected int getMaxIdleTime() {
   public String getServerName() {
     return serverName;
   }
+
+  // avoid netty trying to "guess" an appropriate buffer size.
+  private static final RecvByteBufAllocator IPC_RECVBUF_ALLOCATOR =
+      new FixedRecvByteBufAllocator(NIO_BUFFER_LIMIT);
+
+  private class NettyListener implements Listener<io.netty.channel.Channel> {
+    private ServerBootstrap bootstrap;
+    private NettyThreadFactory listenerFactory;
+    private NettyThreadFactory readerFactory;
+    private ChannelGroup acceptChannels =
+        new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+    private int backlogLength = conf.getInt(
+        CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_KEY,
+        CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_DEFAULT);
+    private EventLoopGroup acceptors;
+    private EventLoopGroup readers;
+
+    NettyListener(int port) throws IOException {
+      if (!LOG.isDebugEnabled()) {
+        ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.DISABLED);
+      }
+      Class<? extends io.netty.channel.socket.ServerSocketChannel> 
channelClass;
+      listenerFactory = new NettyThreadFactory("Netty Socket Acceptor", port);
+      readerFactory = new NettyThreadFactory("Netty Socket Reader", port);
+
+      // netty's readers double as responders so double the readers to
+      // compensate.
+      int numReaders = 2 * getNumReaders();
+      // Attempt to use native transport if available.
+      if (Epoll.isAvailable()) { // Linux.

Review comment:
       Good. I'm happy to see this. There's about 5% more throughput using 
Epoll/KQueue EventLoopGroup.

##########
File path: 
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
##########
@@ -1957,4 +1969,261 @@ public void close() {
       IOUtils.closeStream(in);
     }
   }
+
+  static class NioIpcStreams extends IpcStreams {
+    NioIpcStreams(Socket socket) throws IOException {
+      setInputStream(
+          new BufferedInputStream(NetUtils.getInputStream(socket)));
+      setOutputStream(
+          new BufferedOutputStream(NetUtils.getOutputStream(socket)));
+    }
+    @Override
+    Future<?> submit(Runnable call) {
+      return Client.getClientExecutor().submit(call);
+    }
+  }
+
+  static class NettyIpcStreams extends IpcStreams {
+    private final EventLoopGroup group;
+    private io.netty.channel.Channel channel;
+    private int soTimeout;
+    private IOException channelIOE;
+
+    NettyIpcStreams(Socket socket) throws IOException {
+      soTimeout = socket.getSoTimeout();
+      if (!LOG.isDebugEnabled()) {
+        ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.DISABLED);
+      }
+      channel = new NioSocketChannel(socket.getChannel());
+      channel.config().setAutoRead(false);
+
+      SslContext sslCtx = null;
+
+      try {
+        sslCtx = SslContextBuilder.forClient()
+            .trustManager(InsecureTrustManagerFactory.INSTANCE).build();
+      } catch (SSLException e) {
+        throw new IOException("Exception while building SSL Context", e);
+      }
+
+      SslHandler sslHandler = sslCtx.newHandler(channel.alloc());
+
+      if (sslHandler != null) {
+        sslHandler.handshakeFuture().addListener(
+            new 
GenericFutureListener<io.netty.util.concurrent.Future<Channel>>() {
+          @Override
+          public void operationComplete(
+              final io.netty.util.concurrent.Future<Channel> handshakeFuture)
+              throws Exception {
+            if (handshakeFuture.isSuccess()) {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("TLS handshake success");
+              }
+            } else {
+              throw new IOException("TLS handshake failed." + 
handshakeFuture.cause());
+            }
+          }
+        });
+      }
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Adding the SSLHandler to the pipeline");
+      }
+      channel.pipeline().addLast("SSL", sslHandler);
+
+      RpcChannelHandler handler = new RpcChannelHandler();
+      setInputStream(new BufferedInputStream(handler.getInputStream()));
+      setOutputStream(new BufferedOutputStream(handler.getOutputStream()));
+      channel.pipeline().addLast(handler);
+      group = new NioEventLoopGroup(1);

Review comment:
       (it was for a totally different work so may not be relevant here, 
especially with SSL)

##########
File path: 
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
##########
@@ -4025,4 +4338,326 @@ protected int getMaxIdleTime() {
   public String getServerName() {
     return serverName;
   }
+
+  // avoid netty trying to "guess" an appropriate buffer size.
+  private static final RecvByteBufAllocator IPC_RECVBUF_ALLOCATOR =
+      new FixedRecvByteBufAllocator(NIO_BUFFER_LIMIT);
+
+  private class NettyListener implements Listener<io.netty.channel.Channel> {
+    private ServerBootstrap bootstrap;
+    private NettyThreadFactory listenerFactory;
+    private NettyThreadFactory readerFactory;
+    private ChannelGroup acceptChannels =
+        new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+    private int backlogLength = conf.getInt(
+        CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_KEY,
+        CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_DEFAULT);
+    private EventLoopGroup acceptors;
+    private EventLoopGroup readers;
+
+    NettyListener(int port) throws IOException {
+      if (!LOG.isDebugEnabled()) {
+        ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.DISABLED);
+      }
+      Class<? extends io.netty.channel.socket.ServerSocketChannel> 
channelClass;
+      listenerFactory = new NettyThreadFactory("Netty Socket Acceptor", port);
+      readerFactory = new NettyThreadFactory("Netty Socket Reader", port);
+
+      // netty's readers double as responders so double the readers to
+      // compensate.
+      int numReaders = 2 * getNumReaders();
+      // Attempt to use native transport if available.
+      if (Epoll.isAvailable()) { // Linux.
+        channelClass = EpollServerSocketChannel.class;
+        acceptors = new EpollEventLoopGroup(1, listenerFactory);
+        readers = new EpollEventLoopGroup(numReaders, readerFactory);
+      } else if (KQueue.isAvailable()) { // OS X/BSD.
+        channelClass = KQueueServerSocketChannel.class;
+        acceptors = new KQueueEventLoopGroup(1, listenerFactory);
+        readers = new KQueueEventLoopGroup(numReaders, readerFactory);
+      } else {
+        channelClass = NioServerSocketChannel.class;
+        acceptors = new NioEventLoopGroup(1, listenerFactory);
+        readers = new NioEventLoopGroup(numReaders, readerFactory);
+      }
+      bootstrap = new ServerBootstrap()
+          .group(acceptors, readers)
+          .channel(channelClass)
+          .option(ChannelOption.SO_BACKLOG, backlogLength)
+          .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
+          .childOption(ChannelOption.RCVBUF_ALLOCATOR, IPC_RECVBUF_ALLOCATOR)
+          .childOption(ChannelOption.SO_KEEPALIVE, true)
+          .childOption(ChannelOption.SO_REUSEADDR, true)
+          .childOption(ChannelOption.TCP_NODELAY, tcpNoDelay)
+          .childHandler(new ChannelInitializer<Channel>() {
+              @Override
+              protected void initChannel(io.netty.channel.Channel channel)
+                  throws IOException {
+                connectionManager.register(new NettyConnection(channel));
+              }});
+
+      InetSocketAddress address = new InetSocketAddress(bindAddress, port);
+      io.netty.channel.Channel channel = Server.bind(Binder.NETTY, bootstrap,
+          address, backlogLength, conf, portRangeConfig);
+      registerAcceptChannel(channel);
+      // If may have been an ephemeral port or port range bind, so update
+      // the thread factories to rename any already created threads.
+      port = ((InetSocketAddress)channel.localAddress()).getPort();
+      listenerFactory.updatePort(port);
+      readerFactory.updatePort(port);
+    }
+
+    @Override
+    public InetSocketAddress getAddress() {
+      return Server.this.getListenerAddress();
+    }
+
+    @Override
+    public void listen(InetSocketAddress addr) throws IOException {
+      registerAcceptChannel(Binder.NETTY.bind(bootstrap, addr, backlogLength));
+    }
+
+    @Override
+    public void registerAcceptChannel(io.netty.channel.Channel channel) {
+      acceptChannels.add(channel);
+      addListenerAddress((InetSocketAddress)channel.localAddress());
+    }
+
+    @Override
+    public void closeAcceptChannels() {
+      acceptChannels.close();
+    }
+
+    @Override
+    public void start() {
+      connectionManager.startIdleScan();
+    }
+
+    @Override
+    public void interrupt() {
+      doStop();
+    }
+
+    @Override
+    public void doStop() {
+      try {
+        //TODO : Add Boolean stopped to avoid double stoppage.
+
+        // closing will send events to the bootstrap's event loop groups.
+        closeAcceptChannels();
+        connectionManager.stopIdleScan();
+        connectionManager.closeAll();
+        // shutdown the event loops to reject all further events.
+        ServerBootstrapConfig config = bootstrap.config();
+        //config.group().shutdownGracefully().awaitUninterruptibly();
+        //config.childGroup().shutdownGracefully().awaitUninterruptibly();
+        acceptors.shutdownGracefully().awaitUninterruptibly();
+        readers.shutdownGracefully().awaitUninterruptibly();
+        listenerFactory.close();
+        readerFactory.close();
+      } catch (IOException ioe) {
+        LOG.warn("Unable to shutdown Netty listener and reader threads : ", 
ioe);
+      }
+      finally {
+        IOUtils.cleanupWithLogger(LOG, listenerFactory, readerFactory);
+      }
+    }
+  }
+
+  @ChannelHandler.Sharable
+  private class NettyResponder extends ChannelOutboundHandlerAdapter
+      implements Responder {
+    @Override
+    public void start() {}
+    @Override
+    public void interrupt() {}
+    // called by handlers.
+    // TODO: Is queuing required similar to the NioResponder implementation ?
+    @Override
+    public void doRespond(RpcCall call) throws IOException {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(Thread.currentThread().getName() +
+            ": responding to " + call);
+      }
+      NettyConnection connection = call.connection();

Review comment:
       we should also run the code (benchmark) with 
-Dio.netty.leakDetectionLevel=advanced to detect any potential memory leak.

##########
File path: 
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
##########
@@ -1957,4 +1969,261 @@ public void close() {
       IOUtils.closeStream(in);
     }
   }
+
+  static class NioIpcStreams extends IpcStreams {
+    NioIpcStreams(Socket socket) throws IOException {
+      setInputStream(
+          new BufferedInputStream(NetUtils.getInputStream(socket)));
+      setOutputStream(
+          new BufferedOutputStream(NetUtils.getOutputStream(socket)));
+    }
+    @Override
+    Future<?> submit(Runnable call) {
+      return Client.getClientExecutor().submit(call);
+    }
+  }
+
+  static class NettyIpcStreams extends IpcStreams {
+    private final EventLoopGroup group;
+    private io.netty.channel.Channel channel;
+    private int soTimeout;
+    private IOException channelIOE;
+
+    NettyIpcStreams(Socket socket) throws IOException {
+      soTimeout = socket.getSoTimeout();
+      if (!LOG.isDebugEnabled()) {
+        ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.DISABLED);
+      }
+      channel = new NioSocketChannel(socket.getChannel());
+      channel.config().setAutoRead(false);
+
+      SslContext sslCtx = null;
+
+      try {
+        sslCtx = SslContextBuilder.forClient()
+            .trustManager(InsecureTrustManagerFactory.INSTANCE).build();
+      } catch (SSLException e) {
+        throw new IOException("Exception while building SSL Context", e);
+      }
+
+      SslHandler sslHandler = sslCtx.newHandler(channel.alloc());
+
+      if (sslHandler != null) {
+        sslHandler.handshakeFuture().addListener(
+            new 
GenericFutureListener<io.netty.util.concurrent.Future<Channel>>() {
+          @Override
+          public void operationComplete(
+              final io.netty.util.concurrent.Future<Channel> handshakeFuture)
+              throws Exception {
+            if (handshakeFuture.isSuccess()) {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("TLS handshake success");
+              }
+            } else {
+              throw new IOException("TLS handshake failed." + 
handshakeFuture.cause());
+            }
+          }
+        });
+      }
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Adding the SSLHandler to the pipeline");
+      }
+      channel.pipeline().addLast("SSL", sslHandler);
+
+      RpcChannelHandler handler = new RpcChannelHandler();
+      setInputStream(new BufferedInputStream(handler.getInputStream()));
+      setOutputStream(new BufferedOutputStream(handler.getOutputStream()));
+      channel.pipeline().addLast(handler);
+      group = new NioEventLoopGroup(1);

Review comment:
       We can consider supporting Epoll/KQueue event loop group too at client 
side too.
   
   I had benchmarked NIO/Epoll performance at one point.
   Client side Epoll/KQueue event loop group has a notable 5% throughput 
improvement over NIO.

##########
File path: hadoop-common-project/hadoop-common/pom.xml
##########
@@ -1014,6 +1050,36 @@
               </filesets>
             </configuration>
           </plugin>
+          <plugin>

Review comment:
       I don't remember why it requires shading netty. Although I think it is a 
good idea to shade netty, after going through the netty 4.0 -> 4.1 upgrade 
exercise.
   
   However, shading the netty classes inside hadoop-common makes it impossible 
for the rest of Hadoop use the shaded netty. I guess we want to move the shaded 
netty to hadoop-thirdparty in the long run. Projects like Tez and Ratis uses 
Hadoop RPC code so we want to thread it carefully for compatibility.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to