This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 373932791 [ISSUE#4599] Optimize enableBackpressureForAsyncMode not
relying on NettyRemoting (#4601)
373932791 is described below
commit 3739327915c067861bcd5c64e66ebbf6adb439d2
Author: Shengmin Wang <[email protected]>
AuthorDate: Mon Sep 5 11:46:12 2022 +0800
[ISSUE#4599] Optimize enableBackpressureForAsyncMode not relying on
NettyRemoting (#4601)
* optimize backpressure, add on-going thread nums and message size limit
* modify send mechanism, reserve threadpool in backpressure mode
* modify library reference style
* modify code style
* modify default semaphoreAsyncSize as 100M
* move semaphore to DefaultMQProducerImpl, expose the integer value in
DefaultMQProducer
* modify code style
* add blank to trigger CI test
* add check condition, keep timeout - costTime is active
* add semaphoreAsyncSendNum and semaphoreAsyncSendSize limit
* add semaphoreAsyncSendNum and semaphoreAsyncSendSize limit
---
.../impl/producer/DefaultMQProducerImpl.java | 153 ++++++++++++++++-----
.../client/producer/DefaultMQProducer.java | 44 ++++++
.../client/producer/DefaultMQProducerTest.java | 33 ++++-
.../rocketmq/example/simple/AsyncProducer.java | 2 +
4 files changed, 194 insertions(+), 38 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 304bee9de..842853c48 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -33,6 +33,7 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.Validators;
@@ -120,6 +121,10 @@ public class DefaultMQProducerImpl implements
MQProducerInner {
private CompressionType compressType =
CompressionType.of(System.getProperty(MixAll.MESSAGE_COMPRESS_TYPE, "ZLIB"));
private final Compressor compressor =
CompressorFactory.getCompressor(compressType);
+ // backpressure related
+ private Semaphore semaphoreAsyncSendNum;
+ private Semaphore semaphoreAsyncSendSize;
+
public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer) {
this(defaultMQProducer, null);
}
@@ -143,6 +148,19 @@ public class DefaultMQProducerImpl implements
MQProducerInner {
return new Thread(r, "AsyncSenderExecutor_" +
this.threadIndex.incrementAndGet());
}
});
+ if (defaultMQProducer.getBackPressureForAsyncSendNum() > 10) {
+ semaphoreAsyncSendNum = new
Semaphore(Math.max(defaultMQProducer.getBackPressureForAsyncSendNum(),10),
true);
+ } else {
+ semaphoreAsyncSendNum = new Semaphore(10, true);
+ log.info("semaphoreAsyncSendNum can not be smaller than 10.");
+ }
+
+ if (defaultMQProducer.getBackPressureForAsyncSendNum() > 1024 * 1024) {
+ semaphoreAsyncSendSize = new
Semaphore(Math.max(defaultMQProducer.getBackPressureForAsyncSendNum(),1024 *
1024), true);
+ } else {
+ semaphoreAsyncSendSize = new Semaphore(1024 * 1024, true);
+ log.info("semaphoreAsyncSendSize can not be smaller than 1M.");
+ }
}
public void registerCheckForbiddenHook(CheckForbiddenHook
checkForbiddenHook) {
@@ -151,6 +169,14 @@ public class DefaultMQProducerImpl implements
MQProducerInner {
checkForbiddenHookList.size());
}
+ public void setSemaphoreAsyncSendNum(int num) {
+ semaphoreAsyncSendNum = new Semaphore(num, true);
+ }
+
+ public void setSemaphoreAsyncSendSize(int size) {
+ semaphoreAsyncSendSize = new Semaphore(size, true);
+ }
+
public void initTransactionEnv() {
TransactionMQProducer producer = (TransactionMQProducer)
this.defaultMQProducer;
if (producer.getExecutorService() != null) {
@@ -491,21 +517,69 @@ public class DefaultMQProducerImpl implements
MQProducerInner {
@Deprecated
public void send(final Message msg, final SendCallback sendCallback, final
long timeout)
throws MQClientException, RemotingException, InterruptedException {
- ExecutorService executor = this.getAsyncSenderExecutor();
- try {
- executor.submit(new Runnable() {
- @Override
- public void run() {
+ final long beginStartTime = System.currentTimeMillis();
+ Runnable runnable = new Runnable() {
+ @Override
+ public void run() {
+ long costTime = System.currentTimeMillis() - beginStartTime;
+ if (timeout > costTime) {
try {
- sendDefaultImpl(msg, CommunicationMode.ASYNC,
sendCallback, timeout);
+ sendDefaultImpl(msg, CommunicationMode.ASYNC,
sendCallback, timeout - costTime);
} catch (Exception e) {
sendCallback.onException(e);
}
+ } else {
+ sendCallback.onException(
+ new RemotingTooMuchRequestException("DEFAULT ASYNC
send call timeout"));
}
+ }
+ };
+ executeAsyncMessageSend(runnable, msg, sendCallback, timeout,
beginStartTime);
+ }
- });
+ public void executeAsyncMessageSend(Runnable runnable, final Message msg,
final SendCallback sendCallback,
+ final long timeout, final long
beginStartTime)
+ throws MQClientException, InterruptedException {
+ ExecutorService executor = this.getAsyncSenderExecutor();
+ boolean isEnableBackpressureForAsyncMode =
this.getDefaultMQProducer().isEnableBackpressureForAsyncMode();
+ boolean isSemaphoreAsyncNumAquired = false;
+ boolean isSemaphoreAsyncSizeAquired = false;
+ int msgLen = msg.getBody() == null ? 1 : msg.getBody().length;
+
+ try {
+ if (isEnableBackpressureForAsyncMode) {
+ long costTime = System.currentTimeMillis() - beginStartTime;
+ isSemaphoreAsyncNumAquired = timeout - costTime > 0
+ && semaphoreAsyncSendNum.tryAcquire(timeout -
costTime, TimeUnit.MILLISECONDS);
+ if (!isSemaphoreAsyncNumAquired) {
+ sendCallback.onException(
+ new RemotingTooMuchRequestException("send message
tryAcquire semaphoreAsyncNum timeout"));
+ return;
+ }
+ costTime = System.currentTimeMillis() - beginStartTime;
+ isSemaphoreAsyncSizeAquired = timeout - costTime > 0
+ && semaphoreAsyncSendSize.tryAcquire(msgLen, timeout -
costTime, TimeUnit.MILLISECONDS);
+ if (!isSemaphoreAsyncSizeAquired) {
+ sendCallback.onException(
+ new RemotingTooMuchRequestException("send message
tryAcquire semaphoreAsyncSize timeout"));
+ return;
+ }
+ }
+
+ executor.submit(runnable);
} catch (RejectedExecutionException e) {
- throw new MQClientException("executor rejected ", e);
+ if (isEnableBackpressureForAsyncMode) {
+ runnable.run();
+ } else {
+ throw new MQClientException("executor rejected ", e);
+ }
+ } finally {
+ if (isSemaphoreAsyncSizeAquired) {
+ semaphoreAsyncSendSize.release(msgLen);
+ }
+ if (isSemaphoreAsyncNumAquired) {
+ semaphoreAsyncSendNum.release();
+ }
}
}
@@ -1040,35 +1114,37 @@ public class DefaultMQProducerImpl implements
MQProducerInner {
@Deprecated
public void send(final Message msg, final MessageQueue mq, final
SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException {
- ExecutorService executor = this.getAsyncSenderExecutor();
- try {
- executor.submit(new Runnable() {
- @Override
- public void run() {
- try {
- makeSureStateOK();
- Validators.checkMessage(msg, defaultMQProducer);
- if (!msg.getTopic().equals(mq.getTopic())) {
- throw new MQClientException("Topic of the message
does not match its target message queue", null);
- }
+ final long beginStartTime = System.currentTimeMillis();
+ Runnable runnable = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ makeSureStateOK();
+ Validators.checkMessage(msg, defaultMQProducer);
+
+ if (!msg.getTopic().equals(mq.getTopic())) {
+ throw new MQClientException("Topic of the message does
not match its target message queue", null);
+ }
+ long costTime = System.currentTimeMillis() -
beginStartTime;
+ if (timeout > costTime) {
try {
sendKernelImpl(msg, mq, CommunicationMode.ASYNC,
sendCallback, null,
- timeout);
+ timeout - costTime);
} catch (MQBrokerException e) {
throw new MQClientException("unknown exception",
e);
}
- } catch (Exception e) {
- sendCallback.onException(e);
+ } else {
+ sendCallback.onException(new
RemotingTooMuchRequestException("call timeout"));
}
-
+ } catch (Exception e) {
+ sendCallback.onException(e);
}
+ }
- });
- } catch (RejectedExecutionException e) {
- throw new MQClientException("executor rejected ", e);
- }
+ };
+ executeAsyncMessageSend(runnable, msg, sendCallback, timeout,
beginStartTime);
}
/**
@@ -1165,27 +1241,30 @@ public class DefaultMQProducerImpl implements
MQProducerInner {
public void send(final Message msg, final MessageQueueSelector selector,
final Object arg,
final SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException {
- ExecutorService executor = this.getAsyncSenderExecutor();
- try {
- executor.submit(new Runnable() {
- @Override
- public void run() {
+
+ final long beginStartTime = System.currentTimeMillis();
+ Runnable runnable = new Runnable() {
+ @Override
+ public void run() {
+ long costTime = System.currentTimeMillis() - beginStartTime;
+ if (timeout > costTime) {
try {
try {
sendSelectImpl(msg, selector, arg,
CommunicationMode.ASYNC, sendCallback,
- timeout);
+ timeout - costTime);
} catch (MQBrokerException e) {
throw new MQClientException("unknown exception",
e);
}
} catch (Exception e) {
sendCallback.onException(e);
}
+ } else {
+ sendCallback.onException(new
RemotingTooMuchRequestException("call timeout"));
}
+ }
- });
- } catch (RejectedExecutionException e) {
- throw new MQClientException("executor rejected ", e);
- }
+ };
+ executeAsyncMessageSend(runnable, msg, sendCallback, timeout,
beginStartTime);
}
/**
diff --git
a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
index 4a1cc3366..15e1da22a 100644
---
a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+++
b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -134,6 +134,23 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
*/
private TraceDispatcher traceDispatcher = null;
+ /**
+ * Indicate whether to block message when asynchronous sending traffic is
too heavy.
+ */
+ private boolean enableBackpressureForAsyncMode = false;
+
+ /**
+ * on BackpressureForAsyncMode, limit maximum number of on-going sending
async messages
+ * default is 10000
+ */
+ private int backPressureForAsyncSendNum = 10000;
+
+ /**
+ * on BackpressureForAsyncMode, limit maximum message size of on-going
sending async messages
+ * default is 100M
+ */
+ private int backPressureForAsyncSendSize = 100 * 1024 * 1024;
+
/**
* Default constructor.
*/
@@ -1111,4 +1128,31 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
public Set<Integer> getRetryResponseCodes() {
return retryResponseCodes;
}
+
+ public boolean isEnableBackpressureForAsyncMode() {
+ return enableBackpressureForAsyncMode;
+ }
+
+ public void setEnableBackpressureForAsyncMode(boolean
enableBackpressureForAsyncMode) {
+ this.enableBackpressureForAsyncMode = enableBackpressureForAsyncMode;
+ }
+
+ public int getBackPressureForAsyncSendNum() {
+ return backPressureForAsyncSendNum;
+ }
+
+ public void setBackPressureForAsyncSendNum(int
backPressureForAsyncSendNum) {
+ this.backPressureForAsyncSendNum = backPressureForAsyncSendNum;
+
defaultMQProducerImpl.setSemaphoreAsyncSendNum(backPressureForAsyncSendNum);
+ }
+
+ public int getBackPressureForAsyncSendSize() {
+ return backPressureForAsyncSendSize;
+ }
+
+ public void setBackPressureForAsyncSendSize(int
backPressureForAsyncSendSize) {
+ this.backPressureForAsyncSendSize = backPressureForAsyncSendSize;
+
defaultMQProducerImpl.setSemaphoreAsyncSendSize(backPressureForAsyncSendSize);
+ }
+
}
diff --git
a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
index 919fb009f..ebfa8b2a5 100644
---
a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
+++
b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
@@ -193,7 +193,7 @@ public class DefaultMQProducerTest {
@Test
public void testSendMessageAsync() throws RemotingException,
MQClientException, InterruptedException {
final AtomicInteger cc = new AtomicInteger(0);
- final CountDownLatch countDownLatch = new CountDownLatch(6);
+ final CountDownLatch countDownLatch = new CountDownLatch(12);
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(),
anyLong())).thenReturn(createTopicRoute());
SendCallback sendCallback = new SendCallback() {
@@ -216,6 +216,10 @@ public class DefaultMQProducerTest {
}
};
+ // on enableBackpressureForAsyncMode
+ producer.setEnableBackpressureForAsyncMode(true);
+ producer.setBackPressureForAsyncSendNum(5000);
+ producer.setBackPressureForAsyncSendSize(50 * 1024 * 1024);
Message message = new Message();
message.setTopic("test");
message.setBody("hello world".getBytes());
@@ -229,6 +233,19 @@ public class DefaultMQProducerTest {
countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
assertThat(cc.get()).isEqualTo(5);
+
+ // off enableBackpressureForAsyncMode
+ producer.setEnableBackpressureForAsyncMode(false);
+ producer.send(new Message(), sendCallback);
+ producer.send(message, new MessageQueue(), sendCallback);
+ producer.send(new Message(), new MessageQueue(), sendCallback, 1000);
+ producer.send(new Message(), messageQueueSelector, null, sendCallback);
+ producer.send(message, messageQueueSelector, null, sendCallback, 1000);
+ //this message is send success
+ producer.send(message, sendCallback, 1000);
+
+ countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
+ assertThat(cc.get()).isEqualTo(10);
}
@Test
@@ -265,6 +282,9 @@ public class DefaultMQProducerTest {
message.setBody(("hello world" + i).getBytes());
msgs.add(message);
}
+
+ // on enableBackpressureForAsyncMode
+ producer.setEnableBackpressureForAsyncMode(true);
producer.send(msgs, sendCallback);
producer.send(msgs, sendCallback, 1000);
MessageQueue mq = new MessageQueue("test", "BrokerA", 1);
@@ -274,6 +294,17 @@ public class DefaultMQProducerTest {
countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
assertThat(cc.get()).isEqualTo(1);
+
+ // off enableBackpressureForAsyncMode
+ producer.setEnableBackpressureForAsyncMode(false);
+ producer.send(msgs, sendCallback);
+ producer.send(msgs, sendCallback, 1000);
+ producer.send(msgs, mq, sendCallback);
+ // this message is send failed
+ producer.send(msgs, new MessageQueue(), sendCallback, 1000);
+
+ countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
+ assertThat(cc.get()).isEqualTo(2);
}
@Test
diff --git
a/example/src/main/java/org/apache/rocketmq/example/simple/AsyncProducer.java
b/example/src/main/java/org/apache/rocketmq/example/simple/AsyncProducer.java
index d40739c81..42d19b1b6 100644
---
a/example/src/main/java/org/apache/rocketmq/example/simple/AsyncProducer.java
+++
b/example/src/main/java/org/apache/rocketmq/example/simple/AsyncProducer.java
@@ -32,6 +32,8 @@ public class AsyncProducer {
DefaultMQProducer producer = new DefaultMQProducer("Jodie_Daily_test");
producer.start();
+ // suggest to on enableBackpressureForAsyncMode in heavy traffic,
default is false
+ producer.setEnableBackpressureForAsyncMode(true);
producer.setRetryTimesWhenSendAsyncFailed(0);
int messageCount = 100;