This is an automated email from the ASF dual-hosted git repository.

earthchen pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.2 by this push:
     new 00d8a46cd5 support catch actual exception (#12446)
00d8a46cd5 is described below

commit 00d8a46cd54dff3687f32ade67c0bcddc3285e07
Author: icodening <[email protected]>
AuthorDate: Wed Jun 7 11:01:33 2023 +0800

    support catch actual exception (#12446)
---
 .../rpc/protocol/tri/stream/TripleClientStream.java    |  3 +--
 .../rpc/protocol/tri/transport/TripleWriteQueue.java   | 18 ++++++++++++++----
 2 files changed, 15 insertions(+), 6 deletions(-)

diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStream.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStream.java
index ab5816a34a..99f993566d 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStream.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStream.java
@@ -120,7 +120,6 @@ public class TripleClientStream extends AbstractStream 
implements ClientStream {
                     Channel channel = ctx.channel();
                     channel.pipeline().addLast(new 
TripleCommandOutBoundHandler());
                     channel.pipeline().addLast(new 
TripleHttp2ClientResponseHandler(createTransportListener()));
-                    channel.closeFuture().addListener(f -> 
transportException(f.cause()));
                 }
             });
         CreateStreamQueueCommand cmd = 
CreateStreamQueueCommand.create(bootstrap, streamChannelFuture);
@@ -149,7 +148,7 @@ public class TripleClientStream extends AbstractStream 
implements ClientStream {
     private void transportException(Throwable cause) {
         final TriRpcStatus status = 
TriRpcStatus.INTERNAL.withDescription("Http2 exception")
             .withCause(cause);
-        listener.onComplete(status, null);
+        listener.onComplete(status, null, null, false);
     }
 
     public ChannelFuture cancelByLocal(TriRpcStatus status) {
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleWriteQueue.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleWriteQueue.java
index acbe859267..c659e9e708 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleWriteQueue.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleWriteQueue.java
@@ -23,6 +23,7 @@ import io.netty.channel.ChannelPromise;
 import org.apache.dubbo.common.BatchExecutorQueue;
 import org.apache.dubbo.rpc.protocol.tri.command.QueuedCommand;
 
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 
 public class TripleWriteQueue extends BatchExecutorQueue<QueuedCommand> {
@@ -55,13 +56,22 @@ public class TripleWriteQueue extends 
BatchExecutorQueue<QueuedCommand> {
 
     @Override
     protected void prepare(QueuedCommand item) {
-        item.run(item.channel());
+        try {
+            Channel channel = item.channel();
+            item.run(channel);
+        } catch (CompletionException e) {
+            item.promise().tryFailure(e.getCause());
+        }
     }
 
     @Override
     protected void flush(QueuedCommand item) {
-        Channel channel = item.channel();
-        item.run(channel);
-        channel.flush();
+        try {
+            Channel channel = item.channel();
+            item.run(channel);
+            channel.flush();
+        } catch (CompletionException e) {
+            item.promise().tryFailure(e.getCause());
+        }
     }
 }

Reply via email to