This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 3a2b172 [ISSUE #3006]Replace ScheduledExecutorService instead of
Timer to avoid affecting other tasks during exception (#3001)
3a2b172 is described below
commit 3a2b172660d489dad3365d777b995abafe50a64d
Author: zhenhe <[email protected]>
AuthorDate: Wed Aug 4 13:59:25 2021 +0800
[ISSUE #3006]Replace ScheduledExecutorService instead of Timer to avoid
affecting other tasks during exception (#3001)
* 采用ScheduledExecutorService替代Timer,避免异常捕获时影响其他任务
多线程并行处理定时任务时,Timer运行多个TimeTask时,只要其中之一没有捕获抛出的异常,其它任务便会自动终止运行,使用ScheduledExecutorService则没有这个问题。
* optimize imports
* Add @Override annotation
* Revert "Add @Override annotation"
This reverts commit 3ddccd88022db33361a2af08b36b0c8f5d963f48.
Co-authored-by: wuzh <[email protected]>
---
.../rocketmq/example/benchmark/Consumer.java | 30 ++++++++------
.../rocketmq/example/benchmark/Producer.java | 44 ++++++++++++---------
.../example/benchmark/TransactionProducer.java | 46 ++++++++++++----------
3 files changed, 68 insertions(+), 52 deletions(-)
diff --git
a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
index d3ac36c..154e6ed 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
@@ -17,18 +17,11 @@
package org.apache.rocketmq.example.benchmark;
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
@@ -42,6 +35,16 @@ import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.srvutil.ServerUtil;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.TimerTask;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
public class Consumer {
public static void main(String[] args) throws MQClientException,
IOException {
@@ -71,11 +74,12 @@ public class Consumer {
final StatsBenchmarkConsumer statsBenchmarkConsumer = new
StatsBenchmarkConsumer();
- final Timer timer = new Timer("BenchmarkTimerThread", true);
+ ScheduledExecutorService executorService = new
ScheduledThreadPoolExecutor(1,
+ new
BasicThreadFactory.Builder().namingPattern("BenchmarkTimerThread-%d").daemon(true).build());
final LinkedList<Long[]> snapshotList = new LinkedList<Long[]>();
- timer.scheduleAtFixedRate(new TimerTask() {
+ executorService.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
snapshotList.addLast(statsBenchmarkConsumer.createSnapshot());
@@ -83,9 +87,9 @@ public class Consumer {
snapshotList.removeFirst();
}
}
- }, 1000, 1000);
+ }, 1000, 1000, TimeUnit.MILLISECONDS);
- timer.scheduleAtFixedRate(new TimerTask() {
+ executorService.scheduleAtFixedRate(new TimerTask() {
private void printStats() {
if (snapshotList.size() >= 10) {
Long[] begin = snapshotList.getFirst();
@@ -116,7 +120,7 @@ public class Consumer {
e.printStackTrace();
}
}
- }, 10000, 10000);
+ }, 10000, 10000, TimeUnit.MILLISECONDS);
RPCHook rpcHook = aclEnable ? AclClient.getAclRPCHook() : null;
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group,
rpcHook, new AllocateMessageQueueAveragely(), msgTraceEnable, null);
diff --git
a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
index 32d4b9f..b198a0f 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
@@ -16,32 +16,34 @@
*/
package org.apache.rocketmq.example.benchmark;
-import java.io.UnsupportedEncodingException;
-import java.util.Arrays;
-import java.util.LinkedList;
-import java.util.Random;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
-import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.srvutil.ServerUtil;
+import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.Random;
+import java.util.TimerTask;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicLong;
+
public class Producer {
public static void main(String[] args) throws MQClientException,
UnsupportedEncodingException {
@@ -73,7 +75,8 @@ public class Producer {
final StatsBenchmarkProducer statsBenchmark = new
StatsBenchmarkProducer();
- final Timer timer = new Timer("BenchmarkTimerThread", true);
+ ScheduledExecutorService executorService = new
ScheduledThreadPoolExecutor(1,
+ new
BasicThreadFactory.Builder().namingPattern("BenchmarkTimerThread-%d").daemon(true).build());
final LinkedList<Long[]> snapshotList = new LinkedList<Long[]>();
@@ -87,7 +90,7 @@ public class Producer {
}
}
- timer.scheduleAtFixedRate(new TimerTask() {
+ executorService.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
snapshotList.addLast(statsBenchmark.createSnapshot());
@@ -95,9 +98,9 @@ public class Producer {
snapshotList.removeFirst();
}
}
- }, 1000, 1000);
+ }, 1000, 1000, TimeUnit.MILLISECONDS);
- timer.scheduleAtFixedRate(new TimerTask() {
+ executorService.scheduleAtFixedRate(new TimerTask() {
private void printStats() {
if (snapshotList.size() >= 10) {
doPrintStats(snapshotList, statsBenchmark, false);
@@ -112,7 +115,7 @@ public class Producer {
e.printStackTrace();
}
}
- }, 10000, 10000);
+ }, 10000, 10000, TimeUnit.MILLISECONDS);
RPCHook rpcHook = aclEnable ? AclClient.getAclRPCHook() : null;
final DefaultMQProducer producer = new
DefaultMQProducer("benchmark_producer", rpcHook, msgTraceEnable, null);
@@ -224,7 +227,12 @@ public class Producer {
try {
sendThreadPool.shutdown();
sendThreadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
- timer.cancel();
+ executorService.shutdown();
+ try {
+ executorService.awaitTermination(5000, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ }
+
if (snapshotList.size() > 1) {
doPrintStats(snapshotList, statsBenchmark, true);
} else {
diff --git
a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
index 1b511d8..c4f14a4 100644
---
a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
+++
b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
@@ -17,26 +17,11 @@
package org.apache.rocketmq.example.benchmark;
-import java.io.UnsupportedEncodingException;
-import java.nio.ByteBuffer;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.SendResult;
@@ -48,6 +33,24 @@ import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.srvutil.ServerUtil;
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.TimerTask;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
public class TransactionProducer {
private static final long START_TIME = System.currentTimeMillis();
private static final AtomicLong MSG_COUNT = new AtomicLong(0);
@@ -75,11 +78,12 @@ public class TransactionProducer {
final StatsBenchmarkTProducer statsBenchmark = new
StatsBenchmarkTProducer();
- final Timer timer = new Timer("BenchmarkTimerThread", true);
+ ScheduledExecutorService executorService = new
ScheduledThreadPoolExecutor(1,
+ new
BasicThreadFactory.Builder().namingPattern("BenchmarkTimerThread-%d").daemon(true).build());
final LinkedList<Snapshot> snapshotList = new LinkedList<>();
- timer.scheduleAtFixedRate(new TimerTask() {
+ executorService.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
snapshotList.addLast(statsBenchmark.createSnapshot());
@@ -87,9 +91,9 @@ public class TransactionProducer {
snapshotList.removeFirst();
}
}
- }, 1000, 1000);
+ }, 1000, 1000, TimeUnit.MILLISECONDS);
- timer.scheduleAtFixedRate(new TimerTask() {
+ executorService.scheduleAtFixedRate(new TimerTask() {
private void printStats() {
if (snapshotList.size() >= 10) {
Snapshot begin = snapshotList.getFirst();
@@ -121,7 +125,7 @@ public class TransactionProducer {
e.printStackTrace();
}
}
- }, 10000, 10000);
+ }, 10000, 10000, TimeUnit.MILLISECONDS);
final TransactionListener transactionCheckListener = new
TransactionListenerImpl(statsBenchmark, config);
final TransactionMQProducer producer = new TransactionMQProducer(