This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 737c1e5338 [ISSUE #7029] Add a config to determine whether pop 
response should return the actual retry topic or tamper with the original topic 
(#7030)
737c1e5338 is described below

commit 737c1e53383350a5671fa207ee0e4ce932850bac
Author: rongtong <[email protected]>
AuthorDate: Tue Jul 18 14:12:39 2023 +0800

    [ISSUE #7029] Add a config to determine whether pop response should return 
the actual retry topic or tamper with the original topic (#7030)
---
 .../rocketmq/broker/processor/PopMessageProcessor.java      |  4 ++--
 .../main/java/org/apache/rocketmq/common/BrokerConfig.java  | 13 +++++++++++++
 2 files changed, 15 insertions(+), 2 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 464f8f4fda..53e1725614 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -591,8 +591,8 @@ public class PopMessageProcessor implements 
NettyRequestProcessor {
                 atomicRestNum.set(result.getMaxOffset() - 
result.getNextBeginOffset() + atomicRestNum.get());
                 String brokerName = 
brokerController.getBrokerConfig().getBrokerName();
                 for (SelectMappedBufferResult mapedBuffer : 
result.getMessageMapedList()) {
-                    // We should not recode buffer for normal topic message
-                    if (!isRetry) {
+                    // We should not recode buffer when 
popResponseReturnActualRetryTopic is true or topic is not retry topic
+                    if 
(brokerController.getBrokerConfig().isPopResponseReturnActualRetryTopic() || 
!isRetry) {
                         getMessageResult.addMessage(mapedBuffer);
                     } else {
                         List<MessageExt> messageExtList = 
MessageDecoder.decodesBatch(mapedBuffer.getByteBuffer(),
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java 
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index f5f0db1016..a4d82d1c53 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -381,6 +381,11 @@ public class BrokerConfig extends BrokerIdentity {
      */
     private long fetchNamesrvAddrInterval = 10 * 1000;
 
+    /**
+     * Pop response returns the actual retry topic rather than tampering with 
the original topic
+     */
+    private boolean popResponseReturnActualRetryTopic = false;
+
     public long getMaxPopPollingSize() {
         return maxPopPollingSize;
     }
@@ -1676,4 +1681,12 @@ public class BrokerConfig extends BrokerIdentity {
     public void setFetchNamesrvAddrInterval(final long 
fetchNamesrvAddrInterval) {
         this.fetchNamesrvAddrInterval = fetchNamesrvAddrInterval;
     }
+
+    public boolean isPopResponseReturnActualRetryTopic() {
+        return popResponseReturnActualRetryTopic;
+    }
+
+    public void setPopResponseReturnActualRetryTopic(boolean 
popResponseReturnActualRetryTopic) {
+        this.popResponseReturnActualRetryTopic = 
popResponseReturnActualRetryTopic;
+    }
 }

Reply via email to