This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 737c1e5338 [ISSUE #7029] Add a config to determine whether pop
response should return the actual retry topic or tamper with the original topic
(#7030)
737c1e5338 is described below
commit 737c1e53383350a5671fa207ee0e4ce932850bac
Author: rongtong <[email protected]>
AuthorDate: Tue Jul 18 14:12:39 2023 +0800
[ISSUE #7029] Add a config to determine whether pop response should return
the actual retry topic or tamper with the original topic (#7030)
---
.../rocketmq/broker/processor/PopMessageProcessor.java | 4 ++--
.../main/java/org/apache/rocketmq/common/BrokerConfig.java | 13 +++++++++++++
2 files changed, 15 insertions(+), 2 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 464f8f4fda..53e1725614 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -591,8 +591,8 @@ public class PopMessageProcessor implements
NettyRequestProcessor {
atomicRestNum.set(result.getMaxOffset() -
result.getNextBeginOffset() + atomicRestNum.get());
String brokerName =
brokerController.getBrokerConfig().getBrokerName();
for (SelectMappedBufferResult mapedBuffer :
result.getMessageMapedList()) {
- // We should not recode buffer for normal topic message
- if (!isRetry) {
+ // We should not recode buffer when
popResponseReturnActualRetryTopic is true or topic is not retry topic
+ if
(brokerController.getBrokerConfig().isPopResponseReturnActualRetryTopic() ||
!isRetry) {
getMessageResult.addMessage(mapedBuffer);
} else {
List<MessageExt> messageExtList =
MessageDecoder.decodesBatch(mapedBuffer.getByteBuffer(),
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index f5f0db1016..a4d82d1c53 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -381,6 +381,11 @@ public class BrokerConfig extends BrokerIdentity {
*/
private long fetchNamesrvAddrInterval = 10 * 1000;
+ /**
+ * Pop response returns the actual retry topic rather than tampering with
the original topic
+ */
+ private boolean popResponseReturnActualRetryTopic = false;
+
public long getMaxPopPollingSize() {
return maxPopPollingSize;
}
@@ -1676,4 +1681,12 @@ public class BrokerConfig extends BrokerIdentity {
public void setFetchNamesrvAddrInterval(final long
fetchNamesrvAddrInterval) {
this.fetchNamesrvAddrInterval = fetchNamesrvAddrInterval;
}
+
+ public boolean isPopResponseReturnActualRetryTopic() {
+ return popResponseReturnActualRetryTopic;
+ }
+
+ public void setPopResponseReturnActualRetryTopic(boolean
popResponseReturnActualRetryTopic) {
+ this.popResponseReturnActualRetryTopic =
popResponseReturnActualRetryTopic;
+ }
}