This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 373932791 [ISSUE#4599] Optimize enableBackpressureForAsyncMode not 
relying on NettyRemoting (#4601)
373932791 is described below

commit 3739327915c067861bcd5c64e66ebbf6adb439d2
Author: Shengmin Wang <[email protected]>
AuthorDate: Mon Sep 5 11:46:12 2022 +0800

    [ISSUE#4599] Optimize enableBackpressureForAsyncMode not relying on 
NettyRemoting (#4601)
    
    * optimize backpressure, add on-going thread nums and message size limit
    
    * modify send mechanism, reserve threadpool in backpressure mode
    
    * modify library reference style
    
    * modify code style
    
    * modify default semaphoreAsyncSize as 100M
    
    * move semaphore to DefaultMQProducerImpl, expose the integer value in 
DefaultMQProducer
    
    * modify code style
    
    * add blank to trigger CI test
    
    * add check condition, keep timeout - costTime is active
    
    * add semaphoreAsyncSendNum and semaphoreAsyncSendSize limit
    
    * add semaphoreAsyncSendNum and semaphoreAsyncSendSize limit
---
 .../impl/producer/DefaultMQProducerImpl.java       | 153 ++++++++++++++++-----
 .../client/producer/DefaultMQProducer.java         |  44 ++++++
 .../client/producer/DefaultMQProducerTest.java     |  33 ++++-
 .../rocketmq/example/simple/AsyncProducer.java     |   2 +
 4 files changed, 194 insertions(+), 38 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 304bee9de..842853c48 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -33,6 +33,7 @@ import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.rocketmq.client.QueryResult;
 import org.apache.rocketmq.client.Validators;
@@ -120,6 +121,10 @@ public class DefaultMQProducerImpl implements 
MQProducerInner {
     private CompressionType compressType = 
CompressionType.of(System.getProperty(MixAll.MESSAGE_COMPRESS_TYPE, "ZLIB"));
     private final Compressor compressor = 
CompressorFactory.getCompressor(compressType);
 
+    // backpressure related
+    private Semaphore semaphoreAsyncSendNum;
+    private Semaphore semaphoreAsyncSendSize;
+
     public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer) {
         this(defaultMQProducer, null);
     }
@@ -143,6 +148,19 @@ public class DefaultMQProducerImpl implements 
MQProducerInner {
                     return new Thread(r, "AsyncSenderExecutor_" + 
this.threadIndex.incrementAndGet());
                 }
             });
+        if (defaultMQProducer.getBackPressureForAsyncSendNum() > 10) {
+            semaphoreAsyncSendNum = new 
Semaphore(Math.max(defaultMQProducer.getBackPressureForAsyncSendNum(),10), 
true);
+        } else {
+            semaphoreAsyncSendNum = new Semaphore(10, true);
+            log.info("semaphoreAsyncSendNum can not be smaller than 10.");
+        }
+
+        if (defaultMQProducer.getBackPressureForAsyncSendNum() > 1024 * 1024) {
+            semaphoreAsyncSendSize = new 
Semaphore(Math.max(defaultMQProducer.getBackPressureForAsyncSendNum(),1024 * 
1024), true);
+        } else {
+            semaphoreAsyncSendSize = new Semaphore(1024 * 1024, true);
+            log.info("semaphoreAsyncSendSize can not be smaller than 1M.");
+        }
     }
 
     public void registerCheckForbiddenHook(CheckForbiddenHook 
checkForbiddenHook) {
@@ -151,6 +169,14 @@ public class DefaultMQProducerImpl implements 
MQProducerInner {
             checkForbiddenHookList.size());
     }
 
+    public void setSemaphoreAsyncSendNum(int num) {
+        semaphoreAsyncSendNum = new Semaphore(num, true);
+    }
+
+    public void setSemaphoreAsyncSendSize(int size) {
+        semaphoreAsyncSendSize = new Semaphore(size, true);
+    }
+
     public void initTransactionEnv() {
         TransactionMQProducer producer = (TransactionMQProducer) 
this.defaultMQProducer;
         if (producer.getExecutorService() != null) {
@@ -491,21 +517,69 @@ public class DefaultMQProducerImpl implements 
MQProducerInner {
     @Deprecated
     public void send(final Message msg, final SendCallback sendCallback, final 
long timeout)
         throws MQClientException, RemotingException, InterruptedException {
-        ExecutorService executor = this.getAsyncSenderExecutor();
-        try {
-            executor.submit(new Runnable() {
-                @Override
-                public void run() {
+        final long beginStartTime = System.currentTimeMillis();
+        Runnable runnable = new Runnable() {
+            @Override
+            public void run() {
+                long costTime = System.currentTimeMillis() - beginStartTime;
+                if (timeout > costTime) {
                     try {
-                        sendDefaultImpl(msg, CommunicationMode.ASYNC, 
sendCallback, timeout);
+                        sendDefaultImpl(msg, CommunicationMode.ASYNC, 
sendCallback, timeout - costTime);
                     } catch (Exception e) {
                         sendCallback.onException(e);
                     }
+                } else {
+                    sendCallback.onException(
+                            new RemotingTooMuchRequestException("DEFAULT ASYNC 
send call timeout"));
                 }
+            }
+        };
+        executeAsyncMessageSend(runnable, msg, sendCallback, timeout, 
beginStartTime);
+    }
 
-            });
+    public void executeAsyncMessageSend(Runnable runnable, final Message msg, 
final SendCallback sendCallback,
+                                         final long timeout, final long 
beginStartTime)
+            throws MQClientException, InterruptedException {
+        ExecutorService executor = this.getAsyncSenderExecutor();
+        boolean isEnableBackpressureForAsyncMode = 
this.getDefaultMQProducer().isEnableBackpressureForAsyncMode();
+        boolean isSemaphoreAsyncNumAquired = false;
+        boolean isSemaphoreAsyncSizeAquired = false;
+        int msgLen = msg.getBody() == null ? 1 : msg.getBody().length;
+
+        try {
+            if (isEnableBackpressureForAsyncMode) {
+                long costTime = System.currentTimeMillis() - beginStartTime;
+                isSemaphoreAsyncNumAquired = timeout - costTime > 0
+                        && semaphoreAsyncSendNum.tryAcquire(timeout - 
costTime, TimeUnit.MILLISECONDS);
+                if (!isSemaphoreAsyncNumAquired) {
+                    sendCallback.onException(
+                            new RemotingTooMuchRequestException("send message 
tryAcquire semaphoreAsyncNum timeout"));
+                    return;
+                }
+                costTime = System.currentTimeMillis() - beginStartTime;
+                isSemaphoreAsyncSizeAquired = timeout - costTime > 0
+                        && semaphoreAsyncSendSize.tryAcquire(msgLen, timeout - 
costTime, TimeUnit.MILLISECONDS);
+                if (!isSemaphoreAsyncSizeAquired) {
+                    sendCallback.onException(
+                            new RemotingTooMuchRequestException("send message 
tryAcquire semaphoreAsyncSize timeout"));
+                    return;
+                }
+            }
+
+            executor.submit(runnable);
         } catch (RejectedExecutionException e) {
-            throw new MQClientException("executor rejected ", e);
+            if (isEnableBackpressureForAsyncMode) {
+                runnable.run();
+            } else {
+                throw new MQClientException("executor rejected ", e);
+            }
+        } finally {
+            if (isSemaphoreAsyncSizeAquired) {
+                semaphoreAsyncSendSize.release(msgLen);
+            }
+            if (isSemaphoreAsyncNumAquired) {
+                semaphoreAsyncSendNum.release();
+            }
         }
 
     }
@@ -1040,35 +1114,37 @@ public class DefaultMQProducerImpl implements 
MQProducerInner {
     @Deprecated
     public void send(final Message msg, final MessageQueue mq, final 
SendCallback sendCallback, final long timeout)
         throws MQClientException, RemotingException, InterruptedException {
-        ExecutorService executor = this.getAsyncSenderExecutor();
-        try {
-            executor.submit(new Runnable() {
-                @Override
-                public void run() {
-                    try {
-                        makeSureStateOK();
-                        Validators.checkMessage(msg, defaultMQProducer);
 
-                        if (!msg.getTopic().equals(mq.getTopic())) {
-                            throw new MQClientException("Topic of the message 
does not match its target message queue", null);
-                        }
+        final long beginStartTime = System.currentTimeMillis();
+        Runnable runnable = new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    makeSureStateOK();
+                    Validators.checkMessage(msg, defaultMQProducer);
+
+                    if (!msg.getTopic().equals(mq.getTopic())) {
+                        throw new MQClientException("Topic of the message does 
not match its target message queue", null);
+                    }
+                    long costTime = System.currentTimeMillis() - 
beginStartTime;
+                    if (timeout > costTime) {
                         try {
                             sendKernelImpl(msg, mq, CommunicationMode.ASYNC, 
sendCallback, null,
-                                    timeout);
+                                timeout - costTime);
                         } catch (MQBrokerException e) {
                             throw new MQClientException("unknown exception", 
e);
                         }
-                    } catch (Exception e) {
-                        sendCallback.onException(e);
+                    } else {
+                        sendCallback.onException(new 
RemotingTooMuchRequestException("call timeout"));
                     }
-
+                } catch (Exception e) {
+                    sendCallback.onException(e);
                 }
+            }
 
-            });
-        } catch (RejectedExecutionException e) {
-            throw new MQClientException("executor rejected ", e);
-        }
+        };
 
+        executeAsyncMessageSend(runnable, msg, sendCallback, timeout, 
beginStartTime);
     }
 
     /**
@@ -1165,27 +1241,30 @@ public class DefaultMQProducerImpl implements 
MQProducerInner {
     public void send(final Message msg, final MessageQueueSelector selector, 
final Object arg,
         final SendCallback sendCallback, final long timeout)
         throws MQClientException, RemotingException, InterruptedException {
-        ExecutorService executor = this.getAsyncSenderExecutor();
-        try {
-            executor.submit(new Runnable() {
-                @Override
-                public void run() {
+
+        final long beginStartTime = System.currentTimeMillis();
+        Runnable runnable = new Runnable() {
+            @Override
+            public void run() {
+                long costTime = System.currentTimeMillis() - beginStartTime;
+                if (timeout > costTime) {
                     try {
                         try {
                             sendSelectImpl(msg, selector, arg, 
CommunicationMode.ASYNC, sendCallback,
-                                    timeout);
+                                    timeout - costTime);
                         } catch (MQBrokerException e) {
                             throw new MQClientException("unknown exception", 
e);
                         }
                     } catch (Exception e) {
                         sendCallback.onException(e);
                     }
+                } else {
+                    sendCallback.onException(new 
RemotingTooMuchRequestException("call timeout"));
                 }
+            }
 
-            });
-        } catch (RejectedExecutionException e) {
-            throw new MQClientException("executor rejected ", e);
-        }
+        };
+        executeAsyncMessageSend(runnable, msg, sendCallback, timeout, 
beginStartTime);
     }
 
     /**
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
 
b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
index 4a1cc3366..15e1da22a 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -134,6 +134,23 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
      */
     private TraceDispatcher traceDispatcher = null;
 
+    /**
+     * Indicate whether to block message when asynchronous sending traffic is 
too heavy.
+     */
+    private boolean enableBackpressureForAsyncMode = false;
+
+    /**
+     * on BackpressureForAsyncMode, limit maximum number of on-going sending 
async messages
+     * default is 10000
+     */
+    private int backPressureForAsyncSendNum = 10000;
+
+    /**
+     * on BackpressureForAsyncMode, limit maximum message size of on-going 
sending async messages
+     * default is 100M
+     */
+    private int backPressureForAsyncSendSize = 100 * 1024 * 1024;
+
     /**
      * Default constructor.
      */
@@ -1111,4 +1128,31 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
     public Set<Integer> getRetryResponseCodes() {
         return retryResponseCodes;
     }
+
+    public boolean isEnableBackpressureForAsyncMode() {
+        return  enableBackpressureForAsyncMode;
+    }
+
+    public void setEnableBackpressureForAsyncMode(boolean 
enableBackpressureForAsyncMode) {
+        this.enableBackpressureForAsyncMode = enableBackpressureForAsyncMode;
+    }
+
+    public int getBackPressureForAsyncSendNum() {
+        return backPressureForAsyncSendNum;
+    }
+
+    public void setBackPressureForAsyncSendNum(int 
backPressureForAsyncSendNum) {
+        this.backPressureForAsyncSendNum = backPressureForAsyncSendNum;
+        
defaultMQProducerImpl.setSemaphoreAsyncSendNum(backPressureForAsyncSendNum);
+    }
+
+    public int getBackPressureForAsyncSendSize() {
+        return backPressureForAsyncSendSize;
+    }
+
+    public void setBackPressureForAsyncSendSize(int 
backPressureForAsyncSendSize) {
+        this.backPressureForAsyncSendSize = backPressureForAsyncSendSize;
+        
defaultMQProducerImpl.setSemaphoreAsyncSendSize(backPressureForAsyncSendSize);
+    }
+
 }
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
index 919fb009f..ebfa8b2a5 100644
--- 
a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
@@ -193,7 +193,7 @@ public class DefaultMQProducerTest {
     @Test
     public void testSendMessageAsync() throws RemotingException, 
MQClientException, InterruptedException {
         final AtomicInteger cc = new AtomicInteger(0);
-        final CountDownLatch countDownLatch = new CountDownLatch(6);
+        final CountDownLatch countDownLatch = new CountDownLatch(12);
 
         when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), 
anyLong())).thenReturn(createTopicRoute());
         SendCallback sendCallback = new SendCallback() {
@@ -216,6 +216,10 @@ public class DefaultMQProducerTest {
             }
         };
 
+        // on enableBackpressureForAsyncMode
+        producer.setEnableBackpressureForAsyncMode(true);
+        producer.setBackPressureForAsyncSendNum(5000);
+        producer.setBackPressureForAsyncSendSize(50 * 1024 * 1024);
         Message message = new Message();
         message.setTopic("test");
         message.setBody("hello world".getBytes());
@@ -229,6 +233,19 @@ public class DefaultMQProducerTest {
 
         countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
         assertThat(cc.get()).isEqualTo(5);
+
+        // off enableBackpressureForAsyncMode 
+        producer.setEnableBackpressureForAsyncMode(false);
+        producer.send(new Message(), sendCallback);
+        producer.send(message, new MessageQueue(), sendCallback);
+        producer.send(new Message(), new MessageQueue(), sendCallback, 1000);
+        producer.send(new Message(), messageQueueSelector, null, sendCallback);
+        producer.send(message, messageQueueSelector, null, sendCallback, 1000);
+        //this message is send success
+        producer.send(message, sendCallback, 1000);
+
+        countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
+        assertThat(cc.get()).isEqualTo(10);
     }
     
     @Test
@@ -265,6 +282,9 @@ public class DefaultMQProducerTest {
             message.setBody(("hello world" + i).getBytes());
             msgs.add(message);
         }
+
+        // on enableBackpressureForAsyncMode
+        producer.setEnableBackpressureForAsyncMode(true);
         producer.send(msgs, sendCallback);
         producer.send(msgs, sendCallback, 1000);
         MessageQueue mq = new MessageQueue("test", "BrokerA", 1);
@@ -274,6 +294,17 @@ public class DefaultMQProducerTest {
 
         countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
         assertThat(cc.get()).isEqualTo(1);
+
+        // off enableBackpressureForAsyncMode
+        producer.setEnableBackpressureForAsyncMode(false);
+        producer.send(msgs, sendCallback);
+        producer.send(msgs, sendCallback, 1000);
+        producer.send(msgs, mq, sendCallback);
+        // this message is send failed
+        producer.send(msgs, new MessageQueue(), sendCallback, 1000);
+
+        countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
+        assertThat(cc.get()).isEqualTo(2);
     }
 
     @Test
diff --git 
a/example/src/main/java/org/apache/rocketmq/example/simple/AsyncProducer.java 
b/example/src/main/java/org/apache/rocketmq/example/simple/AsyncProducer.java
index d40739c81..42d19b1b6 100644
--- 
a/example/src/main/java/org/apache/rocketmq/example/simple/AsyncProducer.java
+++ 
b/example/src/main/java/org/apache/rocketmq/example/simple/AsyncProducer.java
@@ -32,6 +32,8 @@ public class AsyncProducer {
 
         DefaultMQProducer producer = new DefaultMQProducer("Jodie_Daily_test");
         producer.start();
+        // suggest to on enableBackpressureForAsyncMode in heavy traffic, 
default is false
+        producer.setEnableBackpressureForAsyncMode(true);
         producer.setRetryTimesWhenSendAsyncFailed(0);
 
         int messageCount = 100;

Reply via email to