This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new b35f3e4 [ISSUE #3286] Replace Timer with ScheduledExecutorService
(#3287)
b35f3e4 is described below
commit b35f3e4459e8cf32116bb15133aaa27d5ff63cfb
Author: Git_Yang <[email protected]>
AuthorDate: Wed Nov 10 19:43:42 2021 +0800
[ISSUE #3286] Replace Timer with ScheduledExecutorService (#3287)
Signed-off-by: zhangyang21 <[email protected]>
---
.../store/schedule/ScheduleMessageService.java | 48 ++++++++++++----------
1 file changed, 26 insertions(+), 22 deletions(-)
diff --git
a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
index e0e7b95..c45287f 100644
---
a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
+++
b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
@@ -19,13 +19,15 @@ package org.apache.rocketmq.store.schedule;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.rocketmq.common.ConfigManager;
import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.TopicFilterType;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.topic.TopicValidator;
@@ -60,7 +62,8 @@ public class ScheduleMessageService extends ConfigManager {
new ConcurrentHashMap<Integer, Long>(32);
private final DefaultMessageStore defaultMessageStore;
private final AtomicBoolean started = new AtomicBoolean(false);
- private Timer timer;
+ private ScheduledExecutorService deliverExecutorService;
+ private int deliverThreadPoolNums =
Runtime.getRuntime().availableProcessors();
private MessageStore writeMessageStore;
private int maxDelayLevel;
@@ -113,7 +116,7 @@ public class ScheduleMessageService extends ConfigManager {
public void start() {
if (started.compareAndSet(false, true)) {
super.load();
- this.timer = new Timer("ScheduleMessageTimerThread", true);
+ this.deliverExecutorService = new
ScheduledThreadPoolExecutor(deliverThreadPoolNums, new
ThreadFactoryImpl("ScheduleMessageTimerThread_"));
for (Map.Entry<Integer, Long> entry :
this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
@@ -123,11 +126,11 @@ public class ScheduleMessageService extends ConfigManager
{
}
if (timeDelay != null) {
- this.timer.schedule(new
DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
+ this.deliverExecutorService.schedule(new
DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME,
TimeUnit.MILLISECONDS);
}
}
- this.timer.scheduleAtFixedRate(new TimerTask() {
+ this.deliverExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
@@ -139,16 +142,14 @@ public class ScheduleMessageService extends ConfigManager
{
log.error("scheduleAtFixedRate flush exception", e);
}
}
- }, 10000,
this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
+ }, 10000,
this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval(),
TimeUnit.MILLISECONDS);
}
}
public void shutdown() {
- if (this.started.compareAndSet(true, false)) {
- if (null != this.timer)
- this.timer.cancel();
+ if (this.started.compareAndSet(true, false) && null !=
this.deliverExecutorService) {
+ this.deliverExecutorService.shutdownNow();
}
-
}
public boolean isStarted() {
@@ -159,10 +160,12 @@ public class ScheduleMessageService extends ConfigManager
{
return maxDelayLevel;
}
+ @Override
public String encode() {
return this.encode(false);
}
+ @Override
public boolean load() {
boolean result = super.load();
result = result && this.parseDelayLevel();
@@ -223,6 +226,7 @@ public class ScheduleMessageService extends ConfigManager {
}
}
+ @Override
public String encode(final boolean prettyFormat) {
DelayOffsetSerializeWrapper delayOffsetSerializeWrapper = new
DelayOffsetSerializeWrapper();
delayOffsetSerializeWrapper.setOffsetTable(this.offsetTable);
@@ -261,7 +265,7 @@ public class ScheduleMessageService extends ConfigManager {
return true;
}
- class DeliverDelayedMessageTimerTask extends TimerTask {
+ class DeliverDelayedMessageTimerTask implements Runnable {
private final int delayLevel;
private final long offset;
@@ -279,8 +283,8 @@ public class ScheduleMessageService extends ConfigManager {
} catch (Exception e) {
// XXX: warn and notify me
log.error("ScheduleMessageService, executeOnTimeup exception",
e);
- ScheduleMessageService.this.timer.schedule(new
DeliverDelayedMessageTimerTask(
- this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);
+
ScheduleMessageService.this.deliverExecutorService.schedule(new
DeliverDelayedMessageTimerTask(
+ this.delayLevel, this.offset), DELAY_FOR_A_PERIOD,
TimeUnit.MILLISECONDS);
}
}
@@ -372,9 +376,9 @@ public class ScheduleMessageService extends ConfigManager {
log.error(
"ScheduleMessageService, a
message time up, but reput it failed, topic: {} msgId {}",
msgExt.getTopic(),
msgExt.getMsgId());
-
ScheduleMessageService.this.timer.schedule(
+
ScheduleMessageService.this.deliverExecutorService.schedule(
new
DeliverDelayedMessageTimerTask(this.delayLevel,
- nextOffset),
DELAY_FOR_A_PERIOD);
+ nextOffset),
DELAY_FOR_A_PERIOD, TimeUnit.MILLISECONDS);
ScheduleMessageService.this.updateOffset(this.delayLevel,
nextOffset);
return;
@@ -388,17 +392,17 @@ public class ScheduleMessageService extends ConfigManager
{
}
}
} else {
- ScheduleMessageService.this.timer.schedule(
+
ScheduleMessageService.this.deliverExecutorService.schedule(
new
DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
- countdown);
+ countdown, TimeUnit.MILLISECONDS);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
}
} // end of for
nextOffset = offset + (i /
ConsumeQueue.CQ_STORE_UNIT_SIZE);
- ScheduleMessageService.this.timer.schedule(new
DeliverDelayedMessageTimerTask(
- this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
+
ScheduleMessageService.this.deliverExecutorService.schedule(new
DeliverDelayedMessageTimerTask(
+ this.delayLevel, nextOffset), DELAY_FOR_A_WHILE,
TimeUnit.MILLISECONDS);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
} finally {
@@ -424,8 +428,8 @@ public class ScheduleMessageService extends ConfigManager {
}
} // end of if (cq != null)
- ScheduleMessageService.this.timer.schedule(new
DeliverDelayedMessageTimerTask(this.delayLevel,
- failScheduleOffset), DELAY_FOR_A_WHILE);
+ ScheduleMessageService.this.deliverExecutorService.schedule(new
DeliverDelayedMessageTimerTask(this.delayLevel,
+ failScheduleOffset), DELAY_FOR_A_WHILE, TimeUnit.MILLISECONDS);
}
private MessageExtBrokerInner messageTimeup(MessageExt msgExt) {