This is an automated email from the ASF dual-hosted git repository.
earthchen pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.2 by this push:
new 00d8a46cd5 support catch actual exception (#12446)
00d8a46cd5 is described below
commit 00d8a46cd54dff3687f32ade67c0bcddc3285e07
Author: icodening <[email protected]>
AuthorDate: Wed Jun 7 11:01:33 2023 +0800
support catch actual exception (#12446)
---
.../rpc/protocol/tri/stream/TripleClientStream.java | 3 +--
.../rpc/protocol/tri/transport/TripleWriteQueue.java | 18 ++++++++++++++----
2 files changed, 15 insertions(+), 6 deletions(-)
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStream.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStream.java
index ab5816a34a..99f993566d 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStream.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStream.java
@@ -120,7 +120,6 @@ public class TripleClientStream extends AbstractStream
implements ClientStream {
Channel channel = ctx.channel();
channel.pipeline().addLast(new
TripleCommandOutBoundHandler());
channel.pipeline().addLast(new
TripleHttp2ClientResponseHandler(createTransportListener()));
- channel.closeFuture().addListener(f ->
transportException(f.cause()));
}
});
CreateStreamQueueCommand cmd =
CreateStreamQueueCommand.create(bootstrap, streamChannelFuture);
@@ -149,7 +148,7 @@ public class TripleClientStream extends AbstractStream
implements ClientStream {
private void transportException(Throwable cause) {
final TriRpcStatus status =
TriRpcStatus.INTERNAL.withDescription("Http2 exception")
.withCause(cause);
- listener.onComplete(status, null);
+ listener.onComplete(status, null, null, false);
}
public ChannelFuture cancelByLocal(TriRpcStatus status) {
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleWriteQueue.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleWriteQueue.java
index acbe859267..c659e9e708 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleWriteQueue.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleWriteQueue.java
@@ -23,6 +23,7 @@ import io.netty.channel.ChannelPromise;
import org.apache.dubbo.common.BatchExecutorQueue;
import org.apache.dubbo.rpc.protocol.tri.command.QueuedCommand;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
public class TripleWriteQueue extends BatchExecutorQueue<QueuedCommand> {
@@ -55,13 +56,22 @@ public class TripleWriteQueue extends
BatchExecutorQueue<QueuedCommand> {
@Override
protected void prepare(QueuedCommand item) {
- item.run(item.channel());
+ try {
+ Channel channel = item.channel();
+ item.run(channel);
+ } catch (CompletionException e) {
+ item.promise().tryFailure(e.getCause());
+ }
}
@Override
protected void flush(QueuedCommand item) {
- Channel channel = item.channel();
- item.run(channel);
- channel.flush();
+ try {
+ Channel channel = item.channel();
+ item.run(channel);
+ channel.flush();
+ } catch (CompletionException e) {
+ item.promise().tryFailure(e.getCause());
+ }
}
}