This is an automated email from the ASF dual-hosted git repository.
fuyou pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new cbd1d3819 flush() in compactionTopic (#6498)
cbd1d3819 is described below
commit cbd1d38199095a44a5a114438e82d0f47be49849
Author: guyinyou <[email protected]>
AuthorDate: Wed Mar 29 11:46:03 2023 +0800
flush() in compactionTopic (#6498)
Co-authored-by: guyinyou <[email protected]>
---
.../java/org/apache/rocketmq/store/DefaultMessageStore.java | 2 +-
.../main/java/org/apache/rocketmq/store/kv/CompactionLog.java | 11 +++++++++++
.../java/org/apache/rocketmq/store/kv/CompactionStore.java | 9 +++++++++
3 files changed, 21 insertions(+), 1 deletion(-)
diff --git
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index dcdae008c..7798e89b8 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -2580,7 +2580,7 @@ public class DefaultMessageStore implements MessageStore {
}
if (messageStoreConfig.isEnableCompaction()) {
- compactionStore.flushCQ(flushConsumeQueueLeastPages);
+ compactionStore.flush(flushConsumeQueueLeastPages);
}
if (0 == flushConsumeQueueLeastPages) {
diff --git
a/store/src/main/java/org/apache/rocketmq/store/kv/CompactionLog.java
b/store/src/main/java/org/apache/rocketmq/store/kv/CompactionLog.java
index 793f6203e..39a7a6d38 100644
--- a/store/src/main/java/org/apache/rocketmq/store/kv/CompactionLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/kv/CompactionLog.java
@@ -703,6 +703,7 @@ public class CompactionLog {
src.getMappedFiles().forEach(mappedFile -> {
try {
+ mappedFile.flush(0);
mappedFile.moveToParent();
} catch (IOException e) {
log.error("move file {} to parent directory exception: ",
mappedFile.getFileName());
@@ -742,6 +743,7 @@ public class CompactionLog {
fileListToDelete.forEach(MappedFile::renameToDelete);
compactMq.getMappedFiles().forEach(mappedFile -> {
try {
+ mappedFile.flush(0);
mappedFile.moveToParent();
} catch (IOException e) {
log.error("move consume queue file {} to parent directory
exception: ", mappedFile.getFileName(), e);
@@ -775,6 +777,15 @@ public class CompactionLog {
// return compactionScq;
// }
+ public void flush(int flushLeastPages) {
+ this.flushLog(flushLeastPages);
+ this.flushCQ(flushLeastPages);
+ }
+
+ public void flushLog(int flushLeastPages) {
+ getLog().flush(flushLeastPages);
+ }
+
public void flushCQ(int flushLeastPages) {
getCQ().flush(flushLeastPages);
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java
b/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java
index c9bcff182..b492cd5f3 100644
--- a/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java
@@ -180,6 +180,14 @@ public class CompactionStore {
}
+ public void flush(int flushLeastPages) {
+ compactionLogTable.values().forEach(log -> log.flush(flushLeastPages));
+ }
+
+ public void flushLog(int flushLeastPages) {
+ compactionLogTable.values().forEach(log ->
log.flushLog(flushLeastPages));
+ }
+
public void flushCQ(int flushLeastPages) {
compactionLogTable.values().forEach(log ->
log.flushCQ(flushLeastPages));
}
@@ -189,6 +197,7 @@ public class CompactionStore {
}
public void shutdown() {
+ this.flush(0);
positionMgr.persist();
compactionSchedule.shutdown();
try {