This is an automated email from the ASF dual-hosted git repository.
lizhimins pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 8a589c6a53 [ISSUE #10453] Fix unexpected decrement of lmq counter
(#10454)
8a589c6a53 is described below
commit 8a589c6a53301a0a3d5e85341f563bd26c107bb4
Author: imzs <[email protected]>
AuthorDate: Thu Jun 11 21:11:08 2026 +0800
[ISSUE #10453] Fix unexpected decrement of lmq counter (#10454)
---
.../queue/RocksDBConsumeQueueOffsetTable.java | 6 +++++-
.../queue/RocksDBConsumeQueueOffsetTableTest.java | 23 ++++++++++++++++++++++
2 files changed, 28 insertions(+), 1 deletion(-)
diff --git
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueOffsetTable.java
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueOffsetTable.java
index dc3712663c..f7bd4cda17 100644
---
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueOffsetTable.java
+++
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueOffsetTable.java
@@ -600,7 +600,11 @@ public class RocksDBConsumeQueueOffsetTable {
private Long removeHeapMaxCqOffset(String topicQueueId) {
Long prev = this.topicQueueMaxCqOffset.remove(topicQueueId);
if (prev != null && topicQueueId.startsWith(MixAll.LMQ_PREFIX)) {
- lmqCounter.decrementAndGet();
+ if (prev != -1L) {
+ lmqCounter.decrementAndGet();
+ } else {
+ ERROR_LOG.warn("remove lmq entry that was never actually used.
{}", topicQueueId);
+ }
}
return prev;
}
diff --git
a/store/src/test/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueOffsetTableTest.java
b/store/src/test/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueOffsetTableTest.java
index b8f415537e..1e0886fc15 100644
---
a/store/src/test/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueOffsetTableTest.java
+++
b/store/src/test/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueOffsetTableTest.java
@@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.queue.offset.OffsetEntryType;
import org.apache.rocketmq.store.rocksdb.ConsumeQueueRocksDBStorage;
import org.junit.AfterClass;
@@ -38,6 +39,7 @@ import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
+import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.Options;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
@@ -53,12 +55,18 @@ public class RocksDBConsumeQueueOffsetTableTest {
@Mock
private ConsumeQueueRocksDBStorage rocksDBStorage;
+ @Mock
+ private ColumnFamilyHandle columnFamilyHandle;
+
@Mock
private RocksDBConsumeQueueTable consumeQueueTable;
@Mock
private DefaultMessageStore messageStore;
+ @Mock
+ private MessageStoreConfig messageStoreConfig;
+
private static RocksDB db;
private static File dbPath;
@@ -90,6 +98,8 @@ public class RocksDBConsumeQueueOffsetTableTest {
public void setUp() {
RocksIterator iterator = db.newIterator();
Mockito.doReturn(iterator).when(rocksDBStorage).seekOffsetCF();
+
Mockito.doReturn(columnFamilyHandle).when(rocksDBStorage).getOffsetCFHandle();
+
Mockito.doReturn(messageStoreConfig).when(messageStore).getMessageStoreConfig();
offsetTable = new RocksDBConsumeQueueOffsetTable(consumeQueueTable,
rocksDBStorage, messageStore);
}
@@ -136,6 +146,19 @@ public class RocksDBConsumeQueueOffsetTableTest {
Assert.assertEquals(initCount + lmqCount, offsetTable.getLmqNum());
}
+ @Test
+ public void testLmqCounter_decrement() throws RocksDBException {
+ offsetTable.load();
+ int initCount = offsetTable.getLmqNum();
+ String topic = MixAll.LMQ_PREFIX + UUID.randomUUID();
+ Long maxOffset = offsetTable.getMaxCqOffset(topic, 0);
+ Assert.assertNull(maxOffset);
+ Assert.assertEquals(initCount, offsetTable.getLmqNum());
+
+ offsetTable.destroyOffset(topic, 0, new WriteBatch());
+ Assert.assertEquals(initCount, offsetTable.getLmqNum());
+ }
+
private static void writeOffset(String topic, int queueId, long phyOffset,
long cqOffset, boolean max) throws RocksDBException {
byte[] topicInBytes = topic.getBytes(StandardCharsets.UTF_8);