This is an automated email from the ASF dual-hosted git repository.

fuyou pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new cbd1d3819 flush() in compactionTopic (#6498)
cbd1d3819 is described below

commit cbd1d38199095a44a5a114438e82d0f47be49849
Author: guyinyou <[email protected]>
AuthorDate: Wed Mar 29 11:46:03 2023 +0800

    flush() in compactionTopic (#6498)
    
    Co-authored-by: guyinyou <[email protected]>
---
 .../java/org/apache/rocketmq/store/DefaultMessageStore.java   |  2 +-
 .../main/java/org/apache/rocketmq/store/kv/CompactionLog.java | 11 +++++++++++
 .../java/org/apache/rocketmq/store/kv/CompactionStore.java    |  9 +++++++++
 3 files changed, 21 insertions(+), 1 deletion(-)

diff --git 
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index dcdae008c..7798e89b8 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -2580,7 +2580,7 @@ public class DefaultMessageStore implements MessageStore {
             }
 
             if (messageStoreConfig.isEnableCompaction()) {
-                compactionStore.flushCQ(flushConsumeQueueLeastPages);
+                compactionStore.flush(flushConsumeQueueLeastPages);
             }
 
             if (0 == flushConsumeQueueLeastPages) {
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/kv/CompactionLog.java 
b/store/src/main/java/org/apache/rocketmq/store/kv/CompactionLog.java
index 793f6203e..39a7a6d38 100644
--- a/store/src/main/java/org/apache/rocketmq/store/kv/CompactionLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/kv/CompactionLog.java
@@ -703,6 +703,7 @@ public class CompactionLog {
 
         src.getMappedFiles().forEach(mappedFile -> {
             try {
+                mappedFile.flush(0);
                 mappedFile.moveToParent();
             } catch (IOException e) {
                 log.error("move file {} to parent directory exception: ", 
mappedFile.getFileName());
@@ -742,6 +743,7 @@ public class CompactionLog {
         fileListToDelete.forEach(MappedFile::renameToDelete);
         compactMq.getMappedFiles().forEach(mappedFile -> {
             try {
+                mappedFile.flush(0);
                 mappedFile.moveToParent();
             } catch (IOException e) {
                 log.error("move consume queue file {} to parent directory 
exception: ", mappedFile.getFileName(), e);
@@ -775,6 +777,15 @@ public class CompactionLog {
 //        return compactionScq;
 //    }
 
+    public void flush(int flushLeastPages) {
+        this.flushLog(flushLeastPages);
+        this.flushCQ(flushLeastPages);
+    }
+
+    public void flushLog(int flushLeastPages) {
+        getLog().flush(flushLeastPages);
+    }
+
     public void flushCQ(int flushLeastPages) {
         getCQ().flush(flushLeastPages);
     }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java 
b/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java
index c9bcff182..b492cd5f3 100644
--- a/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java
@@ -180,6 +180,14 @@ public class CompactionStore {
 
     }
 
+    public void flush(int flushLeastPages) {
+        compactionLogTable.values().forEach(log -> log.flush(flushLeastPages));
+    }
+
+    public void flushLog(int flushLeastPages) {
+        compactionLogTable.values().forEach(log -> 
log.flushLog(flushLeastPages));
+    }
+
     public void flushCQ(int flushLeastPages) {
         compactionLogTable.values().forEach(log -> 
log.flushCQ(flushLeastPages));
     }
@@ -189,6 +197,7 @@ public class CompactionStore {
     }
 
     public void shutdown() {
+        this.flush(0);
         positionMgr.persist();
         compactionSchedule.shutdown();
         try {

Reply via email to