This is an automated email from the ASF dual-hosted git repository. zhouxzhan pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 9897e74a6c38200f6200c7bc69c7a118ec4ad7fd Author: zhouxiang <[email protected]> AuthorDate: Fri Nov 18 15:46:44 2022 +0800 [ISSUE #5406] Overwrite sysFlag to broker --- .../src/main/java/org/apache/rocketmq/common/sysflag/PullSysFlag.java | 4 ++++ .../apache/rocketmq/proxy/remoting/activity/PullMessageActivity.java | 4 +++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/common/src/main/java/org/apache/rocketmq/common/sysflag/PullSysFlag.java b/common/src/main/java/org/apache/rocketmq/common/sysflag/PullSysFlag.java index 20b8ad208..15d56dde7 100644 --- a/common/src/main/java/org/apache/rocketmq/common/sysflag/PullSysFlag.java +++ b/common/src/main/java/org/apache/rocketmq/common/sysflag/PullSysFlag.java @@ -77,6 +77,10 @@ public class PullSysFlag { return (sysFlag & FLAG_SUBSCRIPTION) == FLAG_SUBSCRIPTION; } + public static int buildSysFlagWithSubscription(final int sysFlag) { + return sysFlag | FLAG_SUBSCRIPTION; + } + public static boolean hasClassFilterFlag(final int sysFlag) { return (sysFlag & FLAG_CLASS_FILTER) == FLAG_CLASS_FILTER; } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivity.java index 873b52460..eb744676a 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivity.java @@ -39,7 +39,8 @@ public class PullMessageActivity extends AbstractRemotingActivity { protected RemotingCommand processRequest0(ChannelHandlerContext ctx, RemotingCommand request, ProxyContext context) throws Exception { PullMessageRequestHeader requestHeader = (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class); - if (!PullSysFlag.hasSubscriptionFlag(requestHeader.getSysFlag())) { + int sysFlag = requestHeader.getSysFlag(); + if (!PullSysFlag.hasSubscriptionFlag(sysFlag)) { ConsumerGroupInfo consumerInfo = messagingProcessor.getConsumerGroupInfo(requestHeader.getConsumerGroup()); if (consumerInfo == null) { return RemotingCommand.buildErrorResponse(ResponseCode.SUBSCRIPTION_NOT_LATEST, @@ -50,6 +51,7 @@ public class PullMessageActivity extends AbstractRemotingActivity { return RemotingCommand.buildErrorResponse(ResponseCode.SUBSCRIPTION_NOT_EXIST, "the consumer's subscription not exist"); } + requestHeader.setSysFlag(PullSysFlag.buildSysFlagWithSubscription(sysFlag)); requestHeader.setSubscription(subscriptionData.getSubString()); requestHeader.setExpressionType(subscriptionData.getExpressionType()); request.writeCustomHeader(requestHeader);
