This is an automated email from the ASF dual-hosted git repository.
lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 8086fc545d [ISSUE #9152] Broker getConsumeStats supports inputting
multiple topics (#9153)
8086fc545d is described below
commit 8086fc545d1e403c52d770a15e7be7c247e849b0
Author: qianye <[email protected]>
AuthorDate: Wed Feb 5 14:41:53 2025 +0800
[ISSUE #9152] Broker getConsumeStats supports inputting multiple topics
(#9153)
* [ISSUE #9152] The getConsumeStats supports inputting multiple topics
---
.../broker/processor/AdminBrokerProcessor.java | 60 ++++++----
.../rocketmq/client/impl/MQClientAPIImpl.java | 15 ++-
.../ExportRocksDBConfigToJsonRequestHeader.java | 3 +-
.../header/GetConsumeStatsRequestHeader.java | 37 ++++++-
.../header/GetConsumeStatsRequestHeaderTest.java | 123 +++++++++++++++++++++
.../tools/admin/DefaultMQAdminExtTest.java | 2 +-
6 files changed, 213 insertions(+), 27 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index a9b913192f..2247e90f56 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -1947,16 +1947,14 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
final RemotingCommand response =
RemotingCommand.createResponseCommand(null);
try {
final GetConsumeStatsRequestHeader requestHeader =
request.decodeCommandCustomHeader(GetConsumeStatsRequestHeader.class);
- ConsumeStats consumeStats = new ConsumeStats();
+ List<String> topicListProvided = requestHeader.fetchTopicList();
+ String topicProvided = requestHeader.getTopic();
+ String group = requestHeader.getConsumerGroup();
- Set<String> topics = new HashSet<>();
- if (UtilAll.isBlank(requestHeader.getTopic())) {
- topics =
this.brokerController.getConsumerOffsetManager().whichTopicByConsumer(requestHeader.getConsumerGroup());
- } else {
- topics.add(requestHeader.getTopic());
- }
+ ConsumeStats consumeStats = new ConsumeStats();
+ Set<String> topicsForCollecting =
getTopicsForCollectingConsumeStats(topicListProvided, topicProvided, group);
- for (String topic : topics) {
+ for (String topic : topicsForCollecting) {
TopicConfig topicConfig =
this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
if (null == topicConfig) {
LOGGER.warn("AdminBrokerProcessor#getConsumeStats: topic
config does not exist, topic={}", topic);
@@ -1964,20 +1962,6 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
}
TopicQueueMappingDetail mappingDetail =
this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(topic);
-
- {
- SubscriptionData findSubscriptionData =
-
this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getConsumerGroup(),
topic);
-
- if (null == findSubscriptionData
- &&
this.brokerController.getConsumerManager().findSubscriptionDataCount(requestHeader.getConsumerGroup())
> 0) {
- LOGGER.warn(
- "AdminBrokerProcessor#getConsumeStats: topic does
not exist in consumer group's subscription, "
- + "topic={}, consumer group={}", topic,
requestHeader.getConsumerGroup());
- continue;
- }
- }
-
for (int i = 0; i < topicConfig.getReadQueueNums(); i++) {
MessageQueue mq = new MessageQueue();
mq.setTopic(topic);
@@ -2038,6 +2022,38 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
return response;
}
+ private Set<String> getTopicsForCollectingConsumeStats(List<String>
topicListProvided, String topicProvided,
+ String group) {
+ Set<String> topicsForCollecting = new HashSet<>();
+ if (!topicListProvided.isEmpty()) {
+ // if topic list is provided, only collect the topics in the list
+ // and ignore subscription check
+ topicsForCollecting.addAll(topicListProvided);
+ } else {
+ // In order to be compatible with the old logic,
+ // even if the topic has been provided here, the subscription will
be checked.
+ if (UtilAll.isBlank(topicProvided)) {
+ topicsForCollecting.addAll(
+
this.brokerController.getConsumerOffsetManager().whichTopicByConsumer(group));
+ } else {
+ topicsForCollecting.add(topicProvided);
+ }
+ int subscriptionCount =
this.brokerController.getConsumerManager().findSubscriptionDataCount(group);
+ Iterator<String> iterator = topicsForCollecting.iterator();
+ while (iterator.hasNext()) {
+ String topic = iterator.next();
+ SubscriptionData findSubscriptionData =
this.brokerController.getConsumerManager().findSubscriptionData(group, topic);
+ if (findSubscriptionData == null && subscriptionCount > 0) {
+ LOGGER.warn(
+ "AdminBrokerProcessor#getConsumeStats: topic does not
exist in consumer group's subscription, topic={}, consumer group={}",
+ topic, group);
+ iterator.remove();
+ }
+ }
+ }
+ return topicsForCollecting;
+ }
+
private RemotingCommand getAllConsumerOffset(ChannelHandlerContext ctx,
RemotingCommand request) {
final RemotingCommand response =
RemotingCommand.createResponseCommand(null);
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 114093e350..bed6c1c476 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -1748,16 +1748,27 @@ public class MQClientAPIImpl implements
NameServerUpdateCallback, StartAndShutdo
public ConsumeStats getConsumeStats(final String addr, final String
consumerGroup, final long timeoutMillis)
throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException,
MQBrokerException {
- return getConsumeStats(addr, consumerGroup, null, timeoutMillis);
+ return getConsumeStats(addr, consumerGroup, null, null, timeoutMillis);
+ }
+
+ public ConsumeStats getConsumeStats(final String addr, final String
consumerGroup, final List<String> topicList,
+ final long timeoutMillis) throws RemotingSendRequestException,
RemotingConnectException, RemotingTimeoutException, MQBrokerException,
InterruptedException {
+ return getConsumeStats(addr, consumerGroup, null, topicList,
timeoutMillis);
}
public ConsumeStats getConsumeStats(final String addr, final String
consumerGroup, final String topic,
- final long timeoutMillis)
+ final long timeoutMillis) throws RemotingSendRequestException,
RemotingConnectException, RemotingTimeoutException, MQBrokerException,
InterruptedException {
+ return getConsumeStats(addr, consumerGroup, topic, null,
timeoutMillis);
+ }
+
+ public ConsumeStats getConsumeStats(final String addr, final String
consumerGroup, final String topic,
+ final List<String> topicList, final long timeoutMillis)
throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException,
MQBrokerException {
GetConsumeStatsRequestHeader requestHeader = new
GetConsumeStatsRequestHeader();
requestHeader.setConsumerGroup(consumerGroup);
requestHeader.setTopic(topic);
+ requestHeader.updateTopicList(topicList);
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.GET_CONSUME_STATS,
requestHeader);
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExportRocksDBConfigToJsonRequestHeader.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExportRocksDBConfigToJsonRequestHeader.java
index 7b1f9470e1..8354f83053 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExportRocksDBConfigToJsonRequestHeader.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExportRocksDBConfigToJsonRequestHeader.java
@@ -21,12 +21,13 @@ import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.action.Action;
import org.apache.rocketmq.common.action.RocketMQAction;
+import org.apache.rocketmq.common.resource.ResourceType;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.protocol.RequestCode;
-@RocketMQAction(value = RequestCode.EXPORT_ROCKSDB_CONFIG_TO_JSON, action =
Action.GET)
+@RocketMQAction(value = RequestCode.EXPORT_ROCKSDB_CONFIG_TO_JSON, resource =
ResourceType.CLUSTER, action = Action.GET)
public class ExportRocksDBConfigToJsonRequestHeader implements
CommandCustomHeader {
private static final String CONFIG_TYPE_SEPARATOR = ";";
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetConsumeStatsRequestHeader.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetConsumeStatsRequestHeader.java
index 51a46879e8..2c51c3f529 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetConsumeStatsRequestHeader.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetConsumeStatsRequestHeader.java
@@ -17,27 +17,62 @@
package org.apache.rocketmq.remoting.protocol.header;
import com.google.common.base.MoreObjects;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.action.Action;
import org.apache.rocketmq.common.action.RocketMQAction;
import org.apache.rocketmq.common.resource.ResourceType;
import org.apache.rocketmq.common.resource.RocketMQResource;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-import org.apache.rocketmq.remoting.rpc.TopicRequestHeader;
import org.apache.rocketmq.remoting.protocol.RequestCode;
+import org.apache.rocketmq.remoting.rpc.TopicRequestHeader;
@RocketMQAction(value = RequestCode.GET_CONSUME_STATS, action = Action.GET)
public class GetConsumeStatsRequestHeader extends TopicRequestHeader {
+ private static final String TOPIC_NAME_SEPARATOR = ";";
+
@CFNotNull
@RocketMQResource(ResourceType.GROUP)
private String consumerGroup;
+
@RocketMQResource(ResourceType.TOPIC)
private String topic;
+ // if topicList is provided, topic will be ignored
+ @RocketMQResource(value = ResourceType.TOPIC, splitter =
TOPIC_NAME_SEPARATOR)
+ private String topicList;
+
@Override
public void checkFields() throws RemotingCommandException {
}
+ public List<String> fetchTopicList() {
+ if (StringUtils.isBlank(topicList)) {
+ return Collections.emptyList();
+ }
+ return Arrays.asList(StringUtils.split(topicList,
TOPIC_NAME_SEPARATOR));
+ }
+
+ public void updateTopicList(List<String> topicList) {
+ if (topicList == null || topicList.isEmpty()) {
+ return;
+ }
+ StringBuilder sb = new StringBuilder();
+ topicList.forEach(topic ->
sb.append(topic).append(TOPIC_NAME_SEPARATOR));
+ this.setTopicList(sb.toString());
+ }
+
+ public String getTopicList() {
+ return topicList;
+ }
+
+ public void setTopicList(String topicList) {
+ this.topicList = topicList;
+ }
+
public String getConsumerGroup() {
return consumerGroup;
}
diff --git
a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/header/GetConsumeStatsRequestHeaderTest.java
b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/header/GetConsumeStatsRequestHeaderTest.java
new file mode 100644
index 0000000000..8004305e17
--- /dev/null
+++
b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/header/GetConsumeStatsRequestHeaderTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.remoting.protocol.header;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class GetConsumeStatsRequestHeaderTest {
+
+ private GetConsumeStatsRequestHeader header;
+
+ @Before
+ public void setUp() {
+ header = new GetConsumeStatsRequestHeader();
+ }
+
+ @Test
+ public void updateTopicList_NullTopicList_DoesNotUpdate() {
+ header.updateTopicList(null);
+ assertNull(header.getTopicList());
+ }
+
+ @Test
+ public void updateTopicList_EmptyTopicList_SetsEmptyString() {
+ header.updateTopicList(Collections.emptyList());
+ assertNull(header.getTopicList());
+ }
+
+ @Test
+ public void updateTopicList_SingleTopic_SetsSingleTopicString() {
+ List<String> topicList = Collections.singletonList("TopicA");
+ header.updateTopicList(topicList);
+ assertEquals("TopicA;", header.getTopicList());
+ }
+
+ @Test
+ public void updateTopicList_MultipleTopics_SetsMultipleTopicsString() {
+ List<String> topicList = Arrays.asList("TopicA", "TopicB", "TopicC");
+ header.updateTopicList(topicList);
+ assertEquals("TopicA;TopicB;TopicC;", header.getTopicList());
+ }
+
+ @Test
+ public void updateTopicList_RepeatedTopics_SetsRepeatedTopicsString() {
+ List<String> topicList = Arrays.asList("TopicA", "TopicA", "TopicB");
+ header.updateTopicList(topicList);
+ assertEquals("TopicA;TopicA;TopicB;", header.getTopicList());
+ }
+
+ @Test
+ public void fetchTopicList_NullTopicList_ReturnsEmptyList() {
+ header.setTopicList(null);
+ List<String> topicList = header.fetchTopicList();
+ assertEquals(Collections.emptyList(), topicList);
+
+ header.updateTopicList(new ArrayList<>());
+ topicList = header.fetchTopicList();
+ assertEquals(Collections.emptyList(), topicList);
+ }
+
+ @Test
+ public void fetchTopicList_EmptyTopicList_ReturnsEmptyList() {
+ header.setTopicList("");
+ List<String> topicList = header.fetchTopicList();
+ assertEquals(Collections.emptyList(), topicList);
+ }
+
+ @Test
+ public void fetchTopicList_BlankTopicList_ReturnsEmptyList() {
+ header.setTopicList(" ");
+ List<String> topicList = header.fetchTopicList();
+ assertEquals(Collections.emptyList(), topicList);
+ }
+
+ @Test
+ public void fetchTopicList_SingleTopic_ReturnsSingleTopicList() {
+ header.setTopicList("TopicA");
+ List<String> topicList = header.fetchTopicList();
+ assertEquals(Collections.singletonList("TopicA"), topicList);
+ }
+
+ @Test
+ public void fetchTopicList_MultipleTopics_ReturnsTopicList() {
+ header.setTopicList("TopicA;TopicB;TopicC");
+ List<String> topicList = header.fetchTopicList();
+ assertEquals(Arrays.asList("TopicA", "TopicB", "TopicC"), topicList);
+ }
+
+ @Test
+ public void fetchTopicList_TopicListEndsWithSeparator_ReturnsTopicList() {
+ header.setTopicList("TopicA;TopicB;");
+ List<String> topicList = header.fetchTopicList();
+ assertEquals(Arrays.asList("TopicA", "TopicB"), topicList);
+ }
+
+ @Test
+ public void fetchTopicList_TopicListStartsWithSeparator_ReturnsTopicList()
{
+ header.setTopicList(";TopicA;TopicB");
+ List<String> topicList = header.fetchTopicList();
+ assertEquals(Arrays.asList("TopicA", "TopicB"), topicList);
+ }
+}
diff --git
a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
index dc5642f88c..ec5f7571d2 100644
---
a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
+++
b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
@@ -456,7 +456,7 @@ public class DefaultMQAdminExtTest {
connection.setConnectionSet(connections);
when(mQClientAPIImpl.getConsumerConnectionList(anyString(),
anyString(), anyLong())).thenReturn(connection);
ConsumeStats consumeStats = new ConsumeStats();
- when(mQClientAPIImpl.getConsumeStats(anyString(), anyString(),
isNull(), anyLong())).thenReturn(consumeStats);
+ when(mQClientAPIImpl.getConsumeStats(anyString(), anyString(),
(String) isNull(), anyLong())).thenReturn(consumeStats);
List<MessageTrack> broadcastMessageTracks =
defaultMQAdminExt.messageTrackDetail(messageExt);
assertThat(broadcastMessageTracks.size()).isEqualTo(2);
assertThat(broadcastMessageTracks.get(0).getTrackType()).isEqualTo(TrackType.CONSUME_BROADCASTING);