This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 7c24daf205 [ISSUE #6771] Merge some cases in
PullMessageProcessor#composeResponseHeader method (#6772)
7c24daf205 is described below
commit 7c24daf205cb25068049fd29a02bff89e2b81b3b
Author: mxsm <[email protected]>
AuthorDate: Sun May 21 11:15:55 2023 +0800
[ISSUE #6771] Merge some cases in
PullMessageProcessor#composeResponseHeader method (#6772)
---
.../rocketmq/broker/processor/PullMessageProcessor.java | 17 +++++++++++------
1 file changed, 11 insertions(+), 6 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index 9286cf913d..8df2265c2c 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@ -555,6 +555,15 @@ public class PullMessageProcessor implements
NettyRequestProcessor {
return consumeMessageHookList != null &&
!this.consumeMessageHookList.isEmpty();
}
+ /**
+ * Composes the header of the response message to be sent back to the
client
+ * @param requestHeader - the header of the request message
+ * @param getMessageResult - the result of the GetMessage request
+ * @param topicSysFlag - the system flag of the topic
+ * @param subscriptionGroupConfig - configuration of the subscription group
+ * @param response - the response message to be sent back to the client
+ * @param clientAddress - the address of the client
+ */
protected void composeResponseHeader(PullMessageRequestHeader
requestHeader, GetMessageResult getMessageResult,
int topicSysFlag, SubscriptionGroupConfig subscriptionGroupConfig,
RemotingCommand response,
String clientAddress) {
@@ -572,6 +581,7 @@ public class PullMessageProcessor implements
NettyRequestProcessor {
response.setCode(ResponseCode.SUCCESS);
break;
case MESSAGE_WAS_REMOVING:
+ case NO_MATCHED_MESSAGE:
response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
break;
case NO_MATCHED_LOGIC_QUEUE:
@@ -590,10 +600,8 @@ public class PullMessageProcessor implements
NettyRequestProcessor {
response.setCode(ResponseCode.PULL_NOT_FOUND);
}
break;
- case NO_MATCHED_MESSAGE:
- response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
- break;
case OFFSET_FOUND_NULL:
+ case OFFSET_OVERFLOW_ONE:
response.setCode(ResponseCode.PULL_NOT_FOUND);
break;
case OFFSET_OVERFLOW_BADLY:
@@ -602,9 +610,6 @@ public class PullMessageProcessor implements
NettyRequestProcessor {
LOGGER.info("the request offset: {} over flow badly, fix to
{}, broker max offset: {}, consumer: {}",
requestHeader.getQueueOffset(),
getMessageResult.getNextBeginOffset(), getMessageResult.getMaxOffset(),
clientAddress);
break;
- case OFFSET_OVERFLOW_ONE:
- response.setCode(ResponseCode.PULL_NOT_FOUND);
- break;
case OFFSET_RESET:
response.setCode(ResponseCode.PULL_OFFSET_MOVED);
LOGGER.info("The queue under pulling was previously reset to
start from {}",