This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-3 by this push:
new 9823a6157e7 HBASE-28101 Addendum do not throw EOFException out
directly (#5431)
9823a6157e7 is described below
commit 9823a6157e7c9c5561a38d702bc68f429a85afd5
Author: Duo Zhang <[email protected]>
AuthorDate: Sun Sep 24 16:14:40 2023 +0800
HBASE-28101 Addendum do not throw EOFException out directly (#5431)
Signed-off-by: Nihal Jain <[email protected]>
(cherry picked from commit 4b76a95e032a0426f34a979dd605913ee8bb8d2c)
---
.../hadoop/hbase/ipc/NettyRpcDuplexHandler.java | 65 ++++++++++++++--------
1 file changed, 41 insertions(+), 24 deletions(-)
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java
index 34869314bab..47b0b29a5c6 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java
@@ -126,6 +126,37 @@ class NettyRpcDuplexHandler extends ChannelDuplexHandler {
}
}
+ private void finishCall(ResponseHeader responseHeader, ByteBufInputStream
in, Call call)
+ throws IOException {
+ Message value;
+ if (call.responseDefaultType != null) {
+ Message.Builder builder = call.responseDefaultType.newBuilderForType();
+ if (!builder.mergeDelimitedFrom(in)) {
+ // The javadoc of mergeDelimitedFrom says returning false means the
stream reaches EOF
+ // before reading any bytes out, so here we need to manually finish
create the EOFException
+ // and finish the call
+ call.setException(new EOFException("EOF while reading response with
type: "
+ + call.responseDefaultType.getClass().getName()));
+ return;
+ }
+ value = builder.build();
+ } else {
+ value = null;
+ }
+ CellScanner cellBlockScanner;
+ if (responseHeader.hasCellBlockMeta()) {
+ int size = responseHeader.getCellBlockMeta().getLength();
+ // Maybe we could read directly from the ByteBuf.
+ // The problem here is that we do not know when to release it.
+ byte[] cellBlock = new byte[size];
+ in.readFully(cellBlock);
+ cellBlockScanner = cellBlockBuilder.createCellScanner(this.codec,
this.compressor, cellBlock);
+ } else {
+ cellBlockScanner = null;
+ }
+ call.setResponse(value, cellBlockScanner);
+ }
+
private void readResponse(ChannelHandlerContext ctx, ByteBuf buf) throws
IOException {
int totalSize = buf.readInt();
ByteBufInputStream in = new ByteBufInputStream(buf);
@@ -166,31 +197,17 @@ class NettyRpcDuplexHandler extends ChannelDuplexHandler {
call.setException(remoteExc);
return;
}
- Message value;
- if (call.responseDefaultType != null) {
- Message.Builder builder = call.responseDefaultType.newBuilderForType();
- if (!builder.mergeDelimitedFrom(in)) {
- // The javadoc of mergeDelimitedFrom says returning false means the
stream reaches EOF
- // before reading any bytes out, so here we need to manually throw the
EOFException out
- throw new EOFException(
- "EOF while reading response with type: " +
call.responseDefaultType.getClass().getName());
- }
- value = builder.build();
- } else {
- value = null;
- }
- CellScanner cellBlockScanner;
- if (responseHeader.hasCellBlockMeta()) {
- int size = responseHeader.getCellBlockMeta().getLength();
- // Maybe we could read directly from the ByteBuf.
- // The problem here is that we do not know when to release it.
- byte[] cellBlock = new byte[size];
- buf.readBytes(cellBlock);
- cellBlockScanner = cellBlockBuilder.createCellScanner(this.codec,
this.compressor, cellBlock);
- } else {
- cellBlockScanner = null;
+ try {
+ finishCall(responseHeader, in, call);
+ } catch (IOException e) {
+ // As the call has been removed from id2Call map, if we hit an exception
here, the
+ // exceptionCaught method can not help us finish the call, so here we
need to catch the
+ // exception and finish it
+ // And in netty, the decoding the frame based, when reaching here we
have already read a full
+ // frame, so hitting exception here does not mean the stream decoding is
broken, thus we do
+ // not need to throw the exception out and close the connection.
+ call.setException(e);
}
- call.setResponse(value, cellBlockScanner);
}
@Override