This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 3a2b172  [ISSUE #3006]Replace ScheduledExecutorService instead of 
Timer to avoid affecting other tasks during exception (#3001)
3a2b172 is described below

commit 3a2b172660d489dad3365d777b995abafe50a64d
Author: zhenhe <[email protected]>
AuthorDate: Wed Aug 4 13:59:25 2021 +0800

    [ISSUE #3006]Replace ScheduledExecutorService instead of Timer to avoid 
affecting other tasks during exception (#3001)
    
    * 采用ScheduledExecutorService替代Timer,避免异常捕获时影响其他任务
    
    
多线程并行处理定时任务时,Timer运行多个TimeTask时,只要其中之一没有捕获抛出的异常,其它任务便会自动终止运行,使用ScheduledExecutorService则没有这个问题。
    
    * optimize imports
    
    * Add @Override annotation
    
    * Revert "Add @Override annotation"
    
    This reverts commit 3ddccd88022db33361a2af08b36b0c8f5d963f48.
    
    Co-authored-by: wuzh <[email protected]>
---
 .../rocketmq/example/benchmark/Consumer.java       | 30 ++++++++------
 .../rocketmq/example/benchmark/Producer.java       | 44 ++++++++++++---------
 .../example/benchmark/TransactionProducer.java     | 46 ++++++++++++----------
 3 files changed, 68 insertions(+), 52 deletions(-)

diff --git 
a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java 
b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
index d3ac36c..154e6ed 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
@@ -17,18 +17,11 @@
 
 package org.apache.rocketmq.example.benchmark;
 
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
 import org.apache.rocketmq.client.consumer.MessageSelector;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
@@ -42,6 +35,16 @@ import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.srvutil.ServerUtil;
 
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.TimerTask;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
 public class Consumer {
 
     public static void main(String[] args) throws MQClientException, 
IOException {
@@ -71,11 +74,12 @@ public class Consumer {
 
         final StatsBenchmarkConsumer statsBenchmarkConsumer = new 
StatsBenchmarkConsumer();
 
-        final Timer timer = new Timer("BenchmarkTimerThread", true);
+        ScheduledExecutorService executorService = new 
ScheduledThreadPoolExecutor(1,
+                new 
BasicThreadFactory.Builder().namingPattern("BenchmarkTimerThread-%d").daemon(true).build());
 
         final LinkedList<Long[]> snapshotList = new LinkedList<Long[]>();
 
-        timer.scheduleAtFixedRate(new TimerTask() {
+        executorService.scheduleAtFixedRate(new TimerTask() {
             @Override
             public void run() {
                 snapshotList.addLast(statsBenchmarkConsumer.createSnapshot());
@@ -83,9 +87,9 @@ public class Consumer {
                     snapshotList.removeFirst();
                 }
             }
-        }, 1000, 1000);
+        }, 1000, 1000, TimeUnit.MILLISECONDS);
 
-        timer.scheduleAtFixedRate(new TimerTask() {
+        executorService.scheduleAtFixedRate(new TimerTask() {
             private void printStats() {
                 if (snapshotList.size() >= 10) {
                     Long[] begin = snapshotList.getFirst();
@@ -116,7 +120,7 @@ public class Consumer {
                     e.printStackTrace();
                 }
             }
-        }, 10000, 10000);
+        }, 10000, 10000, TimeUnit.MILLISECONDS);
 
         RPCHook rpcHook = aclEnable ? AclClient.getAclRPCHook() : null;
         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group, 
rpcHook, new AllocateMessageQueueAveragely(), msgTraceEnable, null);
diff --git 
a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java 
b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
index 32d4b9f..b198a0f 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
@@ -16,32 +16,34 @@
  */
 package org.apache.rocketmq.example.benchmark;
 
-import java.io.UnsupportedEncodingException;
-import java.util.Arrays;
-import java.util.LinkedList;
-import java.util.Random;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.log.ClientLogger;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
-import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.apache.rocketmq.srvutil.ServerUtil;
 
+import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.Random;
+import java.util.TimerTask;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicLong;
+
 public class Producer {
 
     public static void main(String[] args) throws MQClientException, 
UnsupportedEncodingException {
@@ -73,7 +75,8 @@ public class Producer {
 
         final StatsBenchmarkProducer statsBenchmark = new 
StatsBenchmarkProducer();
 
-        final Timer timer = new Timer("BenchmarkTimerThread", true);
+        ScheduledExecutorService executorService = new 
ScheduledThreadPoolExecutor(1,
+                new 
BasicThreadFactory.Builder().namingPattern("BenchmarkTimerThread-%d").daemon(true).build());
 
         final LinkedList<Long[]> snapshotList = new LinkedList<Long[]>();
 
@@ -87,7 +90,7 @@ public class Producer {
             }
         }
 
-        timer.scheduleAtFixedRate(new TimerTask() {
+        executorService.scheduleAtFixedRate(new TimerTask() {
             @Override
             public void run() {
                 snapshotList.addLast(statsBenchmark.createSnapshot());
@@ -95,9 +98,9 @@ public class Producer {
                     snapshotList.removeFirst();
                 }
             }
-        }, 1000, 1000);
+        }, 1000, 1000, TimeUnit.MILLISECONDS);
 
-        timer.scheduleAtFixedRate(new TimerTask() {
+        executorService.scheduleAtFixedRate(new TimerTask() {
             private void printStats() {
                 if (snapshotList.size() >= 10) {
                     doPrintStats(snapshotList,  statsBenchmark, false);
@@ -112,7 +115,7 @@ public class Producer {
                     e.printStackTrace();
                 }
             }
-        }, 10000, 10000);
+        }, 10000, 10000, TimeUnit.MILLISECONDS);
 
         RPCHook rpcHook = aclEnable ? AclClient.getAclRPCHook() : null;
         final DefaultMQProducer producer = new 
DefaultMQProducer("benchmark_producer", rpcHook, msgTraceEnable, null);
@@ -224,7 +227,12 @@ public class Producer {
         try {
             sendThreadPool.shutdown();
             sendThreadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
-            timer.cancel();
+            executorService.shutdown();
+            try {
+                executorService.awaitTermination(5000, TimeUnit.MILLISECONDS);
+            } catch (InterruptedException e) {
+            }
+
             if (snapshotList.size() > 1) {
                 doPrintStats(snapshotList, statsBenchmark, true);
             } else {
diff --git 
a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
 
b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
index 1b511d8..c4f14a4 100644
--- 
a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
+++ 
b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
@@ -17,26 +17,11 @@
 
 package org.apache.rocketmq.example.benchmark;
 
-import java.io.UnsupportedEncodingException;
-import java.nio.ByteBuffer;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.LocalTransactionState;
 import org.apache.rocketmq.client.producer.SendResult;
@@ -48,6 +33,24 @@ import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.srvutil.ServerUtil;
 
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.TimerTask;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
 public class TransactionProducer {
     private static final long START_TIME = System.currentTimeMillis();
     private static final AtomicLong MSG_COUNT = new AtomicLong(0);
@@ -75,11 +78,12 @@ public class TransactionProducer {
 
         final StatsBenchmarkTProducer statsBenchmark = new 
StatsBenchmarkTProducer();
 
-        final Timer timer = new Timer("BenchmarkTimerThread", true);
+        ScheduledExecutorService executorService = new 
ScheduledThreadPoolExecutor(1,
+                new 
BasicThreadFactory.Builder().namingPattern("BenchmarkTimerThread-%d").daemon(true).build());
 
         final LinkedList<Snapshot> snapshotList = new LinkedList<>();
 
-        timer.scheduleAtFixedRate(new TimerTask() {
+        executorService.scheduleAtFixedRate(new TimerTask() {
             @Override
             public void run() {
                 snapshotList.addLast(statsBenchmark.createSnapshot());
@@ -87,9 +91,9 @@ public class TransactionProducer {
                     snapshotList.removeFirst();
                 }
             }
-        }, 1000, 1000);
+        }, 1000, 1000, TimeUnit.MILLISECONDS);
 
-        timer.scheduleAtFixedRate(new TimerTask() {
+        executorService.scheduleAtFixedRate(new TimerTask() {
             private void printStats() {
                 if (snapshotList.size() >= 10) {
                     Snapshot begin = snapshotList.getFirst();
@@ -121,7 +125,7 @@ public class TransactionProducer {
                     e.printStackTrace();
                 }
             }
-        }, 10000, 10000);
+        }, 10000, 10000, TimeUnit.MILLISECONDS);
 
         final TransactionListener transactionCheckListener = new 
TransactionListenerImpl(statsBenchmark, config);
         final TransactionMQProducer producer = new TransactionMQProducer(

Reply via email to