This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 14969aa  feat(broker) prevent blocked by group transfer service
     new dd822ae  Merge pull request #1881 from duhenglucky/issue_1879
14969aa is described below

commit 14969aa389f1c87d362504a3899c19ed37a3ee51
Author: duhenglucky <[email protected]>
AuthorDate: Tue Mar 24 12:08:34 2020 +0800

    feat(broker) prevent blocked by group transfer service
---
 .../src/main/java/org/apache/rocketmq/broker/BrokerController.java   | 5 +++--
 .../org/apache/rocketmq/broker/processor/SendMessageProcessor.java   | 2 +-
 2 files changed, 4 insertions(+), 3 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java 
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 85009d6..194f285 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -1240,6 +1240,7 @@ public class BrokerController {
         }
     }
 
-
-
+    public ExecutorService getSendMessageExecutor() {
+        return sendMessageExecutor;
+    }
 }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index 801d886..4dc311d 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -79,7 +79,7 @@ public class SendMessageProcessor extends 
AbstractSendMessageProcessor implement
 
     @Override
     public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand 
request, RemotingResponseCallback responseCallback) throws Exception {
-        asyncProcessRequest(ctx, 
request).thenAccept(responseCallback::callback);
+        asyncProcessRequest(ctx, 
request).thenAcceptAsync(responseCallback::callback, 
this.brokerController.getSendMessageExecutor());
     }
 
     public CompletableFuture<RemotingCommand> 
asyncProcessRequest(ChannelHandlerContext ctx,

Reply via email to