This is an automated email from the ASF dual-hosted git repository.

kaili pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 4f1b42a7c5 [ISSUE #7410] Handle the Exception when the Proxy requests 
the client
4f1b42a7c5 is described below

commit 4f1b42a7c5557bcadd6b9982a0c9bd876622c69e
Author: ShuangxiDing <[email protected]>
AuthorDate: Thu Sep 28 16:52:02 2023 +0800

    [ISSUE #7410] Handle the Exception when the Proxy requests the client
    
    Co-authored-by: 徒钟 <[email protected]>
---
 .../proxy/remoting/channel/RemotingChannel.java    | 23 ++++++++++++++++------
 1 file changed, 17 insertions(+), 6 deletions(-)

diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannel.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannel.java
index 40946cabf8..d755fdcc49 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannel.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannel.java
@@ -34,6 +34,7 @@ import org.apache.rocketmq.common.utils.NetworkUtil;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.proxy.common.channel.ChannelHelper;
+import org.apache.rocketmq.proxy.common.utils.ExceptionUtils;
 import org.apache.rocketmq.proxy.common.utils.FutureUtils;
 import org.apache.rocketmq.proxy.config.ConfigurationManager;
 import 
org.apache.rocketmq.proxy.processor.channel.ChannelExtendAttributeGetter;
@@ -158,10 +159,15 @@ public class RemotingChannel extends ProxyChannel 
implements RemoteChannelConver
                     if (response.getCode() == ResponseCode.SUCCESS) {
                         ConsumerRunningInfo consumerRunningInfo = 
ConsumerRunningInfo.decode(response.getBody(), ConsumerRunningInfo.class);
                         responseFuture.complete(new 
ProxyRelayResult<>(ResponseCode.SUCCESS, "", consumerRunningInfo));
+                    } else {
+                        String errMsg = String.format("get consumer running 
info failed, code:%s remark:%s", response.getCode(), response.getRemark());
+                        RuntimeException e = new RuntimeException(errMsg);
+                        responseFuture.completeExceptionally(e);
                     }
-                    String errMsg = String.format("get consumer running info 
failed, code:%s remark:%s", response.getCode(), response.getRemark());
-                    RuntimeException e = new RuntimeException(errMsg);
-                    responseFuture.completeExceptionally(e);
+                })
+                .exceptionally(t -> {
+                    
responseFuture.completeExceptionally(ExceptionUtils.getRealException(t));
+                    return null;
                 });
             return CompletableFuture.completedFuture(null);
         } catch (Throwable t) {
@@ -183,10 +189,15 @@ public class RemotingChannel extends ProxyChannel 
implements RemoteChannelConver
                     if (response.getCode() == ResponseCode.SUCCESS) {
                         ConsumeMessageDirectlyResult result = 
ConsumeMessageDirectlyResult.decode(response.getBody(), 
ConsumeMessageDirectlyResult.class);
                         responseFuture.complete(new 
ProxyRelayResult<>(ResponseCode.SUCCESS, "", result));
+                    } else {
+                        String errMsg = String.format("consume message 
directly failed, code:%s remark:%s", response.getCode(), response.getRemark());
+                        RuntimeException e = new RuntimeException(errMsg);
+                        responseFuture.completeExceptionally(e);
                     }
-                    String errMsg = String.format("consume message directly 
failed, code:%s remark:%s", response.getCode(), response.getRemark());
-                    RuntimeException e = new RuntimeException(errMsg);
-                    responseFuture.completeExceptionally(e);
+                })
+                .exceptionally(t -> {
+                    
responseFuture.completeExceptionally(ExceptionUtils.getRealException(t));
+                    return null;
                 });
             return CompletableFuture.completedFuture(null);
         } catch (Throwable t) {

Reply via email to