suopovate commented on issue #7355:
URL: https://github.com/apache/rocketmq/issues/7355#issuecomment-1733959552
Hello,I have a question.
If System is exit abnormal, when restart, rocketMq will call
recoverAbnormally() to recover consumerQueue and index data.
So why we need to fix the data? 🧐
That's the code :
@Deprecated
public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) {
// recover by the minimum time stamp
boolean checkCRCOnRecover =
this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
boolean checkDupInfo =
this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable();
final List<MappedFile> mappedFiles =
this.mappedFileQueue.getMappedFiles();
if (!mappedFiles.isEmpty()) {
// Looking beginning to recover from which file
int index = mappedFiles.size() - 1;
MappedFile mappedFile = null;
for (; index >= 0; index--) {
mappedFile = mappedFiles.get(index);
if (this.isMappedFileMatchedRecover(mappedFile)) {
log.info("recover from this mapped file " +
mappedFile.getFileName());
break;
}
}
if (index < 0) {
index = 0;
mappedFile = mappedFiles.get(index);
}
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
long processOffset = mappedFile.getFileFromOffset();
long mappedFileOffset = 0;
long lastValidMsgPhyOffset = processOffset;
long lastConfirmValidMsgPhyOffset = processOffset;
// abnormal recover require dispatching
boolean doDispatch = true;
while (true) {
DispatchRequest dispatchRequest =
this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover, checkDupInfo);
int size = dispatchRequest.getMsgSize();
if (dispatchRequest.isSuccess()) {
// Normal data
if (size > 0) {
lastValidMsgPhyOffset = processOffset +
mappedFileOffset;
mappedFileOffset += size;
if
(this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable() ||
this.defaultMessageStore.getBrokerConfig().isEnableControllerMode()) {
if (dispatchRequest.getCommitLogOffset() + size
<= this.defaultMessageStore.getCommitLog().getConfirmOffset()) {
this.getMessageStore().onCommitLogDispatch(dispatchRequest, doDispatch,
mappedFile, true, false);
lastConfirmValidMsgPhyOffset =
dispatchRequest.getCommitLogOffset() + size;
}
} else {
// it is the place that rebuild consumerQueue
and index data...🤔
this.getMessageStore().onCommitLogDispatch(dispatchRequest, doDispatch,
mappedFile, true, false);
}
}
// Come the end of the file, switch to the next file
// Since the return 0 representatives met last hole,
this can
// not be included in truncate offset
else if (size == 0) {
this.getMessageStore().onCommitLogDispatch(dispatchRequest, doDispatch,
mappedFile, true, true);
index++;
if (index >= mappedFiles.size()) {
// The current branch under normal circumstances
should
// not happen
log.info("recover physics file over, last mapped
file " + mappedFile.getFileName());
break;
} else {
mappedFile = mappedFiles.get(index);
byteBuffer = mappedFile.sliceByteBuffer();
processOffset = mappedFile.getFileFromOffset();
mappedFileOffset = 0;
log.info("recover next physics file, " +
mappedFile.getFileName());
}
}
} else {
if (size > 0) {
log.warn("found a half message at {}, it will be
truncated.", processOffset + mappedFileOffset);
}
log.info("recover physics file end, " +
mappedFile.getFileName() + " pos=" + byteBuffer.position());
break;
}
}
processOffset += mappedFileOffset;
if
(this.defaultMessageStore.getBrokerConfig().isEnableControllerMode()) {
if (this.defaultMessageStore.getConfirmOffset() <
this.defaultMessageStore.getMinPhyOffset()) {
log.error("confirmOffset {} is less than minPhyOffset
{}, correct confirmOffset to minPhyOffset",
this.defaultMessageStore.getConfirmOffset(),
this.defaultMessageStore.getMinPhyOffset());
this.defaultMessageStore.setConfirmOffset(this.defaultMessageStore.getMinPhyOffset());
} else if (this.defaultMessageStore.getConfirmOffset() >
lastConfirmValidMsgPhyOffset) {
log.error("confirmOffset {} is larger than
lastConfirmValidMsgPhyOffset {}, correct confirmOffset to
lastConfirmValidMsgPhyOffset", this.defaultMessageStore.getConfirmOffset(),
lastConfirmValidMsgPhyOffset);
this.defaultMessageStore.setConfirmOffset(lastConfirmValidMsgPhyOffset);
}
} else {
this.setConfirmOffset(lastValidMsgPhyOffset);
}
this.mappedFileQueue.setFlushedWhere(processOffset);
this.mappedFileQueue.setCommittedWhere(processOffset);
this.mappedFileQueue.truncateDirtyFiles(processOffset);
// Clear ConsumeQueue redundant data
if (maxPhyOffsetOfConsumeQueue >= processOffset) {
log.warn("maxPhyOffsetOfConsumeQueue({}) >=
processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue,
processOffset);
this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
}
}
// Commitlog case files are deleted
else {
log.warn("The commitlog files are deleted, and delete the
consume queue files");
this.mappedFileQueue.setFlushedWhere(0);
this.mappedFileQueue.setCommittedWhere(0);
this.defaultMessageStore.destroyLogics();
}
}
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]