This is an automated email from the ASF dual-hosted git repository.

zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 9897e74a6c38200f6200c7bc69c7a118ec4ad7fd
Author: zhouxiang <[email protected]>
AuthorDate: Fri Nov 18 15:46:44 2022 +0800

    [ISSUE #5406] Overwrite sysFlag to broker
---
 .../src/main/java/org/apache/rocketmq/common/sysflag/PullSysFlag.java | 4 ++++
 .../apache/rocketmq/proxy/remoting/activity/PullMessageActivity.java  | 4 +++-
 2 files changed, 7 insertions(+), 1 deletion(-)

diff --git 
a/common/src/main/java/org/apache/rocketmq/common/sysflag/PullSysFlag.java 
b/common/src/main/java/org/apache/rocketmq/common/sysflag/PullSysFlag.java
index 20b8ad208..15d56dde7 100644
--- a/common/src/main/java/org/apache/rocketmq/common/sysflag/PullSysFlag.java
+++ b/common/src/main/java/org/apache/rocketmq/common/sysflag/PullSysFlag.java
@@ -77,6 +77,10 @@ public class PullSysFlag {
         return (sysFlag & FLAG_SUBSCRIPTION) == FLAG_SUBSCRIPTION;
     }
 
+    public static int buildSysFlagWithSubscription(final int sysFlag) {
+        return sysFlag | FLAG_SUBSCRIPTION;
+    }
+
     public static boolean hasClassFilterFlag(final int sysFlag) {
         return (sysFlag & FLAG_CLASS_FILTER) == FLAG_CLASS_FILTER;
     }
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivity.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivity.java
index 873b52460..eb744676a 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivity.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivity.java
@@ -39,7 +39,8 @@ public class PullMessageActivity extends 
AbstractRemotingActivity {
     protected RemotingCommand processRequest0(ChannelHandlerContext ctx, 
RemotingCommand request,
         ProxyContext context) throws Exception {
         PullMessageRequestHeader requestHeader = (PullMessageRequestHeader) 
request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
-        if (!PullSysFlag.hasSubscriptionFlag(requestHeader.getSysFlag())) {
+        int sysFlag = requestHeader.getSysFlag();
+        if (!PullSysFlag.hasSubscriptionFlag(sysFlag)) {
             ConsumerGroupInfo consumerInfo = 
messagingProcessor.getConsumerGroupInfo(requestHeader.getConsumerGroup());
             if (consumerInfo == null) {
                 return 
RemotingCommand.buildErrorResponse(ResponseCode.SUBSCRIPTION_NOT_LATEST,
@@ -50,6 +51,7 @@ public class PullMessageActivity extends 
AbstractRemotingActivity {
                 return 
RemotingCommand.buildErrorResponse(ResponseCode.SUBSCRIPTION_NOT_EXIST,
                     "the consumer's subscription not exist");
             }
+            
requestHeader.setSysFlag(PullSysFlag.buildSysFlagWithSubscription(sysFlag));
             requestHeader.setSubscription(subscriptionData.getSubString());
             
requestHeader.setExpressionType(subscriptionData.getExpressionType());
             request.writeCustomHeader(requestHeader);

Reply via email to