szetszwo commented on a change in pull request #2832:
URL: https://github.com/apache/hadoop/pull/2832#discussion_r613193567



##########
File path: 
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java
##########
@@ -59,35 +64,41 @@ public SimpleTcpServer(int port, RpcProgram program, int 
workercount) {
 
   public void run() {
     // Configure the Server.
-    ChannelFactory factory;
+    bossGroup = new NioEventLoopGroup();
+
     if (workerCount == 0) {
       // Use default workers: 2 * the number of available processors
-      factory = new NioServerSocketChannelFactory(
-          Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
+      workerGroup = new NioEventLoopGroup(0, Executors.newCachedThreadPool());
     } else {
-      factory = new NioServerSocketChannelFactory(
-          Executors.newCachedThreadPool(), Executors.newCachedThreadPool(),
-          workerCount);
+      workerGroup = new NioEventLoopGroup(workerCount, 
Executors.newCachedThreadPool());
     }

Review comment:
       The if-else is no longer need.  Simply pass workerCount for both cases 
as below.
   ```
         workerGroup = new NioEventLoopGroup(workerCount, 
Executors.newCachedThreadPool());
   ```

##########
File path: 
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java
##########
@@ -39,9 +42,11 @@
       LoggerFactory.getLogger(SimpleTcpServer.class);
   protected final int port;
   protected int boundPort = -1; // Will be set after server starts
-  protected final SimpleChannelUpstreamHandler rpcProgram;
+  protected final ChannelInboundHandlerAdapter rpcProgram;
   private ServerBootstrap server;
   private Channel ch;
+  private EventLoopGroup bossGroup;
+  private EventLoopGroup workerGroup;

Review comment:
       Should the non-final fields (boundPort, server, ch, bossGroup, 
workerGroup) be volatile?  Not sure if they are synchronized in somewhere.
   

##########
File path: 
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java
##########
@@ -242,26 +242,26 @@ public static boolean verifyLength(XDR xdr, int len) {
    * @param last specifies last request or not
    * @return TCP buffer
    */
-  public static ChannelBuffer writeMessageTcp(XDR request, boolean last) {
+  public static ByteBuf writeMessageTcp(XDR request, boolean last) {
     Preconditions.checkState(request.state == XDR.State.WRITING);
     ByteBuffer b = request.buf.duplicate();
     b.flip();
     byte[] fragmentHeader = XDR.recordMark(b.limit(), last);
     ByteBuffer headerBuf = ByteBuffer.wrap(fragmentHeader);
 
     // TODO: Investigate whether making a copy of the buffer is necessary.
-    return ChannelBuffers.copiedBuffer(headerBuf, b);
+    return Unpooled.wrappedBuffer(headerBuf, b);

Review comment:
       The new code uses wrappedBuffer but old code uses copiedBuffer.  Is it 
intended?

##########
File path: hadoop-common-project/hadoop-nfs/src/test/resources/log4j.properties
##########
@@ -15,4 +15,4 @@ log4j.rootLogger=info,stdout
 log4j.threshold=ALL
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} 
(%F:%M(%L)) - %m%n
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} 
(%F:%M(%L)) - %m%n

Review comment:
       Let's revert this whitespace change in order to keep log4j.properties 
untouched.

##########
File path: 
hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/portmap/TestPortmap.java
##########
@@ -92,6 +92,7 @@ public void testRegistration() throws IOException, 
InterruptedException {
     DatagramPacket p = new DatagramPacket(reqBuf, reqBuf.length,
         pm.getUdpServerLoAddress());
     try {
+

Review comment:
       Let's revert this whitespace change.

##########
File path: 
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java
##########
@@ -84,8 +97,6 @@ public void shutdown() {
     if (ch != null) {
       ch.close().awaitUninterruptibly();
     }
-    if (server != null) {
-      server.releaseExternalResources();
-    }
+    workerGroup.shutdownGracefully();

Review comment:
       Check null.

##########
File path: 
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/Portmap.java
##########
@@ -95,36 +108,59 @@ RpcProgramPortmap getHandler() {
   void start(final int idleTimeMilliSeconds, final SocketAddress tcpAddress,
       final SocketAddress udpAddress) {
 
-    tcpServer = new ServerBootstrap(new NioServerSocketChannelFactory(
-        Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));
-    tcpServer.setPipelineFactory(new ChannelPipelineFactory() {
-      private final HashedWheelTimer timer = new HashedWheelTimer();
-      private final IdleStateHandler idleStateHandler = new IdleStateHandler(
-          timer, 0, 0, idleTimeMilliSeconds, TimeUnit.MILLISECONDS);
-
-      @Override
-      public ChannelPipeline getPipeline() throws Exception {
-        return Channels.pipeline(RpcUtil.constructRpcFrameDecoder(),
-            RpcUtil.STAGE_RPC_MESSAGE_PARSER, idleStateHandler, handler,
-            RpcUtil.STAGE_RPC_TCP_RESPONSE);
-      }
-    });
-    tcpServer.setOption("reuseAddress", true);
-    tcpServer.setOption("child.reuseAddress", true);
-
-    udpServer = new ConnectionlessBootstrap(new NioDatagramChannelFactory(
-        Executors.newCachedThreadPool()));
-
-    udpServer.setPipeline(Channels.pipeline(RpcUtil.STAGE_RPC_MESSAGE_PARSER,
-        handler, RpcUtil.STAGE_RPC_UDP_RESPONSE));
-    udpServer.setOption("reuseAddress", true);
-
-    tcpChannel = tcpServer.bind(tcpAddress);
-    udpChannel = udpServer.bind(udpAddress);
+    bossGroup = new NioEventLoopGroup();
+    workerGroup = new NioEventLoopGroup(0, Executors.newCachedThreadPool());
+
+    tcpServer = new ServerBootstrap();
+    tcpServer.group(bossGroup, workerGroup)
+        .option(ChannelOption.SO_REUSEADDR, true)
+        .childOption(ChannelOption.SO_REUSEADDR, true)
+        .channel(NioServerSocketChannel.class)
+        .childHandler(new ChannelInitializer<SocketChannel>() {
+          private final IdleStateHandler idleStateHandler = new 
IdleStateHandler(
+              0, 0, idleTimeMilliSeconds, TimeUnit.MILLISECONDS);
+
+          @Override
+          protected void initChannel(SocketChannel ch) throws Exception {
+            ChannelPipeline p = ch.pipeline();
+
+            p.addLast(RpcUtil.constructRpcFrameDecoder(),
+                RpcUtil.STAGE_RPC_MESSAGE_PARSER, idleStateHandler, handler,
+                RpcUtil.STAGE_RPC_TCP_RESPONSE);
+          }});
+
+    udpGroup = new NioEventLoopGroup(0, Executors.newCachedThreadPool());
+
+    udpServer = new Bootstrap();
+    udpServer.group(udpGroup)
+        .channel(NioDatagramChannel.class)
+        .handler(new ChannelInitializer<NioDatagramChannel>() {
+          @Override protected void initChannel(NioDatagramChannel ch)
+              throws Exception {
+            ChannelPipeline p = ch.pipeline();
+            p.addLast(
+                new LoggingHandler(LogLevel.DEBUG),
+                RpcUtil.STAGE_RPC_MESSAGE_PARSER, handler, 
RpcUtil.STAGE_RPC_UDP_RESPONSE);
+          }
+        })
+        .option(ChannelOption.SO_REUSEADDR, true);
+
+    ChannelFuture tcpChannelFuture = null;
+    try {
+      tcpChannelFuture = tcpServer.bind(tcpAddress).sync();
+      tcpChannel = tcpChannelFuture.channel();
+
+      ChannelFuture udpChannelFuture = udpServer.bind(udpAddress).sync();
+      udpChannel = udpChannelFuture.channel();

Review comment:
       Call bind for both channels first and then sync.
   ```
         ChannelFuture tcpChannelFuture = tcpServer.bind(tcpAddress);
         ChannelFuture udpChannelFuture = udpServer.bind(udpAddress);
         tcpChannel = tcpChannelFuture.sync().channel();
         udpChannel = udpChannelFuture.sync().channel();
   ```
   

##########
File path: 
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java
##########
@@ -60,6 +60,8 @@
   private final ChannelGroup allChannels;
 
   RpcProgramPortmap(ChannelGroup allChannels) {
+    super(1, 1, 1);
+    // FIXME: set default idle timeout 1 second.

Review comment:
       Why setting timeout to 1 second?  Is this for debugging?

##########
File path: 
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/Portmap.java
##########
@@ -95,36 +108,59 @@ RpcProgramPortmap getHandler() {
   void start(final int idleTimeMilliSeconds, final SocketAddress tcpAddress,
       final SocketAddress udpAddress) {
 
-    tcpServer = new ServerBootstrap(new NioServerSocketChannelFactory(
-        Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));
-    tcpServer.setPipelineFactory(new ChannelPipelineFactory() {
-      private final HashedWheelTimer timer = new HashedWheelTimer();
-      private final IdleStateHandler idleStateHandler = new IdleStateHandler(
-          timer, 0, 0, idleTimeMilliSeconds, TimeUnit.MILLISECONDS);
-
-      @Override
-      public ChannelPipeline getPipeline() throws Exception {
-        return Channels.pipeline(RpcUtil.constructRpcFrameDecoder(),
-            RpcUtil.STAGE_RPC_MESSAGE_PARSER, idleStateHandler, handler,
-            RpcUtil.STAGE_RPC_TCP_RESPONSE);
-      }
-    });
-    tcpServer.setOption("reuseAddress", true);
-    tcpServer.setOption("child.reuseAddress", true);
-
-    udpServer = new ConnectionlessBootstrap(new NioDatagramChannelFactory(
-        Executors.newCachedThreadPool()));
-
-    udpServer.setPipeline(Channels.pipeline(RpcUtil.STAGE_RPC_MESSAGE_PARSER,
-        handler, RpcUtil.STAGE_RPC_UDP_RESPONSE));
-    udpServer.setOption("reuseAddress", true);
-
-    tcpChannel = tcpServer.bind(tcpAddress);
-    udpChannel = udpServer.bind(udpAddress);
+    bossGroup = new NioEventLoopGroup();
+    workerGroup = new NioEventLoopGroup(0, Executors.newCachedThreadPool());
+
+    tcpServer = new ServerBootstrap();
+    tcpServer.group(bossGroup, workerGroup)
+        .option(ChannelOption.SO_REUSEADDR, true)
+        .childOption(ChannelOption.SO_REUSEADDR, true)
+        .channel(NioServerSocketChannel.class)
+        .childHandler(new ChannelInitializer<SocketChannel>() {
+          private final IdleStateHandler idleStateHandler = new 
IdleStateHandler(
+              0, 0, idleTimeMilliSeconds, TimeUnit.MILLISECONDS);
+
+          @Override
+          protected void initChannel(SocketChannel ch) throws Exception {
+            ChannelPipeline p = ch.pipeline();
+
+            p.addLast(RpcUtil.constructRpcFrameDecoder(),
+                RpcUtil.STAGE_RPC_MESSAGE_PARSER, idleStateHandler, handler,
+                RpcUtil.STAGE_RPC_TCP_RESPONSE);
+          }});
+
+    udpGroup = new NioEventLoopGroup(0, Executors.newCachedThreadPool());
+
+    udpServer = new Bootstrap();
+    udpServer.group(udpGroup)
+        .channel(NioDatagramChannel.class)
+        .handler(new ChannelInitializer<NioDatagramChannel>() {
+          @Override protected void initChannel(NioDatagramChannel ch)
+              throws Exception {
+            ChannelPipeline p = ch.pipeline();
+            p.addLast(
+                new LoggingHandler(LogLevel.DEBUG),
+                RpcUtil.STAGE_RPC_MESSAGE_PARSER, handler, 
RpcUtil.STAGE_RPC_UDP_RESPONSE);
+          }
+        })
+        .option(ChannelOption.SO_REUSEADDR, true);
+
+    ChannelFuture tcpChannelFuture = null;
+    try {
+      tcpChannelFuture = tcpServer.bind(tcpAddress).sync();
+      tcpChannel = tcpChannelFuture.channel();
+
+      ChannelFuture udpChannelFuture = udpServer.bind(udpAddress).sync();
+      udpChannel = udpChannelFuture.channel();
+
+    } catch (InterruptedException e) {
+      e.printStackTrace();

Review comment:
       Should it re-throws the exception?  Otherwise, the code below 
referencing tcpChannel/udpChannel may throw NullPointerException.

##########
File path: 
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java
##########
@@ -103,8 +114,8 @@ public void shutdown() {
     if (ch != null) {
       ch.close().awaitUninterruptibly();
     }
-    if (server != null) {
-      server.releaseExternalResources();
-    }
+
+    workerGroup.shutdownGracefully();
+    bossGroup.shutdownGracefully();

Review comment:
       Check null, i.e.
   ```
       if (workerGroup != null) {
         workerGroup.shutdownGracefully();
       }
       if (bossGroup != null) {
         bossGroup.shutdownGracefully();
       }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to