This is an automated email from the ASF dual-hosted git repository.

lizhimins pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 8a589c6a53 [ISSUE #10453] Fix unexpected decrement of lmq counter 
(#10454)
8a589c6a53 is described below

commit 8a589c6a53301a0a3d5e85341f563bd26c107bb4
Author: imzs <[email protected]>
AuthorDate: Thu Jun 11 21:11:08 2026 +0800

    [ISSUE #10453] Fix unexpected decrement of lmq counter (#10454)
---
 .../queue/RocksDBConsumeQueueOffsetTable.java      |  6 +++++-
 .../queue/RocksDBConsumeQueueOffsetTableTest.java  | 23 ++++++++++++++++++++++
 2 files changed, 28 insertions(+), 1 deletion(-)

diff --git 
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueOffsetTable.java
 
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueOffsetTable.java
index dc3712663c..f7bd4cda17 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueOffsetTable.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueOffsetTable.java
@@ -600,7 +600,11 @@ public class RocksDBConsumeQueueOffsetTable {
     private Long removeHeapMaxCqOffset(String topicQueueId) {
         Long prev = this.topicQueueMaxCqOffset.remove(topicQueueId);
         if (prev != null && topicQueueId.startsWith(MixAll.LMQ_PREFIX)) {
-            lmqCounter.decrementAndGet();
+            if (prev != -1L) {
+                lmqCounter.decrementAndGet();
+            } else {
+                ERROR_LOG.warn("remove lmq entry that was never actually used. 
{}", topicQueueId);
+            }
         }
         return prev;
     }
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueOffsetTableTest.java
 
b/store/src/test/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueOffsetTableTest.java
index b8f415537e..1e0886fc15 100644
--- 
a/store/src/test/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueOffsetTableTest.java
+++ 
b/store/src/test/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueOffsetTableTest.java
@@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.apache.rocketmq.store.queue.offset.OffsetEntryType;
 import org.apache.rocketmq.store.rocksdb.ConsumeQueueRocksDBStorage;
 import org.junit.AfterClass;
@@ -38,6 +39,7 @@ import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.junit.MockitoJUnitRunner;
+import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.Options;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
@@ -53,12 +55,18 @@ public class RocksDBConsumeQueueOffsetTableTest {
     @Mock
     private ConsumeQueueRocksDBStorage rocksDBStorage;
 
+    @Mock
+    private ColumnFamilyHandle columnFamilyHandle;
+
     @Mock
     private RocksDBConsumeQueueTable consumeQueueTable;
 
     @Mock
     private DefaultMessageStore messageStore;
 
+    @Mock
+    private MessageStoreConfig messageStoreConfig;
+
     private static RocksDB db;
 
     private static File dbPath;
@@ -90,6 +98,8 @@ public class RocksDBConsumeQueueOffsetTableTest {
     public void setUp() {
         RocksIterator iterator = db.newIterator();
         Mockito.doReturn(iterator).when(rocksDBStorage).seekOffsetCF();
+        
Mockito.doReturn(columnFamilyHandle).when(rocksDBStorage).getOffsetCFHandle();
+        
Mockito.doReturn(messageStoreConfig).when(messageStore).getMessageStoreConfig();
         offsetTable = new RocksDBConsumeQueueOffsetTable(consumeQueueTable, 
rocksDBStorage, messageStore);
     }
 
@@ -136,6 +146,19 @@ public class RocksDBConsumeQueueOffsetTableTest {
         Assert.assertEquals(initCount + lmqCount, offsetTable.getLmqNum());
     }
 
+    @Test
+    public void testLmqCounter_decrement() throws RocksDBException {
+        offsetTable.load();
+        int initCount = offsetTable.getLmqNum();
+        String topic = MixAll.LMQ_PREFIX + UUID.randomUUID();
+        Long maxOffset = offsetTable.getMaxCqOffset(topic, 0);
+        Assert.assertNull(maxOffset);
+        Assert.assertEquals(initCount, offsetTable.getLmqNum());
+
+        offsetTable.destroyOffset(topic, 0, new WriteBatch());
+        Assert.assertEquals(initCount, offsetTable.getLmqNum());
+    }
+
     private static void writeOffset(String topic, int queueId, long phyOffset,
         long cqOffset, boolean max) throws RocksDBException {
         byte[] topicInBytes = topic.getBytes(StandardCharsets.UTF_8);

Reply via email to