This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 17aa86c61 [ISSUE #6324] Improving compact topic stability (#6353)
17aa86c61 is described below

commit 17aa86c618663743d48844337000ec18c1d5b327
Author: fuyou001 <[email protected]>
AuthorDate: Tue Mar 28 10:41:23 2023 +0800

    [ISSUE #6324] Improving compact topic stability (#6353)
    
    Co-authored-by: RongtongJin <[email protected]>
---
 .../apache/rocketmq/store/config/MessageStoreConfig.java |  2 +-
 .../org/apache/rocketmq/store/kv/CompactionStore.java    | 11 +++++------
 .../tools/command/message/DumpCompactionLogCommand.java  | 16 ++++++++++------
 3 files changed, 16 insertions(+), 13 deletions(-)

diff --git 
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java 
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index 6243d3974..f5ad70543 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -59,7 +59,7 @@ public class MessageStoreConfig {
 
     private int maxOffsetMapSize = 100 * 1024 * 1024;
 
-    private int compactionThreadNum = 0;
+    private int compactionThreadNum = 6;
 
     private boolean enableCompaction = true;
 
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java 
b/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java
index 1142c8153..c9bcff182 100644
--- a/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java
@@ -16,6 +16,7 @@
  */
 package org.apache.rocketmq.store.kv;
 
+import java.util.Random;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -72,11 +73,8 @@ public class CompactionStore {
         this.compactionLogPath = Paths.get(compactionPath, 
COMPACTION_LOG_DIR).toString();
         this.compactionCqPath = Paths.get(compactionPath, 
COMPACTION_CQ_DIR).toString();
         this.positionMgr = new CompactionPositionMgr(compactionPath);
-        if (config.getCompactionThreadNum() <= 0) {
-            this.compactionThreadNum = 
Runtime.getRuntime().availableProcessors();
-        } else {
-            this.compactionThreadNum = config.getCompactionThreadNum();
-        }
+        this.compactionThreadNum = 
Math.min(Runtime.getRuntime().availableProcessors(), 
config.getCompactionThreadNum());
+
         this.compactionSchedule = 
Executors.newScheduledThreadPool(this.compactionThreadNum,
             new ThreadFactoryImpl("compactionSchedule_"));
         this.offsetMapSize = config.getMaxOffsetMapSize() / 
compactionThreadNum;
@@ -151,7 +149,8 @@ public class CompactionStore {
                 try {
                     v = new CompactionLog(defaultMessageStore, this, topic, 
queueId);
                     v.load(true);
-                    compactionSchedule.scheduleWithFixedDelay(v::doCompaction, 
compactionInterval, compactionInterval, TimeUnit.MILLISECONDS);
+                    int randomDelay = 1000 + new 
Random(System.currentTimeMillis()).nextInt(compactionInterval);
+                    compactionSchedule.scheduleWithFixedDelay(v::doCompaction, 
compactionInterval + randomDelay, compactionInterval + randomDelay, 
TimeUnit.MILLISECONDS);
                 } catch (IOException e) {
                     log.error("create compactionLog exception: ", e);
                     return null;
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/command/message/DumpCompactionLogCommand.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/command/message/DumpCompactionLogCommand.java
index b1c8c33cb..ae6d9bdcf 100644
--- 
a/tools/src/main/java/org/apache/rocketmq/tools/command/message/DumpCompactionLogCommand.java
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/command/message/DumpCompactionLogCommand.java
@@ -86,12 +86,16 @@ public class DumpCompactionLogCommand implements SubCommand 
{
                         bb.rewind();
                     }
 
-                    MessageExt messageExt = MessageDecoder.decode(bb, false, 
false);
-                    if (messageExt == null) {
-                        break;
-                    } else {
-                        current += size;
-                        System.out.printf(messageExt + "\n");
+                    try {
+                        MessageExt messageExt = MessageDecoder.decode(bb, 
false, false);
+                        if (messageExt == null) {
+                            break;
+                        } else {
+                            current += size;
+                            System.out.printf(messageExt + "\n");
+                        }
+                    } catch (Exception e) {
+                        e.printStackTrace();
                     }
                 }
 

Reply via email to