vnhive commented on a change in pull request #3966:
URL: https://github.com/apache/hadoop/pull/3966#discussion_r813514721



##########
File path: 
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
##########
@@ -4025,4 +4338,326 @@ protected int getMaxIdleTime() {
   public String getServerName() {
     return serverName;
   }
+
+  // avoid netty trying to "guess" an appropriate buffer size.
+  private static final RecvByteBufAllocator IPC_RECVBUF_ALLOCATOR =
+      new FixedRecvByteBufAllocator(NIO_BUFFER_LIMIT);
+
+  private class NettyListener implements Listener<io.netty.channel.Channel> {
+    private ServerBootstrap bootstrap;
+    private NettyThreadFactory listenerFactory;
+    private NettyThreadFactory readerFactory;
+    private ChannelGroup acceptChannels =
+        new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+    private int backlogLength = conf.getInt(
+        CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_KEY,
+        CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_DEFAULT);
+    private EventLoopGroup acceptors;
+    private EventLoopGroup readers;
+
+    NettyListener(int port) throws IOException {
+      if (!LOG.isDebugEnabled()) {
+        ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.DISABLED);
+      }
+      Class<? extends io.netty.channel.socket.ServerSocketChannel> 
channelClass;
+      listenerFactory = new NettyThreadFactory("Netty Socket Acceptor", port);
+      readerFactory = new NettyThreadFactory("Netty Socket Reader", port);
+
+      // netty's readers double as responders so double the readers to
+      // compensate.
+      int numReaders = 2 * getNumReaders();
+      // Attempt to use native transport if available.
+      if (Epoll.isAvailable()) { // Linux.
+        channelClass = EpollServerSocketChannel.class;
+        acceptors = new EpollEventLoopGroup(1, listenerFactory);
+        readers = new EpollEventLoopGroup(numReaders, readerFactory);
+      } else if (KQueue.isAvailable()) { // OS X/BSD.
+        channelClass = KQueueServerSocketChannel.class;
+        acceptors = new KQueueEventLoopGroup(1, listenerFactory);
+        readers = new KQueueEventLoopGroup(numReaders, readerFactory);
+      } else {
+        channelClass = NioServerSocketChannel.class;
+        acceptors = new NioEventLoopGroup(1, listenerFactory);
+        readers = new NioEventLoopGroup(numReaders, readerFactory);
+      }
+      bootstrap = new ServerBootstrap()
+          .group(acceptors, readers)
+          .channel(channelClass)
+          .option(ChannelOption.SO_BACKLOG, backlogLength)

Review comment:
       will do. will update my next PR with this change.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to