This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch dev-27
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 883418d8a30fd702529591e4687c415454a75932
Author: RongtongJin <[email protected]>
AuthorDate: Mon Dec 5 15:23:15 2022 +0800

    Fix bug that make static topic ITs unable to pass
---
 .../broker/processor/ConsumerManageProcessor.java       | 17 ++++++++---------
 .../client/consumer/store/RemoteBrokerOffsetStore.java  |  1 -
 2 files changed, 8 insertions(+), 10 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
index 53bb5b9f8..395102c7e 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
@@ -110,7 +110,8 @@ public class ConsumerManageProcessor implements 
NettyRequestProcessor {
         return response;
     }
 
-    public  RemotingCommand rewriteRequestForStaticTopic(final 
UpdateConsumerOffsetRequestHeader requestHeader, final TopicQueueMappingContext 
mappingContext) {
+    public RemotingCommand rewriteRequestForStaticTopic(final 
UpdateConsumerOffsetRequestHeader requestHeader,
+        final TopicQueueMappingContext mappingContext) {
         try {
             if (mappingContext.getMappingDetail() == null) {
                 return null;
@@ -140,7 +141,6 @@ public class ConsumerManageProcessor implements 
NettyRequestProcessor {
         }
     }
 
-
     private RemotingCommand updateConsumerOffset(ChannelHandlerContext ctx, 
RemotingCommand request)
         throws RemotingCommandException {
 
@@ -201,8 +201,8 @@ public class ConsumerManageProcessor implements 
NettyRequestProcessor {
         return response;
     }
 
-
-    public  RemotingCommand 
rewriteRequestForStaticTopic(QueryConsumerOffsetRequestHeader requestHeader, 
TopicQueueMappingContext mappingContext) {
+    public RemotingCommand 
rewriteRequestForStaticTopic(QueryConsumerOffsetRequestHeader requestHeader,
+        TopicQueueMappingContext mappingContext) {
         try {
             if (mappingContext.getMappingDetail() == null) {
                 return null;
@@ -213,7 +213,7 @@ public class ConsumerManageProcessor implements 
NettyRequestProcessor {
             }
             List<LogicQueueMappingItem> mappingItemList = 
mappingContext.getMappingItemList();
             if (mappingItemList.size() == 1
-                    &&  mappingItemList.get(0).getLogicOffset() == 0) {
+                && mappingItemList.get(0).getLogicOffset() == 0) {
                 //as physical, just let it go
                 mappingContext.setCurrentItem(mappingItemList.get(0));
                 
requestHeader.setQueueId(mappingContext.getLeaderItem().getQueueId());
@@ -277,8 +277,8 @@ public class ConsumerManageProcessor implements 
NettyRequestProcessor {
         }
     }
 
-
-    public  RemotingCommand rewriteResponseForStaticTopic(final 
QueryConsumerOffsetRequestHeader requestHeader, final 
QueryConsumerOffsetResponseHeader responseHeader,
+    public RemotingCommand rewriteResponseForStaticTopic(final 
QueryConsumerOffsetRequestHeader requestHeader,
+        final QueryConsumerOffsetResponseHeader responseHeader,
         final TopicQueueMappingContext mappingContext, final int code) {
         try {
             if (mappingContext.getMappingDetail() == null) {
@@ -306,9 +306,8 @@ public class ConsumerManageProcessor implements 
NettyRequestProcessor {
             (QueryConsumerOffsetRequestHeader) request
                 
.decodeCommandCustomHeader(QueryConsumerOffsetRequestHeader.class);
 
-
         TopicQueueMappingContext mappingContext = 
this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader);
-        RemotingCommand rewriteResult  = 
rewriteRequestForStaticTopic(requestHeader, mappingContext);
+        RemotingCommand rewriteResult = 
rewriteRequestForStaticTopic(requestHeader, mappingContext);
         if (rewriteResult != null) {
             return rewriteResult;
         }
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
 
b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
index 21c7cd3a0..1b9cd63db 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
@@ -240,7 +240,6 @@ public class RemoteBrokerOffsetStore implements OffsetStore 
{
             requestHeader.setConsumerGroup(this.groupName);
             requestHeader.setQueueId(mq.getQueueId());
             requestHeader.setBname(mq.getBrokerName());
-            requestHeader.setSetZeroIfNotFound(false);
 
             return 
this.mQClientFactory.getMQClientAPIImpl().queryConsumerOffset(
                 findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);

Reply via email to