This is an automated email from the ASF dual-hosted git repository.

RongtongJin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 2e6632ff40 [ISSUE #10417] Fix stale minOffset after consumeQueue 
truncation (#10418)
2e6632ff40 is described below

commit 2e6632ff404b6210057914cf0530f53d6531ab99
Author: chenxu80 <[email protected]>
AuthorDate: Tue Jun 23 17:10:16 2026 +0800

    [ISSUE #10417] Fix stale minOffset after consumeQueue truncation (#10418)
---
 .../org/apache/rocketmq/store/ConsumeQueue.java    | 17 +++---
 .../apache/rocketmq/store/ConsumeQueueTest.java    | 62 ++++++++++++++++++++++
 2 files changed, 71 insertions(+), 8 deletions(-)

diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java 
b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
index 5cba37b8ce..0d698dacfe 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
@@ -552,17 +552,18 @@ public class ConsumeQueue implements 
ConsumeQueueInterface {
      */
     @Override
     public void correctMinOffset(long minCommitLogOffset) {
-        // Check if the consume queue is the state of deprecation.
-        if (minLogicOffset >= mappedFileQueue.getMaxOffset()) {
-            log.info("ConsumeQueue[Topic={}, queue-id={}] contains no valid 
entries", topic, queueId);
+        MappedFile lastMappedFile = this.mappedFileQueue.getLastMappedFile();
+        if (null == lastMappedFile) {
+            log.info("ConsumeQueue[Topic={}, queue-id={}] contains no entry,"
+                    + " reset minLogicOffset to 0, maxPhysicOffset to -1", 
topic, queueId);
+            this.minLogicOffset = 0;
+            this.maxPhysicOffset = -1;
             return;
         }
 
-        // Check whether the consume queue maps no valid data at all. This 
check may cost 1 IO operation.
-        // The rationale is that consume queue always preserves the last file. 
In case there are many deprecated topics,
-        // This check would save a lot of efforts.
-        MappedFile lastMappedFile = this.mappedFileQueue.getLastMappedFile();
-        if (null == lastMappedFile) {
+        // Check if the consume queue is the state of deprecation.
+        if (minLogicOffset > mappedFileQueue.getMaxOffset()) {
+            log.warn("ConsumeQueue[Topic={}, queue-id={}] contains no valid 
entries", topic, queueId);
             return;
         }
 
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java 
b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
index 3ad66b3cef..e8e3797d02 100644
--- a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
@@ -24,6 +24,7 @@ import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.net.UnknownHostException;
+import java.nio.file.Files;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -713,4 +714,65 @@ public class ConsumeQueueTest {
         consumeQueue.destroy();
         FileUtils.deleteDirectory(tmpDir);
     }
+
+    @Test
+    public void testCorrectMinOffsetAfterAllFilesDeleted() throws IOException {
+        String topic = "T1";
+        int queueId = 0;
+        MessageStoreConfig storeConfig = new MessageStoreConfig();
+        File tmpDir = 
Files.createTempDirectory("testCorrectMinOffsetAfterAllFilesDeleted").toFile();
+        storeConfig.setStorePathRootDir(tmpDir.getAbsolutePath());
+        storeConfig.setEnableConsumeQueueExt(false);
+        DefaultMessageStore messageStore = 
Mockito.mock(DefaultMessageStore.class);
+        
Mockito.when(messageStore.getMessageStoreConfig()).thenReturn(storeConfig);
+
+        RunningFlags runningFlags = new RunningFlags();
+        Mockito.when(messageStore.getRunningFlags()).thenReturn(runningFlags);
+
+        StoreCheckpoint storeCheckpoint = Mockito.mock(StoreCheckpoint.class);
+        
Mockito.when(messageStore.getStoreCheckpoint()).thenReturn(storeCheckpoint);
+
+        ConsumeQueue consumeQueue = null;
+        ConsumeQueue reloadedConsumeQueue = null;
+        try {
+            consumeQueue = new ConsumeQueue(topic, queueId, 
storeConfig.getStorePathRootDir(),
+                storeConfig.getMappedFileSizeConsumeQueue(), messageStore);
+
+            int max = 10;
+            int messageSize = 100;
+            for (int i = 0; i < max; ++i) {
+                DispatchRequest dispatchRequest = new DispatchRequest(topic, 
queueId, messageSize * i, messageSize, 0, 0, i,
+                    null, null, 0, 0, null);
+                consumeQueue.putMessagePositionInfoWrapper(dispatchRequest);
+            }
+
+            File consumeQueueDir = new File(storeConfig.getStorePathRootDir(), 
topic + File.separator + queueId);
+            Assert.assertTrue(consumeQueueDir.exists());
+
+            consumeQueue.setMinLogicOffset(ConsumeQueue.CQ_STORE_UNIT_SIZE);
+            consumeQueue.setMaxPhysicOffset(max * messageSize);
+            consumeQueue.destroy();
+            consumeQueue = null;
+            FileUtils.deleteQuietly(consumeQueueDir);
+            Assert.assertFalse(consumeQueueDir.exists());
+
+            reloadedConsumeQueue = new ConsumeQueue(topic, queueId, 
storeConfig.getStorePathRootDir(),
+                storeConfig.getMappedFileSizeConsumeQueue(), messageStore);
+            Assert.assertTrue(reloadedConsumeQueue.load());
+            reloadedConsumeQueue.recover();
+            
reloadedConsumeQueue.setMinLogicOffset(ConsumeQueue.CQ_STORE_UNIT_SIZE);
+            reloadedConsumeQueue.setMaxPhysicOffset(max * messageSize);
+            reloadedConsumeQueue.correctMinOffset(0L);
+            Assert.assertEquals(0L, reloadedConsumeQueue.getMinLogicOffset());
+            Assert.assertEquals(-1L, 
reloadedConsumeQueue.getMaxPhysicOffset());
+        } finally {
+            if (reloadedConsumeQueue != null) {
+                reloadedConsumeQueue.destroy();
+            }
+            if (consumeQueue != null) {
+                consumeQueue.destroy();
+            }
+            FileUtils.deleteQuietly(tmpDir);
+        }
+    }
 }

Reply via email to