Repository: incubator-rocketmq Updated Branches: refs/heads/master [created] 057d0e9b1
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/DefaultMonitorListener.java ---------------------------------------------------------------------- diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/DefaultMonitorListener.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/DefaultMonitorListener.java new file mode 100644 index 0000000..b8176d4 --- /dev/null +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/DefaultMonitorListener.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.tools.monitor; + +import com.alibaba.rocketmq.client.log.ClientLogger; +import com.alibaba.rocketmq.common.protocol.body.ConsumerRunningInfo; +import org.slf4j.Logger; + +import java.util.Iterator; +import java.util.Map.Entry; +import java.util.TreeMap; + + +public class DefaultMonitorListener implements MonitorListener { + private final static String LOG_PREFIX = "[MONITOR] "; + private final static String LOG_NOTIFY = LOG_PREFIX + " [NOTIFY] "; + private final Logger log = ClientLogger.getLog(); + + + public DefaultMonitorListener() { + } + + + @Override + public void beginRound() { + log.info(LOG_PREFIX + "=========================================beginRound"); + } + + + @Override + public void reportUndoneMsgs(UndoneMsgs undoneMsgs) { + log.info(String.format(LOG_PREFIX + "reportUndoneMsgs: %s", undoneMsgs)); + } + + + @Override + public void reportFailedMsgs(FailedMsgs failedMsgs) { + log.info(String.format(LOG_PREFIX + "reportFailedMsgs: %s", failedMsgs)); + } + + + @Override + public void reportDeleteMsgsEvent(DeleteMsgsEvent deleteMsgsEvent) { + log.info(String.format(LOG_PREFIX + "reportDeleteMsgsEvent: %s", deleteMsgsEvent)); + } + + + @Override + public void reportConsumerRunningInfo(TreeMap<String, ConsumerRunningInfo> criTable) { + + { + boolean result = ConsumerRunningInfo.analyzeSubscription(criTable); + if (!result) { + log.info(String.format(LOG_NOTIFY + + "reportConsumerRunningInfo: ConsumerGroup: %s, Subscription different", criTable + .firstEntry().getValue().getProperties().getProperty("consumerGroup"))); + } + } + + + { + Iterator<Entry<String, ConsumerRunningInfo>> it = criTable.entrySet().iterator(); + while (it.hasNext()) { + Entry<String, ConsumerRunningInfo> next = it.next(); + String result = ConsumerRunningInfo.analyzeProcessQueue(next.getKey(), next.getValue()); + if (!result.isEmpty()) { + log.info(String.format(LOG_NOTIFY + + "reportConsumerRunningInfo: ConsumerGroup: %s, ClientId: %s, %s", + criTable.firstEntry().getValue().getProperties().getProperty("consumerGroup"), + next.getKey(), + result)); + } + } + } + } + + + @Override + public void endRound() { + log.info(LOG_PREFIX + "=========================================endRound"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/DeleteMsgsEvent.java ---------------------------------------------------------------------- diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/DeleteMsgsEvent.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/DeleteMsgsEvent.java new file mode 100644 index 0000000..25ac420 --- /dev/null +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/DeleteMsgsEvent.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.tools.monitor; + +import com.alibaba.rocketmq.common.protocol.topic.OffsetMovedEvent; + + +public class DeleteMsgsEvent { + private OffsetMovedEvent offsetMovedEvent; + private long eventTimestamp; + + + public OffsetMovedEvent getOffsetMovedEvent() { + return offsetMovedEvent; + } + + + public void setOffsetMovedEvent(OffsetMovedEvent offsetMovedEvent) { + this.offsetMovedEvent = offsetMovedEvent; + } + + + public long getEventTimestamp() { + return eventTimestamp; + } + + + public void setEventTimestamp(long eventTimestamp) { + this.eventTimestamp = eventTimestamp; + } + + + @Override + public String toString() { + return "DeleteMsgsEvent [offsetMovedEvent=" + offsetMovedEvent + ", eventTimestamp=" + eventTimestamp + + "]"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/FailedMsgs.java ---------------------------------------------------------------------- diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/FailedMsgs.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/FailedMsgs.java new file mode 100644 index 0000000..3ae5c2f --- /dev/null +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/FailedMsgs.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.tools.monitor; + +public class FailedMsgs { + private String consumerGroup; + private String topic; + private long failedMsgsTotalRecently; + + + public String getConsumerGroup() { + return consumerGroup; + } + + + public void setConsumerGroup(String consumerGroup) { + this.consumerGroup = consumerGroup; + } + + + public String getTopic() { + return topic; + } + + + public void setTopic(String topic) { + this.topic = topic; + } + + + public long getFailedMsgsTotalRecently() { + return failedMsgsTotalRecently; + } + + + public void setFailedMsgsTotalRecently(long failedMsgsTotalRecently) { + this.failedMsgsTotalRecently = failedMsgsTotalRecently; + } + + + @Override + public String toString() { + return "FailedMsgs [consumerGroup=" + consumerGroup + ", topic=" + topic + + ", failedMsgsTotalRecently=" + failedMsgsTotalRecently + "]"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/MonitorConfig.java ---------------------------------------------------------------------- diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/MonitorConfig.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/MonitorConfig.java new file mode 100644 index 0000000..7ef4513 --- /dev/null +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/MonitorConfig.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.tools.monitor; + +import com.alibaba.rocketmq.common.MixAll; + + +public class MonitorConfig { + private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, + System.getenv(MixAll.NAMESRV_ADDR_ENV)); + + private int roundInterval = 1000 * 60; + + + public String getNamesrvAddr() { + return namesrvAddr; + } + + + public void setNamesrvAddr(String namesrvAddr) { + this.namesrvAddr = namesrvAddr; + } + + + public int getRoundInterval() { + return roundInterval; + } + + + public void setRoundInterval(int roundInterval) { + this.roundInterval = roundInterval; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/MonitorListener.java ---------------------------------------------------------------------- diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/MonitorListener.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/MonitorListener.java new file mode 100644 index 0000000..1586ef9 --- /dev/null +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/MonitorListener.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.tools.monitor; + +import com.alibaba.rocketmq.common.protocol.body.ConsumerRunningInfo; + +import java.util.TreeMap; + +public interface MonitorListener { + void beginRound(); + + void reportUndoneMsgs(UndoneMsgs undoneMsgs); + + void reportFailedMsgs(FailedMsgs failedMsgs); + + void reportDeleteMsgsEvent(DeleteMsgsEvent deleteMsgsEvent); + + void reportConsumerRunningInfo(TreeMap<String/* clientId */, ConsumerRunningInfo> criTable); + + void endRound(); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/MonitorService.java ---------------------------------------------------------------------- diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/MonitorService.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/MonitorService.java new file mode 100644 index 0000000..2b50862 --- /dev/null +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/MonitorService.java @@ -0,0 +1,325 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.tools.monitor; + +import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer; +import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; +import com.alibaba.rocketmq.client.consumer.PullResult; +import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import com.alibaba.rocketmq.client.exception.MQBrokerException; +import com.alibaba.rocketmq.client.exception.MQClientException; +import com.alibaba.rocketmq.client.log.ClientLogger; +import com.alibaba.rocketmq.common.MQVersion; +import com.alibaba.rocketmq.common.MixAll; +import com.alibaba.rocketmq.common.ThreadFactoryImpl; +import com.alibaba.rocketmq.common.admin.ConsumeStats; +import com.alibaba.rocketmq.common.admin.OffsetWrapper; +import com.alibaba.rocketmq.common.message.MessageExt; +import com.alibaba.rocketmq.common.message.MessageQueue; +import com.alibaba.rocketmq.common.protocol.body.Connection; +import com.alibaba.rocketmq.common.protocol.body.ConsumerConnection; +import com.alibaba.rocketmq.common.protocol.body.ConsumerRunningInfo; +import com.alibaba.rocketmq.common.protocol.body.TopicList; +import com.alibaba.rocketmq.common.protocol.topic.OffsetMovedEvent; +import com.alibaba.rocketmq.remoting.RPCHook; +import com.alibaba.rocketmq.remoting.exception.RemotingException; +import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt; +import org.slf4j.Logger; + +import java.util.*; +import java.util.Map.Entry; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + + +public class MonitorService { + private final Logger log = ClientLogger.getLog(); + private final ScheduledExecutorService scheduledExecutorService = Executors + .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("MonitorService")); + + private final MonitorConfig monitorConfig; + + private final MonitorListener monitorListener; + + private final DefaultMQAdminExt defaultMQAdminExt; + private final DefaultMQPullConsumer defaultMQPullConsumer = new DefaultMQPullConsumer( + MixAll.TOOLS_CONSUMER_GROUP); + private final DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer( + MixAll.MONITOR_CONSUMER_GROUP); + + + public MonitorService(MonitorConfig monitorConfig, MonitorListener monitorListener, RPCHook rpcHook) { + this.monitorConfig = monitorConfig; + this.monitorListener = monitorListener; + + this.defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); + this.defaultMQAdminExt.setInstanceName(instanceName()); + this.defaultMQAdminExt.setNamesrvAddr(monitorConfig.getNamesrvAddr()); + + this.defaultMQPullConsumer.setInstanceName(instanceName()); + this.defaultMQPullConsumer.setNamesrvAddr(monitorConfig.getNamesrvAddr()); + + this.defaultMQPushConsumer.setInstanceName(instanceName()); + this.defaultMQPushConsumer.setNamesrvAddr(monitorConfig.getNamesrvAddr()); + try { + this.defaultMQPushConsumer.setConsumeThreadMin(1); + this.defaultMQPushConsumer.setConsumeThreadMax(1); + this.defaultMQPushConsumer.subscribe(MixAll.OFFSET_MOVED_EVENT, "*"); + this.defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() { + + @Override + public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, + ConsumeConcurrentlyContext context) { + try { + OffsetMovedEvent ome = + OffsetMovedEvent.decode(msgs.get(0).getBody(), OffsetMovedEvent.class); + + DeleteMsgsEvent deleteMsgsEvent = new DeleteMsgsEvent(); + deleteMsgsEvent.setOffsetMovedEvent(ome); + deleteMsgsEvent.setEventTimestamp(msgs.get(0).getStoreTimestamp()); + + MonitorService.this.monitorListener.reportDeleteMsgsEvent(deleteMsgsEvent); + } catch (Exception e) { + } + + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + }); + } catch (MQClientException e) { + } + } + + + private String instanceName() { + String name = + System.currentTimeMillis() + new Random().nextInt() + this.monitorConfig.getNamesrvAddr(); + + return "MonitorService_" + name.hashCode(); + } + + public static void main(String[] args) throws MQClientException { + main0(args, null); + } + + public static void main0(String[] args, RPCHook rpcHook) throws MQClientException { + final MonitorService monitorService = + new MonitorService(new MonitorConfig(), new DefaultMonitorListener(), rpcHook); + monitorService.start(); + + Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { + private volatile boolean hasShutdown = false; + + + @Override + public void run() { + synchronized (this) { + if (!this.hasShutdown) { + this.hasShutdown = true; + monitorService.shutdown(); + } + } + } + }, "ShutdownHook")); + } + + public void start() throws MQClientException { + this.defaultMQPullConsumer.start(); + this.defaultMQAdminExt.start(); + this.defaultMQPushConsumer.start(); + this.startScheduleTask(); + } + + public void shutdown() { + this.defaultMQPullConsumer.shutdown(); + this.defaultMQAdminExt.shutdown(); + this.defaultMQPushConsumer.shutdown(); + } + + private void startScheduleTask() { + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + MonitorService.this.doMonitorWork(); + } catch (Exception e) { + log.error("doMonitorWork Exception", e); + } + } + }, 1000 * 20, this.monitorConfig.getRoundInterval(), TimeUnit.MILLISECONDS); + } + + public void doMonitorWork() throws RemotingException, MQClientException, InterruptedException { + long beginTime = System.currentTimeMillis(); + this.monitorListener.beginRound(); + + TopicList topicList = defaultMQAdminExt.fetchAllTopicList(); + for (String topic : topicList.getTopicList()) { + if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { + String consumerGroup = topic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length()); + + try { + this.reportUndoneMsgs(consumerGroup); + } catch (Exception e) { + // log.error("reportUndoneMsgs Exception", e); + } + + + try { + this.reportConsumerRunningInfo(consumerGroup); + } catch (Exception e) { + // log.error("reportConsumerRunningInfo Exception", e); + } + } + } + this.monitorListener.endRound(); + long spentTimeMills = System.currentTimeMillis() - beginTime; + log.info("Execute one round monitor work, spent timemills: {}", spentTimeMills); + } + + private void reportUndoneMsgs(final String consumerGroup) { + ConsumeStats cs = null; + try { + cs = defaultMQAdminExt.examineConsumeStats(consumerGroup); + } catch (Exception e) { + return; + } + + ConsumerConnection cc = null; + try { + cc = defaultMQAdminExt.examineConsumerConnectionInfo(consumerGroup); + } catch (Exception e) { + return; + } + + if (cs != null) { + + HashMap<String/* Topic */, ConsumeStats> csByTopic = new HashMap<String, ConsumeStats>(); + { + Iterator<Entry<MessageQueue, OffsetWrapper>> it = cs.getOffsetTable().entrySet().iterator(); + while (it.hasNext()) { + Entry<MessageQueue, OffsetWrapper> next = it.next(); + MessageQueue mq = next.getKey(); + OffsetWrapper ow = next.getValue(); + ConsumeStats csTmp = csByTopic.get(mq.getTopic()); + if (null == csTmp) { + csTmp = new ConsumeStats(); + csByTopic.put(mq.getTopic(), csTmp); + } + + csTmp.getOffsetTable().put(mq, ow); + } + } + + + { + Iterator<Entry<String, ConsumeStats>> it = csByTopic.entrySet().iterator(); + while (it.hasNext()) { + Entry<String, ConsumeStats> next = it.next(); + UndoneMsgs undoneMsgs = new UndoneMsgs(); + undoneMsgs.setConsumerGroup(consumerGroup); + undoneMsgs.setTopic(next.getKey()); + this.computeUndoneMsgs(undoneMsgs, next.getValue()); + this.monitorListener.reportUndoneMsgs(undoneMsgs); + this.reportFailedMsgs(consumerGroup, next.getKey()); + } + } + } + } + + public void reportConsumerRunningInfo(final String consumerGroup) throws InterruptedException, + MQBrokerException, RemotingException, MQClientException { + ConsumerConnection cc = defaultMQAdminExt.examineConsumerConnectionInfo(consumerGroup); + TreeMap<String, ConsumerRunningInfo> infoMap = new TreeMap<String, ConsumerRunningInfo>(); + for (Connection c : cc.getConnectionSet()) { + String clientId = c.getClientId(); + + if (c.getVersion() < MQVersion.Version.V3_1_8_SNAPSHOT.ordinal()) { + continue; + } + + try { + ConsumerRunningInfo info = + defaultMQAdminExt.getConsumerRunningInfo(consumerGroup, clientId, false); + infoMap.put(clientId, info); + } catch (Exception e) { + } + } + + if (!infoMap.isEmpty()) { + this.monitorListener.reportConsumerRunningInfo(infoMap); + } + } + + private void computeUndoneMsgs(final UndoneMsgs undoneMsgs, final ConsumeStats consumeStats) { + long total = 0; + long singleMax = 0; + long delayMax = 0; + Iterator<Entry<MessageQueue, OffsetWrapper>> it = consumeStats.getOffsetTable().entrySet().iterator(); + while (it.hasNext()) { + Entry<MessageQueue, OffsetWrapper> next = it.next(); + MessageQueue mq = next.getKey(); + OffsetWrapper ow = next.getValue(); + long diff = ow.getBrokerOffset() - ow.getConsumerOffset(); + + if (diff > singleMax) { + singleMax = diff; + } + + if (diff > 0) { + total += diff; + } + + // Delay + if (ow.getLastTimestamp() > 0) { + try { + long maxOffset = this.defaultMQPullConsumer.maxOffset(mq); + if (maxOffset > 0) { + PullResult pull = this.defaultMQPullConsumer.pull(mq, "*", maxOffset - 1, 1); + switch (pull.getPullStatus()) { + case FOUND: + long delay = + pull.getMsgFoundList().get(0).getStoreTimestamp() - ow.getLastTimestamp(); + if (delay > delayMax) { + delayMax = delay; + } + break; + case NO_MATCHED_MSG: + case NO_NEW_MSG: + case OFFSET_ILLEGAL: + break; + default: + break; + } + } + } catch (Exception e) { + } + } + } + + undoneMsgs.setUndoneMsgsTotal(total); + undoneMsgs.setUndoneMsgsSingleMQ(singleMax); + undoneMsgs.setUndoneMsgsDelayTimeMills(delayMax); + } + + private void reportFailedMsgs(final String consumerGroup, final String topic) { + + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/UndoneMsgs.java ---------------------------------------------------------------------- diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/UndoneMsgs.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/UndoneMsgs.java new file mode 100644 index 0000000..1638d14 --- /dev/null +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/UndoneMsgs.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.tools.monitor; + +public class UndoneMsgs { + private String consumerGroup; + private String topic; + + private long undoneMsgsTotal; + + private long undoneMsgsSingleMQ; + + private long undoneMsgsDelayTimeMills; + + + public String getConsumerGroup() { + return consumerGroup; + } + + + public void setConsumerGroup(String consumerGroup) { + this.consumerGroup = consumerGroup; + } + + + public String getTopic() { + return topic; + } + + + public void setTopic(String topic) { + this.topic = topic; + } + + + public long getUndoneMsgsTotal() { + return undoneMsgsTotal; + } + + + public void setUndoneMsgsTotal(long undoneMsgsTotal) { + this.undoneMsgsTotal = undoneMsgsTotal; + } + + + public long getUndoneMsgsSingleMQ() { + return undoneMsgsSingleMQ; + } + + + public void setUndoneMsgsSingleMQ(long undoneMsgsSingleMQ) { + this.undoneMsgsSingleMQ = undoneMsgsSingleMQ; + } + + + public long getUndoneMsgsDelayTimeMills() { + return undoneMsgsDelayTimeMills; + } + + + public void setUndoneMsgsDelayTimeMills(long undoneMsgsDelayTimeMills) { + this.undoneMsgsDelayTimeMills = undoneMsgsDelayTimeMills; + } + + + @Override + public String toString() { + return "UndoneMsgs [consumerGroup=" + consumerGroup + ", topic=" + topic + ", undoneMsgsTotal=" + + undoneMsgsTotal + ", undoneMsgsSingleMQ=" + undoneMsgsSingleMQ + + ", undoneMsgsDelayTimeMills=" + undoneMsgsDelayTimeMills + "]"; + } +}
