RongtongJin commented on code in PR #4439:
URL: https://github.com/apache/rocketmq/pull/4439#discussion_r898608023
##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -309,6 +300,17 @@ public boolean load() {
*/
@Override
public void start() throws Exception {
+ if (!messageStoreConfig.isEnableDLegerCommitLog() &&
!this.messageStoreConfig.isDuplicationEnable()) {
+ this.haService.init(this);
+ }
+
+ if (messageStoreConfig.isTransientStorePoolEnable()) {
+ this.transientStorePool.init();
+ }
Review Comment:
Why put the init operation in the start function?
##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -321,53 +323,14 @@ public void start() throws Exception {
if (this.getMessageStoreConfig().isDuplicationEnable()) {
this.reputMessageService.setReputFromOffset(this.commitLog.getConfirmOffset());
} else {
- /**
- * 1. Make sure the fast-forward messages to be truncated during
the recovering according to the max physical offset of the commitlog;
- * 2. DLedger committedPos may be missing, so the
maxPhysicalPosInLogicQueue maybe bigger that maxOffset returned by
DLedgerCommitLog, just let it go;
- * 3. Calculate the reput offset according to the consume queue;
- * 4. Make sure the fall-behind messages to be dispatched before
starting the commitlog, especially when the broker role are automatically
changed.
- */
- long maxPhysicalPosInLogicQueue = commitLog.getMinOffset();
- for (ConcurrentMap<Integer, ConsumeQueueInterface> maps :
this.getConsumeQueueTable().values()) {
- for (ConsumeQueueInterface logic : maps.values()) {
- if (logic.getMaxPhysicOffset() >
maxPhysicalPosInLogicQueue) {
- maxPhysicalPosInLogicQueue =
logic.getMaxPhysicOffset();
- }
- }
- }
- if (maxPhysicalPosInLogicQueue < 0) {
- maxPhysicalPosInLogicQueue = 0;
- }
- if (maxPhysicalPosInLogicQueue < this.commitLog.getMinOffset()) {
- maxPhysicalPosInLogicQueue = this.commitLog.getMinOffset();
- /**
- * This happens in following conditions:
- * 1. If someone removes all the consumequeue files or the
disk get damaged.
- * 2. Launch a new broker, and copy the commitlog from other
brokers.
- *
- * All the conditions has the same in common that the
maxPhysicalPosInLogicQueue should be 0.
- * If the maxPhysicalPosInLogicQueue is gt 0, there maybe
something wrong.
- */
- LOGGER.warn("[TooSmallCqOffset] maxPhysicalPosInLogicQueue={}
clMinOffset={}", maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset());
- }
- LOGGER.info("[SetReputOffset] maxPhysicalPosInLogicQueue={}
clMinOffset={} clMaxOffset={} clConfirmedOffset={}",
- maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset(),
this.commitLog.getMaxOffset(), this.commitLog.getConfirmOffset());
-
this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue);
+ // It is [recover]'s responsibility to fully dispatch the commit
log data before the max offset of commit log.
+
this.reputMessageService.setReputFromOffset(this.commitLog.getMaxOffset());
}
this.reputMessageService.start();
Review Comment:
如果先设置ReputFromOffset到最大位点,后面再进行修正,会不会导致之前一些cq无法构建成功?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]