This is an automated email from the ASF dual-hosted git repository.
RongtongJin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 2e6632ff40 [ISSUE #10417] Fix stale minOffset after consumeQueue
truncation (#10418)
2e6632ff40 is described below
commit 2e6632ff404b6210057914cf0530f53d6531ab99
Author: chenxu80 <[email protected]>
AuthorDate: Tue Jun 23 17:10:16 2026 +0800
[ISSUE #10417] Fix stale minOffset after consumeQueue truncation (#10418)
---
.../org/apache/rocketmq/store/ConsumeQueue.java | 17 +++---
.../apache/rocketmq/store/ConsumeQueueTest.java | 62 ++++++++++++++++++++++
2 files changed, 71 insertions(+), 8 deletions(-)
diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
index 5cba37b8ce..0d698dacfe 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
@@ -552,17 +552,18 @@ public class ConsumeQueue implements
ConsumeQueueInterface {
*/
@Override
public void correctMinOffset(long minCommitLogOffset) {
- // Check if the consume queue is the state of deprecation.
- if (minLogicOffset >= mappedFileQueue.getMaxOffset()) {
- log.info("ConsumeQueue[Topic={}, queue-id={}] contains no valid
entries", topic, queueId);
+ MappedFile lastMappedFile = this.mappedFileQueue.getLastMappedFile();
+ if (null == lastMappedFile) {
+ log.info("ConsumeQueue[Topic={}, queue-id={}] contains no entry,"
+ + " reset minLogicOffset to 0, maxPhysicOffset to -1",
topic, queueId);
+ this.minLogicOffset = 0;
+ this.maxPhysicOffset = -1;
return;
}
- // Check whether the consume queue maps no valid data at all. This
check may cost 1 IO operation.
- // The rationale is that consume queue always preserves the last file.
In case there are many deprecated topics,
- // This check would save a lot of efforts.
- MappedFile lastMappedFile = this.mappedFileQueue.getLastMappedFile();
- if (null == lastMappedFile) {
+ // Check if the consume queue is the state of deprecation.
+ if (minLogicOffset > mappedFileQueue.getMaxOffset()) {
+ log.warn("ConsumeQueue[Topic={}, queue-id={}] contains no valid
entries", topic, queueId);
return;
}
diff --git
a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
index 3ad66b3cef..e8e3797d02 100644
--- a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
@@ -24,6 +24,7 @@ import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
+import java.nio.file.Files;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -713,4 +714,65 @@ public class ConsumeQueueTest {
consumeQueue.destroy();
FileUtils.deleteDirectory(tmpDir);
}
+
+ @Test
+ public void testCorrectMinOffsetAfterAllFilesDeleted() throws IOException {
+ String topic = "T1";
+ int queueId = 0;
+ MessageStoreConfig storeConfig = new MessageStoreConfig();
+ File tmpDir =
Files.createTempDirectory("testCorrectMinOffsetAfterAllFilesDeleted").toFile();
+ storeConfig.setStorePathRootDir(tmpDir.getAbsolutePath());
+ storeConfig.setEnableConsumeQueueExt(false);
+ DefaultMessageStore messageStore =
Mockito.mock(DefaultMessageStore.class);
+
Mockito.when(messageStore.getMessageStoreConfig()).thenReturn(storeConfig);
+
+ RunningFlags runningFlags = new RunningFlags();
+ Mockito.when(messageStore.getRunningFlags()).thenReturn(runningFlags);
+
+ StoreCheckpoint storeCheckpoint = Mockito.mock(StoreCheckpoint.class);
+
Mockito.when(messageStore.getStoreCheckpoint()).thenReturn(storeCheckpoint);
+
+ ConsumeQueue consumeQueue = null;
+ ConsumeQueue reloadedConsumeQueue = null;
+ try {
+ consumeQueue = new ConsumeQueue(topic, queueId,
storeConfig.getStorePathRootDir(),
+ storeConfig.getMappedFileSizeConsumeQueue(), messageStore);
+
+ int max = 10;
+ int messageSize = 100;
+ for (int i = 0; i < max; ++i) {
+ DispatchRequest dispatchRequest = new DispatchRequest(topic,
queueId, messageSize * i, messageSize, 0, 0, i,
+ null, null, 0, 0, null);
+ consumeQueue.putMessagePositionInfoWrapper(dispatchRequest);
+ }
+
+ File consumeQueueDir = new File(storeConfig.getStorePathRootDir(),
topic + File.separator + queueId);
+ Assert.assertTrue(consumeQueueDir.exists());
+
+ consumeQueue.setMinLogicOffset(ConsumeQueue.CQ_STORE_UNIT_SIZE);
+ consumeQueue.setMaxPhysicOffset(max * messageSize);
+ consumeQueue.destroy();
+ consumeQueue = null;
+ FileUtils.deleteQuietly(consumeQueueDir);
+ Assert.assertFalse(consumeQueueDir.exists());
+
+ reloadedConsumeQueue = new ConsumeQueue(topic, queueId,
storeConfig.getStorePathRootDir(),
+ storeConfig.getMappedFileSizeConsumeQueue(), messageStore);
+ Assert.assertTrue(reloadedConsumeQueue.load());
+ reloadedConsumeQueue.recover();
+
reloadedConsumeQueue.setMinLogicOffset(ConsumeQueue.CQ_STORE_UNIT_SIZE);
+ reloadedConsumeQueue.setMaxPhysicOffset(max * messageSize);
+ reloadedConsumeQueue.correctMinOffset(0L);
+ Assert.assertEquals(0L, reloadedConsumeQueue.getMinLogicOffset());
+ Assert.assertEquals(-1L,
reloadedConsumeQueue.getMaxPhysicOffset());
+ } finally {
+ if (reloadedConsumeQueue != null) {
+ reloadedConsumeQueue.destroy();
+ }
+ if (consumeQueue != null) {
+ consumeQueue.destroy();
+ }
+ FileUtils.deleteQuietly(tmpDir);
+ }
+ }
}