Erik1288 commented on code in PR #4439:
URL: https://github.com/apache/rocketmq/pull/4439#discussion_r903344776
##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -321,53 +323,14 @@ public void start() throws Exception {
if (this.getMessageStoreConfig().isDuplicationEnable()) {
this.reputMessageService.setReputFromOffset(this.commitLog.getConfirmOffset());
} else {
- /**
- * 1. Make sure the fast-forward messages to be truncated during
the recovering according to the max physical offset of the commitlog;
- * 2. DLedger committedPos may be missing, so the
maxPhysicalPosInLogicQueue maybe bigger that maxOffset returned by
DLedgerCommitLog, just let it go;
- * 3. Calculate the reput offset according to the consume queue;
- * 4. Make sure the fall-behind messages to be dispatched before
starting the commitlog, especially when the broker role are automatically
changed.
- */
- long maxPhysicalPosInLogicQueue = commitLog.getMinOffset();
- for (ConcurrentMap<Integer, ConsumeQueueInterface> maps :
this.getConsumeQueueTable().values()) {
- for (ConsumeQueueInterface logic : maps.values()) {
- if (logic.getMaxPhysicOffset() >
maxPhysicalPosInLogicQueue) {
- maxPhysicalPosInLogicQueue =
logic.getMaxPhysicOffset();
- }
- }
- }
- if (maxPhysicalPosInLogicQueue < 0) {
- maxPhysicalPosInLogicQueue = 0;
- }
- if (maxPhysicalPosInLogicQueue < this.commitLog.getMinOffset()) {
- maxPhysicalPosInLogicQueue = this.commitLog.getMinOffset();
- /**
- * This happens in following conditions:
- * 1. If someone removes all the consumequeue files or the
disk get damaged.
- * 2. Launch a new broker, and copy the commitlog from other
brokers.
- *
- * All the conditions has the same in common that the
maxPhysicalPosInLogicQueue should be 0.
- * If the maxPhysicalPosInLogicQueue is gt 0, there maybe
something wrong.
- */
- LOGGER.warn("[TooSmallCqOffset] maxPhysicalPosInLogicQueue={}
clMinOffset={}", maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset());
- }
- LOGGER.info("[SetReputOffset] maxPhysicalPosInLogicQueue={}
clMinOffset={} clMaxOffset={} clConfirmedOffset={}",
- maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset(),
this.commitLog.getMaxOffset(), this.commitLog.getConfirmOffset());
-
this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue);
+ // It is [recover]'s responsibility to fully dispatch the commit
log data before the max offset of commit log.
+
this.reputMessageService.setReputFromOffset(this.commitLog.getMaxOffset());
}
this.reputMessageService.start();
Review Comment:
理论上,在recover结束后,落后或者错误的CQ就应该被处理完毕,这里直接设置为commitLog.getMaxOffset。目前普通的CommitLog也是这么实现的。只是现在DLedger在Abnormal
recover的实现上有点问题,所以位点还需要修正下,等DLedger严格符合recover的语义后,这个修正的逻辑可以去掉的。
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]