This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 17aa86c61 [ISSUE #6324] Improving compact topic stability (#6353)
17aa86c61 is described below
commit 17aa86c618663743d48844337000ec18c1d5b327
Author: fuyou001 <[email protected]>
AuthorDate: Tue Mar 28 10:41:23 2023 +0800
[ISSUE #6324] Improving compact topic stability (#6353)
Co-authored-by: RongtongJin <[email protected]>
---
.../apache/rocketmq/store/config/MessageStoreConfig.java | 2 +-
.../org/apache/rocketmq/store/kv/CompactionStore.java | 11 +++++------
.../tools/command/message/DumpCompactionLogCommand.java | 16 ++++++++++------
3 files changed, 16 insertions(+), 13 deletions(-)
diff --git
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index 6243d3974..f5ad70543 100644
---
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -59,7 +59,7 @@ public class MessageStoreConfig {
private int maxOffsetMapSize = 100 * 1024 * 1024;
- private int compactionThreadNum = 0;
+ private int compactionThreadNum = 6;
private boolean enableCompaction = true;
diff --git
a/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java
b/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java
index 1142c8153..c9bcff182 100644
--- a/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.store.kv;
+import java.util.Random;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -72,11 +73,8 @@ public class CompactionStore {
this.compactionLogPath = Paths.get(compactionPath,
COMPACTION_LOG_DIR).toString();
this.compactionCqPath = Paths.get(compactionPath,
COMPACTION_CQ_DIR).toString();
this.positionMgr = new CompactionPositionMgr(compactionPath);
- if (config.getCompactionThreadNum() <= 0) {
- this.compactionThreadNum =
Runtime.getRuntime().availableProcessors();
- } else {
- this.compactionThreadNum = config.getCompactionThreadNum();
- }
+ this.compactionThreadNum =
Math.min(Runtime.getRuntime().availableProcessors(),
config.getCompactionThreadNum());
+
this.compactionSchedule =
Executors.newScheduledThreadPool(this.compactionThreadNum,
new ThreadFactoryImpl("compactionSchedule_"));
this.offsetMapSize = config.getMaxOffsetMapSize() /
compactionThreadNum;
@@ -151,7 +149,8 @@ public class CompactionStore {
try {
v = new CompactionLog(defaultMessageStore, this, topic,
queueId);
v.load(true);
- compactionSchedule.scheduleWithFixedDelay(v::doCompaction,
compactionInterval, compactionInterval, TimeUnit.MILLISECONDS);
+ int randomDelay = 1000 + new
Random(System.currentTimeMillis()).nextInt(compactionInterval);
+ compactionSchedule.scheduleWithFixedDelay(v::doCompaction,
compactionInterval + randomDelay, compactionInterval + randomDelay,
TimeUnit.MILLISECONDS);
} catch (IOException e) {
log.error("create compactionLog exception: ", e);
return null;
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/message/DumpCompactionLogCommand.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/message/DumpCompactionLogCommand.java
index b1c8c33cb..ae6d9bdcf 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/command/message/DumpCompactionLogCommand.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/command/message/DumpCompactionLogCommand.java
@@ -86,12 +86,16 @@ public class DumpCompactionLogCommand implements SubCommand
{
bb.rewind();
}
- MessageExt messageExt = MessageDecoder.decode(bb, false,
false);
- if (messageExt == null) {
- break;
- } else {
- current += size;
- System.out.printf(messageExt + "\n");
+ try {
+ MessageExt messageExt = MessageDecoder.decode(bb,
false, false);
+ if (messageExt == null) {
+ break;
+ } else {
+ current += size;
+ System.out.printf(messageExt + "\n");
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
}
}