This is an automated email from the ASF dual-hosted git repository.

yukon pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new e59918704 add GET_CONSUMER_CONNECTION_LIST
e59918704 is described below

commit e5991870479bb7302c5908c04aa2d858c5e1f2fb
Author: lyx <[email protected]>
AuthorDate: Thu Mar 9 10:57:54 2023 +0800

    add GET_CONSUMER_CONNECTION_LIST
---
 .../proxy/remoting/RemotingProtocolServer.java     |  1 +
 .../remoting/activity/ConsumerManagerActivity.java | 48 ++++++++++++++++++++++
 2 files changed, 49 insertions(+)

diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
index b5c749d3b..85c960562 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
@@ -204,6 +204,7 @@ public class RemotingProtocolServer implements 
StartAndShutdown, RemotingProxyOu
         remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, 
consumerManagerActivity, this.updateOffsetExecutor);
         remotingServer.registerProcessor(RequestCode.ACK_MESSAGE, 
consumerManagerActivity, this.updateOffsetExecutor);
         
remotingServer.registerProcessor(RequestCode.CHANGE_MESSAGE_INVISIBLETIME, 
consumerManagerActivity, this.updateOffsetExecutor);
+        
remotingServer.registerProcessor(RequestCode.GET_CONSUMER_CONNECTION_LIST, 
consumerManagerActivity, this.updateOffsetExecutor);
 
         
remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, 
consumerManagerActivity, this.defaultExecutor);
         remotingServer.registerProcessor(RequestCode.GET_MAX_OFFSET, 
consumerManagerActivity, this.defaultExecutor);
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivity.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivity.java
index 1c1993ff0..e9d42afc2 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivity.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivity.java
@@ -17,17 +17,25 @@
 
 package org.apache.rocketmq.proxy.remoting.activity;
 
+import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import org.apache.rocketmq.broker.client.ClientChannelInfo;
 import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.protocol.RequestCode;
 import org.apache.rocketmq.remoting.protocol.ResponseCode;
+import org.apache.rocketmq.remoting.protocol.body.Connection;
+import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
 import org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody;
 import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody;
+import 
org.apache.rocketmq.remoting.protocol.header.GetConsumerConnectionListRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.GetConsumerListByGroupRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.GetConsumerListByGroupResponseBody;
 import 
org.apache.rocketmq.remoting.protocol.header.GetConsumerListByGroupResponseHeader;
@@ -62,6 +70,9 @@ public class ConsumerManagerActivity extends 
AbstractRemotingActivity {
             case RequestCode.GET_EARLIEST_MSG_STORETIME: {
                 return request(ctx, request, context, 
Duration.ofSeconds(3).toMillis());
             }
+            case RequestCode.GET_CONSUMER_CONNECTION_LIST: {
+                return getConsumerConnectionList(ctx, request, context);
+            }
             default:
                 break;
         }
@@ -81,6 +92,43 @@ public class ConsumerManagerActivity extends 
AbstractRemotingActivity {
         return response;
     }
 
+    protected RemotingCommand getConsumerConnectionList(ChannelHandlerContext 
ctx, RemotingCommand request,
+        ProxyContext context) throws Exception {
+        RemotingCommand response = 
RemotingCommand.createResponseCommand(GetConsumerConnectionListRequestHeader.class);
+        GetConsumerConnectionListRequestHeader header = 
(GetConsumerConnectionListRequestHeader) 
request.decodeCommandCustomHeader(GetConsumerConnectionListRequestHeader.class);
+        ConsumerGroupInfo consumerGroupInfo = 
messagingProcessor.getConsumerGroupInfo(header.getConsumerGroup());
+        if (consumerGroupInfo != null) {
+            ConsumerConnection bodydata = new ConsumerConnection();
+            
bodydata.setConsumeFromWhere(consumerGroupInfo.getConsumeFromWhere());
+            bodydata.setConsumeType(consumerGroupInfo.getConsumeType());
+            bodydata.setMessageModel(consumerGroupInfo.getMessageModel());
+            
bodydata.getSubscriptionTable().putAll(consumerGroupInfo.getSubscriptionTable());
+
+            Iterator<Map.Entry<Channel, ClientChannelInfo>> it = 
consumerGroupInfo.getChannelInfoTable().entrySet().iterator();
+            while (it.hasNext()) {
+                ClientChannelInfo info = it.next().getValue();
+                Connection connection = new Connection();
+                connection.setClientId(info.getClientId());
+                connection.setLanguage(info.getLanguage());
+                connection.setVersion(info.getVersion());
+                
connection.setClientAddr(RemotingHelper.parseChannelRemoteAddr(info.getChannel()));
+
+                bodydata.getConnectionSet().add(connection);
+            }
+
+            byte[] body = bodydata.encode();
+            response.setBody(body);
+            response.setCode(ResponseCode.SUCCESS);
+            response.setRemark(null);
+
+            return response;
+        }
+
+        response.setCode(ResponseCode.CONSUMER_NOT_ONLINE);
+        response.setRemark("the consumer group[" + header.getConsumerGroup() + 
"] not online");
+        return response;
+    }
+
     protected RemotingCommand lockBatchMQ(ChannelHandlerContext ctx, 
RemotingCommand request,
         ProxyContext context) throws Exception {
         final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);

Reply via email to