This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch 5.0.0-alpha
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-alpha by this push:
new 7be401e [ISSUE #3708] add CorrectLogicOffsetService to periodically
correct min logic offset (#3722)
7be401e is described below
commit 7be401e7774ac09d07edef6edb79a3b9c0636392
Author: Hongjian Fei <[email protected]>
AuthorDate: Mon Jan 10 18:44:12 2022 +0800
[ISSUE #3708] add CorrectLogicOffsetService to periodically correct min
logic offset (#3722)
* [ISSUE #3708] add CorrectLogicOffsetService to periodically correct min
logic offset; refactor QueueOffsetAssigner.
* Mock getDiskSpaceWarningLevelRatio and getDiskSpaceCleanForciblyRatio to
get around configuration protection in unit-test.
* Fix check style.
---
.../apache/rocketmq/common/attribute/CQType.java | 3 +-
.../org/apache/rocketmq/store/ConsumeQueue.java | 9 +-
.../apache/rocketmq/store/DefaultMessageStore.java | 124 +++++++++++++++++++++
.../rocketmq/store/queue/BatchConsumeQueue.java | 9 +-
.../rocketmq/store/queue/ConsumeQueueStore.java | 2 +-
.../rocketmq/store/queue/QueueOffsetAssigner.java | 30 +++--
.../store/DefaultMessageStoreCleanFilesTest.java | 21 ++++
7 files changed, 173 insertions(+), 25 deletions(-)
diff --git
a/common/src/main/java/org/apache/rocketmq/common/attribute/CQType.java
b/common/src/main/java/org/apache/rocketmq/common/attribute/CQType.java
index 6bd6ad2..73ef218 100644
--- a/common/src/main/java/org/apache/rocketmq/common/attribute/CQType.java
+++ b/common/src/main/java/org/apache/rocketmq/common/attribute/CQType.java
@@ -19,6 +19,5 @@ package org.apache.rocketmq.common.attribute;
public enum CQType {
SimpleCQ,
- BatchCQ,
- MillionCQ;
+ BatchCQ
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
index 7763a0f..a1fc870 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
@@ -18,7 +18,6 @@ package org.apache.rocketmq.store;
import java.io.File;
import java.nio.ByteBuffer;
-import java.util.HashMap;
import java.util.List;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
@@ -443,12 +442,8 @@ public class ConsumeQueue implements
ConsumeQueueInterface, FileQueueLifeCycle {
@Override
public void assignQueueOffset(QueueOffsetAssigner queueOffsetAssigner,
MessageExtBrokerInner msg, short messageNum) {
String topicQueueKey = getTopic() + "-" + getQueueId();
- HashMap<String, Long> topicQueueTable =
queueOffsetAssigner.getTopicQueueTable();
-
- long topicOffset = topicQueueTable.computeIfAbsent(topicQueueKey, k ->
0L);
- topicQueueTable.put(topicQueueKey, topicOffset + messageNum);
-
- msg.setQueueOffset(topicOffset);
+ long queueOffset =
queueOffsetAssigner.assignQueueOffset(topicQueueKey, messageNum);
+ msg.setQueueOffset(queueOffset);
}
private boolean putMessagePositionInfo(final long offset, final int size,
final long tagsCode,
diff --git
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index da47be6..76165d9 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -32,6 +32,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -49,6 +50,7 @@ import org.apache.rocketmq.common.SystemClock;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.attribute.CQType;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
@@ -94,6 +96,8 @@ public class DefaultMessageStore implements MessageStore {
private final CleanConsumeQueueService cleanConsumeQueueService;
+ private final CorrectLogicOffsetService correctLogicOffsetService;
+
private final IndexService indexService;
private final AllocateMappedFileService allocateMappedFileService;
@@ -156,6 +160,7 @@ public class DefaultMessageStore implements MessageStore {
this.flushConsumeQueueService = new FlushConsumeQueueService();
this.cleanCommitLogService = new CleanCommitLogService();
this.cleanConsumeQueueService = new CleanConsumeQueueService();
+ this.correctLogicOffsetService = new CorrectLogicOffsetService();
this.storeStatsService = new StoreStatsService();
this.indexService = new IndexService(this);
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
@@ -1351,6 +1356,8 @@ public class DefaultMessageStore implements MessageStore {
long deleteCount = 0L;
deleteCount += this.cleanCommitLogService.run();
deleteCount += this.cleanConsumeQueueService.run();
+
+ this.correctLogicOffsetService.run();
return deleteCount;
}
@@ -1879,6 +1886,123 @@ public class DefaultMessageStore implements
MessageStore {
}
}
+ class CorrectLogicOffsetService {
+ private long lastForceCorrectTime = -1L;
+
+ public void run() {
+ try {
+ this.correctLogicMinOffset();
+ } catch (Throwable e) {
+ log.warn(this.getServiceName() + " service has exception. ",
e);
+ }
+ }
+
+ private boolean needCorrect(ConsumeQueueInterface logic, long
minPhyOffset, long lastForeCorrectTimeCurRun) {
+ if (logic == null) {
+ return false;
+ }
+ // If first exist and not available, it means first file may
destroy failed, delete it.
+ if
(DefaultMessageStore.this.consumeQueueStore.isFirstFileExist(logic) &&
!DefaultMessageStore.this.consumeQueueStore.isFirstFileAvailable(logic)) {
+ log.error("CorrectLogicOffsetService.needCorrect. first file
not available, trigger correct." +
+ " topic:{}, queue:{}, maxPhyOffset in
queue:{}, minPhyOffset " +
+ "in commit log:{}, minOffset in queue:{},
maxOffset in queue:{}, cqType:{}"
+ , logic.getTopic(), logic.getQueueId(),
logic.getMaxPhysicOffset()
+ , minPhyOffset, logic.getMinOffsetInQueue(),
logic.getMaxOffsetInQueue(), logic.getCQType());
+ return true;
+ }
+
+ // logic.getMaxPhysicOffset() or minPhyOffset = -1
+ // means there is no message in current queue, so no need to
correct.
+ if (logic.getMaxPhysicOffset() == -1 || minPhyOffset == -1) {
+ return false;
+ }
+
+ if (logic.getMaxPhysicOffset() < minPhyOffset) {
+ if (logic.getMinOffsetInQueue() < logic.getMaxOffsetInQueue())
{
+ log.error("CorrectLogicOffsetService.needCorrect. logic
max phy offset: {} is less than min phy offset: {}, " +
+ "but min offset: {} is less than max
offset: {}. topic:{}, queue:{}, cqType:{}."
+ , logic.getMaxPhysicOffset(), minPhyOffset,
logic.getMinOffsetInQueue()
+ , logic.getMaxOffsetInQueue(), logic.getTopic(),
logic.getQueueId(), logic.getCQType());
+ return true;
+ } else if (logic.getMinOffsetInQueue() ==
logic.getMaxOffsetInQueue()) {
+ return false;
+ } else {
+ log.error("CorrectLogicOffsetService.needCorrect. It
should not happen, logic max phy offset: {} is less than min phy offset: {}," +
+ " but min offset: {} is larger than max
offset: {}. topic:{}, queue:{}, cqType:{}"
+ , logic.getMaxPhysicOffset(), minPhyOffset,
logic.getMinOffsetInQueue()
+ , logic.getMaxOffsetInQueue(), logic.getTopic(),
logic.getQueueId(), logic.getCQType());
+ return false;
+ }
+ }
+ //the logic.getMaxPhysicOffset() >= minPhyOffset
+ int forceCorrectInterval =
DefaultMessageStore.this.getMessageStoreConfig().getCorrectLogicMinOffsetForceInterval();
+ if ((System.currentTimeMillis() - lastForeCorrectTimeCurRun) >
forceCorrectInterval) {
+ lastForceCorrectTime = System.currentTimeMillis();
+ CqUnit cqUnit = logic.getEarliestUnit();
+ if (cqUnit == null) {
+ if (logic.getMinOffsetInQueue() ==
logic.getMaxOffsetInQueue()) {
+ return false;
+ } else {
+ log.error("CorrectLogicOffsetService.needCorrect.
cqUnit is null, logic max phy offset: {} is greater than min phy offset: {}, " +
+ "but min offset: {} is not equal to
max offset: {}. topic:{}, queue:{}, cqType:{}."
+ , logic.getMaxPhysicOffset(), minPhyOffset,
logic.getMinOffsetInQueue()
+ , logic.getMaxOffsetInQueue(),
logic.getTopic(), logic.getQueueId(), logic.getCQType());
+ return true;
+ }
+ }
+
+ if (cqUnit.getPos() < minPhyOffset) {
+ log.error("CorrectLogicOffsetService.needCorrect. logic
max phy offset: {} is greater than min phy offset: {}, " +
+ "but minPhyPos in cq is: {}. min offset in
queue: {}, max offset in queue: {}, topic:{}, queue:{}, cqType:{}."
+ , logic.getMaxPhysicOffset(), minPhyOffset,
cqUnit.getPos(), logic.getMinOffsetInQueue()
+ , logic.getMaxOffsetInQueue(), logic.getTopic(),
logic.getQueueId(), logic.getCQType());
+ return true;
+ }
+
+ if (cqUnit.getPos() >= minPhyOffset) {
+
+ // Normal case, do not need correct.
+ return false;
+ }
+ }
+
+ return false;
+ }
+
+ private void correctLogicMinOffset() {
+
+ long lastForeCorrectTimeCurRun = lastForceCorrectTime;
+ long minPhyOffset = getMinPhyOffset();
+ ConcurrentMap<String, ConcurrentMap<Integer,
ConsumeQueueInterface>> tables =
DefaultMessageStore.this.getConsumeQueueTable();
+ for (ConcurrentMap<Integer, ConsumeQueueInterface> maps :
tables.values()) {
+ for (ConsumeQueueInterface logic : maps.values()) {
+ if (Objects.equals(CQType.SimpleCQ, logic.getCQType())) {
+ // cq is not supported for now.
+ continue;
+ }
+ if (needCorrect(logic, minPhyOffset,
lastForeCorrectTimeCurRun)) {
+ doCorrect(logic, minPhyOffset);
+ }
+ }
+ }
+ }
+
+ private void doCorrect(ConsumeQueueInterface logic, long minPhyOffset)
{
+
DefaultMessageStore.this.consumeQueueStore.deleteExpiredFile(logic,
minPhyOffset);
+ int sleepIntervalWhenCorrectMinOffset =
DefaultMessageStore.this.getMessageStoreConfig().getCorrectLogicMinOffsetSleepInterval();
+ if (sleepIntervalWhenCorrectMinOffset > 0) {
+ try {
+ Thread.sleep(sleepIntervalWhenCorrectMinOffset);
+ } catch (InterruptedException ignored) {
+ }
+ }
+ }
+
+ public String getServiceName() {
+ return CorrectLogicOffsetService.class.getSimpleName();
+ }
+ }
+
class FlushConsumeQueueService extends ServiceThread {
private static final int RETRY_TIMES_OVER = 3;
private long lastFlushTimestamp = 0;
diff --git
a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
index 648a472..3400120 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
@@ -35,7 +35,6 @@ import org.apache.rocketmq.store.logfile.MappedFile;
import java.io.File;
import java.nio.ByteBuffer;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
@@ -482,17 +481,15 @@ public class BatchConsumeQueue implements
ConsumeQueueInterface, FileQueueLifeCy
@Override
public void assignQueueOffset(QueueOffsetAssigner queueOffsetAssigner,
MessageExtBrokerInner msg, short messageNum) {
- HashMap<String, Long> batchTopicQueueTable =
queueOffsetAssigner.getBatchTopicQueueTable();
String topicQueueKey = getTopic() + "-" + getQueueId();
- Long topicOffset = batchTopicQueueTable.computeIfAbsent(topicQueueKey,
k -> 0L);
+ long queueOffset =
queueOffsetAssigner.assignBatchQueueOffset(topicQueueKey, messageNum);
if (MessageSysFlag.check(msg.getSysFlag(),
MessageSysFlag.INNER_BATCH_FLAG)) {
- MessageAccessor.putProperty(msg, MessageConst.PROPERTY_INNER_BASE,
String.valueOf(topicOffset));
+ MessageAccessor.putProperty(msg, MessageConst.PROPERTY_INNER_BASE,
String.valueOf(queueOffset));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
}
- msg.setQueueOffset(topicOffset);
- batchTopicQueueTable.put(topicQueueKey, topicOffset + messageNum);
+ msg.setQueueOffset(queueOffset);
}
boolean putBatchMessagePositionInfo(final long offset, final int size,
final long tagsCode, final long storeTime,
diff --git
a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
index d3bfe75..d2d147c 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
@@ -295,7 +295,7 @@ public class ConsumeQueueStore {
}
public Long getMaxOffset(String topic, int queueId) {
- return this.queueOffsetAssigner.getTopicQueueTable().get(topic + "-" +
queueId);
+ return this.queueOffsetAssigner.currentQueueOffset(topic + "-" +
queueId);
}
public void setTopicQueueTable(HashMap<String, Long> topicQueueTable) {
diff --git
a/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetAssigner.java
b/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetAssigner.java
index 09e18ec..4ca1126 100644
---
a/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetAssigner.java
+++
b/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetAssigner.java
@@ -24,7 +24,7 @@ import org.apache.rocketmq.logging.InternalLoggerFactory;
import java.util.HashMap;
/**
- * QueueOffsetAssigner is a component for assigning queue.
+ * QueueOffsetAssigner is a component for assigning offsets for queues.
*
*/
public class QueueOffsetAssigner {
@@ -33,20 +33,24 @@ public class QueueOffsetAssigner {
private HashMap<String, Long> topicQueueTable = new HashMap<>(1024);
private HashMap<String, Long> batchTopicQueueTable = new HashMap<>(1024);
- public HashMap<String, Long> getTopicQueueTable() {
- return topicQueueTable;
+ public long assignQueueOffset(String topicQueueKey, short messageNum) {
+ long queueOffset = this.topicQueueTable.computeIfAbsent(topicQueueKey,
k -> 0L);
+ this.topicQueueTable.put(topicQueueKey, queueOffset + messageNum);
+ return queueOffset;
}
- public void setTopicQueueTable(HashMap<String, Long> topicQueueTable) {
- this.topicQueueTable = topicQueueTable;
+ public long assignBatchQueueOffset(String topicQueueKey, short messageNum)
{
+ Long topicOffset =
this.batchTopicQueueTable.computeIfAbsent(topicQueueKey, k -> 0L);
+ this.batchTopicQueueTable.put(topicQueueKey, topicOffset + messageNum);
+ return topicOffset;
}
- public HashMap<String, Long> getBatchTopicQueueTable() {
- return batchTopicQueueTable;
+ public long currentQueueOffset(String topicQueueKey) {
+ return this.topicQueueTable.get(topicQueueKey);
}
- public void setBatchTopicQueueTable(HashMap<String, Long>
batchTopicQueueTable) {
- this.batchTopicQueueTable = batchTopicQueueTable;
+ public long currentBatchQueueOffset(String topicQueueKey) {
+ return this.batchTopicQueueTable.get(topicQueueKey);
}
public synchronized void remove(String topic, Integer queueId) {
@@ -57,4 +61,12 @@ public class QueueOffsetAssigner {
log.info("removeQueueFromTopicQueueTable OK Topic: {} QueueId: {}",
topic, queueId);
}
+
+ public void setTopicQueueTable(HashMap<String, Long> topicQueueTable) {
+ this.topicQueueTable = topicQueueTable;
+ }
+
+ public void setBatchTopicQueueTable(HashMap<String, Long>
batchTopicQueueTable) {
+ this.batchTopicQueueTable = batchTopicQueueTable;
+ }
}
\ No newline at end of file
diff --git
a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java
b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java
index 9dad5ea..356e653 100644
---
a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java
+++
b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java
@@ -336,6 +336,17 @@ public class DefaultMessageStoreCleanFilesTest {
}
}
+ private DefaultMessageStore.CleanCommitLogService
getCleanCommitLogService()
+ throws Exception {
+ Field serviceField =
messageStore.getClass().getDeclaredField("cleanCommitLogService");
+ serviceField.setAccessible(true);
+ DefaultMessageStore.CleanCommitLogService cleanCommitLogService =
+ (DefaultMessageStore.CleanCommitLogService)
serviceField.get(messageStore);
+ serviceField.setAccessible(false);
+
+ return cleanCommitLogService;
+ }
+
private DefaultMessageStore.CleanConsumeQueueService
getCleanConsumeQueueService()
throws Exception {
Field serviceField =
messageStore.getClass().getDeclaredField("cleanConsumeQueueService");
@@ -472,6 +483,7 @@ public class DefaultMessageStoreCleanFilesTest {
messageStore = new DefaultMessageStore(messageStoreConfig,
new BrokerStatsManager("test"), new
MyMessageArrivingListener(), new BrokerConfig());
+ cleanCommitLogService = getCleanCommitLogService();
cleanConsumeQueueService = getCleanConsumeQueueService();
assertTrue(messageStore.load());
@@ -481,6 +493,15 @@ public class DefaultMessageStoreCleanFilesTest {
cleanCommitLogService = spy(cleanCommitLogService);
when(cleanCommitLogService.getDiskSpaceWarningLevelRatio()).thenReturn(diskSpaceCleanForciblyRatio);
when(cleanCommitLogService.getDiskSpaceCleanForciblyRatio()).thenReturn(diskSpaceCleanForciblyRatio);
+
+ putFiledBackToMessageStore(cleanCommitLogService);
+ }
+
+ private void
putFiledBackToMessageStore(DefaultMessageStore.CleanCommitLogService
cleanCommitLogService) throws Exception {
+ Field cleanCommitLogServiceField =
DefaultMessageStore.class.getDeclaredField("cleanCommitLogService");
+ cleanCommitLogServiceField.setAccessible(true);
+ cleanCommitLogServiceField.set(messageStore, cleanCommitLogService);
+ cleanCommitLogServiceField.setAccessible(false);
}
private class MyMessageArrivingListener implements MessageArrivingListener
{