jojochuang commented on a change in pull request #2832:
URL: https://github.com/apache/hadoop/pull/2832#discussion_r615684655



##########
File path: 
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/Portmap.java
##########
@@ -95,36 +108,59 @@ RpcProgramPortmap getHandler() {
   void start(final int idleTimeMilliSeconds, final SocketAddress tcpAddress,
       final SocketAddress udpAddress) {
 
-    tcpServer = new ServerBootstrap(new NioServerSocketChannelFactory(
-        Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));
-    tcpServer.setPipelineFactory(new ChannelPipelineFactory() {
-      private final HashedWheelTimer timer = new HashedWheelTimer();
-      private final IdleStateHandler idleStateHandler = new IdleStateHandler(
-          timer, 0, 0, idleTimeMilliSeconds, TimeUnit.MILLISECONDS);
-
-      @Override
-      public ChannelPipeline getPipeline() throws Exception {
-        return Channels.pipeline(RpcUtil.constructRpcFrameDecoder(),
-            RpcUtil.STAGE_RPC_MESSAGE_PARSER, idleStateHandler, handler,
-            RpcUtil.STAGE_RPC_TCP_RESPONSE);
-      }
-    });
-    tcpServer.setOption("reuseAddress", true);
-    tcpServer.setOption("child.reuseAddress", true);
-
-    udpServer = new ConnectionlessBootstrap(new NioDatagramChannelFactory(
-        Executors.newCachedThreadPool()));
-
-    udpServer.setPipeline(Channels.pipeline(RpcUtil.STAGE_RPC_MESSAGE_PARSER,
-        handler, RpcUtil.STAGE_RPC_UDP_RESPONSE));
-    udpServer.setOption("reuseAddress", true);
-
-    tcpChannel = tcpServer.bind(tcpAddress);
-    udpChannel = udpServer.bind(udpAddress);
+    bossGroup = new NioEventLoopGroup();
+    workerGroup = new NioEventLoopGroup(0, Executors.newCachedThreadPool());
+
+    tcpServer = new ServerBootstrap();
+    tcpServer.group(bossGroup, workerGroup)
+        .option(ChannelOption.SO_REUSEADDR, true)
+        .childOption(ChannelOption.SO_REUSEADDR, true)
+        .channel(NioServerSocketChannel.class)
+        .childHandler(new ChannelInitializer<SocketChannel>() {
+          private final IdleStateHandler idleStateHandler = new 
IdleStateHandler(
+              0, 0, idleTimeMilliSeconds, TimeUnit.MILLISECONDS);
+
+          @Override
+          protected void initChannel(SocketChannel ch) throws Exception {
+            ChannelPipeline p = ch.pipeline();
+
+            p.addLast(RpcUtil.constructRpcFrameDecoder(),
+                RpcUtil.STAGE_RPC_MESSAGE_PARSER, idleStateHandler, handler,
+                RpcUtil.STAGE_RPC_TCP_RESPONSE);
+          }});
+
+    udpGroup = new NioEventLoopGroup(0, Executors.newCachedThreadPool());
+
+    udpServer = new Bootstrap();
+    udpServer.group(udpGroup)
+        .channel(NioDatagramChannel.class)
+        .handler(new ChannelInitializer<NioDatagramChannel>() {
+          @Override protected void initChannel(NioDatagramChannel ch)
+              throws Exception {
+            ChannelPipeline p = ch.pipeline();
+            p.addLast(
+                new LoggingHandler(LogLevel.DEBUG),
+                RpcUtil.STAGE_RPC_MESSAGE_PARSER, handler, 
RpcUtil.STAGE_RPC_UDP_RESPONSE);
+          }
+        })
+        .option(ChannelOption.SO_REUSEADDR, true);
+
+    ChannelFuture tcpChannelFuture = null;
+    try {
+      tcpChannelFuture = tcpServer.bind(tcpAddress).sync();
+      tcpChannel = tcpChannelFuture.channel();
+
+      ChannelFuture udpChannelFuture = udpServer.bind(udpAddress).sync();
+      udpChannel = udpChannelFuture.channel();
+
+    } catch (InterruptedException e) {
+      e.printStackTrace();

Review comment:
       make sense.

##########
File path: 
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java
##########
@@ -242,26 +242,26 @@ public static boolean verifyLength(XDR xdr, int len) {
    * @param last specifies last request or not
    * @return TCP buffer
    */
-  public static ChannelBuffer writeMessageTcp(XDR request, boolean last) {
+  public static ByteBuf writeMessageTcp(XDR request, boolean last) {
     Preconditions.checkState(request.state == XDR.State.WRITING);
     ByteBuffer b = request.buf.duplicate();
     b.flip();
     byte[] fragmentHeader = XDR.recordMark(b.limit(), last);
     ByteBuffer headerBuf = ByteBuffer.wrap(fragmentHeader);
 
     // TODO: Investigate whether making a copy of the buffer is necessary.
-    return ChannelBuffers.copiedBuffer(headerBuf, b);
+    return Unpooled.wrappedBuffer(headerBuf, b);

Review comment:
       Intentional. As suspected in the TODO above, it seems wrapping instead 
of copying the buffer does work.

##########
File path: 
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java
##########
@@ -39,9 +42,11 @@
       LoggerFactory.getLogger(SimpleTcpServer.class);
   protected final int port;
   protected int boundPort = -1; // Will be set after server starts
-  protected final SimpleChannelUpstreamHandler rpcProgram;
+  protected final ChannelInboundHandlerAdapter rpcProgram;
   private ServerBootstrap server;
   private Channel ch;
+  private EventLoopGroup bossGroup;
+  private EventLoopGroup workerGroup;

Review comment:
       these are all private members. Only the shutdown() and run() methods 
touch them, and both methods are called by the same thread. So given the 
current usage volatile isn't necessary.

##########
File path: 
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java
##########
@@ -60,6 +60,8 @@
   private final ChannelGroup allChannels;
 
   RpcProgramPortmap(ChannelGroup allChannels) {
+    super(1, 1, 1);
+    // FIXME: set default idle timeout 1 second.

Review comment:
       So this one I am not 100% sure. Without calling super(), the default is 
no idle timeout (not sure about the default behavior in netty3), 
TestPortmap#testIdle() will fail. So I have to set a timeout. If I set a long 
timeout like 60 seconds,  TestPortmap#testIdle() will fail too. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to