lollipopjin commented on code in PR #8823:
URL: https://github.com/apache/rocketmq/pull/8823#discussion_r1814167555
##########
broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java:
##########
@@ -3418,6 +3375,103 @@ private boolean validateBlackListConfigExist(Properties
properties) {
return false;
}
+ private CheckRocksdbCqWriteResult
doCheckRocksdbCqWriteProgress(ChannelHandlerContext ctx, RemotingCommand
request) throws RemotingCommandException {
+ CheckRocksdbCqWriteProgressRequestHeader requestHeader =
request.decodeCommandCustomHeader(CheckRocksdbCqWriteProgressRequestHeader.class);
+ String requestTopic = requestHeader.getTopic();
+ MessageStore messageStore = brokerController.getMessageStore();
+ DefaultMessageStore defaultMessageStore;
+ if (messageStore instanceof AbstractPluginMessageStore) {
+ defaultMessageStore = (DefaultMessageStore)
((AbstractPluginMessageStore) messageStore).getNext();
+ } else {
+ defaultMessageStore = (DefaultMessageStore) messageStore;
+ }
+ RocksDBMessageStore rocksDBMessageStore =
defaultMessageStore.getRocksDBMessageStore();
+ CheckRocksdbCqWriteResult result = new CheckRocksdbCqWriteResult();
+
+ if
(defaultMessageStore.getMessageStoreConfig().getStoreType().equals(StoreType.DEFAULT_ROCKSDB.getStoreType()))
{
+ result.setCheckResult("storeType is DEFAULT_ROCKSDB, no need
check");
+
result.setCheckStatus(CheckRocksdbCqWriteResult.CheckStatus.CHECK_OK.getValue());
+ return result;
+ }
+
+ if
(!defaultMessageStore.getMessageStoreConfig().isRocksdbCQDoubleWriteEnable()) {
+ result.setCheckResult("rocksdbCQWriteEnable is false,
checkRocksdbCqWriteProgressCommand is invalid");
+
result.setCheckStatus(CheckRocksdbCqWriteResult.CheckStatus.CHECK_NOT_OK.getValue());
+ return result;
+ }
+
+ ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>>
cqTable = defaultMessageStore.getConsumeQueueTable();
+ StringBuilder diffResult = new StringBuilder();
+ try {
+ if (StringUtils.isNotBlank(requestTopic)) {
+ boolean checkResult =
processConsumeQueuesForTopic(cqTable.get(requestTopic), requestTopic,
rocksDBMessageStore, diffResult, false, requestHeader.getCheckStoreTime());
+ result.setCheckResult(diffResult.toString());
+ result.setCheckStatus(checkResult ?
CheckRocksdbCqWriteResult.CheckStatus.CHECK_OK.getValue() :
CheckRocksdbCqWriteResult.CheckStatus.CHECK_NOT_OK.getValue());
+ return result;
+ }
+ boolean checkResult = true;
+ for (Map.Entry<String, ConcurrentMap<Integer,
ConsumeQueueInterface>> topicEntry : cqTable.entrySet()) {
+ String topic = topicEntry.getKey();
+ checkResult =
processConsumeQueuesForTopic(topicEntry.getValue(), topic, rocksDBMessageStore,
diffResult, true, requestHeader.getCheckStoreTime());
+ if (!checkResult) {
+ break;
+ }
+ }
+ diffResult.append("check all topic successful,
size:").append(cqTable.size());
+ result.setCheckResult(diffResult.toString());
+ result.setCheckStatus(checkResult ?
CheckRocksdbCqWriteResult.CheckStatus.CHECK_OK.getValue() :
CheckRocksdbCqWriteResult.CheckStatus.CHECK_NOT_OK.getValue());
+ } catch (Exception e) {
+ LOGGER.error("CheckRocksdbCqWriteProgressCommand error", e);
+ result.setCheckResult(e.getMessage() +
Arrays.toString(e.getStackTrace()));
+
result.setCheckStatus(CheckRocksdbCqWriteResult.CheckStatus.CHECK_ERROR.getValue());
+ }
+ return result;
+ }
+
+ private boolean processConsumeQueuesForTopic(ConcurrentMap<Integer,
ConsumeQueueInterface> queueMap, String topic, RocksDBMessageStore
rocksDBMessageStore, StringBuilder diffResult, boolean checkAll, long
checkStoreTime) {
+ for (Map.Entry<Integer, ConsumeQueueInterface> queueEntry :
queueMap.entrySet()) {
+ Integer queueId = queueEntry.getKey();
+ ConsumeQueueInterface jsonCq = queueEntry.getValue();
+ ConsumeQueueInterface kvCq =
rocksDBMessageStore.getConsumeQueue(topic, queueId);
+ if (!checkAll) {
+ String format = String.format("[topic: %s, queue: %s] \n
kvEarliest : %s | kvLatest : %s \n fileEarliest: %s | fileEarliest: %s ",
+ topic, queueId, kvCq.getEarliestUnit(),
kvCq.getLatestUnit(), jsonCq.getEarliestUnit(), jsonCq.getLatestUnit());
+ diffResult.append(format).append("\n");
+ }
+ long maxFileOffsetInQueue = jsonCq.getMaxOffsetInQueue();
+ long minOffsetInQueue = kvCq.getMinOffsetInQueue();
+
+ // The latest message is earlier than the check time
+ Pair<CqUnit, Long> fileLatestCq =
jsonCq.getCqUnitAndStoreTime(maxFileOffsetInQueue);
+ if (fileLatestCq != null) {
+ if (fileLatestCq.getObject2() < checkStoreTime) {
+ continue;
+ }
+ }
+
+ for (long i = minOffsetInQueue; i < maxFileOffsetInQueue; i++) {
Review Comment:
Should start from the checkpointByStoreTime to reduce unnecessary Iterate.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]