This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 0f1ff25512 [ISSUE #6881] Fix scheduled messages are replayed bug
(#6882)
0f1ff25512 is described below
commit 0f1ff255128f723402298560b757d92784921316
Author: gaoyf <[email protected]>
AuthorDate: Sun Jun 11 10:31:30 2023 +0800
[ISSUE #6881] Fix scheduled messages are replayed bug (#6882)
* fix scheduled messages are replayed bug
* scheduledPersistService reset to final and constructed in the constructor
---
.../broker/schedule/ScheduleMessageService.java | 17 ++++-------------
1 file changed, 4 insertions(+), 13 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
index 196b78f83c..2a4ace0985 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
@@ -92,14 +92,7 @@ public class ScheduleMessageService extends ConfigManager {
this.brokerController = brokerController;
this.enableAsyncDeliver =
brokerController.getMessageStoreConfig().isEnableScheduleAsyncDeliver();
scheduledPersistService = new ScheduledThreadPoolExecutor(1,
- new ThreadFactoryImpl("ScheduleMessageServicePersistThread", true,
brokerController.getBrokerConfig()));
- scheduledPersistService.scheduleAtFixedRate(() -> {
- try {
- ScheduleMessageService.this.persist();
- } catch (Throwable e) {
- log.error("scheduleAtFixedRate flush exception", e);
- }
- }, 10000,
this.brokerController.getMessageStoreConfig().getFlushDelayOffsetInterval(),
TimeUnit.MILLISECONDS);
+ new ThreadFactoryImpl("ScheduleMessageServicePersistThread",
true, brokerController.getBrokerConfig()));
}
public static int queueId2DelayLevel(final int queueId) {
@@ -161,15 +154,13 @@ public class ScheduleMessageService extends ConfigManager
{
}
}
- this.deliverExecutorService.scheduleAtFixedRate(() -> {
+ scheduledPersistService.scheduleAtFixedRate(() -> {
try {
- if (started.get()) {
- ScheduleMessageService.this.persist();
- }
+ ScheduleMessageService.this.persist();
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
}
- }, 10000,
this.brokerController.getMessageStore().getMessageStoreConfig().getFlushDelayOffsetInterval(),
TimeUnit.MILLISECONDS);
+ }, 10000,
this.brokerController.getMessageStoreConfig().getFlushDelayOffsetInterval(),
TimeUnit.MILLISECONDS);
}
}