This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 31d10385d1 [ISSUE #7289] Fixed asynchronous send backpressure 
capability
31d10385d1 is described below

commit 31d10385d1616445478104ce9ef463a8c4852ba2
Author: guyinyou <[email protected]>
AuthorDate: Mon Sep 4 14:09:32 2023 +0800

    [ISSUE #7289] Fixed asynchronous send backpressure capability
    
    Co-authored-by: guyinyou <[email protected]>
---
 .../impl/producer/DefaultMQProducerImpl.java       | 77 +++++++++++++++-------
 1 file changed, 53 insertions(+), 24 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index bbbb17b07a..2d6b83ac2c 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -547,6 +547,8 @@ public class DefaultMQProducerImpl implements 
MQProducerInner {
     @Deprecated
     public void send(final Message msg, final SendCallback sendCallback, final 
long timeout)
         throws MQClientException, RemotingException, InterruptedException {
+        BackpressureSendCallBack newCallBack = new 
BackpressureSendCallBack(sendCallback);
+
         final long beginStartTime = System.currentTimeMillis();
         Runnable runnable = new Runnable() {
             @Override
@@ -554,20 +556,53 @@ public class DefaultMQProducerImpl implements 
MQProducerInner {
                 long costTime = System.currentTimeMillis() - beginStartTime;
                 if (timeout > costTime) {
                     try {
-                        sendDefaultImpl(msg, CommunicationMode.ASYNC, 
sendCallback, timeout - costTime);
+                        sendDefaultImpl(msg, CommunicationMode.ASYNC, 
newCallBack, timeout - costTime);
                     } catch (Exception e) {
-                        sendCallback.onException(e);
+                        newCallBack.onException(e);
                     }
                 } else {
-                    sendCallback.onException(
+                    newCallBack.onException(
                         new RemotingTooMuchRequestException("DEFAULT ASYNC 
send call timeout"));
                 }
             }
         };
-        executeAsyncMessageSend(runnable, msg, sendCallback, timeout, 
beginStartTime);
+        executeAsyncMessageSend(runnable, msg, newCallBack, timeout, 
beginStartTime);
     }
 
-    public void executeAsyncMessageSend(Runnable runnable, final Message msg, 
final SendCallback sendCallback,
+    class BackpressureSendCallBack implements SendCallback {
+        public boolean isSemaphoreAsyncSizeAquired = false;
+        public boolean isSemaphoreAsyncNumAquired = false;
+        public int msgLen;
+        private final SendCallback sendCallback;
+
+        public BackpressureSendCallBack(final SendCallback sendCallback) {
+            this.sendCallback = sendCallback;
+        }
+
+        @Override
+        public void onSuccess(SendResult sendResult) {
+            if (isSemaphoreAsyncSizeAquired) {
+                semaphoreAsyncSendSize.release(msgLen);
+            }
+            if (isSemaphoreAsyncNumAquired) {
+                semaphoreAsyncSendNum.release();
+            }
+            sendCallback.onSuccess(sendResult);
+        }
+
+        @Override
+        public void onException(Throwable e) {
+            if (isSemaphoreAsyncSizeAquired) {
+                semaphoreAsyncSendSize.release(msgLen);
+            }
+            if (isSemaphoreAsyncNumAquired) {
+                semaphoreAsyncSendNum.release();
+            }
+            sendCallback.onException(e);
+        }
+    }
+
+    public void executeAsyncMessageSend(Runnable runnable, final Message msg, 
final BackpressureSendCallBack sendCallback,
         final long timeout, final long beginStartTime)
         throws MQClientException, InterruptedException {
         ExecutorService executor = this.getAsyncSenderExecutor();
@@ -595,7 +630,9 @@ public class DefaultMQProducerImpl implements 
MQProducerInner {
                     return;
                 }
             }
-
+            sendCallback.isSemaphoreAsyncSizeAquired = 
isSemaphoreAsyncSizeAquired;
+            sendCallback.isSemaphoreAsyncNumAquired = 
isSemaphoreAsyncNumAquired;
+            sendCallback.msgLen = msgLen;
             executor.submit(runnable);
         } catch (RejectedExecutionException e) {
             if (isEnableBackpressureForAsyncMode) {
@@ -603,15 +640,7 @@ public class DefaultMQProducerImpl implements 
MQProducerInner {
             } else {
                 throw new MQClientException("executor rejected ", e);
             }
-        } finally {
-            if (isSemaphoreAsyncSizeAquired) {
-                semaphoreAsyncSendSize.release(msgLen);
-            }
-            if (isSemaphoreAsyncNumAquired) {
-                semaphoreAsyncSendNum.release();
-            }
         }
-
     }
 
     public MessageQueue invokeMessageQueueSelector(Message msg, 
MessageQueueSelector selector, Object arg,
@@ -1188,7 +1217,7 @@ public class DefaultMQProducerImpl implements 
MQProducerInner {
     @Deprecated
     public void send(final Message msg, final MessageQueue mq, final 
SendCallback sendCallback, final long timeout)
         throws MQClientException, RemotingException, InterruptedException {
-
+        BackpressureSendCallBack newCallBack = new 
BackpressureSendCallBack(sendCallback);
         final long beginStartTime = System.currentTimeMillis();
         Runnable runnable = new Runnable() {
             @Override
@@ -1203,22 +1232,22 @@ public class DefaultMQProducerImpl implements 
MQProducerInner {
                     long costTime = System.currentTimeMillis() - 
beginStartTime;
                     if (timeout > costTime) {
                         try {
-                            sendKernelImpl(msg, mq, CommunicationMode.ASYNC, 
sendCallback, null,
+                            sendKernelImpl(msg, mq, CommunicationMode.ASYNC, 
newCallBack, null,
                                 timeout - costTime);
                         } catch (MQBrokerException e) {
                             throw new MQClientException("unknown exception", 
e);
                         }
                     } else {
-                        sendCallback.onException(new 
RemotingTooMuchRequestException("call timeout"));
+                        newCallBack.onException(new 
RemotingTooMuchRequestException("call timeout"));
                     }
                 } catch (Exception e) {
-                    sendCallback.onException(e);
+                    newCallBack.onException(e);
                 }
             }
 
         };
 
-        executeAsyncMessageSend(runnable, msg, sendCallback, timeout, 
beginStartTime);
+        executeAsyncMessageSend(runnable, msg, newCallBack, timeout, 
beginStartTime);
     }
 
     /**
@@ -1315,7 +1344,7 @@ public class DefaultMQProducerImpl implements 
MQProducerInner {
     public void send(final Message msg, final MessageQueueSelector selector, 
final Object arg,
         final SendCallback sendCallback, final long timeout)
         throws MQClientException, RemotingException, InterruptedException {
-
+        BackpressureSendCallBack newCallBack = new 
BackpressureSendCallBack(sendCallback);
         final long beginStartTime = System.currentTimeMillis();
         Runnable runnable = new Runnable() {
             @Override
@@ -1324,21 +1353,21 @@ public class DefaultMQProducerImpl implements 
MQProducerInner {
                 if (timeout > costTime) {
                     try {
                         try {
-                            sendSelectImpl(msg, selector, arg, 
CommunicationMode.ASYNC, sendCallback,
+                            sendSelectImpl(msg, selector, arg, 
CommunicationMode.ASYNC, newCallBack,
                                 timeout - costTime);
                         } catch (MQBrokerException e) {
                             throw new MQClientException("unknown exception", 
e);
                         }
                     } catch (Exception e) {
-                        sendCallback.onException(e);
+                        newCallBack.onException(e);
                     }
                 } else {
-                    sendCallback.onException(new 
RemotingTooMuchRequestException("call timeout"));
+                    newCallBack.onException(new 
RemotingTooMuchRequestException("call timeout"));
                 }
             }
 
         };
-        executeAsyncMessageSend(runnable, msg, sendCallback, timeout, 
beginStartTime);
+        executeAsyncMessageSend(runnable, msg, newCallBack, timeout, 
beginStartTime);
     }
 
     /**

Reply via email to