This is an automated email from the ASF dual-hosted git repository.

RongtongJin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new bfbbb2a955 [#10421] Fix Timer message rocksdb use wrong cache key. 
(#10422)
bfbbb2a955 is described below

commit bfbbb2a95535374d285e97bf2d159e00b6e26dc8
Author: echooymxq <[email protected]>
AuthorDate: Sat Jun 20 10:44:49 2026 +0800

    [#10421] Fix Timer message rocksdb use wrong cache key. (#10422)
---
 .../store/rocksdb/MessageRocksDBStorage.java       |  10 +-
 .../store/rocksdb/MessageRocksDBStorageTest.java   | 143 +++++++++++++++++++++
 2 files changed, 150 insertions(+), 3 deletions(-)

diff --git 
a/store/src/main/java/org/apache/rocketmq/store/rocksdb/MessageRocksDBStorage.java
 
b/store/src/main/java/org/apache/rocketmq/store/rocksdb/MessageRocksDBStorage.java
index 8d32998bdc..d55596a293 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/rocksdb/MessageRocksDBStorage.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/rocksdb/MessageRocksDBStorage.java
@@ -84,7 +84,7 @@ public class MessageRocksDBStorage extends 
AbstractRocksDBStorage {
     private volatile ColumnFamilyHandle transCFHandle;
 
     private final ScheduledExecutorService scheduler = 
Executors.newScheduledThreadPool(1);
-    private static final Cache<byte[], byte[]> DELETE_KEY_CACHE_FOR_TIMER = 
CacheBuilder.newBuilder()
+    private static final Cache<String, byte[]> DELETE_KEY_CACHE_FOR_TIMER = 
CacheBuilder.newBuilder()
         .maximumSize(10000)
         .expireAfterWrite(60, TimeUnit.MINUTES)
         .build();
@@ -354,9 +354,9 @@ public class MessageRocksDBStorage extends 
AbstractRocksDBStorage {
                         writeBatch.put(cfHandle, keyBytes, valueBytes);
                     } else if (record.getActionFlag() == TIMER_ROCKSDB_DELETE) 
{
                         writeBatch.delete(cfHandle, keyBytes);
-                        DELETE_KEY_CACHE_FOR_TIMER.put(keyBytes, 
DELETE_VAL_FLAG);
+                        
DELETE_KEY_CACHE_FOR_TIMER.put(getTimerCacheKey(record.getDelayTime(), 
record.getUniqKey()), DELETE_VAL_FLAG);
                     } else if (record.getActionFlag() == TIMER_ROCKSDB_UPDATE) 
{
-                        byte[] deleteByte = 
DELETE_KEY_CACHE_FOR_TIMER.getIfPresent(keyBytes);
+                        byte[] deleteByte = 
DELETE_KEY_CACHE_FOR_TIMER.getIfPresent(getTimerCacheKey(record.getDelayTime(), 
record.getUniqKey()));
                         if (null == deleteByte) {
                             writeBatch.put(cfHandle, keyBytes, valueBytes);
                         }
@@ -373,6 +373,10 @@ public class MessageRocksDBStorage extends 
AbstractRocksDBStorage {
         }
     }
 
+    private static String getTimerCacheKey(long delayTime, String uniqKey) {
+        return delayTime + ":" + uniqKey;
+    }
+
     public List<TimerRocksDBRecord> scanRecordsForTimer(byte[] columnFamily, 
long lowerTime, long upperTime, int size, byte[] startKey) {
         ColumnFamilyHandle cfHandle = getColumnFamily(columnFamily);
         if (null == cfHandle || lowerTime <= 0L || upperTime <= 0L || 
lowerTime > upperTime || size <= 0) {
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/rocksdb/MessageRocksDBStorageTest.java
 
b/store/src/test/java/org/apache/rocketmq/store/rocksdb/MessageRocksDBStorageTest.java
new file mode 100644
index 0000000000..d28ef19f54
--- /dev/null
+++ 
b/store/src/test/java/org/apache/rocketmq/store/rocksdb/MessageRocksDBStorageTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.rocksdb;
+
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.timer.rocksdb.TimerRocksDBRecord;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+import static 
org.apache.rocketmq.store.rocksdb.MessageRocksDBStorage.TIMER_COLUMN_FAMILY;
+
+public class MessageRocksDBStorageTest {
+
+    private MessageRocksDBStorage storage;
+    private String storePath;
+
+    @Before
+    public void setUp() throws Exception {
+        storePath = System.getProperty("java.io.tmpdir") + File.separator + 
"message_rocksdb_test_" + System.currentTimeMillis();
+        MessageStoreConfig config = new MessageStoreConfig();
+        config.setStorePathRootDir(storePath);
+        storage = new MessageRocksDBStorage(config);
+    }
+
+    @After
+    public void tearDown() {
+        if (null != storage) {
+            storage.shutdown();
+        }
+        UtilAll.deleteFile(new File(storePath));
+    }
+
+    @Test
+    public void testPutThenDelete() {
+        long delayTime = System.currentTimeMillis() + 3600000L;
+        String uniqKey = "0A0A0A0A00002A9F0000000000000003";
+
+        TimerRocksDBRecord putRecord = new TimerRocksDBRecord(delayTime, 
uniqKey, 100L, 200, 0L, null);
+        putRecord.setActionFlag(TimerRocksDBRecord.TIMER_ROCKSDB_PUT);
+
+        List<TimerRocksDBRecord> putList = new ArrayList<>();
+        putList.add(putRecord);
+        storage.writeRecordsForTimer(TIMER_COLUMN_FAMILY, putList);
+
+        TimerRocksDBRecord deleteRecord = new TimerRocksDBRecord(delayTime, 
uniqKey, 100L, 200, 0L, null);
+        deleteRecord.setActionFlag(TimerRocksDBRecord.TIMER_ROCKSDB_DELETE);
+
+        List<TimerRocksDBRecord> deleteList = new ArrayList<>();
+        deleteList.add(deleteRecord);
+        storage.writeRecordsForTimer(TIMER_COLUMN_FAMILY, deleteList);
+
+        List<TimerRocksDBRecord> result = storage.scanRecordsForTimer(
+            TIMER_COLUMN_FAMILY, delayTime - 1, delayTime + 1, 10, null);
+
+        Assert.assertTrue(null == result || result.isEmpty());
+    }
+
+    @Test
+    public void testPutThenUpdate() {
+        long delayTime = System.currentTimeMillis() + 3600000L;
+        String uniqKey = "0A0A0A0A00002A9F0000000000000004";
+
+        TimerRocksDBRecord putRecord = new TimerRocksDBRecord(delayTime, 
uniqKey, 100L, 200, 0L, null);
+        putRecord.setActionFlag(TimerRocksDBRecord.TIMER_ROCKSDB_PUT);
+
+        List<TimerRocksDBRecord> putList = new ArrayList<>();
+        putList.add(putRecord);
+        storage.writeRecordsForTimer(TIMER_COLUMN_FAMILY, putList);
+
+        TimerRocksDBRecord updateRecord = new TimerRocksDBRecord(delayTime, 
uniqKey, 200L, 300, 1L, null);
+        updateRecord.setActionFlag(TimerRocksDBRecord.TIMER_ROCKSDB_UPDATE);
+
+        List<TimerRocksDBRecord> updateList = new ArrayList<>();
+        updateList.add(updateRecord);
+        storage.writeRecordsForTimer(TIMER_COLUMN_FAMILY, updateList);
+
+        List<TimerRocksDBRecord> result = storage.scanRecordsForTimer(
+            TIMER_COLUMN_FAMILY, delayTime - 1, delayTime + 1, 10, null);
+
+        Assert.assertNotNull("PUT then UPDATE should have 1 record", result);
+        Assert.assertEquals(1, result.size());
+        Assert.assertEquals(200L, result.get(0).getOffsetPy());
+        Assert.assertEquals(300, result.get(0).getSizePy());
+    }
+
+    @Test
+    public void testDeleteThenUpdate() {
+        long delayTime = System.currentTimeMillis() + 3600000L;
+        String uniqKey = "0A0A0A0A00002A9F0000000000000001";
+
+        TimerRocksDBRecord putRecord = new TimerRocksDBRecord(delayTime, 
uniqKey, 100L, 200, 0L, null);
+        putRecord.setActionFlag(TimerRocksDBRecord.TIMER_ROCKSDB_PUT);
+
+        List<TimerRocksDBRecord> putList = new ArrayList<>();
+        putList.add(putRecord);
+        storage.writeRecordsForTimer(TIMER_COLUMN_FAMILY, putList);
+
+        List<TimerRocksDBRecord> scanAfterPut = storage.scanRecordsForTimer(
+            TIMER_COLUMN_FAMILY, delayTime - 1, delayTime + 1, 10, null);
+        Assert.assertNotNull("PUT should create a record in RocksDB", 
scanAfterPut);
+        Assert.assertEquals(1, scanAfterPut.size());
+
+        TimerRocksDBRecord deleteRecord = new TimerRocksDBRecord(delayTime, 
uniqKey, 100L, 200, 0L, null);
+        deleteRecord.setActionFlag(TimerRocksDBRecord.TIMER_ROCKSDB_DELETE);
+
+        TimerRocksDBRecord updateRecord = new TimerRocksDBRecord(delayTime, 
uniqKey, 200L, 300, 1L, null);
+        updateRecord.setActionFlag(TimerRocksDBRecord.TIMER_ROCKSDB_UPDATE);
+
+        List<TimerRocksDBRecord> cudList = new ArrayList<>();
+        cudList.add(deleteRecord);
+        cudList.add(updateRecord);
+        storage.writeRecordsForTimer(TIMER_COLUMN_FAMILY, cudList);
+
+        List<TimerRocksDBRecord> resultAfterDeleteUpdate = 
storage.scanRecordsForTimer(
+            TIMER_COLUMN_FAMILY, delayTime - 1, delayTime + 1, 10, null);
+
+        int recordCount = null == resultAfterDeleteUpdate ? 0 : 
resultAfterDeleteUpdate.size();
+        Assert.assertEquals(0, recordCount);
+    }
+
+}

Reply via email to