This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new c6820f23e send async,time calculation rules Multiple method patches
(#4881)
c6820f23e is described below
commit c6820f23eb008af1eb250d8826f59891898975ae
Author: zhaows <[email protected]>
AuthorDate: Thu Sep 1 09:44:27 2022 +0800
send async,time calculation rules Multiple method patches (#4881)
* Clear out meaningless changes
* merge upstream/dev
* merge upstream/dev
* send async,time calculation rules Multiple method patches
* Clear out meaningless changes
* merge upstream/dev
* merge upstream/dev
* send async,time calculation rules Multiple method patches
---
.../impl/producer/DefaultMQProducerImpl.java | 36 ++++++++--------------
1 file changed, 12 insertions(+), 24 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 739ab0a73..304bee9de 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -1040,7 +1040,6 @@ public class DefaultMQProducerImpl implements
MQProducerInner {
@Deprecated
public void send(final Message msg, final MessageQueue mq, final
SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException {
- final long beginStartTime = System.currentTimeMillis();
ExecutorService executor = this.getAsyncSenderExecutor();
try {
executor.submit(new Runnable() {
@@ -1053,16 +1052,11 @@ public class DefaultMQProducerImpl implements
MQProducerInner {
if (!msg.getTopic().equals(mq.getTopic())) {
throw new MQClientException("Topic of the message
does not match its target message queue", null);
}
- long costTime = System.currentTimeMillis() -
beginStartTime;
- if (timeout > costTime) {
- try {
- sendKernelImpl(msg, mq,
CommunicationMode.ASYNC, sendCallback, null,
- timeout - costTime);
- } catch (MQBrokerException e) {
- throw new MQClientException("unknown
exception", e);
- }
- } else {
- sendCallback.onException(new
RemotingTooMuchRequestException("call timeout"));
+ try {
+ sendKernelImpl(msg, mq, CommunicationMode.ASYNC,
sendCallback, null,
+ timeout);
+ } catch (MQBrokerException e) {
+ throw new MQClientException("unknown exception",
e);
}
} catch (Exception e) {
sendCallback.onException(e);
@@ -1171,26 +1165,20 @@ public class DefaultMQProducerImpl implements
MQProducerInner {
public void send(final Message msg, final MessageQueueSelector selector,
final Object arg,
final SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException {
- final long beginStartTime = System.currentTimeMillis();
ExecutorService executor = this.getAsyncSenderExecutor();
try {
executor.submit(new Runnable() {
@Override
public void run() {
- long costTime = System.currentTimeMillis() -
beginStartTime;
- if (timeout > costTime) {
+ try {
try {
- try {
- sendSelectImpl(msg, selector, arg,
CommunicationMode.ASYNC, sendCallback,
- timeout - costTime);
- } catch (MQBrokerException e) {
- throw new MQClientException("unknown
exception", e);
- }
- } catch (Exception e) {
- sendCallback.onException(e);
+ sendSelectImpl(msg, selector, arg,
CommunicationMode.ASYNC, sendCallback,
+ timeout);
+ } catch (MQBrokerException e) {
+ throw new MQClientException("unknown exception",
e);
}
- } else {
- sendCallback.onException(new
RemotingTooMuchRequestException("call timeout"));
+ } catch (Exception e) {
+ sendCallback.onException(e);
}
}