This is an automated email from the ASF dual-hosted git repository.
yuzhou pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 646e2a4942 [ISSUE #7355] fix dledger recover abnormally may lost
consume queue of tail (#7599)
646e2a4942 is described below
commit 646e2a4942c62eb36b1601a41ebc0828e8580804
Author: bxfjb <[email protected]>
AuthorDate: Thu Jan 18 15:43:01 2024 +0800
[ISSUE #7355] fix dledger recover abnormally may lost consume queue of tail
(#7599)
* fix dledger recover abnormally may lost consume queue of tail
* fix correct storeTimestampPosition when bornhost is v6
* fix correct SYSFLAG offset
---------
Co-authored-by: 赵宇晗 <[email protected]>
---
.../rocketmq/store/dledger/DLedgerCommitLog.java | 137 ++++++++++++++++++++-
.../store/dledger/DLedgerCommitlogTest.java | 40 ++++++
2 files changed, 172 insertions(+), 5 deletions(-)
diff --git
a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
index 70371d83b8..27a18abc9d 100644
---
a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
+++
b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
@@ -290,9 +290,9 @@ public class DLedgerCommitLog extends CommitLog {
return false;
}
- private void recover(long maxPhyOffsetOfConsumeQueue) throws
RocksDBException {
+ private void dledgerRecoverNormally(long maxPhyOffsetOfConsumeQueue)
throws RocksDBException {
dLedgerFileStore.load();
- if (dLedgerFileList.getMappedFiles().size() > 0) {
+ if (!dLedgerFileList.getMappedFiles().isEmpty()) {
dLedgerFileStore.recover();
dividedCommitlogOffset =
dLedgerFileList.getFirstMappedFile().getFileFromOffset();
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
@@ -309,9 +309,93 @@ public class DLedgerCommitLog extends CommitLog {
}
//Indicate that, it is the first time to load mixed commitlog, need to
recover the old commitlog
isInrecoveringOldCommitlog = true;
- //No need the abnormal recover
super.recoverNormally(maxPhyOffsetOfConsumeQueue);
isInrecoveringOldCommitlog = false;
+
+ setRecoverPosition();
+
+ }
+
+ private void dledgerRecoverAbnormally(long maxPhyOffsetOfConsumeQueue)
throws RocksDBException {
+ boolean checkCRCOnRecover =
this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
+ dLedgerFileStore.load();
+ if (!dLedgerFileList.getMappedFiles().isEmpty()) {
+ dLedgerFileStore.recover();
+ dividedCommitlogOffset =
dLedgerFileList.getFirstMappedFile().getFileFromOffset();
+ MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
+ if (mappedFile != null) {
+ disableDeleteDledger();
+ }
+ List<MmapFile> mmapFiles = dLedgerFileList.getMappedFiles();
+ int index = mmapFiles.size() - 1;
+ MmapFile mmapFile = null;
+ for (; index >= 0; index--) {
+ mmapFile = mmapFiles.get(index);
+ if (isMmapFileMatchedRecover(mmapFile)) {
+ log.info("dledger recover from this mappFile " +
mmapFile.getFileName());
+ break;
+ }
+ }
+
+ if (index < 0) {
+ index = 0;
+ mmapFile = mmapFiles.get(index);
+ }
+
+ ByteBuffer byteBuffer = mmapFile.sliceByteBuffer();
+ long processOffset = mmapFile.getFileFromOffset();
+ long mmapFileOffset = 0;
+ while (true) {
+ DispatchRequest dispatchRequest =
this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover, true);
+ int size = dispatchRequest.getMsgSize();
+
+ if (dispatchRequest.isSuccess()) {
+ if (size > 0) {
+ mmapFileOffset += size;
+ if
(this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
+ if (dispatchRequest.getCommitLogOffset() <
this.defaultMessageStore.getConfirmOffset()) {
+
this.defaultMessageStore.doDispatch(dispatchRequest);
+ }
+ } else {
+
this.defaultMessageStore.doDispatch(dispatchRequest);
+ }
+ } else if (size == 0) {
+ index++;
+ if (index >= mmapFiles.size()) {
+ log.info("dledger recover physics file over, last
mapped file " + mmapFile.getFileName());
+ break;
+ } else {
+ mmapFile = mmapFiles.get(index);
+ byteBuffer = mmapFile.sliceByteBuffer();
+ processOffset = mmapFile.getFileFromOffset();
+ mmapFileOffset = 0;
+ log.info("dledger recover next physics file, " +
mmapFile.getFileName());
+ }
+ }
+ } else {
+ log.info("dledger recover physics file end, " +
mmapFile.getFileName() + " pos=" + byteBuffer.position());
+ break;
+ }
+ }
+
+ processOffset += mmapFileOffset;
+
+ if (maxPhyOffsetOfConsumeQueue >= processOffset) {
+ log.warn("dledger maxPhyOffsetOfConsumeQueue({}) >=
processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue,
processOffset);
+
this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
+ }
+ return;
+ }
+ isInrecoveringOldCommitlog = true;
+ super.recoverAbnormally(maxPhyOffsetOfConsumeQueue);
+
+ isInrecoveringOldCommitlog = false;
+
+ setRecoverPosition();
+
+ }
+
+ private void setRecoverPosition() {
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
if (mappedFile == null) {
return;
@@ -343,14 +427,57 @@ public class DLedgerCommitLog extends CommitLog {
log.info("Will set the initial commitlog offset={} for dledger",
dividedCommitlogOffset);
}
+ private boolean isMmapFileMatchedRecover(final MmapFile mmapFile) {
+ ByteBuffer byteBuffer = mmapFile.sliceByteBuffer();
+
+ int magicCode = byteBuffer.getInt(DLedgerEntry.BODY_OFFSET +
MessageDecoder.MESSAGE_MAGIC_CODE_POSITION);
+ if (magicCode != MESSAGE_MAGIC_CODE) {
+ return false;
+ }
+
+ int storeTimestampPosition;
+ int sysFlag = byteBuffer.getInt(DLedgerEntry.BODY_OFFSET +
MessageDecoder.SYSFLAG_POSITION);
+ if ((sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0) {
+ storeTimestampPosition =
MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSITION;
+ } else {
+ // v6 address is 12 byte larger than v4
+ storeTimestampPosition =
MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSITION + 12;
+ }
+
+ long storeTimestamp = byteBuffer.getLong(DLedgerEntry.BODY_OFFSET +
storeTimestampPosition);
+ if (storeTimestamp == 0) {
+ return false;
+ }
+
+ if
(this.defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable()
+ &&
this.defaultMessageStore.getMessageStoreConfig().isMessageIndexSafe()) {
+ if (storeTimestamp <=
this.defaultMessageStore.getStoreCheckpoint().getMinTimestampIndex()) {
+ log.info("dledger find check timestamp, {} {}",
+ storeTimestamp,
+ UtilAll.timeMillisToHumanString(storeTimestamp));
+ return true;
+ }
+ } else {
+ if (storeTimestamp <=
this.defaultMessageStore.getStoreCheckpoint().getMinTimestamp()) {
+ log.info("dledger find check timestamp, {} {}",
+ storeTimestamp,
+ UtilAll.timeMillisToHumanString(storeTimestamp));
+ return true;
+ }
+ }
+
+ return false;
+
+ }
+
@Override
public void recoverNormally(long maxPhyOffsetOfConsumeQueue) throws
RocksDBException {
- recover(maxPhyOffsetOfConsumeQueue);
+ dledgerRecoverNormally(maxPhyOffsetOfConsumeQueue);
}
@Override
public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) throws
RocksDBException {
- recover(maxPhyOffsetOfConsumeQueue);
+ dledgerRecoverAbnormally(maxPhyOffsetOfConsumeQueue);
}
@Override
diff --git
a/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java
b/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java
index 234273b6af..1e4bbf21bd 100644
---
a/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java
+++
b/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java
@@ -19,6 +19,8 @@ package org.apache.rocketmq.store.dledger;
import io.openmessaging.storage.dledger.DLedgerServer;
import io.openmessaging.storage.dledger.store.file.DLedgerMmapFileStore;
import io.openmessaging.storage.dledger.store.file.MmapFileList;
+
+import java.io.File;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
@@ -36,6 +38,8 @@ import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.GetMessageStatus;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
+import org.apache.rocketmq.store.StoreCheckpoint;
+import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.junit.Assert;
import org.junit.Test;
import org.junit.Assume;
@@ -146,6 +150,42 @@ public class DLedgerCommitlogTest extends
MessageStoreTestBase {
messageStore.shutdown();
}
}
+ @Test
+ public void testDLedgerAbnormallyRecover() throws Exception {
+ String base = createBaseDir();
+ String peers = String.format("n0-localhost:%d", nextPort());
+ String group = UUID.randomUUID().toString();
+ String topic = UUID.randomUUID().toString();
+
+ int messageNumPerQueue = 100;
+
+ DefaultMessageStore messageStore = createDledgerMessageStore(base,
group, "n0", peers, null, false, 0);
+ Thread.sleep(1000);
+ doPutMessages(messageStore, topic, 0, messageNumPerQueue, 0);
+ doPutMessages(messageStore, topic, 1, messageNumPerQueue, 0);
+ Thread.sleep(1000);
+ Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic, 0));
+ Assert.assertEquals(messageNumPerQueue,
messageStore.getMaxOffsetInQueue(topic, 0));
+ Assert.assertEquals(0, messageStore.dispatchBehindBytes());
+ doGetMessages(messageStore, topic, 0, messageNumPerQueue, 0);
+ StoreCheckpoint storeCheckpoint = messageStore.getStoreCheckpoint();
+ storeCheckpoint.setPhysicMsgTimestamp(0);
+ storeCheckpoint.setLogicsMsgTimestamp(0);
+ messageStore.shutdown();
+
+ String fileName = StorePathConfigHelper.getAbortFile(base);
+ makeSureFileExists(fileName);
+
+ File file = new File(base + File.separator + "consumequeue" +
File.separator + topic + File.separator + "0" + File.separator +
"00000000000000001040");
+ file.delete();
+// truncateAllConsumeQueue(base + File.separator + "consumequeue" +
File.separator + topic + File.separator);
+ messageStore = createDledgerMessageStore(base, group, "n0", peers,
null, false, 0);
+ Thread.sleep(1000);
+ doGetMessages(messageStore, topic, 0, messageNumPerQueue, 0);
+ doGetMessages(messageStore, topic, 1, messageNumPerQueue, 0);
+ messageStore.shutdown();
+
+ }
@Test
public void testPutAndGetMessage() throws Exception {