This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 0d516ba70 [ISSUE #6473] Fix multi dispatch error when 
enableMultiDispatch=true and enableLmq=true (#6476)
0d516ba70 is described below

commit 0d516ba708b13aabf206fea2dd184fea1999fdd5
Author: fujian-zfj <[email protected]>
AuthorDate: Wed Mar 29 14:05:07 2023 +0800

    [ISSUE #6473] Fix multi dispatch error when enableMultiDispatch=true and 
enableLmq=true (#6476)
    
    * typo int readme[ecosystem]
    
    * remove multi_dispatch to avoid building cq again
    
    * if topic starts with %RETRY%, do not process INNER_MULTI_DISPATCH
    
    * quickstart consumer test revert
---
 store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java       | 4 ++--
 .../src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java  | 2 +-
 2 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java 
b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
index ed20e39d4..d1c24ee35 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
@@ -723,7 +723,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, 
FileQueueLifeCycle {
     }
 
     private boolean checkMultiDispatchQueue(DispatchRequest dispatchRequest) {
-        if 
(!this.messageStore.getMessageStoreConfig().isEnableMultiDispatch()) {
+        if (!this.messageStore.getMessageStoreConfig().isEnableMultiDispatch() 
|| dispatchRequest.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
             return false;
         }
         Map<String, String> prop = dispatchRequest.getPropertiesMap();
@@ -791,7 +791,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, 
FileQueueLifeCycle {
         long queueOffset = 
queueOffsetAssigner.assignQueueOffset(topicQueueKey, messageNum);
         msg.setQueueOffset(queueOffset);
         // For LMQ
-        if (!messageStore.getMessageStoreConfig().isEnableMultiDispatch()) {
+        if (!messageStore.getMessageStoreConfig().isEnableMultiDispatch() || 
msg.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
             return;
         }
         String multiDispatchQueue = 
msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH);
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 7798e89b8..117f20481 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -2812,7 +2812,7 @@ public class DefaultMessageStore implements MessageStore {
 
         private void notifyMessageArrive4MultiQueue(DispatchRequest 
dispatchRequest) {
             Map<String, String> prop = dispatchRequest.getPropertiesMap();
-            if (prop == null) {
+            if (prop == null || 
dispatchRequest.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                 return;
             }
             String multiDispatchQueue = 
prop.get(MessageConst.PROPERTY_INNER_MULTI_DISPATCH);

Reply via email to