lollipopjin commented on code in PR #8823:
URL: https://github.com/apache/rocketmq/pull/8823#discussion_r1814814545
##########
broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java:
##########
@@ -3418,6 +3376,115 @@ private boolean validateBlackListConfigExist(Properties
properties) {
return false;
}
+ private CheckRocksdbCqWriteResult
doCheckRocksdbCqWriteProgress(ChannelHandlerContext ctx, RemotingCommand
request) throws RemotingCommandException {
+ CheckRocksdbCqWriteProgressRequestHeader requestHeader =
request.decodeCommandCustomHeader(CheckRocksdbCqWriteProgressRequestHeader.class);
+ String requestTopic = requestHeader.getTopic();
+ MessageStore messageStore = brokerController.getMessageStore();
+ DefaultMessageStore defaultMessageStore;
+ if (messageStore instanceof AbstractPluginMessageStore) {
+ defaultMessageStore = (DefaultMessageStore)
((AbstractPluginMessageStore) messageStore).getNext();
+ } else {
+ defaultMessageStore = (DefaultMessageStore) messageStore;
+ }
+ RocksDBMessageStore rocksDBMessageStore =
defaultMessageStore.getRocksDBMessageStore();
+ CheckRocksdbCqWriteResult result = new CheckRocksdbCqWriteResult();
+
+ if
(defaultMessageStore.getMessageStoreConfig().getStoreType().equals(StoreType.DEFAULT_ROCKSDB.getStoreType()))
{
+ result.setCheckResult("storeType is DEFAULT_ROCKSDB, no need
check");
+
result.setCheckStatus(CheckRocksdbCqWriteResult.CheckStatus.CHECK_OK.getValue());
+ return result;
+ }
+
+ if
(!defaultMessageStore.getMessageStoreConfig().isRocksdbCQDoubleWriteEnable()) {
+ result.setCheckResult("rocksdbCQWriteEnable is false,
checkRocksdbCqWriteProgressCommand is invalid");
+
result.setCheckStatus(CheckRocksdbCqWriteResult.CheckStatus.CHECK_NOT_OK.getValue());
+ return result;
+ }
+
+ ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>>
cqTable = defaultMessageStore.getConsumeQueueTable();
+ StringBuilder diffResult = new StringBuilder();
+ try {
+ if (StringUtils.isNotBlank(requestTopic)) {
+ boolean checkResult =
processConsumeQueuesForTopic(cqTable.get(requestTopic), requestTopic,
rocksDBMessageStore, diffResult, true, requestHeader.getCheckStoreTime());
+ result.setCheckResult(diffResult.toString());
+ result.setCheckStatus(checkResult ?
CheckRocksdbCqWriteResult.CheckStatus.CHECK_OK.getValue() :
CheckRocksdbCqWriteResult.CheckStatus.CHECK_NOT_OK.getValue());
+ return result;
+ }
+ boolean checkResult = true;
+ int successNum = 0;
+ for (Map.Entry<String, ConcurrentMap<Integer,
ConsumeQueueInterface>> topicEntry : cqTable.entrySet()) {
+ String topic = topicEntry.getKey();
+ checkResult =
processConsumeQueuesForTopic(topicEntry.getValue(), topic, rocksDBMessageStore,
diffResult, false, requestHeader.getCheckStoreTime());
+ if (checkResult) {
+ successNum++;
+ }
+ }
+ // check all topic finish, all topic is ready -> ready
+ // check all topic finish, success/all : 89/100 -> not ready
+ String checkResultString = successNum == cqTable.size() ? "all
topic is ready" : String.format("success/all : %s/%s", successNum,
cqTable.size());
+ diffResult.append("check all topic finish,
").append(checkResultString);
+ result.setCheckResult(diffResult.toString());
+ result.setCheckStatus(checkResult ?
CheckRocksdbCqWriteResult.CheckStatus.CHECK_OK.getValue() :
CheckRocksdbCqWriteResult.CheckStatus.CHECK_NOT_OK.getValue());
+ } catch (Exception e) {
+ LOGGER.error("CheckRocksdbCqWriteProgressCommand error", e);
+ result.setCheckResult(e.getMessage() +
Arrays.toString(e.getStackTrace()));
+
result.setCheckStatus(CheckRocksdbCqWriteResult.CheckStatus.CHECK_ERROR.getValue());
+ }
+ return result;
+ }
+
+ private boolean processConsumeQueuesForTopic(ConcurrentMap<Integer,
ConsumeQueueInterface> queueMap, String topic, RocksDBMessageStore
rocksDBMessageStore, StringBuilder diffResult, boolean printDetail, long
checkpointByStoreTime) {
+ for (Map.Entry<Integer, ConsumeQueueInterface> queueEntry :
queueMap.entrySet()) {
+ Integer queueId = queueEntry.getKey();
+ ConsumeQueueInterface jsonCq = queueEntry.getValue();
+ ConsumeQueueInterface kvCq =
rocksDBMessageStore.getConsumeQueue(topic, queueId);
+ if (printDetail) {
+ String format = String.format("[topic: %s, queue: %s] \n
kvEarliest : %s | kvLatest : %s \n fileEarliest: %s | fileEarliest: %s ",
+ topic, queueId, kvCq.getEarliestUnit(),
kvCq.getLatestUnit(), jsonCq.getEarliestUnit(), jsonCq.getLatestUnit());
+ diffResult.append(format).append("\n");
+ }
+
+ long minOffsetByTime = 0L;
+ try {
+ minOffsetByTime =
rocksDBMessageStore.getConsumeQueueStore().getOffsetInQueueByTime(topic,
queueId, checkpointByStoreTime, BoundaryType.UPPER);
+ } catch (Exception e) {
+ // ignore
+ }
+ long minOffsetInQueue = kvCq.getMinOffsetInQueue();
+ long checkFrom = Math.max(minOffsetInQueue, minOffsetByTime);
+ long checkTo = jsonCq.getMaxOffsetInQueue();
+ // The latest message is earlier than the check time
+ Pair<CqUnit, Long> fileLatestCq =
jsonCq.getCqUnitAndStoreTime(checkTo);
+ if (fileLatestCq != null) {
+ if (fileLatestCq.getObject2() < checkpointByStoreTime) {
+ continue;
+ }
+ }
+
+ for (long i = checkFrom; i < checkTo; i++) {
+ Pair<CqUnit, Long> fileCqUnit =
jsonCq.getCqUnitAndStoreTime(i);
+ Pair<CqUnit, Long> kvCqUnit = kvCq.getCqUnitAndStoreTime(i);
+ if (fileCqUnit == null || kvCqUnit == null) {
+ String format = String.format("[topic: %s, queue: %s,
offset: %s] \n kv : %s \n file : %s \n",
+ topic, queueId, i, kvCqUnit != null ?
kvCqUnit.getObject1() : "null", fileCqUnit != null ? fileCqUnit.getObject1() :
"null");
+ LOGGER.error(format);
+ break;
Review Comment:
break here, the check result is true?
##########
broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java:
##########
@@ -469,74 +475,26 @@ private RemotingCommand
updateAndGetGroupForbidden(ChannelHandlerContext ctx, Re
private RemotingCommand checkRocksdbCqWriteProgress(ChannelHandlerContext
ctx, RemotingCommand request) throws RemotingCommandException {
CheckRocksdbCqWriteProgressRequestHeader requestHeader =
request.decodeCommandCustomHeader(CheckRocksdbCqWriteProgressRequestHeader.class);
- String requestTopic = requestHeader.getTopic();
- final RemotingCommand response =
RemotingCommand.createResponseCommand(null);
- response.setCode(ResponseCode.SUCCESS);
- MessageStore messageStore = brokerController.getMessageStore();
- DefaultMessageStore defaultMessageStore;
- if (messageStore instanceof AbstractPluginMessageStore) {
- defaultMessageStore = (DefaultMessageStore)
((AbstractPluginMessageStore) messageStore).getNext();
+ CheckRocksdbCqWriteResult result = new CheckRocksdbCqWriteResult();
+
result.setCheckStatus(CheckRocksdbCqWriteResult.CheckStatus.CHECK_IN_PROGRESS.getValue());
+ if (requestHeader.isAsync()) {
Review Comment:
Remove the async switch, by default, all check request should use async way
to avoid admin threads blocked.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]