This is an automated email from the ASF dual-hosted git repository.
zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 71a7a659be [ISSUE #7543] only call a single type of retry topic in pop
(#7665)
71a7a659be is described below
commit 71a7a659bed15110d1146091bfb7a51d28ade562
Author: Zhouxiang Zhan <[email protected]>
AuthorDate: Fri Dec 15 16:09:11 2023 +0800
[ISSUE #7543] only call a single type of retry topic in pop (#7665)
* only call a single type of retry topic in pop
---
.../broker/processor/PopMessageProcessor.java | 85 ++++++++++++++--------
.../broker/processor/PopMessageProcessorTest.java | 2 +-
2 files changed, 55 insertions(+), 32 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 58baecc05a..5d86ecc0cd 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -351,27 +351,42 @@ public class PopMessageProcessor implements
NettyRequestProcessor {
ExpressionMessageFilter finalMessageFilter = messageFilter;
StringBuilder finalOrderCountInfo = orderCountInfo;
+ // Due to the design of the fields startOffsetInfo, msgOffsetInfo, and
orderCountInfo,
+ // a single POP request could only invoke the popMsgFromQueue method
once
+ // for either a normal topic or a retry topic's queue. Retry topics v1
and v2 are
+ // considered the same type because they share the same retry flag in
previous fields.
+ // Therefore, needRetryV1 is designed as a subset of needRetry, and
within a single request,
+ // only one type of retry topic is able to call popMsgFromQueue.
boolean needRetry = randomQ % 5 == 0;
+ boolean needRetryV1 = false;
+ if
(brokerController.getBrokerConfig().isRetrieveMessageFromPopRetryTopicV1()) {
+ needRetryV1 = randomQ % 2 == 0;
+ }
long popTime = System.currentTimeMillis();
CompletableFuture<Long> getMessageFuture =
CompletableFuture.completedFuture(0L);
if (needRetry && !requestHeader.isOrder()) {
- TopicConfig retryTopicConfig =
-
this.brokerController.getTopicConfigManager().selectTopicConfig(KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(),
requestHeader.getConsumerGroup()));
- if (retryTopicConfig != null) {
- for (int i = 0; i < retryTopicConfig.getReadQueueNums(); i++) {
- int queueId = (randomQ + i) %
retryTopicConfig.getReadQueueNums();
- getMessageFuture = getMessageFuture.thenCompose(restNum ->
popMsgFromQueue(requestHeader.getAttemptId(), true, getMessageResult,
requestHeader, queueId, restNum, reviveQid, channel, popTime,
finalMessageFilter,
- startOffsetInfo, msgOffsetInfo, finalOrderCountInfo));
- }
- }
- if
(brokerController.getBrokerConfig().isRetrieveMessageFromPopRetryTopicV1()) {
+ if (needRetryV1) {
TopicConfig retryTopicConfigV1 =
this.brokerController.getTopicConfigManager().selectTopicConfig(KeyBuilder.buildPopRetryTopicV1(requestHeader.getTopic(),
requestHeader.getConsumerGroup()));
if (retryTopicConfigV1 != null) {
for (int i = 0; i < retryTopicConfigV1.getReadQueueNums();
i++) {
int queueId = (randomQ + i) %
retryTopicConfigV1.getReadQueueNums();
- getMessageFuture =
getMessageFuture.thenCompose(restNum ->
popMsgFromQueue(requestHeader.getAttemptId(), true, getMessageResult,
requestHeader, queueId, restNum, reviveQid, channel, popTime,
finalMessageFilter,
- startOffsetInfo, msgOffsetInfo,
finalOrderCountInfo));
+ getMessageFuture =
getMessageFuture.thenCompose(restNum ->
+ popMsgFromQueue(retryTopicConfigV1.getTopicName(),
requestHeader.getAttemptId(), true,
+ getMessageResult, requestHeader, queueId,
restNum, reviveQid, channel, popTime, finalMessageFilter,
+ startOffsetInfo, msgOffsetInfo,
finalOrderCountInfo));
+ }
+ }
+ } else {
+ TopicConfig retryTopicConfig =
+
this.brokerController.getTopicConfigManager().selectTopicConfig(KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(),
requestHeader.getConsumerGroup()));
+ if (retryTopicConfig != null) {
+ for (int i = 0; i < retryTopicConfig.getReadQueueNums();
i++) {
+ int queueId = (randomQ + i) %
retryTopicConfig.getReadQueueNums();
+ getMessageFuture =
getMessageFuture.thenCompose(restNum ->
+ popMsgFromQueue(retryTopicConfig.getTopicName(),
requestHeader.getAttemptId(), true,
+ getMessageResult, requestHeader, queueId,
restNum, reviveQid, channel, popTime, finalMessageFilter,
+ startOffsetInfo, msgOffsetInfo,
finalOrderCountInfo));
}
}
}
@@ -380,33 +395,42 @@ public class PopMessageProcessor implements
NettyRequestProcessor {
// read all queue
for (int i = 0; i < topicConfig.getReadQueueNums(); i++) {
int queueId = (randomQ + i) % topicConfig.getReadQueueNums();
- getMessageFuture = getMessageFuture.thenCompose(restNum ->
popMsgFromQueue(requestHeader.getAttemptId(), false, getMessageResult,
requestHeader, queueId, restNum, reviveQid, channel, popTime,
finalMessageFilter,
- startOffsetInfo, msgOffsetInfo, finalOrderCountInfo));
+ getMessageFuture = getMessageFuture.thenCompose(restNum ->
+ popMsgFromQueue(topicConfig.getTopicName(),
requestHeader.getAttemptId(), false,
+ getMessageResult, requestHeader, queueId, restNum,
reviveQid, channel, popTime, finalMessageFilter,
+ startOffsetInfo, msgOffsetInfo, finalOrderCountInfo));
}
} else {
int queueId = requestHeader.getQueueId();
- getMessageFuture = getMessageFuture.thenCompose(restNum ->
popMsgFromQueue(requestHeader.getAttemptId(), false, getMessageResult,
requestHeader, queueId, restNum, reviveQid, channel, popTime,
finalMessageFilter,
- startOffsetInfo, msgOffsetInfo, finalOrderCountInfo));
+ getMessageFuture = getMessageFuture.thenCompose(restNum ->
+ popMsgFromQueue(topicConfig.getTopicName(),
requestHeader.getAttemptId(), false,
+ getMessageResult, requestHeader, queueId, restNum,
reviveQid, channel, popTime, finalMessageFilter,
+ startOffsetInfo, msgOffsetInfo, finalOrderCountInfo));
}
// if not full , fetch retry again
if (!needRetry && getMessageResult.getMessageMapedList().size() <
requestHeader.getMaxMsgNums() && !requestHeader.isOrder()) {
- TopicConfig retryTopicConfig =
-
this.brokerController.getTopicConfigManager().selectTopicConfig(KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(),
requestHeader.getConsumerGroup()));
- if (retryTopicConfig != null) {
- for (int i = 0; i < retryTopicConfig.getReadQueueNums(); i++) {
- int queueId = (randomQ + i) %
retryTopicConfig.getReadQueueNums();
- getMessageFuture = getMessageFuture.thenCompose(restNum ->
popMsgFromQueue(requestHeader.getAttemptId(), true, getMessageResult,
requestHeader, queueId, restNum, reviveQid, channel, popTime,
finalMessageFilter,
- startOffsetInfo, msgOffsetInfo, finalOrderCountInfo));
- }
- }
- if
(brokerController.getBrokerConfig().isRetrieveMessageFromPopRetryTopicV1()) {
+ if (needRetryV1) {
TopicConfig retryTopicConfigV1 =
this.brokerController.getTopicConfigManager().selectTopicConfig(KeyBuilder.buildPopRetryTopicV1(requestHeader.getTopic(),
requestHeader.getConsumerGroup()));
if (retryTopicConfigV1 != null) {
for (int i = 0; i < retryTopicConfigV1.getReadQueueNums();
i++) {
int queueId = (randomQ + i) %
retryTopicConfigV1.getReadQueueNums();
- getMessageFuture =
getMessageFuture.thenCompose(restNum ->
popMsgFromQueue(requestHeader.getAttemptId(), true, getMessageResult,
requestHeader, queueId, restNum, reviveQid, channel, popTime,
finalMessageFilter,
- startOffsetInfo, msgOffsetInfo,
finalOrderCountInfo));
+ getMessageFuture =
getMessageFuture.thenCompose(restNum ->
+ popMsgFromQueue(retryTopicConfigV1.getTopicName(),
requestHeader.getAttemptId(), true,
+ getMessageResult, requestHeader, queueId,
restNum, reviveQid, channel, popTime, finalMessageFilter,
+ startOffsetInfo, msgOffsetInfo,
finalOrderCountInfo));
+ }
+ }
+ } else {
+ TopicConfig retryTopicConfig =
+
this.brokerController.getTopicConfigManager().selectTopicConfig(KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(),
requestHeader.getConsumerGroup()));
+ if (retryTopicConfig != null) {
+ for (int i = 0; i < retryTopicConfig.getReadQueueNums();
i++) {
+ int queueId = (randomQ + i) %
retryTopicConfig.getReadQueueNums();
+ getMessageFuture =
getMessageFuture.thenCompose(restNum ->
+ popMsgFromQueue(retryTopicConfig.getTopicName(),
requestHeader.getAttemptId(), true,
+ getMessageResult, requestHeader, queueId,
restNum, reviveQid, channel, popTime, finalMessageFilter,
+ startOffsetInfo, msgOffsetInfo,
finalOrderCountInfo));
}
}
}
@@ -489,12 +513,11 @@ public class PopMessageProcessor implements
NettyRequestProcessor {
return null;
}
- private CompletableFuture<Long> popMsgFromQueue(String attemptId, boolean
isRetry, GetMessageResult getMessageResult,
+ private CompletableFuture<Long> popMsgFromQueue(String targetTopic, String
attemptId, boolean isRetry, GetMessageResult getMessageResult,
PopMessageRequestHeader requestHeader, int queueId, long restNum, int
reviveQid,
Channel channel, long popTime, ExpressionMessageFilter messageFilter,
StringBuilder startOffsetInfo,
StringBuilder msgOffsetInfo, StringBuilder orderCountInfo) {
- String topic = isRetry ?
KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(),
- requestHeader.getConsumerGroup()) : requestHeader.getTopic();
+ String topic = targetTopic;
String lockKey =
topic + PopAckConstants.SPLIT + requestHeader.getConsumerGroup() +
PopAckConstants.SPLIT + queueId;
boolean isOrder = requestHeader.isOrder();
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java
index 44f04066ca..d8c8fa1034 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java
@@ -76,7 +76,7 @@ public class PopMessageProcessorTest {
brokerController.getBrokerConfig().setEnablePopBufferMerge(true);
popMessageProcessor = new PopMessageProcessor(brokerController);
when(handlerContext.channel()).thenReturn(embeddedChannel);
-
brokerController.getTopicConfigManager().getTopicConfigTable().put(topic, new
TopicConfig());
+
brokerController.getTopicConfigManager().getTopicConfigTable().put(topic, new
TopicConfig(topic));
clientChannelInfo = new ClientChannelInfo(embeddedChannel);
ConsumerData consumerData = createConsumerData(group, topic);
brokerController.getConsumerManager().registerConsumer(