This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 31d10385d1 [ISSUE #7289] Fixed asynchronous send backpressure
capability
31d10385d1 is described below
commit 31d10385d1616445478104ce9ef463a8c4852ba2
Author: guyinyou <[email protected]>
AuthorDate: Mon Sep 4 14:09:32 2023 +0800
[ISSUE #7289] Fixed asynchronous send backpressure capability
Co-authored-by: guyinyou <[email protected]>
---
.../impl/producer/DefaultMQProducerImpl.java | 77 +++++++++++++++-------
1 file changed, 53 insertions(+), 24 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index bbbb17b07a..2d6b83ac2c 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -547,6 +547,8 @@ public class DefaultMQProducerImpl implements
MQProducerInner {
@Deprecated
public void send(final Message msg, final SendCallback sendCallback, final
long timeout)
throws MQClientException, RemotingException, InterruptedException {
+ BackpressureSendCallBack newCallBack = new
BackpressureSendCallBack(sendCallback);
+
final long beginStartTime = System.currentTimeMillis();
Runnable runnable = new Runnable() {
@Override
@@ -554,20 +556,53 @@ public class DefaultMQProducerImpl implements
MQProducerInner {
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeout > costTime) {
try {
- sendDefaultImpl(msg, CommunicationMode.ASYNC,
sendCallback, timeout - costTime);
+ sendDefaultImpl(msg, CommunicationMode.ASYNC,
newCallBack, timeout - costTime);
} catch (Exception e) {
- sendCallback.onException(e);
+ newCallBack.onException(e);
}
} else {
- sendCallback.onException(
+ newCallBack.onException(
new RemotingTooMuchRequestException("DEFAULT ASYNC
send call timeout"));
}
}
};
- executeAsyncMessageSend(runnable, msg, sendCallback, timeout,
beginStartTime);
+ executeAsyncMessageSend(runnable, msg, newCallBack, timeout,
beginStartTime);
}
- public void executeAsyncMessageSend(Runnable runnable, final Message msg,
final SendCallback sendCallback,
+ class BackpressureSendCallBack implements SendCallback {
+ public boolean isSemaphoreAsyncSizeAquired = false;
+ public boolean isSemaphoreAsyncNumAquired = false;
+ public int msgLen;
+ private final SendCallback sendCallback;
+
+ public BackpressureSendCallBack(final SendCallback sendCallback) {
+ this.sendCallback = sendCallback;
+ }
+
+ @Override
+ public void onSuccess(SendResult sendResult) {
+ if (isSemaphoreAsyncSizeAquired) {
+ semaphoreAsyncSendSize.release(msgLen);
+ }
+ if (isSemaphoreAsyncNumAquired) {
+ semaphoreAsyncSendNum.release();
+ }
+ sendCallback.onSuccess(sendResult);
+ }
+
+ @Override
+ public void onException(Throwable e) {
+ if (isSemaphoreAsyncSizeAquired) {
+ semaphoreAsyncSendSize.release(msgLen);
+ }
+ if (isSemaphoreAsyncNumAquired) {
+ semaphoreAsyncSendNum.release();
+ }
+ sendCallback.onException(e);
+ }
+ }
+
+ public void executeAsyncMessageSend(Runnable runnable, final Message msg,
final BackpressureSendCallBack sendCallback,
final long timeout, final long beginStartTime)
throws MQClientException, InterruptedException {
ExecutorService executor = this.getAsyncSenderExecutor();
@@ -595,7 +630,9 @@ public class DefaultMQProducerImpl implements
MQProducerInner {
return;
}
}
-
+ sendCallback.isSemaphoreAsyncSizeAquired =
isSemaphoreAsyncSizeAquired;
+ sendCallback.isSemaphoreAsyncNumAquired =
isSemaphoreAsyncNumAquired;
+ sendCallback.msgLen = msgLen;
executor.submit(runnable);
} catch (RejectedExecutionException e) {
if (isEnableBackpressureForAsyncMode) {
@@ -603,15 +640,7 @@ public class DefaultMQProducerImpl implements
MQProducerInner {
} else {
throw new MQClientException("executor rejected ", e);
}
- } finally {
- if (isSemaphoreAsyncSizeAquired) {
- semaphoreAsyncSendSize.release(msgLen);
- }
- if (isSemaphoreAsyncNumAquired) {
- semaphoreAsyncSendNum.release();
- }
}
-
}
public MessageQueue invokeMessageQueueSelector(Message msg,
MessageQueueSelector selector, Object arg,
@@ -1188,7 +1217,7 @@ public class DefaultMQProducerImpl implements
MQProducerInner {
@Deprecated
public void send(final Message msg, final MessageQueue mq, final
SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException {
-
+ BackpressureSendCallBack newCallBack = new
BackpressureSendCallBack(sendCallback);
final long beginStartTime = System.currentTimeMillis();
Runnable runnable = new Runnable() {
@Override
@@ -1203,22 +1232,22 @@ public class DefaultMQProducerImpl implements
MQProducerInner {
long costTime = System.currentTimeMillis() -
beginStartTime;
if (timeout > costTime) {
try {
- sendKernelImpl(msg, mq, CommunicationMode.ASYNC,
sendCallback, null,
+ sendKernelImpl(msg, mq, CommunicationMode.ASYNC,
newCallBack, null,
timeout - costTime);
} catch (MQBrokerException e) {
throw new MQClientException("unknown exception",
e);
}
} else {
- sendCallback.onException(new
RemotingTooMuchRequestException("call timeout"));
+ newCallBack.onException(new
RemotingTooMuchRequestException("call timeout"));
}
} catch (Exception e) {
- sendCallback.onException(e);
+ newCallBack.onException(e);
}
}
};
- executeAsyncMessageSend(runnable, msg, sendCallback, timeout,
beginStartTime);
+ executeAsyncMessageSend(runnable, msg, newCallBack, timeout,
beginStartTime);
}
/**
@@ -1315,7 +1344,7 @@ public class DefaultMQProducerImpl implements
MQProducerInner {
public void send(final Message msg, final MessageQueueSelector selector,
final Object arg,
final SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException {
-
+ BackpressureSendCallBack newCallBack = new
BackpressureSendCallBack(sendCallback);
final long beginStartTime = System.currentTimeMillis();
Runnable runnable = new Runnable() {
@Override
@@ -1324,21 +1353,21 @@ public class DefaultMQProducerImpl implements
MQProducerInner {
if (timeout > costTime) {
try {
try {
- sendSelectImpl(msg, selector, arg,
CommunicationMode.ASYNC, sendCallback,
+ sendSelectImpl(msg, selector, arg,
CommunicationMode.ASYNC, newCallBack,
timeout - costTime);
} catch (MQBrokerException e) {
throw new MQClientException("unknown exception",
e);
}
} catch (Exception e) {
- sendCallback.onException(e);
+ newCallBack.onException(e);
}
} else {
- sendCallback.onException(new
RemotingTooMuchRequestException("call timeout"));
+ newCallBack.onException(new
RemotingTooMuchRequestException("call timeout"));
}
}
};
- executeAsyncMessageSend(runnable, msg, sendCallback, timeout,
beginStartTime);
+ executeAsyncMessageSend(runnable, msg, newCallBack, timeout,
beginStartTime);
}
/**