This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new b35f3e4  [ISSUE #3286] Replace Timer with ScheduledExecutorService 
(#3287)
b35f3e4 is described below

commit b35f3e4459e8cf32116bb15133aaa27d5ff63cfb
Author: Git_Yang <[email protected]>
AuthorDate: Wed Nov 10 19:43:42 2021 +0800

    [ISSUE #3286] Replace Timer with ScheduledExecutorService (#3287)
    
    Signed-off-by: zhangyang21 <[email protected]>
---
 .../store/schedule/ScheduleMessageService.java     | 48 ++++++++++++----------
 1 file changed, 26 insertions(+), 22 deletions(-)

diff --git 
a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
 
b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
index e0e7b95..c45287f 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
@@ -19,13 +19,15 @@ package org.apache.rocketmq.store.schedule;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.rocketmq.common.ConfigManager;
 import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.TopicFilterType;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.topic.TopicValidator;
@@ -60,7 +62,8 @@ public class ScheduleMessageService extends ConfigManager {
         new ConcurrentHashMap<Integer, Long>(32);
     private final DefaultMessageStore defaultMessageStore;
     private final AtomicBoolean started = new AtomicBoolean(false);
-    private Timer timer;
+    private ScheduledExecutorService deliverExecutorService;
+    private int deliverThreadPoolNums = 
Runtime.getRuntime().availableProcessors();
     private MessageStore writeMessageStore;
     private int maxDelayLevel;
 
@@ -113,7 +116,7 @@ public class ScheduleMessageService extends ConfigManager {
     public void start() {
         if (started.compareAndSet(false, true)) {
             super.load();
-            this.timer = new Timer("ScheduleMessageTimerThread", true);
+            this.deliverExecutorService = new 
ScheduledThreadPoolExecutor(deliverThreadPoolNums, new 
ThreadFactoryImpl("ScheduleMessageTimerThread_"));
             for (Map.Entry<Integer, Long> entry : 
this.delayLevelTable.entrySet()) {
                 Integer level = entry.getKey();
                 Long timeDelay = entry.getValue();
@@ -123,11 +126,11 @@ public class ScheduleMessageService extends ConfigManager 
{
                 }
 
                 if (timeDelay != null) {
-                    this.timer.schedule(new 
DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
+                    this.deliverExecutorService.schedule(new 
DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME, 
TimeUnit.MILLISECONDS);
                 }
             }
 
-            this.timer.scheduleAtFixedRate(new TimerTask() {
+            this.deliverExecutorService.scheduleAtFixedRate(new Runnable() {
 
                 @Override
                 public void run() {
@@ -139,16 +142,14 @@ public class ScheduleMessageService extends ConfigManager 
{
                         log.error("scheduleAtFixedRate flush exception", e);
                     }
                 }
-            }, 10000, 
this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
+            }, 10000, 
this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval(), 
TimeUnit.MILLISECONDS);
         }
     }
 
     public void shutdown() {
-        if (this.started.compareAndSet(true, false)) {
-            if (null != this.timer)
-                this.timer.cancel();
+        if (this.started.compareAndSet(true, false) && null != 
this.deliverExecutorService) {
+            this.deliverExecutorService.shutdownNow();
         }
-
     }
 
     public boolean isStarted() {
@@ -159,10 +160,12 @@ public class ScheduleMessageService extends ConfigManager 
{
         return maxDelayLevel;
     }
 
+    @Override
     public String encode() {
         return this.encode(false);
     }
 
+    @Override
     public boolean load() {
         boolean result = super.load();
         result = result && this.parseDelayLevel();
@@ -223,6 +226,7 @@ public class ScheduleMessageService extends ConfigManager {
         }
     }
 
+    @Override
     public String encode(final boolean prettyFormat) {
         DelayOffsetSerializeWrapper delayOffsetSerializeWrapper = new 
DelayOffsetSerializeWrapper();
         delayOffsetSerializeWrapper.setOffsetTable(this.offsetTable);
@@ -261,7 +265,7 @@ public class ScheduleMessageService extends ConfigManager {
         return true;
     }
 
-    class DeliverDelayedMessageTimerTask extends TimerTask {
+    class DeliverDelayedMessageTimerTask implements Runnable {
         private final int delayLevel;
         private final long offset;
 
@@ -279,8 +283,8 @@ public class ScheduleMessageService extends ConfigManager {
             } catch (Exception e) {
                 // XXX: warn and notify me
                 log.error("ScheduleMessageService, executeOnTimeup exception", 
e);
-                ScheduleMessageService.this.timer.schedule(new 
DeliverDelayedMessageTimerTask(
-                    this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);
+                
ScheduleMessageService.this.deliverExecutorService.schedule(new 
DeliverDelayedMessageTimerTask(
+                    this.delayLevel, this.offset), DELAY_FOR_A_PERIOD, 
TimeUnit.MILLISECONDS);
             }
         }
 
@@ -372,9 +376,9 @@ public class ScheduleMessageService extends ConfigManager {
                                             log.error(
                                                 "ScheduleMessageService, a 
message time up, but reput it failed, topic: {} msgId {}",
                                                 msgExt.getTopic(), 
msgExt.getMsgId());
-                                            
ScheduleMessageService.this.timer.schedule(
+                                            
ScheduleMessageService.this.deliverExecutorService.schedule(
                                                 new 
DeliverDelayedMessageTimerTask(this.delayLevel,
-                                                    nextOffset), 
DELAY_FOR_A_PERIOD);
+                                                    nextOffset), 
DELAY_FOR_A_PERIOD, TimeUnit.MILLISECONDS);
                                             
ScheduleMessageService.this.updateOffset(this.delayLevel,
                                                 nextOffset);
                                             return;
@@ -388,17 +392,17 @@ public class ScheduleMessageService extends ConfigManager 
{
                                     }
                                 }
                             } else {
-                                ScheduleMessageService.this.timer.schedule(
+                                
ScheduleMessageService.this.deliverExecutorService.schedule(
                                     new 
DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
-                                    countdown);
+                                    countdown, TimeUnit.MILLISECONDS);
                                 
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                                 return;
                             }
                         } // end of for
 
                         nextOffset = offset + (i / 
ConsumeQueue.CQ_STORE_UNIT_SIZE);
-                        ScheduleMessageService.this.timer.schedule(new 
DeliverDelayedMessageTimerTask(
-                            this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
+                        
ScheduleMessageService.this.deliverExecutorService.schedule(new 
DeliverDelayedMessageTimerTask(
+                            this.delayLevel, nextOffset), DELAY_FOR_A_WHILE, 
TimeUnit.MILLISECONDS);
                         
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                         return;
                     } finally {
@@ -424,8 +428,8 @@ public class ScheduleMessageService extends ConfigManager {
                 }
             } // end of if (cq != null)
 
-            ScheduleMessageService.this.timer.schedule(new 
DeliverDelayedMessageTimerTask(this.delayLevel,
-                failScheduleOffset), DELAY_FOR_A_WHILE);
+            ScheduleMessageService.this.deliverExecutorService.schedule(new 
DeliverDelayedMessageTimerTask(this.delayLevel,
+                failScheduleOffset), DELAY_FOR_A_WHILE, TimeUnit.MILLISECONDS);
         }
 
         private MessageExtBrokerInner messageTimeup(MessageExt msgExt) {

Reply via email to