This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 79e700312a [ISSUE #9945] Use UniqueKey as TimerDelKey by default
79e700312a is described below

commit 79e700312a7ba00b4a8e42fede9a89a56eff9640
Author: imzs <[email protected]>
AuthorDate: Wed Dec 24 10:00:01 2025 +0800

    [ISSUE #9945] Use UniqueKey as TimerDelKey by default
---
 .../broker/processor/RecallMessageProcessor.java    |  4 ++--
 .../processor/RecallMessageProcessorTest.java       | 19 ++++++++++++++++++-
 .../rocketmq/store/config/MessageStoreConfig.java   | 10 ++++++++++
 .../rocketmq/store/timer/TimerMessageStore.java     |  8 ++++----
 .../rocketmq/store/timer/TimerMessageStoreTest.java |  8 +++++---
 .../test/recall/SendAndRecallDelayMessageIT.java    | 21 +++++++++++++++++++++
 6 files changed, 60 insertions(+), 10 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/RecallMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/RecallMessageProcessor.java
index 372db0d36e..fd537c3c9d 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/RecallMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/RecallMessageProcessor.java
@@ -135,8 +135,8 @@ public class RecallMessageProcessor implements 
NettyRequestProcessor {
         msgInner.setTags(RECALL_MESSAGE_TAG);
         msgInner.setTagsCode(RECALL_MESSAGE_TAG.hashCode());
         msgInner.setQueueId(0);
-        MessageAccessor.putProperty(msgInner, 
MessageConst.PROPERTY_TIMER_DEL_UNIQKEY,
-            TimerMessageStore.buildDeleteKey(handle.getTopic(), 
handle.getMessageId()));
+        MessageAccessor.putProperty(msgInner, 
MessageConst.PROPERTY_TIMER_DEL_UNIQKEY, TimerMessageStore.buildDeleteKey(
+            handle.getTopic(), handle.getMessageId(), 
brokerController.getMessageStoreConfig().isAppendTopicForTimerDeleteKey()));
         MessageAccessor.putProperty(msgInner,
             MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, 
handle.getMessageId());
         MessageAccessor.putProperty(msgInner,
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/RecallMessageProcessorTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/RecallMessageProcessorTest.java
index d28eb2f1df..f35870427c 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/RecallMessageProcessorTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/RecallMessageProcessorTest.java
@@ -96,7 +96,8 @@ public class RecallMessageProcessorTest {
     }
 
     @Test
-    public void testBuildMessage() {
+    public void testBuildMessage_withNamespace() {
+        
when(messageStoreConfig.isAppendTopicForTimerDeleteKey()).thenReturn(true);
         String timestampStr = String.valueOf(System.currentTimeMillis());
         String id = "id";
         RecallMessageHandle.HandleV1 handle = new 
RecallMessageHandle.HandleV1(TOPIC, "brokerName", timestampStr, id);
@@ -110,6 +111,22 @@ public class RecallMessageProcessorTest {
         Assert.assertEquals(TOPIC + "+" + id, 
properties.get(MessageConst.PROPERTY_TIMER_DEL_UNIQKEY));
     }
 
+    @Test
+    public void testBuildMessage_withoutNamespace() {
+        
when(messageStoreConfig.isAppendTopicForTimerDeleteKey()).thenReturn(false);
+        String timestampStr = String.valueOf(System.currentTimeMillis());
+        String id = "id";
+        RecallMessageHandle.HandleV1 handle = new 
RecallMessageHandle.HandleV1(TOPIC, "brokerName", timestampStr, id);
+        MessageExtBrokerInner msg =
+            recallMessageProcessor.buildMessage(handlerContext, new 
RecallMessageRequestHeader(), handle);
+
+        Assert.assertEquals(TOPIC, msg.getTopic());
+        Map<String, String> properties = 
MessageDecoder.string2messageProperties(msg.getPropertiesString());
+        Assert.assertEquals(timestampStr, 
properties.get(MessageConst.PROPERTY_TIMER_DELIVER_MS));
+        Assert.assertEquals(id, 
properties.get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
+        Assert.assertEquals(id, 
properties.get(MessageConst.PROPERTY_TIMER_DEL_UNIQKEY));
+    }
+
     @Test
     public void testHandlePutMessageResult() {
         MessageExt message = new MessageExt();
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java 
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index 9670f40d92..ad77319264 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -533,6 +533,8 @@ public class MessageStoreConfig {
 
     private boolean enableLogConsumeQueueRepeatedlyBuildWhenRecover = false;
 
+    private boolean appendTopicForTimerDeleteKey = false;
+
     public boolean isRocksdbCQDoubleWriteEnable() {
         return rocksdbCQDoubleWriteEnable;
     }
@@ -2236,4 +2238,12 @@ public class MessageStoreConfig {
     public void setSharedByteBufferNum(int sharedByteBufferNum) {
         this.sharedByteBufferNum = sharedByteBufferNum;
     }
+
+    public boolean isAppendTopicForTimerDeleteKey() {
+        return appendTopicForTimerDeleteKey;
+    }
+
+    public void setAppendTopicForTimerDeleteKey(boolean 
appendTopicForTimerDeleteKey) {
+        this.appendTopicForTimerDeleteKey = appendTopicForTimerDeleteKey;
+    }
 }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
index a32b4a3f21..5fee8da6d0 100644
--- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
@@ -1741,7 +1741,7 @@ public class TimerMessageStore {
                                         isRound = false;
                                     }
                                     if (null != uniqueKey && 
tr.getDeleteList() != null && tr.getDeleteList().size() > 0
-                                        && 
tr.getDeleteList().contains(buildDeleteKey(getRealTopic(msgExt), uniqueKey))) {
+                                        && 
tr.getDeleteList().contains(buildDeleteKey(getRealTopic(msgExt), uniqueKey, 
storeConfig.isAppendTopicForTimerDeleteKey()))) {
                                         avoidDeleteLose.remove(uniqueKey);
                                         doRes = true;
                                         tr.idempotentRelease();
@@ -2074,9 +2074,9 @@ public class TimerMessageStore {
         return timerCheckpoint;
     }
 
-    // identify a message by topic + uk, like query operation
-    public static String buildDeleteKey(String realTopic, String uniqueKey) {
-        return realTopic + "+" + uniqueKey;
+    // identify a message by topic or topic + uk(like query operation)
+    public static String buildDeleteKey(String realTopic, String uniqueKey, 
Boolean appendTopicForTimerDeleteKey) {
+        return appendTopicForTimerDeleteKey ? (realTopic + "+" + uniqueKey) : 
uniqueKey;
     }
 
     private void recallToTimeline(long delayTime, long offsetPy, int sizePy, 
MessageExt messageExt) {
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java
 
b/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java
index a014e77b90..fe1a1177c6 100644
--- 
a/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java
+++ 
b/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java
@@ -110,6 +110,7 @@ public class TimerMessageStoreTest {
         storeConfig.setFlushDiskType(FlushDiskType.ASYNC_FLUSH);
         storeConfig.setTimerInterceptDelayLevel(true);
         storeConfig.setTimerPrecisionMs(precisionMs);
+        storeConfig.setAppendTopicForTimerDeleteKey(false); // reset default 
value
 
         mockMessageStore = Mockito.mock(MessageStore.class);
         messageStore = new DefaultMessageStore(storeConfig, new 
BrokerStatsManager("TimerTest",false), new MyMessageArrivingListener(), new 
BrokerConfig(), new ConcurrentHashMap<>());
@@ -358,7 +359,7 @@ public class TimerMessageStoreTest {
 
         MessageExtBrokerInner delMsg = buildMessage(delayMs, topic, false);
         transformTimerMessage(timerMessageStore,delMsg);
-        MessageAccessor.putProperty(delMsg, 
TimerMessageStore.TIMER_DELETE_UNIQUE_KEY, 
TimerMessageStore.buildDeleteKey(topic, uniqKey));
+        MessageAccessor.putProperty(delMsg, 
TimerMessageStore.TIMER_DELETE_UNIQUE_KEY, 
TimerMessageStore.buildDeleteKey(topic, uniqKey, false));
         
delMsg.setPropertiesString(MessageDecoder.messageProperties2String(delMsg.getProperties()));
         assertEquals(PutMessageStatus.PUT_OK, 
messageStore.putMessage(delMsg).getPutMessageStatus());
 
@@ -375,6 +376,7 @@ public class TimerMessageStoreTest {
 
     @Test
     public void testDeleteTimerMessage_ukCollision() throws Exception {
+        storeConfig.setAppendTopicForTimerDeleteKey(true); // append topic as 
namespace
         String topic = "TimerTest_testDeleteTimerMessage";
         String collisionTopic = "TimerTest_testDeleteTimerMessage_collision";
 
@@ -397,13 +399,13 @@ public class TimerMessageStoreTest {
 
         MessageExtBrokerInner delMsg = buildMessage(delayMs, "whatever", 
false);
         transformTimerMessage(timerMessageStore, delMsg);
-        MessageAccessor.putProperty(delMsg, 
TimerMessageStore.TIMER_DELETE_UNIQUE_KEY, 
TimerMessageStore.buildDeleteKey(topic, firstUniqKey));
+        MessageAccessor.putProperty(delMsg, 
TimerMessageStore.TIMER_DELETE_UNIQUE_KEY, 
TimerMessageStore.buildDeleteKey(topic, firstUniqKey, true));
         
delMsg.setPropertiesString(MessageDecoder.messageProperties2String(delMsg.getProperties()));
         assertEquals(PutMessageStatus.PUT_OK, 
messageStore.putMessage(delMsg).getPutMessageStatus());
 
         delMsg = buildMessage(delayMs, "whatever", false);
         transformTimerMessage(timerMessageStore, delMsg);
-        MessageAccessor.putProperty(delMsg, 
TimerMessageStore.TIMER_DELETE_UNIQUE_KEY, 
TimerMessageStore.buildDeleteKey(collisionTopic, secondUniqKey));
+        MessageAccessor.putProperty(delMsg, 
TimerMessageStore.TIMER_DELETE_UNIQUE_KEY, 
TimerMessageStore.buildDeleteKey(collisionTopic, secondUniqKey, true));
         
delMsg.setPropertiesString(MessageDecoder.messageProperties2String(delMsg.getProperties()));
         assertEquals(PutMessageStatus.PUT_OK, 
messageStore.putMessage(delMsg).getPutMessageStatus());
 
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/recall/SendAndRecallDelayMessageIT.java
 
b/test/src/test/java/org/apache/rocketmq/test/recall/SendAndRecallDelayMessageIT.java
index ec73226a9e..3bcf20953b 100644
--- 
a/test/src/test/java/org/apache/rocketmq/test/recall/SendAndRecallDelayMessageIT.java
+++ 
b/test/src/test/java/org/apache/rocketmq/test/recall/SendAndRecallDelayMessageIT.java
@@ -42,7 +42,10 @@ import org.apache.rocketmq.test.util.MQRandomUtils;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+@RunWith(Parameterized.class)
 public class SendAndRecallDelayMessageIT extends BaseConf {
 
     private static String initTopic;
@@ -50,8 +53,23 @@ public class SendAndRecallDelayMessageIT extends BaseConf {
     private static RMQNormalProducer producer;
     private static RMQPopConsumer popConsumer;
 
+    private final boolean appendTopicForTimerDeleteKey;
+
+    public SendAndRecallDelayMessageIT(boolean appendTopicForTimerDeleteKey) {
+        this.appendTopicForTimerDeleteKey = appendTopicForTimerDeleteKey;
+    }
+
+    @Parameterized.Parameters
+    public static List<Object[]> params() {
+        List<Object[]> result = new ArrayList<>();
+        result.add(new Object[] {false});
+        result.add(new Object[] {true});
+        return result;
+    }
+
     @Before
     public void init() {
+        
brokerController1.getMessageStoreConfig().setAppendTopicForTimerDeleteKey(appendTopicForTimerDeleteKey);
         initTopic = initTopic();
         consumerGroup = initConsumerGroup();
         producer = getProducer(NAMESRV_ADDR, initTopic);
@@ -126,6 +144,9 @@ public class SendAndRecallDelayMessageIT extends BaseConf {
 
     @Test
     public void testSendAndRecall_ukCollision() throws Exception {
+        if (!appendTopicForTimerDeleteKey) { // skip
+            return;
+        }
         int delaySecond = 5;
         String topic = MQRandomUtils.getRandomTopic();
         String collisionTopic = MQRandomUtils.getRandomTopic();

Reply via email to