This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch 5.0.0-alpha in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 78a3ed7b0ef1a2d997dd58168f2a082423fe6301 Author: Hongjian Fei <[email protected]> AuthorDate: Mon Jan 10 18:44:12 2022 +0800 [ISSUE #3708] add CorrectLogicOffsetService to periodically correct min logic offset (#3722) * [ISSUE #3708] add CorrectLogicOffsetService to periodically correct min logic offset; refactor QueueOffsetAssigner. * Mock getDiskSpaceWarningLevelRatio and getDiskSpaceCleanForciblyRatio to get around configuration protection in unit-test. * Fix check style. --- .../apache/rocketmq/common/attribute/CQType.java | 3 +- .../org/apache/rocketmq/store/ConsumeQueue.java | 9 +- .../apache/rocketmq/store/DefaultMessageStore.java | 124 +++++++++++++++++++++ .../rocketmq/store/queue/BatchConsumeQueue.java | 9 +- .../rocketmq/store/queue/ConsumeQueueStore.java | 2 +- .../rocketmq/store/queue/QueueOffsetAssigner.java | 30 +++-- .../store/DefaultMessageStoreCleanFilesTest.java | 21 ++++ 7 files changed, 173 insertions(+), 25 deletions(-) diff --git a/common/src/main/java/org/apache/rocketmq/common/attribute/CQType.java b/common/src/main/java/org/apache/rocketmq/common/attribute/CQType.java index 6bd6ad2..73ef218 100644 --- a/common/src/main/java/org/apache/rocketmq/common/attribute/CQType.java +++ b/common/src/main/java/org/apache/rocketmq/common/attribute/CQType.java @@ -19,6 +19,5 @@ package org.apache.rocketmq.common.attribute; public enum CQType { SimpleCQ, - BatchCQ, - MillionCQ; + BatchCQ } diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java index 7763a0f..a1fc870 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java @@ -18,7 +18,6 @@ package org.apache.rocketmq.store; import java.io.File; import java.nio.ByteBuffer; -import java.util.HashMap; import java.util.List; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.InternalLogger; @@ -443,12 +442,8 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle { @Override public void assignQueueOffset(QueueOffsetAssigner queueOffsetAssigner, MessageExtBrokerInner msg, short messageNum) { String topicQueueKey = getTopic() + "-" + getQueueId(); - HashMap<String, Long> topicQueueTable = queueOffsetAssigner.getTopicQueueTable(); - - long topicOffset = topicQueueTable.computeIfAbsent(topicQueueKey, k -> 0L); - topicQueueTable.put(topicQueueKey, topicOffset + messageNum); - - msg.setQueueOffset(topicOffset); + long queueOffset = queueOffsetAssigner.assignQueueOffset(topicQueueKey, messageNum); + msg.setQueueOffset(queueOffset); } private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode, diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index da47be6..76165d9 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -32,6 +32,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -49,6 +50,7 @@ import org.apache.rocketmq.common.SystemClock; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.attribute.CQType; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageDecoder; @@ -94,6 +96,8 @@ public class DefaultMessageStore implements MessageStore { private final CleanConsumeQueueService cleanConsumeQueueService; + private final CorrectLogicOffsetService correctLogicOffsetService; + private final IndexService indexService; private final AllocateMappedFileService allocateMappedFileService; @@ -156,6 +160,7 @@ public class DefaultMessageStore implements MessageStore { this.flushConsumeQueueService = new FlushConsumeQueueService(); this.cleanCommitLogService = new CleanCommitLogService(); this.cleanConsumeQueueService = new CleanConsumeQueueService(); + this.correctLogicOffsetService = new CorrectLogicOffsetService(); this.storeStatsService = new StoreStatsService(); this.indexService = new IndexService(this); if (!messageStoreConfig.isEnableDLegerCommitLog()) { @@ -1351,6 +1356,8 @@ public class DefaultMessageStore implements MessageStore { long deleteCount = 0L; deleteCount += this.cleanCommitLogService.run(); deleteCount += this.cleanConsumeQueueService.run(); + + this.correctLogicOffsetService.run(); return deleteCount; } @@ -1879,6 +1886,123 @@ public class DefaultMessageStore implements MessageStore { } } + class CorrectLogicOffsetService { + private long lastForceCorrectTime = -1L; + + public void run() { + try { + this.correctLogicMinOffset(); + } catch (Throwable e) { + log.warn(this.getServiceName() + " service has exception. ", e); + } + } + + private boolean needCorrect(ConsumeQueueInterface logic, long minPhyOffset, long lastForeCorrectTimeCurRun) { + if (logic == null) { + return false; + } + // If first exist and not available, it means first file may destroy failed, delete it. + if (DefaultMessageStore.this.consumeQueueStore.isFirstFileExist(logic) && !DefaultMessageStore.this.consumeQueueStore.isFirstFileAvailable(logic)) { + log.error("CorrectLogicOffsetService.needCorrect. first file not available, trigger correct." + + " topic:{}, queue:{}, maxPhyOffset in queue:{}, minPhyOffset " + + "in commit log:{}, minOffset in queue:{}, maxOffset in queue:{}, cqType:{}" + , logic.getTopic(), logic.getQueueId(), logic.getMaxPhysicOffset() + , minPhyOffset, logic.getMinOffsetInQueue(), logic.getMaxOffsetInQueue(), logic.getCQType()); + return true; + } + + // logic.getMaxPhysicOffset() or minPhyOffset = -1 + // means there is no message in current queue, so no need to correct. + if (logic.getMaxPhysicOffset() == -1 || minPhyOffset == -1) { + return false; + } + + if (logic.getMaxPhysicOffset() < minPhyOffset) { + if (logic.getMinOffsetInQueue() < logic.getMaxOffsetInQueue()) { + log.error("CorrectLogicOffsetService.needCorrect. logic max phy offset: {} is less than min phy offset: {}, " + + "but min offset: {} is less than max offset: {}. topic:{}, queue:{}, cqType:{}." + , logic.getMaxPhysicOffset(), minPhyOffset, logic.getMinOffsetInQueue() + , logic.getMaxOffsetInQueue(), logic.getTopic(), logic.getQueueId(), logic.getCQType()); + return true; + } else if (logic.getMinOffsetInQueue() == logic.getMaxOffsetInQueue()) { + return false; + } else { + log.error("CorrectLogicOffsetService.needCorrect. It should not happen, logic max phy offset: {} is less than min phy offset: {}," + + " but min offset: {} is larger than max offset: {}. topic:{}, queue:{}, cqType:{}" + , logic.getMaxPhysicOffset(), minPhyOffset, logic.getMinOffsetInQueue() + , logic.getMaxOffsetInQueue(), logic.getTopic(), logic.getQueueId(), logic.getCQType()); + return false; + } + } + //the logic.getMaxPhysicOffset() >= minPhyOffset + int forceCorrectInterval = DefaultMessageStore.this.getMessageStoreConfig().getCorrectLogicMinOffsetForceInterval(); + if ((System.currentTimeMillis() - lastForeCorrectTimeCurRun) > forceCorrectInterval) { + lastForceCorrectTime = System.currentTimeMillis(); + CqUnit cqUnit = logic.getEarliestUnit(); + if (cqUnit == null) { + if (logic.getMinOffsetInQueue() == logic.getMaxOffsetInQueue()) { + return false; + } else { + log.error("CorrectLogicOffsetService.needCorrect. cqUnit is null, logic max phy offset: {} is greater than min phy offset: {}, " + + "but min offset: {} is not equal to max offset: {}. topic:{}, queue:{}, cqType:{}." + , logic.getMaxPhysicOffset(), minPhyOffset, logic.getMinOffsetInQueue() + , logic.getMaxOffsetInQueue(), logic.getTopic(), logic.getQueueId(), logic.getCQType()); + return true; + } + } + + if (cqUnit.getPos() < minPhyOffset) { + log.error("CorrectLogicOffsetService.needCorrect. logic max phy offset: {} is greater than min phy offset: {}, " + + "but minPhyPos in cq is: {}. min offset in queue: {}, max offset in queue: {}, topic:{}, queue:{}, cqType:{}." + , logic.getMaxPhysicOffset(), minPhyOffset, cqUnit.getPos(), logic.getMinOffsetInQueue() + , logic.getMaxOffsetInQueue(), logic.getTopic(), logic.getQueueId(), logic.getCQType()); + return true; + } + + if (cqUnit.getPos() >= minPhyOffset) { + + // Normal case, do not need correct. + return false; + } + } + + return false; + } + + private void correctLogicMinOffset() { + + long lastForeCorrectTimeCurRun = lastForceCorrectTime; + long minPhyOffset = getMinPhyOffset(); + ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>> tables = DefaultMessageStore.this.getConsumeQueueTable(); + for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : tables.values()) { + for (ConsumeQueueInterface logic : maps.values()) { + if (Objects.equals(CQType.SimpleCQ, logic.getCQType())) { + // cq is not supported for now. + continue; + } + if (needCorrect(logic, minPhyOffset, lastForeCorrectTimeCurRun)) { + doCorrect(logic, minPhyOffset); + } + } + } + } + + private void doCorrect(ConsumeQueueInterface logic, long minPhyOffset) { + DefaultMessageStore.this.consumeQueueStore.deleteExpiredFile(logic, minPhyOffset); + int sleepIntervalWhenCorrectMinOffset = DefaultMessageStore.this.getMessageStoreConfig().getCorrectLogicMinOffsetSleepInterval(); + if (sleepIntervalWhenCorrectMinOffset > 0) { + try { + Thread.sleep(sleepIntervalWhenCorrectMinOffset); + } catch (InterruptedException ignored) { + } + } + } + + public String getServiceName() { + return CorrectLogicOffsetService.class.getSimpleName(); + } + } + class FlushConsumeQueueService extends ServiceThread { private static final int RETRY_TIMES_OVER = 3; private long lastFlushTimestamp = 0; diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java index 648a472..3400120 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java @@ -35,7 +35,6 @@ import org.apache.rocketmq.store.logfile.MappedFile; import java.io.File; import java.nio.ByteBuffer; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentSkipListMap; @@ -482,17 +481,15 @@ public class BatchConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCy @Override public void assignQueueOffset(QueueOffsetAssigner queueOffsetAssigner, MessageExtBrokerInner msg, short messageNum) { - HashMap<String, Long> batchTopicQueueTable = queueOffsetAssigner.getBatchTopicQueueTable(); String topicQueueKey = getTopic() + "-" + getQueueId(); - Long topicOffset = batchTopicQueueTable.computeIfAbsent(topicQueueKey, k -> 0L); + long queueOffset = queueOffsetAssigner.assignBatchQueueOffset(topicQueueKey, messageNum); if (MessageSysFlag.check(msg.getSysFlag(), MessageSysFlag.INNER_BATCH_FLAG)) { - MessageAccessor.putProperty(msg, MessageConst.PROPERTY_INNER_BASE, String.valueOf(topicOffset)); + MessageAccessor.putProperty(msg, MessageConst.PROPERTY_INNER_BASE, String.valueOf(queueOffset)); msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); } - msg.setQueueOffset(topicOffset); - batchTopicQueueTable.put(topicQueueKey, topicOffset + messageNum); + msg.setQueueOffset(queueOffset); } boolean putBatchMessagePositionInfo(final long offset, final int size, final long tagsCode, final long storeTime, diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java index d3bfe75..d2d147c 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java @@ -295,7 +295,7 @@ public class ConsumeQueueStore { } public Long getMaxOffset(String topic, int queueId) { - return this.queueOffsetAssigner.getTopicQueueTable().get(topic + "-" + queueId); + return this.queueOffsetAssigner.currentQueueOffset(topic + "-" + queueId); } public void setTopicQueueTable(HashMap<String, Long> topicQueueTable) { diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetAssigner.java b/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetAssigner.java index 09e18ec..4ca1126 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetAssigner.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetAssigner.java @@ -24,7 +24,7 @@ import org.apache.rocketmq.logging.InternalLoggerFactory; import java.util.HashMap; /** - * QueueOffsetAssigner is a component for assigning queue. + * QueueOffsetAssigner is a component for assigning offsets for queues. * */ public class QueueOffsetAssigner { @@ -33,20 +33,24 @@ public class QueueOffsetAssigner { private HashMap<String, Long> topicQueueTable = new HashMap<>(1024); private HashMap<String, Long> batchTopicQueueTable = new HashMap<>(1024); - public HashMap<String, Long> getTopicQueueTable() { - return topicQueueTable; + public long assignQueueOffset(String topicQueueKey, short messageNum) { + long queueOffset = this.topicQueueTable.computeIfAbsent(topicQueueKey, k -> 0L); + this.topicQueueTable.put(topicQueueKey, queueOffset + messageNum); + return queueOffset; } - public void setTopicQueueTable(HashMap<String, Long> topicQueueTable) { - this.topicQueueTable = topicQueueTable; + public long assignBatchQueueOffset(String topicQueueKey, short messageNum) { + Long topicOffset = this.batchTopicQueueTable.computeIfAbsent(topicQueueKey, k -> 0L); + this.batchTopicQueueTable.put(topicQueueKey, topicOffset + messageNum); + return topicOffset; } - public HashMap<String, Long> getBatchTopicQueueTable() { - return batchTopicQueueTable; + public long currentQueueOffset(String topicQueueKey) { + return this.topicQueueTable.get(topicQueueKey); } - public void setBatchTopicQueueTable(HashMap<String, Long> batchTopicQueueTable) { - this.batchTopicQueueTable = batchTopicQueueTable; + public long currentBatchQueueOffset(String topicQueueKey) { + return this.batchTopicQueueTable.get(topicQueueKey); } public synchronized void remove(String topic, Integer queueId) { @@ -57,4 +61,12 @@ public class QueueOffsetAssigner { log.info("removeQueueFromTopicQueueTable OK Topic: {} QueueId: {}", topic, queueId); } + + public void setTopicQueueTable(HashMap<String, Long> topicQueueTable) { + this.topicQueueTable = topicQueueTable; + } + + public void setBatchTopicQueueTable(HashMap<String, Long> batchTopicQueueTable) { + this.batchTopicQueueTable = batchTopicQueueTable; + } } \ No newline at end of file diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java index 9dad5ea..356e653 100644 --- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java @@ -336,6 +336,17 @@ public class DefaultMessageStoreCleanFilesTest { } } + private DefaultMessageStore.CleanCommitLogService getCleanCommitLogService() + throws Exception { + Field serviceField = messageStore.getClass().getDeclaredField("cleanCommitLogService"); + serviceField.setAccessible(true); + DefaultMessageStore.CleanCommitLogService cleanCommitLogService = + (DefaultMessageStore.CleanCommitLogService) serviceField.get(messageStore); + serviceField.setAccessible(false); + + return cleanCommitLogService; + } + private DefaultMessageStore.CleanConsumeQueueService getCleanConsumeQueueService() throws Exception { Field serviceField = messageStore.getClass().getDeclaredField("cleanConsumeQueueService"); @@ -472,6 +483,7 @@ public class DefaultMessageStoreCleanFilesTest { messageStore = new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("test"), new MyMessageArrivingListener(), new BrokerConfig()); + cleanCommitLogService = getCleanCommitLogService(); cleanConsumeQueueService = getCleanConsumeQueueService(); assertTrue(messageStore.load()); @@ -481,6 +493,15 @@ public class DefaultMessageStoreCleanFilesTest { cleanCommitLogService = spy(cleanCommitLogService); when(cleanCommitLogService.getDiskSpaceWarningLevelRatio()).thenReturn(diskSpaceCleanForciblyRatio); when(cleanCommitLogService.getDiskSpaceCleanForciblyRatio()).thenReturn(diskSpaceCleanForciblyRatio); + + putFiledBackToMessageStore(cleanCommitLogService); + } + + private void putFiledBackToMessageStore(DefaultMessageStore.CleanCommitLogService cleanCommitLogService) throws Exception { + Field cleanCommitLogServiceField = DefaultMessageStore.class.getDeclaredField("cleanCommitLogService"); + cleanCommitLogServiceField.setAccessible(true); + cleanCommitLogServiceField.set(messageStore, cleanCommitLogService); + cleanCommitLogServiceField.setAccessible(false); } private class MyMessageArrivingListener implements MessageArrivingListener {
