This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 14969aa feat(broker) prevent blocked by group transfer service
new dd822ae Merge pull request #1881 from duhenglucky/issue_1879
14969aa is described below
commit 14969aa389f1c87d362504a3899c19ed37a3ee51
Author: duhenglucky <[email protected]>
AuthorDate: Tue Mar 24 12:08:34 2020 +0800
feat(broker) prevent blocked by group transfer service
---
.../src/main/java/org/apache/rocketmq/broker/BrokerController.java | 5 +++--
.../org/apache/rocketmq/broker/processor/SendMessageProcessor.java | 2 +-
2 files changed, 4 insertions(+), 3 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 85009d6..194f285 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -1240,6 +1240,7 @@ public class BrokerController {
}
}
-
-
+ public ExecutorService getSendMessageExecutor() {
+ return sendMessageExecutor;
+ }
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index 801d886..4dc311d 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -79,7 +79,7 @@ public class SendMessageProcessor extends
AbstractSendMessageProcessor implement
@Override
public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand
request, RemotingResponseCallback responseCallback) throws Exception {
- asyncProcessRequest(ctx,
request).thenAccept(responseCallback::callback);
+ asyncProcessRequest(ctx,
request).thenAcceptAsync(responseCallback::callback,
this.brokerController.getSendMessageExecutor());
}
public CompletableFuture<RemotingCommand>
asyncProcessRequest(ChannelHandlerContext ctx,