Erik1288 commented on code in PR #4439:
URL: https://github.com/apache/rocketmq/pull/4439#discussion_r903344776


##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -321,53 +323,14 @@ public void start() throws Exception {
         if (this.getMessageStoreConfig().isDuplicationEnable()) {
             
this.reputMessageService.setReputFromOffset(this.commitLog.getConfirmOffset());
         } else {
-            /**
-             * 1. Make sure the fast-forward messages to be truncated during 
the recovering according to the max physical offset of the commitlog;
-             * 2. DLedger committedPos may be missing, so the 
maxPhysicalPosInLogicQueue maybe bigger that maxOffset returned by 
DLedgerCommitLog, just let it go;
-             * 3. Calculate the reput offset according to the consume queue;
-             * 4. Make sure the fall-behind messages to be dispatched before 
starting the commitlog, especially when the broker role are automatically 
changed.
-             */
-            long maxPhysicalPosInLogicQueue = commitLog.getMinOffset();
-            for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : 
this.getConsumeQueueTable().values()) {
-                for (ConsumeQueueInterface logic : maps.values()) {
-                    if (logic.getMaxPhysicOffset() > 
maxPhysicalPosInLogicQueue) {
-                        maxPhysicalPosInLogicQueue = 
logic.getMaxPhysicOffset();
-                    }
-                }
-            }
-            if (maxPhysicalPosInLogicQueue < 0) {
-                maxPhysicalPosInLogicQueue = 0;
-            }
-            if (maxPhysicalPosInLogicQueue < this.commitLog.getMinOffset()) {
-                maxPhysicalPosInLogicQueue = this.commitLog.getMinOffset();
-                /**
-                 * This happens in following conditions:
-                 * 1. If someone removes all the consumequeue files or the 
disk get damaged.
-                 * 2. Launch a new broker, and copy the commitlog from other 
brokers.
-                 *
-                 * All the conditions has the same in common that the 
maxPhysicalPosInLogicQueue should be 0.
-                 * If the maxPhysicalPosInLogicQueue is gt 0, there maybe 
something wrong.
-                 */
-                LOGGER.warn("[TooSmallCqOffset] maxPhysicalPosInLogicQueue={} 
clMinOffset={}", maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset());
-            }
-            LOGGER.info("[SetReputOffset] maxPhysicalPosInLogicQueue={} 
clMinOffset={} clMaxOffset={} clConfirmedOffset={}",
-                maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset(), 
this.commitLog.getMaxOffset(), this.commitLog.getConfirmOffset());
-            
this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue);
+            // It is [recover]'s responsibility to fully dispatch the commit 
log data before the max offset of commit log.
+            
this.reputMessageService.setReputFromOffset(this.commitLog.getMaxOffset());
         }
         this.reputMessageService.start();

Review Comment:
   
理论上,在recover结束后,落后或者错误的CQ就应该被处理完毕,这里直接设置为commitLog.getMaxOffset。目前普通的CommitLog也是这么实现的。只是现在DLedger在Abnormal
 recover的实现上有点问题,所以位点还需要修正下,等DLedger严格符合recover的语义后,这个修正的逻辑可以去掉的。



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to