This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-3 by this push:
     new 9823a6157e7 HBASE-28101 Addendum do not throw EOFException out 
directly (#5431)
9823a6157e7 is described below

commit 9823a6157e7c9c5561a38d702bc68f429a85afd5
Author: Duo Zhang <[email protected]>
AuthorDate: Sun Sep 24 16:14:40 2023 +0800

    HBASE-28101 Addendum do not throw EOFException out directly (#5431)
    
    Signed-off-by: Nihal Jain <[email protected]>
    (cherry picked from commit 4b76a95e032a0426f34a979dd605913ee8bb8d2c)
---
 .../hadoop/hbase/ipc/NettyRpcDuplexHandler.java    | 65 ++++++++++++++--------
 1 file changed, 41 insertions(+), 24 deletions(-)

diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java
index 34869314bab..47b0b29a5c6 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java
@@ -126,6 +126,37 @@ class NettyRpcDuplexHandler extends ChannelDuplexHandler {
     }
   }
 
+  private void finishCall(ResponseHeader responseHeader, ByteBufInputStream 
in, Call call)
+    throws IOException {
+    Message value;
+    if (call.responseDefaultType != null) {
+      Message.Builder builder = call.responseDefaultType.newBuilderForType();
+      if (!builder.mergeDelimitedFrom(in)) {
+        // The javadoc of mergeDelimitedFrom says returning false means the 
stream reaches EOF
+        // before reading any bytes out, so here we need to manually finish 
create the EOFException
+        // and finish the call
+        call.setException(new EOFException("EOF while reading response with 
type: "
+          + call.responseDefaultType.getClass().getName()));
+        return;
+      }
+      value = builder.build();
+    } else {
+      value = null;
+    }
+    CellScanner cellBlockScanner;
+    if (responseHeader.hasCellBlockMeta()) {
+      int size = responseHeader.getCellBlockMeta().getLength();
+      // Maybe we could read directly from the ByteBuf.
+      // The problem here is that we do not know when to release it.
+      byte[] cellBlock = new byte[size];
+      in.readFully(cellBlock);
+      cellBlockScanner = cellBlockBuilder.createCellScanner(this.codec, 
this.compressor, cellBlock);
+    } else {
+      cellBlockScanner = null;
+    }
+    call.setResponse(value, cellBlockScanner);
+  }
+
   private void readResponse(ChannelHandlerContext ctx, ByteBuf buf) throws 
IOException {
     int totalSize = buf.readInt();
     ByteBufInputStream in = new ByteBufInputStream(buf);
@@ -166,31 +197,17 @@ class NettyRpcDuplexHandler extends ChannelDuplexHandler {
       call.setException(remoteExc);
       return;
     }
-    Message value;
-    if (call.responseDefaultType != null) {
-      Message.Builder builder = call.responseDefaultType.newBuilderForType();
-      if (!builder.mergeDelimitedFrom(in)) {
-        // The javadoc of mergeDelimitedFrom says returning false means the 
stream reaches EOF
-        // before reading any bytes out, so here we need to manually throw the 
EOFException out
-        throw new EOFException(
-          "EOF while reading response with type: " + 
call.responseDefaultType.getClass().getName());
-      }
-      value = builder.build();
-    } else {
-      value = null;
-    }
-    CellScanner cellBlockScanner;
-    if (responseHeader.hasCellBlockMeta()) {
-      int size = responseHeader.getCellBlockMeta().getLength();
-      // Maybe we could read directly from the ByteBuf.
-      // The problem here is that we do not know when to release it.
-      byte[] cellBlock = new byte[size];
-      buf.readBytes(cellBlock);
-      cellBlockScanner = cellBlockBuilder.createCellScanner(this.codec, 
this.compressor, cellBlock);
-    } else {
-      cellBlockScanner = null;
+    try {
+      finishCall(responseHeader, in, call);
+    } catch (IOException e) {
+      // As the call has been removed from id2Call map, if we hit an exception 
here, the
+      // exceptionCaught method can not help us finish the call, so here we 
need to catch the
+      // exception and finish it
+      // And in netty, the decoding the frame based, when reaching here we 
have already read a full
+      // frame, so hitting exception here does not mean the stream decoding is 
broken, thus we do
+      // not need to throw the exception out and close the connection.
+      call.setException(e);
     }
-    call.setResponse(value, cellBlockScanner);
   }
 
   @Override

Reply via email to