This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 79e700312a [ISSUE #9945] Use UniqueKey as TimerDelKey by default
79e700312a is described below
commit 79e700312a7ba00b4a8e42fede9a89a56eff9640
Author: imzs <[email protected]>
AuthorDate: Wed Dec 24 10:00:01 2025 +0800
[ISSUE #9945] Use UniqueKey as TimerDelKey by default
---
.../broker/processor/RecallMessageProcessor.java | 4 ++--
.../processor/RecallMessageProcessorTest.java | 19 ++++++++++++++++++-
.../rocketmq/store/config/MessageStoreConfig.java | 10 ++++++++++
.../rocketmq/store/timer/TimerMessageStore.java | 8 ++++----
.../rocketmq/store/timer/TimerMessageStoreTest.java | 8 +++++---
.../test/recall/SendAndRecallDelayMessageIT.java | 21 +++++++++++++++++++++
6 files changed, 60 insertions(+), 10 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/RecallMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/RecallMessageProcessor.java
index 372db0d36e..fd537c3c9d 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/RecallMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/RecallMessageProcessor.java
@@ -135,8 +135,8 @@ public class RecallMessageProcessor implements
NettyRequestProcessor {
msgInner.setTags(RECALL_MESSAGE_TAG);
msgInner.setTagsCode(RECALL_MESSAGE_TAG.hashCode());
msgInner.setQueueId(0);
- MessageAccessor.putProperty(msgInner,
MessageConst.PROPERTY_TIMER_DEL_UNIQKEY,
- TimerMessageStore.buildDeleteKey(handle.getTopic(),
handle.getMessageId()));
+ MessageAccessor.putProperty(msgInner,
MessageConst.PROPERTY_TIMER_DEL_UNIQKEY, TimerMessageStore.buildDeleteKey(
+ handle.getTopic(), handle.getMessageId(),
brokerController.getMessageStoreConfig().isAppendTopicForTimerDeleteKey()));
MessageAccessor.putProperty(msgInner,
MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,
handle.getMessageId());
MessageAccessor.putProperty(msgInner,
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/processor/RecallMessageProcessorTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/processor/RecallMessageProcessorTest.java
index d28eb2f1df..f35870427c 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/processor/RecallMessageProcessorTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/processor/RecallMessageProcessorTest.java
@@ -96,7 +96,8 @@ public class RecallMessageProcessorTest {
}
@Test
- public void testBuildMessage() {
+ public void testBuildMessage_withNamespace() {
+
when(messageStoreConfig.isAppendTopicForTimerDeleteKey()).thenReturn(true);
String timestampStr = String.valueOf(System.currentTimeMillis());
String id = "id";
RecallMessageHandle.HandleV1 handle = new
RecallMessageHandle.HandleV1(TOPIC, "brokerName", timestampStr, id);
@@ -110,6 +111,22 @@ public class RecallMessageProcessorTest {
Assert.assertEquals(TOPIC + "+" + id,
properties.get(MessageConst.PROPERTY_TIMER_DEL_UNIQKEY));
}
+ @Test
+ public void testBuildMessage_withoutNamespace() {
+
when(messageStoreConfig.isAppendTopicForTimerDeleteKey()).thenReturn(false);
+ String timestampStr = String.valueOf(System.currentTimeMillis());
+ String id = "id";
+ RecallMessageHandle.HandleV1 handle = new
RecallMessageHandle.HandleV1(TOPIC, "brokerName", timestampStr, id);
+ MessageExtBrokerInner msg =
+ recallMessageProcessor.buildMessage(handlerContext, new
RecallMessageRequestHeader(), handle);
+
+ Assert.assertEquals(TOPIC, msg.getTopic());
+ Map<String, String> properties =
MessageDecoder.string2messageProperties(msg.getPropertiesString());
+ Assert.assertEquals(timestampStr,
properties.get(MessageConst.PROPERTY_TIMER_DELIVER_MS));
+ Assert.assertEquals(id,
properties.get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
+ Assert.assertEquals(id,
properties.get(MessageConst.PROPERTY_TIMER_DEL_UNIQKEY));
+ }
+
@Test
public void testHandlePutMessageResult() {
MessageExt message = new MessageExt();
diff --git
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index 9670f40d92..ad77319264 100644
---
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -533,6 +533,8 @@ public class MessageStoreConfig {
private boolean enableLogConsumeQueueRepeatedlyBuildWhenRecover = false;
+ private boolean appendTopicForTimerDeleteKey = false;
+
public boolean isRocksdbCQDoubleWriteEnable() {
return rocksdbCQDoubleWriteEnable;
}
@@ -2236,4 +2238,12 @@ public class MessageStoreConfig {
public void setSharedByteBufferNum(int sharedByteBufferNum) {
this.sharedByteBufferNum = sharedByteBufferNum;
}
+
+ public boolean isAppendTopicForTimerDeleteKey() {
+ return appendTopicForTimerDeleteKey;
+ }
+
+ public void setAppendTopicForTimerDeleteKey(boolean
appendTopicForTimerDeleteKey) {
+ this.appendTopicForTimerDeleteKey = appendTopicForTimerDeleteKey;
+ }
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
index a32b4a3f21..5fee8da6d0 100644
--- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
@@ -1741,7 +1741,7 @@ public class TimerMessageStore {
isRound = false;
}
if (null != uniqueKey &&
tr.getDeleteList() != null && tr.getDeleteList().size() > 0
- &&
tr.getDeleteList().contains(buildDeleteKey(getRealTopic(msgExt), uniqueKey))) {
+ &&
tr.getDeleteList().contains(buildDeleteKey(getRealTopic(msgExt), uniqueKey,
storeConfig.isAppendTopicForTimerDeleteKey()))) {
avoidDeleteLose.remove(uniqueKey);
doRes = true;
tr.idempotentRelease();
@@ -2074,9 +2074,9 @@ public class TimerMessageStore {
return timerCheckpoint;
}
- // identify a message by topic + uk, like query operation
- public static String buildDeleteKey(String realTopic, String uniqueKey) {
- return realTopic + "+" + uniqueKey;
+ // identify a message by topic or topic + uk(like query operation)
+ public static String buildDeleteKey(String realTopic, String uniqueKey,
Boolean appendTopicForTimerDeleteKey) {
+ return appendTopicForTimerDeleteKey ? (realTopic + "+" + uniqueKey) :
uniqueKey;
}
private void recallToTimeline(long delayTime, long offsetPy, int sizePy,
MessageExt messageExt) {
diff --git
a/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java
b/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java
index a014e77b90..fe1a1177c6 100644
---
a/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java
+++
b/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java
@@ -110,6 +110,7 @@ public class TimerMessageStoreTest {
storeConfig.setFlushDiskType(FlushDiskType.ASYNC_FLUSH);
storeConfig.setTimerInterceptDelayLevel(true);
storeConfig.setTimerPrecisionMs(precisionMs);
+ storeConfig.setAppendTopicForTimerDeleteKey(false); // reset default
value
mockMessageStore = Mockito.mock(MessageStore.class);
messageStore = new DefaultMessageStore(storeConfig, new
BrokerStatsManager("TimerTest",false), new MyMessageArrivingListener(), new
BrokerConfig(), new ConcurrentHashMap<>());
@@ -358,7 +359,7 @@ public class TimerMessageStoreTest {
MessageExtBrokerInner delMsg = buildMessage(delayMs, topic, false);
transformTimerMessage(timerMessageStore,delMsg);
- MessageAccessor.putProperty(delMsg,
TimerMessageStore.TIMER_DELETE_UNIQUE_KEY,
TimerMessageStore.buildDeleteKey(topic, uniqKey));
+ MessageAccessor.putProperty(delMsg,
TimerMessageStore.TIMER_DELETE_UNIQUE_KEY,
TimerMessageStore.buildDeleteKey(topic, uniqKey, false));
delMsg.setPropertiesString(MessageDecoder.messageProperties2String(delMsg.getProperties()));
assertEquals(PutMessageStatus.PUT_OK,
messageStore.putMessage(delMsg).getPutMessageStatus());
@@ -375,6 +376,7 @@ public class TimerMessageStoreTest {
@Test
public void testDeleteTimerMessage_ukCollision() throws Exception {
+ storeConfig.setAppendTopicForTimerDeleteKey(true); // append topic as
namespace
String topic = "TimerTest_testDeleteTimerMessage";
String collisionTopic = "TimerTest_testDeleteTimerMessage_collision";
@@ -397,13 +399,13 @@ public class TimerMessageStoreTest {
MessageExtBrokerInner delMsg = buildMessage(delayMs, "whatever",
false);
transformTimerMessage(timerMessageStore, delMsg);
- MessageAccessor.putProperty(delMsg,
TimerMessageStore.TIMER_DELETE_UNIQUE_KEY,
TimerMessageStore.buildDeleteKey(topic, firstUniqKey));
+ MessageAccessor.putProperty(delMsg,
TimerMessageStore.TIMER_DELETE_UNIQUE_KEY,
TimerMessageStore.buildDeleteKey(topic, firstUniqKey, true));
delMsg.setPropertiesString(MessageDecoder.messageProperties2String(delMsg.getProperties()));
assertEquals(PutMessageStatus.PUT_OK,
messageStore.putMessage(delMsg).getPutMessageStatus());
delMsg = buildMessage(delayMs, "whatever", false);
transformTimerMessage(timerMessageStore, delMsg);
- MessageAccessor.putProperty(delMsg,
TimerMessageStore.TIMER_DELETE_UNIQUE_KEY,
TimerMessageStore.buildDeleteKey(collisionTopic, secondUniqKey));
+ MessageAccessor.putProperty(delMsg,
TimerMessageStore.TIMER_DELETE_UNIQUE_KEY,
TimerMessageStore.buildDeleteKey(collisionTopic, secondUniqKey, true));
delMsg.setPropertiesString(MessageDecoder.messageProperties2String(delMsg.getProperties()));
assertEquals(PutMessageStatus.PUT_OK,
messageStore.putMessage(delMsg).getPutMessageStatus());
diff --git
a/test/src/test/java/org/apache/rocketmq/test/recall/SendAndRecallDelayMessageIT.java
b/test/src/test/java/org/apache/rocketmq/test/recall/SendAndRecallDelayMessageIT.java
index ec73226a9e..3bcf20953b 100644
---
a/test/src/test/java/org/apache/rocketmq/test/recall/SendAndRecallDelayMessageIT.java
+++
b/test/src/test/java/org/apache/rocketmq/test/recall/SendAndRecallDelayMessageIT.java
@@ -42,7 +42,10 @@ import org.apache.rocketmq.test.util.MQRandomUtils;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+@RunWith(Parameterized.class)
public class SendAndRecallDelayMessageIT extends BaseConf {
private static String initTopic;
@@ -50,8 +53,23 @@ public class SendAndRecallDelayMessageIT extends BaseConf {
private static RMQNormalProducer producer;
private static RMQPopConsumer popConsumer;
+ private final boolean appendTopicForTimerDeleteKey;
+
+ public SendAndRecallDelayMessageIT(boolean appendTopicForTimerDeleteKey) {
+ this.appendTopicForTimerDeleteKey = appendTopicForTimerDeleteKey;
+ }
+
+ @Parameterized.Parameters
+ public static List<Object[]> params() {
+ List<Object[]> result = new ArrayList<>();
+ result.add(new Object[] {false});
+ result.add(new Object[] {true});
+ return result;
+ }
+
@Before
public void init() {
+
brokerController1.getMessageStoreConfig().setAppendTopicForTimerDeleteKey(appendTopicForTimerDeleteKey);
initTopic = initTopic();
consumerGroup = initConsumerGroup();
producer = getProducer(NAMESRV_ADDR, initTopic);
@@ -126,6 +144,9 @@ public class SendAndRecallDelayMessageIT extends BaseConf {
@Test
public void testSendAndRecall_ukCollision() throws Exception {
+ if (!appendTopicForTimerDeleteKey) { // skip
+ return;
+ }
int delaySecond = 5;
String topic = MQRandomUtils.getRandomTopic();
String collisionTopic = MQRandomUtils.getRandomTopic();