suopovate commented on issue #7355:
URL: https://github.com/apache/rocketmq/issues/7355#issuecomment-1733959552

   Hello,I have a question.
   If System is exit abnormal, when restart, rocketMq will call 
recoverAbnormally() to recover consumerQueue and index data.
   So why we need to fix the data? 🧐
   That's the code : 
   
       @Deprecated
       public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) {
           // recover by the minimum time stamp
           boolean checkCRCOnRecover = 
this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
           boolean checkDupInfo = 
this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable();
           final List<MappedFile> mappedFiles = 
this.mappedFileQueue.getMappedFiles();
           if (!mappedFiles.isEmpty()) {
               // Looking beginning to recover from which file
               int index = mappedFiles.size() - 1;
               MappedFile mappedFile = null;
               for (; index >= 0; index--) {
                   mappedFile = mappedFiles.get(index);
                   if (this.isMappedFileMatchedRecover(mappedFile)) {
                       log.info("recover from this mapped file " + 
mappedFile.getFileName());
                       break;
                   }
               }
   
               if (index < 0) {
                   index = 0;
                   mappedFile = mappedFiles.get(index);
               }
   
               ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
               long processOffset = mappedFile.getFileFromOffset();
               long mappedFileOffset = 0;
               long lastValidMsgPhyOffset = processOffset;
               long lastConfirmValidMsgPhyOffset = processOffset;
               // abnormal recover require dispatching
               boolean doDispatch = true;
               while (true) {
                   DispatchRequest dispatchRequest = 
this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover, checkDupInfo);
                   int size = dispatchRequest.getMsgSize();
   
                   if (dispatchRequest.isSuccess()) {
                       // Normal data
                       if (size > 0) {
                           lastValidMsgPhyOffset = processOffset + 
mappedFileOffset;
                           mappedFileOffset += size;
   
                           if 
(this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable() || 
this.defaultMessageStore.getBrokerConfig().isEnableControllerMode()) {
                               if (dispatchRequest.getCommitLogOffset() + size 
<= this.defaultMessageStore.getCommitLog().getConfirmOffset()) {
                                   
this.getMessageStore().onCommitLogDispatch(dispatchRequest, doDispatch, 
mappedFile, true, false);
                                   lastConfirmValidMsgPhyOffset = 
dispatchRequest.getCommitLogOffset() + size;
                               }
                           } else {
                               // it is the place that rebuild consumerQueue 
and index data...🤔
                               
this.getMessageStore().onCommitLogDispatch(dispatchRequest, doDispatch, 
mappedFile, true, false);
                           }
                       }
                       // Come the end of the file, switch to the next file
                       // Since the return 0 representatives met last hole, 
this can
                       // not be included in truncate offset
                       else if (size == 0) {
                           
this.getMessageStore().onCommitLogDispatch(dispatchRequest, doDispatch, 
mappedFile, true, true);
                           index++;
                           if (index >= mappedFiles.size()) {
                               // The current branch under normal circumstances 
should
                               // not happen
                               log.info("recover physics file over, last mapped 
file " + mappedFile.getFileName());
                               break;
                           } else {
                               mappedFile = mappedFiles.get(index);
                               byteBuffer = mappedFile.sliceByteBuffer();
                               processOffset = mappedFile.getFileFromOffset();
                               mappedFileOffset = 0;
                               log.info("recover next physics file, " + 
mappedFile.getFileName());
                           }
                       }
                   } else {
   
                       if (size > 0) {
                           log.warn("found a half message at {}, it will be 
truncated.", processOffset + mappedFileOffset);
                       }
   
                       log.info("recover physics file end, " + 
mappedFile.getFileName() + " pos=" + byteBuffer.position());
                       break;
                   }
               }
   
               processOffset += mappedFileOffset;
               if 
(this.defaultMessageStore.getBrokerConfig().isEnableControllerMode()) {
                   if (this.defaultMessageStore.getConfirmOffset() < 
this.defaultMessageStore.getMinPhyOffset()) {
                       log.error("confirmOffset {} is less than minPhyOffset 
{}, correct confirmOffset to minPhyOffset", 
this.defaultMessageStore.getConfirmOffset(), 
this.defaultMessageStore.getMinPhyOffset());
                       
this.defaultMessageStore.setConfirmOffset(this.defaultMessageStore.getMinPhyOffset());
                   } else if (this.defaultMessageStore.getConfirmOffset() > 
lastConfirmValidMsgPhyOffset) {
                       log.error("confirmOffset {} is larger than 
lastConfirmValidMsgPhyOffset {}, correct confirmOffset to 
lastConfirmValidMsgPhyOffset", this.defaultMessageStore.getConfirmOffset(), 
lastConfirmValidMsgPhyOffset);
                       
this.defaultMessageStore.setConfirmOffset(lastConfirmValidMsgPhyOffset);
                   }
               } else {
                   this.setConfirmOffset(lastValidMsgPhyOffset);
               }
               this.mappedFileQueue.setFlushedWhere(processOffset);
               this.mappedFileQueue.setCommittedWhere(processOffset);
               this.mappedFileQueue.truncateDirtyFiles(processOffset);
   
               // Clear ConsumeQueue redundant data
               if (maxPhyOffsetOfConsumeQueue >= processOffset) {
                   log.warn("maxPhyOffsetOfConsumeQueue({}) >= 
processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, 
processOffset);
                   
this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
               }
           }
           // Commitlog case files are deleted
           else {
               log.warn("The commitlog files are deleted, and delete the 
consume queue files");
               this.mappedFileQueue.setFlushedWhere(0);
               this.mappedFileQueue.setCommittedWhere(0);
               this.defaultMessageStore.destroyLogics();
           }
       }
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to