This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch dev-27 in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 883418d8a30fd702529591e4687c415454a75932 Author: RongtongJin <[email protected]> AuthorDate: Mon Dec 5 15:23:15 2022 +0800 Fix bug that make static topic ITs unable to pass --- .../broker/processor/ConsumerManageProcessor.java | 17 ++++++++--------- .../client/consumer/store/RemoteBrokerOffsetStore.java | 1 - 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java index 53bb5b9f8..395102c7e 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java @@ -110,7 +110,8 @@ public class ConsumerManageProcessor implements NettyRequestProcessor { return response; } - public RemotingCommand rewriteRequestForStaticTopic(final UpdateConsumerOffsetRequestHeader requestHeader, final TopicQueueMappingContext mappingContext) { + public RemotingCommand rewriteRequestForStaticTopic(final UpdateConsumerOffsetRequestHeader requestHeader, + final TopicQueueMappingContext mappingContext) { try { if (mappingContext.getMappingDetail() == null) { return null; @@ -140,7 +141,6 @@ public class ConsumerManageProcessor implements NettyRequestProcessor { } } - private RemotingCommand updateConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { @@ -201,8 +201,8 @@ public class ConsumerManageProcessor implements NettyRequestProcessor { return response; } - - public RemotingCommand rewriteRequestForStaticTopic(QueryConsumerOffsetRequestHeader requestHeader, TopicQueueMappingContext mappingContext) { + public RemotingCommand rewriteRequestForStaticTopic(QueryConsumerOffsetRequestHeader requestHeader, + TopicQueueMappingContext mappingContext) { try { if (mappingContext.getMappingDetail() == null) { return null; @@ -213,7 +213,7 @@ public class ConsumerManageProcessor implements NettyRequestProcessor { } List<LogicQueueMappingItem> mappingItemList = mappingContext.getMappingItemList(); if (mappingItemList.size() == 1 - && mappingItemList.get(0).getLogicOffset() == 0) { + && mappingItemList.get(0).getLogicOffset() == 0) { //as physical, just let it go mappingContext.setCurrentItem(mappingItemList.get(0)); requestHeader.setQueueId(mappingContext.getLeaderItem().getQueueId()); @@ -277,8 +277,8 @@ public class ConsumerManageProcessor implements NettyRequestProcessor { } } - - public RemotingCommand rewriteResponseForStaticTopic(final QueryConsumerOffsetRequestHeader requestHeader, final QueryConsumerOffsetResponseHeader responseHeader, + public RemotingCommand rewriteResponseForStaticTopic(final QueryConsumerOffsetRequestHeader requestHeader, + final QueryConsumerOffsetResponseHeader responseHeader, final TopicQueueMappingContext mappingContext, final int code) { try { if (mappingContext.getMappingDetail() == null) { @@ -306,9 +306,8 @@ public class ConsumerManageProcessor implements NettyRequestProcessor { (QueryConsumerOffsetRequestHeader) request .decodeCommandCustomHeader(QueryConsumerOffsetRequestHeader.class); - TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader); - RemotingCommand rewriteResult = rewriteRequestForStaticTopic(requestHeader, mappingContext); + RemotingCommand rewriteResult = rewriteRequestForStaticTopic(requestHeader, mappingContext); if (rewriteResult != null) { return rewriteResult; } diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java index 21c7cd3a0..1b9cd63db 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java @@ -240,7 +240,6 @@ public class RemoteBrokerOffsetStore implements OffsetStore { requestHeader.setConsumerGroup(this.groupName); requestHeader.setQueueId(mq.getQueueId()); requestHeader.setBname(mq.getBrokerName()); - requestHeader.setSetZeroIfNotFound(false); return this.mQClientFactory.getMQClientAPIImpl().queryConsumerOffset( findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
