This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 2d44ec897c [ISSUE #8261] Avoid unnecessary waiting when a response is
successfully returned (#8272)
2d44ec897c is described below
commit 2d44ec897c5ff9e1ce46a8ac8765c5cf493c7ac6
Author: hqbfz <[email protected]>
AuthorDate: Mon Jul 29 18:55:55 2024 +0800
[ISSUE #8261] Avoid unnecessary waiting when a response is successfully
returned (#8272)
---
.../apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java | 5 ++++-
.../org/apache/rocketmq/client/producer/RequestResponseFuture.java | 4 ++++
2 files changed, 8 insertions(+), 1 deletion(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 7ef3402513..0e70ee2595 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -969,7 +969,7 @@ public class DefaultMQProducerImpl implements
MQProducerInner {
boolean messageCloned = false;
if (msgBodyCompressed) {
//If msg body was compressed, msgbody should be
reset using prevBody.
- //Clone new message using commpressed message body
and recover origin massage.
+ //Clone new message using compressed message body
and recover origin massage.
//Fix
bug:https://github.com/apache/rocketmq-externals/issues/66
tmpMessage = MessageAccessor.cloneMessage(msg);
messageCloned = true;
@@ -1538,6 +1538,7 @@ public class DefaultMQProducerImpl implements
MQProducerInner {
@Override
public void onSuccess(SendResult sendResult) {
requestResponseFuture.setSendRequestOk(true);
+ requestResponseFuture.acquireCountDownLatch();
}
@Override
@@ -1595,6 +1596,7 @@ public class DefaultMQProducerImpl implements
MQProducerInner {
@Override
public void onSuccess(SendResult sendResult) {
requestResponseFuture.setSendRequestOk(true);
+ requestResponseFuture.acquireCountDownLatch();
}
@Override
@@ -1652,6 +1654,7 @@ public class DefaultMQProducerImpl implements
MQProducerInner {
@Override
public void onSuccess(SendResult sendResult) {
requestResponseFuture.setSendRequestOk(true);
+ requestResponseFuture.acquireCountDownLatch();
}
@Override
diff --git
a/client/src/main/java/org/apache/rocketmq/client/producer/RequestResponseFuture.java
b/client/src/main/java/org/apache/rocketmq/client/producer/RequestResponseFuture.java
index e66c08fdc5..203f92907a 100644
---
a/client/src/main/java/org/apache/rocketmq/client/producer/RequestResponseFuture.java
+++
b/client/src/main/java/org/apache/rocketmq/client/producer/RequestResponseFuture.java
@@ -107,6 +107,10 @@ public class RequestResponseFuture {
this.sendRequestOk = sendRequestOk;
}
+ public void acquireCountDownLatch() {
+ this.countDownLatch.countDown();
+ }
+
public Message getRequestMsg() {
return requestMsg;
}