vnhive commented on a change in pull request #3966:
URL: https://github.com/apache/hadoop/pull/3966#discussion_r813515305



##########
File path: 
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
##########
@@ -4025,4 +4338,326 @@ protected int getMaxIdleTime() {
   public String getServerName() {
     return serverName;
   }
+
+  // avoid netty trying to "guess" an appropriate buffer size.
+  private static final RecvByteBufAllocator IPC_RECVBUF_ALLOCATOR =
+      new FixedRecvByteBufAllocator(NIO_BUFFER_LIMIT);
+
+  private class NettyListener implements Listener<io.netty.channel.Channel> {
+    private ServerBootstrap bootstrap;
+    private NettyThreadFactory listenerFactory;
+    private NettyThreadFactory readerFactory;
+    private ChannelGroup acceptChannels =
+        new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+    private int backlogLength = conf.getInt(
+        CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_KEY,
+        CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_DEFAULT);
+    private EventLoopGroup acceptors;
+    private EventLoopGroup readers;
+
+    NettyListener(int port) throws IOException {
+      if (!LOG.isDebugEnabled()) {
+        ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.DISABLED);
+      }
+      Class<? extends io.netty.channel.socket.ServerSocketChannel> 
channelClass;
+      listenerFactory = new NettyThreadFactory("Netty Socket Acceptor", port);
+      readerFactory = new NettyThreadFactory("Netty Socket Reader", port);
+
+      // netty's readers double as responders so double the readers to
+      // compensate.
+      int numReaders = 2 * getNumReaders();
+      // Attempt to use native transport if available.
+      if (Epoll.isAvailable()) { // Linux.
+        channelClass = EpollServerSocketChannel.class;
+        acceptors = new EpollEventLoopGroup(1, listenerFactory);
+        readers = new EpollEventLoopGroup(numReaders, readerFactory);
+      } else if (KQueue.isAvailable()) { // OS X/BSD.
+        channelClass = KQueueServerSocketChannel.class;
+        acceptors = new KQueueEventLoopGroup(1, listenerFactory);
+        readers = new KQueueEventLoopGroup(numReaders, readerFactory);
+      } else {
+        channelClass = NioServerSocketChannel.class;
+        acceptors = new NioEventLoopGroup(1, listenerFactory);
+        readers = new NioEventLoopGroup(numReaders, readerFactory);
+      }
+      bootstrap = new ServerBootstrap()
+          .group(acceptors, readers)
+          .channel(channelClass)
+          .option(ChannelOption.SO_BACKLOG, backlogLength)
+          .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
+          .childOption(ChannelOption.RCVBUF_ALLOCATOR, IPC_RECVBUF_ALLOCATOR)
+          .childOption(ChannelOption.SO_KEEPALIVE, true)
+          .childOption(ChannelOption.SO_REUSEADDR, true)
+          .childOption(ChannelOption.TCP_NODELAY, tcpNoDelay)
+          .childHandler(new ChannelInitializer<Channel>() {
+              @Override
+              protected void initChannel(io.netty.channel.Channel channel)
+                  throws IOException {
+                connectionManager.register(new NettyConnection(channel));
+              }});
+
+      InetSocketAddress address = new InetSocketAddress(bindAddress, port);
+      io.netty.channel.Channel channel = Server.bind(Binder.NETTY, bootstrap,
+          address, backlogLength, conf, portRangeConfig);
+      registerAcceptChannel(channel);
+      // If may have been an ephemeral port or port range bind, so update
+      // the thread factories to rename any already created threads.
+      port = ((InetSocketAddress)channel.localAddress()).getPort();
+      listenerFactory.updatePort(port);
+      readerFactory.updatePort(port);
+    }
+
+    @Override
+    public InetSocketAddress getAddress() {
+      return Server.this.getListenerAddress();
+    }
+
+    @Override
+    public void listen(InetSocketAddress addr) throws IOException {
+      registerAcceptChannel(Binder.NETTY.bind(bootstrap, addr, backlogLength));
+    }
+
+    @Override
+    public void registerAcceptChannel(io.netty.channel.Channel channel) {
+      acceptChannels.add(channel);
+      addListenerAddress((InetSocketAddress)channel.localAddress());
+    }
+
+    @Override
+    public void closeAcceptChannels() {
+      acceptChannels.close();
+    }
+
+    @Override
+    public void start() {
+      connectionManager.startIdleScan();
+    }
+
+    @Override
+    public void interrupt() {
+      doStop();
+    }
+
+    @Override
+    public void doStop() {
+      try {
+        //TODO : Add Boolean stopped to avoid double stoppage.
+
+        // closing will send events to the bootstrap's event loop groups.
+        closeAcceptChannels();
+        connectionManager.stopIdleScan();
+        connectionManager.closeAll();
+        // shutdown the event loops to reject all further events.
+        ServerBootstrapConfig config = bootstrap.config();
+        //config.group().shutdownGracefully().awaitUninterruptibly();
+        //config.childGroup().shutdownGracefully().awaitUninterruptibly();
+        acceptors.shutdownGracefully().awaitUninterruptibly();
+        readers.shutdownGracefully().awaitUninterruptibly();
+        listenerFactory.close();
+        readerFactory.close();
+      } catch (IOException ioe) {
+        LOG.warn("Unable to shutdown Netty listener and reader threads : ", 
ioe);
+      }
+      finally {
+        IOUtils.cleanupWithLogger(LOG, listenerFactory, readerFactory);
+      }
+    }
+  }
+
+  @ChannelHandler.Sharable
+  private class NettyResponder extends ChannelOutboundHandlerAdapter
+      implements Responder {
+    @Override
+    public void start() {}
+    @Override
+    public void interrupt() {}
+    // called by handlers.
+    // TODO: Is queuing required similar to the NioResponder implementation ?
+    @Override
+    public void doRespond(RpcCall call) throws IOException {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(Thread.currentThread().getName() +
+            ": responding to " + call);
+      }
+      NettyConnection connection = call.connection();

Review comment:
       Sure, Thank you for the pointer, I will look into this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to