This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch branch-3.3.5
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.3.5 by this push:
     new df4812df65d HADOOP-18569. NFS Gateway may release buffer too early 
(#5207)
df4812df65d is described below

commit df4812df65d01889ba93bce1415e01461500208d
Author: Doroszlai, Attila <6454655+adorosz...@users.noreply.github.com>
AuthorDate: Mon Dec 12 19:02:49 2022 +0100

    HADOOP-18569. NFS Gateway may release buffer too early (#5207)
---
 .../src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java  | 13 +++++++++++--
 .../src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java     |  8 +++++---
 2 files changed, 16 insertions(+), 5 deletions(-)

diff --git 
a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java
 
b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java
index 8b8d558255f..cff6693f6aa 100644
--- 
a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java
+++ 
b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java
@@ -26,6 +26,7 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.util.ReferenceCountUtil;
 import 
org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.oncrpc.RpcAcceptedReply.AcceptState;
 import org.apache.hadoop.oncrpc.security.VerifierNone;
@@ -163,8 +164,16 @@ public abstract class RpcProgram extends 
ChannelInboundHandlerAdapter {
   public void channelRead(ChannelHandlerContext ctx, Object msg)
       throws Exception {
     RpcInfo info = (RpcInfo) msg;
+    try {
+      channelRead(ctx, info);
+    } finally {
+      ReferenceCountUtil.release(info.data());
+    }
+  }
+
+  private void channelRead(ChannelHandlerContext ctx, RpcInfo info)
+      throws Exception {
     RpcCall call = (RpcCall) info.header();
-    
     SocketAddress remoteAddress = info.remoteAddress();
     if (LOG.isTraceEnabled()) {
       LOG.trace(program + " procedure #" + call.getProcedure());
@@ -256,4 +265,4 @@ public abstract class RpcProgram extends 
ChannelInboundHandlerAdapter {
   public int getPortmapUdpTimeoutMillis() {
     return portmapUdpTimeoutMillis;
   }
-}
\ No newline at end of file
+}
diff --git 
a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java
 
b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java
index e8bc27d687f..784e8c79618 100644
--- 
a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java
+++ 
b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java
@@ -129,15 +129,17 @@ public final class RpcUtil {
       RpcInfo info = null;
       try {
         RpcCall callHeader = RpcCall.read(in);
-        ByteBuf dataBuffer = Unpooled.wrappedBuffer(in.buffer()
-            .slice());
+        ByteBuf dataBuffer = buf.slice(b.position(), b.remaining());
 
         info = new RpcInfo(callHeader, dataBuffer, ctx, ctx.channel(),
             remoteAddress);
       } catch (Exception exc) {
         LOG.info("Malformed RPC request from " + remoteAddress);
       } finally {
-        buf.release();
+        // only release buffer if it is not passed to downstream handler
+        if (info == null) {
+          buf.release();
+        }
       }
 
       if (info != null) {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to