This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new c6820f23e send async,time calculation rules Multiple method patches 
(#4881)
c6820f23e is described below

commit c6820f23eb008af1eb250d8826f59891898975ae
Author: zhaows <[email protected]>
AuthorDate: Thu Sep 1 09:44:27 2022 +0800

    send async,time calculation rules Multiple method patches (#4881)
    
    * Clear out meaningless changes
    
    * merge upstream/dev
    
    * merge upstream/dev
    
    * send async,time calculation rules Multiple method patches
    
    * Clear out meaningless changes
    
    * merge upstream/dev
    
    * merge upstream/dev
    
    * send async,time calculation rules Multiple method patches
---
 .../impl/producer/DefaultMQProducerImpl.java       | 36 ++++++++--------------
 1 file changed, 12 insertions(+), 24 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 739ab0a73..304bee9de 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -1040,7 +1040,6 @@ public class DefaultMQProducerImpl implements 
MQProducerInner {
     @Deprecated
     public void send(final Message msg, final MessageQueue mq, final 
SendCallback sendCallback, final long timeout)
         throws MQClientException, RemotingException, InterruptedException {
-        final long beginStartTime = System.currentTimeMillis();
         ExecutorService executor = this.getAsyncSenderExecutor();
         try {
             executor.submit(new Runnable() {
@@ -1053,16 +1052,11 @@ public class DefaultMQProducerImpl implements 
MQProducerInner {
                         if (!msg.getTopic().equals(mq.getTopic())) {
                             throw new MQClientException("Topic of the message 
does not match its target message queue", null);
                         }
-                        long costTime = System.currentTimeMillis() - 
beginStartTime;
-                        if (timeout > costTime) {
-                            try {
-                                sendKernelImpl(msg, mq, 
CommunicationMode.ASYNC, sendCallback, null,
-                                    timeout - costTime);
-                            } catch (MQBrokerException e) {
-                                throw new MQClientException("unknown 
exception", e);
-                            }
-                        } else {
-                            sendCallback.onException(new 
RemotingTooMuchRequestException("call timeout"));
+                        try {
+                            sendKernelImpl(msg, mq, CommunicationMode.ASYNC, 
sendCallback, null,
+                                    timeout);
+                        } catch (MQBrokerException e) {
+                            throw new MQClientException("unknown exception", 
e);
                         }
                     } catch (Exception e) {
                         sendCallback.onException(e);
@@ -1171,26 +1165,20 @@ public class DefaultMQProducerImpl implements 
MQProducerInner {
     public void send(final Message msg, final MessageQueueSelector selector, 
final Object arg,
         final SendCallback sendCallback, final long timeout)
         throws MQClientException, RemotingException, InterruptedException {
-        final long beginStartTime = System.currentTimeMillis();
         ExecutorService executor = this.getAsyncSenderExecutor();
         try {
             executor.submit(new Runnable() {
                 @Override
                 public void run() {
-                    long costTime = System.currentTimeMillis() - 
beginStartTime;
-                    if (timeout > costTime) {
+                    try {
                         try {
-                            try {
-                                sendSelectImpl(msg, selector, arg, 
CommunicationMode.ASYNC, sendCallback,
-                                    timeout - costTime);
-                            } catch (MQBrokerException e) {
-                                throw new MQClientException("unknown 
exception", e);
-                            }
-                        } catch (Exception e) {
-                            sendCallback.onException(e);
+                            sendSelectImpl(msg, selector, arg, 
CommunicationMode.ASYNC, sendCallback,
+                                    timeout);
+                        } catch (MQBrokerException e) {
+                            throw new MQClientException("unknown exception", 
e);
                         }
-                    } else {
-                        sendCallback.onException(new 
RemotingTooMuchRequestException("call timeout"));
+                    } catch (Exception e) {
+                        sendCallback.onException(e);
                     }
                 }
 

Reply via email to