vongosling closed pull request #582: Update asynchronous send thread pool from
callback executor to an exclusive one.
URL: https://github.com/apache/rocketmq/pull/582
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 7ace9d5b0..90f4f7876 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -30,8 +30,10 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.common.ClientErrorCode;
@@ -101,6 +103,10 @@
private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy();
+ private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue;
+ private final ExecutorService defaultAsyncSenderExecutor;
+ private ExecutorService asyncSenderExecutor;
+
public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer) {
this(defaultMQProducer, null);
}
@@ -108,6 +114,22 @@ public DefaultMQProducerImpl(final DefaultMQProducer
defaultMQProducer) {
public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer,
RPCHook rpcHook) {
this.defaultMQProducer = defaultMQProducer;
this.rpcHook = rpcHook;
+
+ this.asyncSenderThreadPoolQueue = new
LinkedBlockingQueue<Runnable>(50000);
+ this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(
+ Runtime.getRuntime().availableProcessors(),
+ Runtime.getRuntime().availableProcessors(),
+ 1000 * 60,
+ TimeUnit.MILLISECONDS,
+ this.asyncSenderThreadPoolQueue,
+ new ThreadFactory() {
+ private AtomicInteger threadIndex = new AtomicInteger(0);
+
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "AsyncSenderExecutor_" +
this.threadIndex.incrementAndGet());
+ }
+ });
}
public void registerCheckForbiddenHook(CheckForbiddenHook
checkForbiddenHook) {
@@ -456,7 +478,7 @@ public void send(Message msg,
public void send(final Message msg, final SendCallback sendCallback, final
long timeout)
throws MQClientException, RemotingException, InterruptedException {
final long beginStartTime = System.currentTimeMillis();
- ExecutorService executor = this.getCallbackExecutor();
+ ExecutorService executor = this.getAsyncSenderExecutor();
try {
executor.submit(new Runnable() {
@Override
@@ -957,7 +979,7 @@ public void send(Message msg, MessageQueue mq, SendCallback
sendCallback)
public void send(final Message msg, final MessageQueue mq, final
SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException {
final long beginStartTime = System.currentTimeMillis();
- ExecutorService executor = this.getCallbackExecutor();
+ ExecutorService executor = this.getAsyncSenderExecutor();
try {
executor.submit(new Runnable() {
@Override
@@ -1079,7 +1101,7 @@ public void send(Message msg, MessageQueueSelector
selector, Object arg, SendCal
public void send(final Message msg, final MessageQueueSelector selector,
final Object arg, final SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException {
final long beginStartTime = System.currentTimeMillis();
- ExecutorService executor = this.getCallbackExecutor();
+ ExecutorService executor = this.getAsyncSenderExecutor();
try {
executor.submit(new Runnable() {
@Override
@@ -1243,9 +1265,13 @@ public void endTransaction(
public void setCallbackExecutor(final ExecutorService callbackExecutor) {
this.mQClientFactory.getMQClientAPIImpl().getRemotingClient().setCallbackExecutor(callbackExecutor);
}
- public ExecutorService getCallbackExecutor() {
- return
this.mQClientFactory.getMQClientAPIImpl().getRemotingClient().getCallbackExecutor();
+ public ExecutorService getAsyncSenderExecutor() {
+ return null == asyncSenderExecutor ? defaultAsyncSenderExecutor :
asyncSenderExecutor;
+ }
+
+ public void setAsyncSenderExecutor(ExecutorService asyncSenderExecutor) {
+ this.asyncSenderExecutor = asyncSenderExecutor;
}
public SendResult send(Message msg,
diff --git
a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
index 9732d0eb8..f57e52c3e 100644
---
a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+++
b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -655,6 +655,16 @@ public void setCallbackExecutor(final ExecutorService
callbackExecutor) {
this.defaultMQProducerImpl.setCallbackExecutor(callbackExecutor);
}
+ /**
+ * Sets an Executor to be used for executing asynchronous send. If the
Executor is not set, {@link
+ * DefaultMQProducerImpl#defaultAsyncSenderExecutor} will be used.
+ *
+ * @param asyncSenderExecutor the instance of Executor
+ */
+ public void setAsyncSenderExecutor(final ExecutorService
asyncSenderExecutor) {
+ this.defaultMQProducerImpl.setAsyncSenderExecutor(asyncSenderExecutor);
+ }
+
private MessageBatch batch(Collection<Message> msgs) throws
MQClientException {
MessageBatch msgBatch;
try {
diff --git
a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
index c225afd68..9540755fe 100644
---
a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
+++
b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
@@ -167,10 +167,7 @@ public void testSendMessageSync_WithBodyCompressed()
throws RemotingException, I
@Test
public void testSendMessageAsync_Success() throws RemotingException,
InterruptedException, MQBrokerException, MQClientException {
- ExecutorService callbackExecutor = Executors.newSingleThreadExecutor();
final CountDownLatch countDownLatch = new CountDownLatch(1);
-
when(mQClientAPIImpl.getRemotingClient()).thenReturn((nettyRemotingClient));
-
when(nettyRemotingClient.getCallbackExecutor()).thenReturn(callbackExecutor);
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
@@ -186,15 +183,11 @@ public void onException(Throwable e) {
}
});
countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
- callbackExecutor.shutdown();
}
@Test
public void testSendMessageAsync() throws RemotingException,
MQClientException, InterruptedException {
final AtomicInteger cc = new AtomicInteger(0);
final CountDownLatch countDownLatch = new CountDownLatch(6);
- ExecutorService callbackExecutor = Executors.newSingleThreadExecutor();
-
when(mQClientAPIImpl.getRemotingClient()).thenReturn((nettyRemotingClient));
-
when(nettyRemotingClient.getCallbackExecutor()).thenReturn(callbackExecutor);
SendCallback sendCallback = new SendCallback() {
@Override
@@ -226,16 +219,13 @@ public MessageQueue select(List<MessageQueue> mqs,
Message msg, Object arg) {
producer.send(message,messageQueueSelector,null,sendCallback,1000);
countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
- callbackExecutor.shutdown();
assertThat(cc.get()).isEqualTo(6);
}
@Test
public void testSendMessageAsync_BodyCompressed() throws
RemotingException, InterruptedException, MQBrokerException, MQClientException {
- ExecutorService callbackExecutor = Executors.newSingleThreadExecutor();
+
final CountDownLatch countDownLatch = new CountDownLatch(1);
-
when(mQClientAPIImpl.getRemotingClient()).thenReturn((nettyRemotingClient));
-
when(nettyRemotingClient.getCallbackExecutor()).thenReturn(callbackExecutor);
producer.send(bigMessage, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
@@ -251,7 +241,6 @@ public void onException(Throwable e) {
}
});
countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
- callbackExecutor.shutdown();
}
@Test
diff --git a/distribution/bin/runbroker.sh b/distribution/bin/runbroker.sh
index 3bd00bb30..df962b4a2 100644
--- a/distribution/bin/runbroker.sh
+++ b/distribution/bin/runbroker.sh
@@ -37,7 +37,7 @@ export CLASSPATH=.:${BASE_DIR}/conf:${CLASSPATH}
# JVM Configuration
#===========================================================================================
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
-JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m
-XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30
-XX:SoftRefLRUPolicyMSPerMB=0 -XX:SurvivorRatio=8"
+JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m
-XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30
-XX:SoftRefLRUPolicyMSPerMB=0"
JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:/dev/shm/mq_gc_%p.log
-XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime
-XX:+PrintAdaptiveSizePolicy"
JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5
-XX:GCLogFileSize=30m"
JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services