Apache9 commented on code in PR #5631:
URL: https://github.com/apache/hbase/pull/5631#discussion_r1477027843


##########
hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java:
##########
@@ -127,88 +120,15 @@ public void write(ChannelHandlerContext ctx, Object msg, 
ChannelPromise promise)
     }
   }
 
-  private void finishCall(ResponseHeader responseHeader, ByteBufInputStream 
in, Call call)
-    throws IOException {
-    Message value;
-    if (call.responseDefaultType != null) {
-      Message.Builder builder = call.responseDefaultType.newBuilderForType();
-      if (!builder.mergeDelimitedFrom(in)) {
-        // The javadoc of mergeDelimitedFrom says returning false means the 
stream reaches EOF
-        // before reading any bytes out, so here we need to manually finish 
create the EOFException
-        // and finish the call
-        call.setException(new EOFException("EOF while reading response with 
type: "
-          + call.responseDefaultType.getClass().getName()));
-        return;
-      }
-      value = builder.build();
-    } else {
-      value = null;
-    }
-    CellScanner cellBlockScanner;
-    if (responseHeader.hasCellBlockMeta()) {
-      int size = responseHeader.getCellBlockMeta().getLength();
-      // Maybe we could read directly from the ByteBuf.
-      // The problem here is that we do not know when to release it.
-      byte[] cellBlock = new byte[size];
-      in.readFully(cellBlock);
-      cellBlockScanner = cellBlockBuilder.createCellScanner(this.codec, 
this.compressor, cellBlock);
-    } else {
-      cellBlockScanner = null;
-    }
-    call.setResponse(value, cellBlockScanner);
-  }
-
   private void readResponse(ChannelHandlerContext ctx, ByteBuf buf) throws 
IOException {
-    int totalSize = buf.readInt();
-    ByteBufInputStream in = new ByteBufInputStream(buf);
-    ResponseHeader responseHeader = ResponseHeader.parseDelimitedFrom(in);
-    int id = responseHeader.getCallId();
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("got response header " + 
TextFormat.shortDebugString(responseHeader)
-        + ", totalSize: " + totalSize + " bytes");
-    }
-    RemoteException remoteExc;
-    if (responseHeader.hasException()) {
-      ExceptionResponse exceptionResponse = responseHeader.getException();
-      remoteExc = IPCUtil.createRemoteException(exceptionResponse);
-      if (IPCUtil.isFatalConnectionException(exceptionResponse)) {
-        // Here we will cleanup all calls so do not need to fall back, just 
return.
-        exceptionCaught(ctx, remoteExc);
-        return;
-      }
-    } else {
-      remoteExc = null;
-    }
-    Call call = id2Call.remove(id);
-    if (call == null) {
-      // So we got a response for which we have no corresponding 'call' here 
on the client-side.
-      // We probably timed out waiting, cleaned up all references, and now the 
server decides
-      // to return a response. There is nothing we can do w/ the response at 
this stage. Clean
-      // out the wire of the response so its out of the way and we can get 
other responses on
-      // this connection.
-      if (LOG.isDebugEnabled()) {
-        int readSoFar = 
IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader);
-        int whatIsLeftToRead = totalSize - readSoFar;
-        LOG.debug("Unknown callId: " + id + ", skipping over this response of 
" + whatIsLeftToRead
-          + " bytes");
-      }
-      return;
-    }
-    call.callStats.setResponseSizeBytes(totalSize);
-    if (remoteExc != null) {
-      call.setException(remoteExc);
-      return;
-    }
     try {
-      finishCall(responseHeader, in, call);
+      conn.readResponse(new ByteBufInputStream(buf), id2Call,
+        remoteExc -> exceptionCaught(ctx, remoteExc));
     } catch (IOException e) {
-      // As the call has been removed from id2Call map, if we hit an exception 
here, the
-      // exceptionCaught method can not help us finish the call, so here we 
need to catch the
-      // exception and finish it
-      // And in netty, the decoding the frame based, when reaching here we 
have already read a full
+      // In netty, the decoding the frame based, when reaching here we have 
already read a full
       // frame, so hitting exception here does not mean the stream decoding is 
broken, thus we do
       // not need to throw the exception out and close the connection.
-      call.setException(e);
+      LOG.warn("failed to process response", e);

Review Comment:
   This is already done in RpcConnection.readResponse method, you can check the 
code there. It will catch the exception for finishCall method and call the 
setException method. And then it will throw the exception out, for 
BlockingRpcClient we may close the connetion and for netty, we will just ignore 
it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to