This is an automated email from the ASF dual-hosted git repository.

fuyou pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 7dbcdc8f9 [ISSUE #6372] new scheduledExecutor clean consume queue 
(#6376)
7dbcdc8f9 is described below

commit 7dbcdc8f9665141ccdf4b2e2492c92a4cf90ff58
Author: fuyou001 <[email protected]>
AuthorDate: Fri Mar 17 16:48:50 2023 +0800

    [ISSUE #6372] new scheduledExecutor clean consume queue (#6376)
---
 .../apache/rocketmq/store/DefaultMessageStore.java   | 20 +++++++++++++++++++-
 1 file changed, 19 insertions(+), 1 deletion(-)

diff --git 
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 9f3c79b94..403cb9ad1 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -200,6 +200,9 @@ public class DefaultMessageStore implements MessageStore {
 
     private long stateMachineVersion = 0L;
 
+    private final ScheduledExecutorService scheduledCleanQueueExecutorService =
+        Executors.newSingleThreadScheduledExecutor(new 
ThreadFactoryImpl("StoreCleanQueueScheduledThread"));
+
     public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, 
final BrokerStatsManager brokerStatsManager,
         final MessageArrivingListener messageArrivingListener, final 
BrokerConfig brokerConfig) throws IOException {
         this.messageArrivingListener = messageArrivingListener;
@@ -456,7 +459,11 @@ public class DefaultMessageStore implements MessageStore {
             this.shutdown = true;
 
             this.scheduledExecutorService.shutdown();
+            this.scheduledCleanQueueExecutorService.shutdown();
+
             try {
+                this.scheduledExecutorService.awaitTermination(3, 
TimeUnit.SECONDS);
+                this.scheduledCleanQueueExecutorService.awaitTermination(3, 
TimeUnit.SECONDS);
                 Thread.sleep(1000 * 3);
             } catch (InterruptedException e) {
                 LOGGER.error("shutdown Exception, ", e);
@@ -1771,6 +1778,14 @@ public class DefaultMessageStore implements MessageStore 
{
             }
         }, 1, 1, TimeUnit.SECONDS);
 
+        this.scheduledCleanQueueExecutorService.scheduleAtFixedRate(new 
Runnable() {
+            @Override
+            public void run() {
+                DefaultMessageStore.this.cleanQueueFilesPeriodically();
+            }
+        }, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), 
TimeUnit.MILLISECONDS);
+
+
         // this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
         // @Override
         // public void run() {
@@ -1781,8 +1796,11 @@ public class DefaultMessageStore implements MessageStore 
{
 
     private void cleanFilesPeriodically() {
         this.cleanCommitLogService.run();
-        this.cleanConsumeQueueService.run();
+    }
+
+    private void cleanQueueFilesPeriodically() {
         this.correctLogicOffsetService.run();
+        this.cleanConsumeQueueService.run();
     }
 
     private void checkSelf() {

Reply via email to