[ 
https://issues.apache.org/jira/browse/HADOOP-11245?focusedWorklogId=581671&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-581671
 ]

ASF GitHub Bot logged work on HADOOP-11245:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 13/Apr/21 10:21
            Start Date: 13/Apr/21 10:21
    Worklog Time Spent: 10m 
      Work Description: szetszwo commented on a change in pull request #2832:
URL: https://github.com/apache/hadoop/pull/2832#discussion_r612218129



##########
File path: 
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java
##########
@@ -62,75 +64,84 @@ public static FrameDecoder constructRpcFrameDecoder() {
    * RpcFrameDecoder is a stateful pipeline stage. It has to be constructed for
    * each RPC client.
    */
-  static class RpcFrameDecoder extends FrameDecoder {
+  static class RpcFrameDecoder extends ByteToMessageDecoder {
     public static final Logger LOG =
         LoggerFactory.getLogger(RpcFrameDecoder.class);
-    private ChannelBuffer currentFrame;
+    private boolean isLast;

Review comment:
       isLast should be volatile.

##########
File path: 
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClient.java
##########
@@ -48,40 +47,42 @@ public SimpleTcpClient(String host, int port, XDR request, 
Boolean oneShot) {
     this.request = request;
     this.oneShot = oneShot;
   }
-  
-  protected ChannelPipelineFactory setPipelineFactory() {
-    this.pipelineFactory = new ChannelPipelineFactory() {
+
+  protected ChannelInitializer<SocketChannel> setChannelHandler() {
+    return new ChannelInitializer<SocketChannel>() {
       @Override
-      public ChannelPipeline getPipeline() {
-        return Channels.pipeline(
+      protected void initChannel(SocketChannel ch) throws Exception {
+        ChannelPipeline p = ch.pipeline();
+        p.addLast(
             RpcUtil.constructRpcFrameDecoder(),
-            new SimpleTcpClientHandler(request));
+            new SimpleTcpClientHandler(request)
+        );
       }
     };
-    return this.pipelineFactory;
   }
 
   public void run() {
     // Configure the client.
-    ChannelFactory factory = new NioClientSocketChannelFactory(
-        Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), 1, 
1);
-    ClientBootstrap bootstrap = new ClientBootstrap(factory);
-
-    // Set up the pipeline factory.
-    bootstrap.setPipelineFactory(setPipelineFactory());
+    NioEventLoopGroup workerGroup = new NioEventLoopGroup();

Review comment:
       Is run() only used in unit tests?  If not, workerGroup needs to be 
shutdown when oneShot == false.

##########
File path: 
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClient.java
##########
@@ -48,40 +47,42 @@ public SimpleTcpClient(String host, int port, XDR request, 
Boolean oneShot) {
     this.request = request;
     this.oneShot = oneShot;
   }
-  
-  protected ChannelPipelineFactory setPipelineFactory() {
-    this.pipelineFactory = new ChannelPipelineFactory() {
+
+  protected ChannelInitializer<SocketChannel> setChannelHandler() {
+    return new ChannelInitializer<SocketChannel>() {
       @Override
-      public ChannelPipeline getPipeline() {
-        return Channels.pipeline(
+      protected void initChannel(SocketChannel ch) throws Exception {
+        ChannelPipeline p = ch.pipeline();
+        p.addLast(
             RpcUtil.constructRpcFrameDecoder(),
-            new SimpleTcpClientHandler(request));
+            new SimpleTcpClientHandler(request)
+        );
       }
     };
-    return this.pipelineFactory;
   }
 
   public void run() {
     // Configure the client.
-    ChannelFactory factory = new NioClientSocketChannelFactory(
-        Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), 1, 
1);
-    ClientBootstrap bootstrap = new ClientBootstrap(factory);
-
-    // Set up the pipeline factory.
-    bootstrap.setPipelineFactory(setPipelineFactory());
+    NioEventLoopGroup workerGroup = new NioEventLoopGroup();
+    Bootstrap bootstrap = new Bootstrap()
+        .group(workerGroup)
+        .channel(NioSocketChannel.class);
 
-    bootstrap.setOption("tcpNoDelay", true);
-    bootstrap.setOption("keepAlive", true);
+    try {
+      ChannelFuture future = bootstrap.handler(setChannelHandler())
+          .option(ChannelOption.TCP_NODELAY, true)
+          .option(ChannelOption.SO_KEEPALIVE, true)
+          .connect(new InetSocketAddress(host, port)).sync();
 
-    // Start the connection attempt.
-    ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, 
port));
+      if (oneShot) {
+        // Wait until the connection is closed or the connection attempt fails.
+        future.channel().closeFuture().sync();
 
-    if (oneShot) {
-      // Wait until the connection is closed or the connection attempt fails.
-      future.getChannel().getCloseFuture().awaitUninterruptibly();
-
-      // Shut down thread pools to exit.
-      bootstrap.releaseExternalResources();
+        // Shut down thread pools to exit.
+        workerGroup.shutdownGracefully();

Review comment:
       This should be moved to a finally block.  Otherwise, it won't be 
shutdown in case of exceptions.

##########
File path: 
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcResponse.java
##########
@@ -19,15 +19,33 @@
 
 import java.net.SocketAddress;
 
-import org.jboss.netty.buffer.ChannelBuffer;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.DefaultAddressedEnvelope;
 
 /**
  * RpcResponse encapsulates a response to a RPC request. It contains the data
  * that is going to cross the wire, as well as the information of the remote
  * peer.
  */
-public class RpcResponse {
-  private final ChannelBuffer data;
+public class RpcResponse extends
+    DefaultAddressedEnvelope<ByteBuf, SocketAddress> {
+  public RpcResponse(ByteBuf message, SocketAddress recipient) {
+    super(message, recipient, null);
+  }
+
+  public RpcResponse(ByteBuf message, SocketAddress recipient,
+      SocketAddress sender) {
+    super(message, recipient, sender);
+  }
+
+  public ByteBuf data() {
+    return this.content();
+  }
+
+  public SocketAddress remoteAddress() {
+    return this.recipient();
+  }
+  /*private final ChannelBuffer data;

Review comment:
       Let's remove the old code?

##########
File path: 
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClientHandler.java
##########
@@ -39,27 +37,27 @@ public SimpleTcpClientHandler(XDR request) {
   }
 
   @Override
-  public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) 
{
+  public void channelActive(ChannelHandlerContext ctx) throws Exception {
     // Send the request
     if (LOG.isDebugEnabled()) {
       LOG.debug("sending PRC request");
     }
-    ChannelBuffer outBuf = XDR.writeMessageTcp(request, true);
-    e.getChannel().write(outBuf);
+    ByteBuf outBuf = XDR.writeMessageTcp(request, true);
+    ctx.channel().writeAndFlush(outBuf);
   }
 
   /**
    * Shutdown connection by default. Subclass can override this method to do
    * more interaction with the server.
    */
   @Override
-  public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
-    e.getChannel().close();
+  public void channelRead(ChannelHandlerContext ctx, Object msg) {
+    ctx.channel().closeFuture().awaitUninterruptibly();

Review comment:
       If should call close() instead of closeFuture() since closeFuture() just 
returns the future but won't close the channel.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 581671)
    Time Spent: 1h 50m  (was: 1h 40m)

> Update NFS gateway to use Netty4
> --------------------------------
>
>                 Key: HADOOP-11245
>                 URL: https://issues.apache.org/jira/browse/HADOOP-11245
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: nfs
>            Reporter: Brandon Li
>            Assignee: Wei-Chiu Chuang
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 1h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to