This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 36adf1251f [ISSUE #10017] Validate commitlog offset in
recoverAbnormally to prevent processing … (#10018)
36adf1251f is described below
commit 36adf1251ff5d3684083c2dbd2debf5ba2c104a8
Author: guyinyou <[email protected]>
AuthorDate: Tue Jan 20 14:44:22 2026 +0800
[ISSUE #10017] Validate commitlog offset in recoverAbnormally to prevent
processing … (#10018)
* validate commitlog offset in recoverAbnormally to prevent processing old
file data that passes CRC checks
Change-Id: If4b1881f82d26ce8d374472d73ec9ce3d51ba643
* fix
Change-Id: Idc4bf7ec476cc9b6529619c2aa9afd6a980b819c
* add checkCommitLogOffsetOnRecover in MessageStoreConfig
Change-Id: Iac9afbb8b3ffb03fa15890decaf502afbfa44cf9
---------
Co-authored-by: guyinyou <[email protected]>
---
.../main/java/org/apache/rocketmq/store/CommitLog.java | 18 +++++++++++++++---
.../rocketmq/store/config/MessageStoreConfig.java | 11 +++++++++++
2 files changed, 26 insertions(+), 3 deletions(-)
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 286f31cd4a..3b92f1a745 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -738,6 +738,7 @@ public class CommitLog implements Swappable {
// recover by the minimum time stamp
boolean checkCRCOnRecover =
this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
boolean checkDupInfo =
this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable();
+ boolean checkCommitLogOffsetOnRecover =
this.defaultMessageStore.getMessageStoreConfig().isCheckCommitLogOffsetOnRecover();
int maxRecoverNum =
this.defaultMessageStore.getMessageStoreConfig().getCommitLogRecoverMaxNum();
if (maxRecoverNum <= 0) {
maxRecoverNum = 10;
@@ -792,8 +793,18 @@ public class CommitLog implements Swappable {
while (true) {
DispatchRequest dispatchRequest =
this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover, checkDupInfo);
int size = dispatchRequest.getMsgSize();
-
if (dispatchRequest.isSuccess()) {
+ // Check commitlog offset validity if enabled
+ if (checkCommitLogOffsetOnRecover) {
+ if (dispatchRequest.getCommitLogOffset() <
mappedFile.getFileFromOffset()
+ || dispatchRequest.getCommitLogOffset() >
mappedFile.getFileFromOffset() + mappedFile.getFileSize()) {
+ log.warn("found illegal commitlog offset {} in {},
current pos={}, it will be truncated.",
+ dispatchRequest.getCommitLogOffset(),
mappedFile.getFileName(), processOffset + mappedFileOffset);
+ log.info("recover physics file end, {} pos={}",
mappedFile.getFileName(), byteBuffer.position());
+
+ break;
+ }
+ }
// Normal data
if (size > 0) {
lastValidMsgPhyOffset = processOffset +
mappedFileOffset;
@@ -925,7 +936,8 @@ public class CommitLog implements Swappable {
return isMappedFileMatchedRecover(phyOffset, storeTimestamp,
recoverNormally);
}
- private boolean isMappedFileMatchedRecover(long phyOffset, long
storeTimestamp, boolean recoverNormally) throws RocksDBException {
+ private boolean isMappedFileMatchedRecover(long phyOffset, long
storeTimestamp,
+ boolean recoverNormally) throws RocksDBException {
boolean result =
this.defaultMessageStore.getQueueStore().isMappedFileMatchedRecover(phyOffset,
storeTimestamp, recoverNormally);
if (null != this.defaultMessageStore.getTransMessageRocksDBStore() &&
defaultMessageStore.getMessageStoreConfig().isTransRocksDBEnable() &&
!defaultMessageStore.getMessageStoreConfig().isTransWriteOriginTransHalfEnable())
{
result = result &&
this.defaultMessageStore.getTransMessageRocksDBStore().isMappedFileMatchedRecover(phyOffset);
@@ -2386,7 +2398,7 @@ public class CommitLog implements Swappable {
long costTime = this.systemClock.now() -
beginClockTimestamp;
log.info("[{}] scanFilesInPageCache-cost {} ms.", costTime
> 30 * 1000 ? "NOTIFYME" : "OK", costTime);
} catch (Throwable e) {
- log.warn("{} service has e: ", this.getServiceName() , e);
+ log.warn("{} service has e: ", this.getServiceName(), e);
}
}
log.info("{} service end", this.getServiceName());
diff --git
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index 65dba5390d..d7f17efd64 100644
---
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -194,6 +194,9 @@ public class MessageStoreConfig {
// This ensures no on-the-wire or on-disk corruption to the messages
occurred.
// This check adds some overhead,so it may be disabled in cases seeking
extreme performance.
private boolean checkCRCOnRecover = true;
+ // Whether check the commitlog offset validity during abnormal recovery.
+ // This helps detect and truncate old file data that may pass CRC checks
but contains invalid offsets.
+ private boolean checkCommitLogOffsetOnRecover = false;
// How many pages are to be flushed when flush CommitLog
private int flushCommitLogLeastPages = 4;
// How many pages are to be committed when commit data to file
@@ -796,6 +799,14 @@ public class MessageStoreConfig {
this.checkCRCOnRecover = checkCRCOnRecover;
}
+ public boolean isCheckCommitLogOffsetOnRecover() {
+ return checkCommitLogOffsetOnRecover;
+ }
+
+ public void setCheckCommitLogOffsetOnRecover(boolean
checkCommitLogOffsetOnRecover) {
+ this.checkCommitLogOffsetOnRecover = checkCommitLogOffsetOnRecover;
+ }
+
public boolean isForceVerifyPropCRC() {
return forceVerifyPropCRC;
}