This is an automated email from the ASF dual-hosted git repository. vongosling pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-exporter.git
commit 378184de875742e83cbfc1e92eda98d25c85ef26 Author: breezecoolyang <[email protected]> AuthorDate: Mon Jun 3 10:47:05 2019 +0800 Avoid crash when get consumer offsets as a group of topic consuming --- .../rocketmq/exporter/task/MetricsCollectTask.java | 23 ++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/rocketmq-prometheus-exporter/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java b/rocketmq-prometheus-exporter/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java index 3f359e9..6ae442d 100644 --- a/rocketmq-prometheus-exporter/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java +++ b/rocketmq-prometheus-exporter/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java @@ -106,17 +106,20 @@ public class MetricsCollectTask { GroupList groupList = mqAdminExt.queryTopicConsumeByWho(topic); if (groupList != null && !groupList.getGroupList().isEmpty()) { for (String group : groupList.getGroupList()) { - ConsumeStats consumeStatus = mqAdminExt.examineConsumeStats(group,topic); - Set<Map.Entry<MessageQueue, OffsetWrapper>> consumeStatusEntries = consumeStatus.getOffsetTable().entrySet(); - for (Map.Entry<MessageQueue, OffsetWrapper> consumeStatusEntry : consumeStatusEntries) { - MessageQueue q = consumeStatusEntry.getKey(); - OffsetWrapper offset = consumeStatusEntry.getValue(); - if (consumeOffsetMap.containsKey(q.getBrokerName())) { - consumeOffsetMap.put(q.getBrokerName(), consumeOffsetMap.get(q.getBrokerName()) + offset.getConsumerOffset()); - } - else { - consumeOffsetMap.put(q.getBrokerName(), offset.getConsumerOffset()); + try { + ConsumeStats consumeStatus = mqAdminExt.examineConsumeStats(group, topic); + Set<Map.Entry<MessageQueue, OffsetWrapper>> consumeStatusEntries = consumeStatus.getOffsetTable().entrySet(); + for (Map.Entry<MessageQueue, OffsetWrapper> consumeStatusEntry : consumeStatusEntries) { + MessageQueue q = consumeStatusEntry.getKey(); + OffsetWrapper offset = consumeStatusEntry.getValue(); + if (consumeOffsetMap.containsKey(q.getBrokerName())) { + consumeOffsetMap.put(q.getBrokerName(), consumeOffsetMap.get(q.getBrokerName()) + offset.getConsumerOffset()); + } else { + consumeOffsetMap.put(q.getBrokerName(), offset.getConsumerOffset()); + } } + } catch (Exception e) { + log.info("ignore this consumer", e.getMessage()); } Set<Map.Entry<String, Long>> consumeOffsetEntries = consumeOffsetMap.entrySet(); for (Map.Entry<String, Long> consumeOffsetEntry : consumeOffsetEntries) {
