This is an automated email from the ASF dual-hosted git repository. zhouxzhan pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 4b9816e4a911f656470c70b4528e3a0517aa2090 Author: zhouxiang <[email protected]> AuthorDate: Mon Oct 31 16:44:50 2022 +0800 [ISSUE #5406] Support getConsumerIdList --- .../remoting/activity/ConsumerManagerActivity.java | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivity.java index fb248a894..734b1dad1 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivity.java @@ -20,11 +20,17 @@ package org.apache.rocketmq.proxy.remoting.activity; import io.netty.channel.ChannelHandlerContext; import java.time.Duration; import java.util.ArrayList; +import java.util.List; import java.util.Set; +import org.apache.rocketmq.broker.client.ConsumerGroupInfo; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.RequestCode; +import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.body.LockBatchRequestBody; import org.apache.rocketmq.common.protocol.body.UnlockBatchRequestBody; +import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupRequestHeader; +import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseBody; +import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseHeader; import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.proxy.processor.MessagingProcessor; import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline; @@ -64,8 +70,15 @@ public class ConsumerManagerActivity extends AbstractRemotingActivity { protected RemotingCommand getConsumerListByGroup(ChannelHandlerContext ctx, RemotingCommand request, ProxyContext context) throws Exception { - // TODO after connection-related module - return null; + RemotingCommand response = RemotingCommand.createResponseCommand(GetConsumerListByGroupResponseHeader.class); + GetConsumerListByGroupRequestHeader header = (GetConsumerListByGroupRequestHeader) request.decodeCommandCustomHeader(GetConsumerListByGroupRequestHeader.class); + ConsumerGroupInfo consumerGroupInfo = messagingProcessor.getConsumerGroupInfo(header.getConsumerGroup()); + List<String> clientIds = consumerGroupInfo.getAllClientId(); + GetConsumerListByGroupResponseBody body = new GetConsumerListByGroupResponseBody(); + body.setConsumerIdList(clientIds); + response.setBody(body.encode()); + response.setCode(ResponseCode.SUCCESS); + return response; } protected RemotingCommand lockBatchMQ(ChannelHandlerContext ctx, RemotingCommand request,
