This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 0d516ba70 [ISSUE #6473] Fix multi dispatch error when
enableMultiDispatch=true and enableLmq=true (#6476)
0d516ba70 is described below
commit 0d516ba708b13aabf206fea2dd184fea1999fdd5
Author: fujian-zfj <[email protected]>
AuthorDate: Wed Mar 29 14:05:07 2023 +0800
[ISSUE #6473] Fix multi dispatch error when enableMultiDispatch=true and
enableLmq=true (#6476)
* typo int readme[ecosystem]
* remove multi_dispatch to avoid building cq again
* if topic starts with %RETRY%, do not process INNER_MULTI_DISPATCH
* quickstart consumer test revert
---
store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java | 4 ++--
.../src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java | 2 +-
2 files changed, 3 insertions(+), 3 deletions(-)
diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
index ed20e39d4..d1c24ee35 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
@@ -723,7 +723,7 @@ public class ConsumeQueue implements ConsumeQueueInterface,
FileQueueLifeCycle {
}
private boolean checkMultiDispatchQueue(DispatchRequest dispatchRequest) {
- if
(!this.messageStore.getMessageStoreConfig().isEnableMultiDispatch()) {
+ if (!this.messageStore.getMessageStoreConfig().isEnableMultiDispatch()
|| dispatchRequest.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
return false;
}
Map<String, String> prop = dispatchRequest.getPropertiesMap();
@@ -791,7 +791,7 @@ public class ConsumeQueue implements ConsumeQueueInterface,
FileQueueLifeCycle {
long queueOffset =
queueOffsetAssigner.assignQueueOffset(topicQueueKey, messageNum);
msg.setQueueOffset(queueOffset);
// For LMQ
- if (!messageStore.getMessageStoreConfig().isEnableMultiDispatch()) {
+ if (!messageStore.getMessageStoreConfig().isEnableMultiDispatch() ||
msg.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
return;
}
String multiDispatchQueue =
msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH);
diff --git
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 7798e89b8..117f20481 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -2812,7 +2812,7 @@ public class DefaultMessageStore implements MessageStore {
private void notifyMessageArrive4MultiQueue(DispatchRequest
dispatchRequest) {
Map<String, String> prop = dispatchRequest.getPropertiesMap();
- if (prop == null) {
+ if (prop == null ||
dispatchRequest.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
return;
}
String multiDispatchQueue =
prop.get(MessageConst.PROPERTY_INNER_MULTI_DISPATCH);