This is an automated email from the ASF dual-hosted git repository.
fuyou pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 7dbcdc8f9 [ISSUE #6372] new scheduledExecutor clean consume queue
(#6376)
7dbcdc8f9 is described below
commit 7dbcdc8f9665141ccdf4b2e2492c92a4cf90ff58
Author: fuyou001 <[email protected]>
AuthorDate: Fri Mar 17 16:48:50 2023 +0800
[ISSUE #6372] new scheduledExecutor clean consume queue (#6376)
---
.../apache/rocketmq/store/DefaultMessageStore.java | 20 +++++++++++++++++++-
1 file changed, 19 insertions(+), 1 deletion(-)
diff --git
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 9f3c79b94..403cb9ad1 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -200,6 +200,9 @@ public class DefaultMessageStore implements MessageStore {
private long stateMachineVersion = 0L;
+ private final ScheduledExecutorService scheduledCleanQueueExecutorService =
+ Executors.newSingleThreadScheduledExecutor(new
ThreadFactoryImpl("StoreCleanQueueScheduledThread"));
+
public DefaultMessageStore(final MessageStoreConfig messageStoreConfig,
final BrokerStatsManager brokerStatsManager,
final MessageArrivingListener messageArrivingListener, final
BrokerConfig brokerConfig) throws IOException {
this.messageArrivingListener = messageArrivingListener;
@@ -456,7 +459,11 @@ public class DefaultMessageStore implements MessageStore {
this.shutdown = true;
this.scheduledExecutorService.shutdown();
+ this.scheduledCleanQueueExecutorService.shutdown();
+
try {
+ this.scheduledExecutorService.awaitTermination(3,
TimeUnit.SECONDS);
+ this.scheduledCleanQueueExecutorService.awaitTermination(3,
TimeUnit.SECONDS);
Thread.sleep(1000 * 3);
} catch (InterruptedException e) {
LOGGER.error("shutdown Exception, ", e);
@@ -1771,6 +1778,14 @@ public class DefaultMessageStore implements MessageStore
{
}
}, 1, 1, TimeUnit.SECONDS);
+ this.scheduledCleanQueueExecutorService.scheduleAtFixedRate(new
Runnable() {
+ @Override
+ public void run() {
+ DefaultMessageStore.this.cleanQueueFilesPeriodically();
+ }
+ }, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(),
TimeUnit.MILLISECONDS);
+
+
// this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
// @Override
// public void run() {
@@ -1781,8 +1796,11 @@ public class DefaultMessageStore implements MessageStore
{
private void cleanFilesPeriodically() {
this.cleanCommitLogService.run();
- this.cleanConsumeQueueService.run();
+ }
+
+ private void cleanQueueFilesPeriodically() {
this.correctLogicOffsetService.run();
+ this.cleanConsumeQueueService.run();
}
private void checkSelf() {