http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/DefaultMQAdminExtImpl.java ---------------------------------------------------------------------- diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/DefaultMQAdminExtImpl.java new file mode 100644 index 0000000..1de96db --- /dev/null +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/DefaultMQAdminExtImpl.java @@ -0,0 +1,933 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.tools.admin; + +import com.alibaba.rocketmq.client.QueryResult; +import com.alibaba.rocketmq.client.admin.MQAdminExtInner; +import com.alibaba.rocketmq.client.exception.MQBrokerException; +import com.alibaba.rocketmq.client.exception.MQClientException; +import com.alibaba.rocketmq.client.impl.MQClientManager; +import com.alibaba.rocketmq.client.impl.factory.MQClientInstance; +import com.alibaba.rocketmq.client.log.ClientLogger; +import com.alibaba.rocketmq.common.MixAll; +import com.alibaba.rocketmq.common.ServiceState; +import com.alibaba.rocketmq.common.TopicConfig; +import com.alibaba.rocketmq.common.UtilAll; +import com.alibaba.rocketmq.common.admin.*; +import com.alibaba.rocketmq.common.help.FAQUrl; +import com.alibaba.rocketmq.common.message.*; +import com.alibaba.rocketmq.common.namesrv.NamesrvUtil; +import com.alibaba.rocketmq.common.protocol.ResponseCode; +import com.alibaba.rocketmq.common.protocol.body.*; +import com.alibaba.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader; +import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData; +import com.alibaba.rocketmq.common.protocol.route.BrokerData; +import com.alibaba.rocketmq.common.protocol.route.QueueData; +import com.alibaba.rocketmq.common.protocol.route.TopicRouteData; +import com.alibaba.rocketmq.common.subscription.SubscriptionGroupConfig; +import com.alibaba.rocketmq.remoting.RPCHook; +import com.alibaba.rocketmq.remoting.common.RemotingHelper; +import com.alibaba.rocketmq.remoting.common.RemotingUtil; +import com.alibaba.rocketmq.remoting.exception.*; +import com.alibaba.rocketmq.tools.admin.api.MessageTrack; +import com.alibaba.rocketmq.tools.admin.api.TrackType; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; + +import java.io.UnsupportedEncodingException; +import java.util.*; +import java.util.Map.Entry; + + +/** + * @author shijia.wxr + */ +public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { + private final Logger log = ClientLogger.getLog(); + private final DefaultMQAdminExt defaultMQAdminExt; + private ServiceState serviceState = ServiceState.CREATE_JUST; + private MQClientInstance mqClientInstance; + private RPCHook rpcHook; + private long timeoutMillis = 20000; + private Random random = new Random(); + + + public DefaultMQAdminExtImpl(DefaultMQAdminExt defaultMQAdminExt, long timeoutMillis) { + this(defaultMQAdminExt, null, timeoutMillis); + } + + + public DefaultMQAdminExtImpl(DefaultMQAdminExt defaultMQAdminExt, RPCHook rpcHook, long timeoutMillis) { + this.defaultMQAdminExt = defaultMQAdminExt; + this.rpcHook = rpcHook; + this.timeoutMillis = timeoutMillis; + } + + + @Override + public void start() throws MQClientException { + switch (this.serviceState) { + case CREATE_JUST: + this.serviceState = ServiceState.START_FAILED; + + this.defaultMQAdminExt.changeInstanceNameToPID(); + + this.mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQAdminExt, rpcHook); + + boolean registerOK = mqClientInstance.registerAdminExt(this.defaultMQAdminExt.getAdminExtGroup(), this); + if (!registerOK) { + this.serviceState = ServiceState.CREATE_JUST; + throw new MQClientException("The adminExt group[" + this.defaultMQAdminExt.getAdminExtGroup() + + "] has created already, specifed another name please."// + + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null); + } + + mqClientInstance.start(); + + log.info("the adminExt [{}] start OK", this.defaultMQAdminExt.getAdminExtGroup()); + + this.serviceState = ServiceState.RUNNING; + break; + case RUNNING: + case START_FAILED: + case SHUTDOWN_ALREADY: + throw new MQClientException("The AdminExt service state not OK, maybe started once, "// + + this.serviceState// + + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); + default: + break; + } + } + + + @Override + public void shutdown() { + switch (this.serviceState) { + case CREATE_JUST: + break; + case RUNNING: + this.mqClientInstance.unregisterAdminExt(this.defaultMQAdminExt.getAdminExtGroup()); + this.mqClientInstance.shutdown(); + + log.info("the adminExt [{}] shutdown OK", this.defaultMQAdminExt.getAdminExtGroup()); + this.serviceState = ServiceState.SHUTDOWN_ALREADY; + break; + case SHUTDOWN_ALREADY: + break; + default: + break; + } + } + + @Override + public void updateBrokerConfig(String brokerAddr, Properties properties) throws RemotingConnectException, RemotingSendRequestException, + RemotingTimeoutException, UnsupportedEncodingException, InterruptedException, MQBrokerException { + this.mqClientInstance.getMQClientAPIImpl().updateBrokerConfig(brokerAddr, properties, timeoutMillis); + } + + @Override + public Properties getBrokerConfig(final String brokerAddr) throws RemotingConnectException, + RemotingSendRequestException, RemotingTimeoutException, UnsupportedEncodingException, InterruptedException, MQBrokerException { + return this.mqClientInstance.getMQClientAPIImpl().getBrokerConfig(brokerAddr, timeoutMillis); + } + + @Override + public void createAndUpdateTopicConfig(String addr, TopicConfig config) throws RemotingException, MQBrokerException, + InterruptedException, MQClientException { + this.mqClientInstance.getMQClientAPIImpl().createTopic(addr, this.defaultMQAdminExt.getCreateTopicKey(), config, timeoutMillis); + } + + @Override + public void createAndUpdateSubscriptionGroupConfig(String addr, SubscriptionGroupConfig config) throws RemotingException, + MQBrokerException, InterruptedException, MQClientException { + this.mqClientInstance.getMQClientAPIImpl().createSubscriptionGroup(addr, config, timeoutMillis); + } + + @Override + public SubscriptionGroupConfig examineSubscriptionGroupConfig(String addr, String group) { + // TODO Auto-generated method stub + return null; + } + + @Override + public TopicConfig examineTopicConfig(String addr, String topic) { + // TODO Auto-generated method stub + return null; + } + + @Override + public TopicStatsTable examineTopicStats(String topic) throws RemotingException, MQClientException, InterruptedException, + MQBrokerException { + TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic); + TopicStatsTable topicStatsTable = new TopicStatsTable(); + + for (BrokerData bd : topicRouteData.getBrokerDatas()) { + String addr = bd.selectBrokerAddr(); + if (addr != null) { + TopicStatsTable tst = this.mqClientInstance.getMQClientAPIImpl().getTopicStatsInfo(addr, topic, timeoutMillis); + topicStatsTable.getOffsetTable().putAll(tst.getOffsetTable()); + } + } + + if (topicStatsTable.getOffsetTable().isEmpty()) { + throw new MQClientException("Not found the topic stats info", null); + } + + return topicStatsTable; + } + + @Override + public TopicList fetchAllTopicList() throws RemotingException, MQClientException, InterruptedException { + return this.mqClientInstance.getMQClientAPIImpl().getTopicListFromNameServer(timeoutMillis); + } + + @Override + public TopicList fetchTopicsByCLuster(String clusterName) throws RemotingException, MQClientException, InterruptedException { + return this.mqClientInstance.getMQClientAPIImpl().getTopicsByCluster(clusterName, timeoutMillis); + } + + @Override + public KVTable fetchBrokerRuntimeStats(final String brokerAddr) throws RemotingConnectException, RemotingSendRequestException, + RemotingTimeoutException, InterruptedException, MQBrokerException { + return this.mqClientInstance.getMQClientAPIImpl().getBrokerRuntimeInfo(brokerAddr, timeoutMillis); + } + + @Override + public ConsumeStats examineConsumeStats(String consumerGroup) throws RemotingException, MQClientException, InterruptedException, + MQBrokerException { + return examineConsumeStats(consumerGroup, null); + } + + @Override + public ConsumeStats examineConsumeStats(String consumerGroup, String topic) throws RemotingException, MQClientException, + InterruptedException, MQBrokerException { + String retryTopic = MixAll.getRetryTopic(consumerGroup); + TopicRouteData topicRouteData = this.examineTopicRouteInfo(retryTopic); + ConsumeStats result = new ConsumeStats(); + + for (BrokerData bd : topicRouteData.getBrokerDatas()) { + String addr = bd.selectBrokerAddr(); + if (addr != null) { + ConsumeStats consumeStats = + this.mqClientInstance.getMQClientAPIImpl().getConsumeStats(addr, consumerGroup, topic, timeoutMillis * 3); + result.getOffsetTable().putAll(consumeStats.getOffsetTable()); + double value = result.getConsumeTps() + consumeStats.getConsumeTps(); + result.setConsumeTps(value); + } + } + + if (result.getOffsetTable().isEmpty()) { + throw new MQClientException(ResponseCode.CONSUMER_NOT_ONLINE, + "Not found the consumer group consume stats, because return offset table is empty, maybe the consumer not consume any message"); + } + + return result; + } + + @Override + public ClusterInfo examineBrokerClusterInfo() throws InterruptedException, MQBrokerException, RemotingTimeoutException, + RemotingSendRequestException, RemotingConnectException { + return this.mqClientInstance.getMQClientAPIImpl().getBrokerClusterInfo(timeoutMillis); + } + + @Override + public TopicRouteData examineTopicRouteInfo(String topic) throws RemotingException, MQClientException, InterruptedException { + return this.mqClientInstance.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic, timeoutMillis); + } + + + @Override + public MessageExt viewMessage(String topic, String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + try { + MessageDecoder.decodeMessageId(msgId); + return this.viewMessage(msgId); + } catch (Exception e) { + log.warn("the msgId maybe created by new client. msgId={}", msgId, e); + } + return this.mqClientInstance.getMQAdminImpl().queryMessageByUniqKey(topic, msgId); + } + + @Override + public ConsumerConnection examineConsumerConnectionInfo(String consumerGroup) throws InterruptedException, MQBrokerException, + RemotingException, MQClientException { + ConsumerConnection result = new ConsumerConnection(); + String topic = MixAll.getRetryTopic(consumerGroup); + List<BrokerData> brokers = this.examineTopicRouteInfo(topic).getBrokerDatas(); + BrokerData brokerData = brokers.get(random.nextInt(brokers.size())); + String addr = null; + if (brokerData != null) { + addr = brokerData.selectBrokerAddr(); + if (StringUtils.isNotBlank(addr)) { + result = this.mqClientInstance.getMQClientAPIImpl().getConsumerConnectionList(addr, consumerGroup, timeoutMillis); + } + } + + if (result.getConnectionSet().isEmpty()) { + log.warn("the consumer group not online. brokerAddr={}, group={}", addr, consumerGroup); + throw new MQClientException(ResponseCode.CONSUMER_NOT_ONLINE, "Not found the consumer group connection"); + } + + return result; + } + + @Override + public ProducerConnection examineProducerConnectionInfo(String producerGroup, final String topic) throws RemotingException, + MQClientException, InterruptedException, MQBrokerException { + ProducerConnection result = new ProducerConnection(); + List<BrokerData> brokers = this.examineTopicRouteInfo(topic).getBrokerDatas(); + BrokerData brokerData = brokers.get(random.nextInt(brokers.size())); + String addr = null; + if (brokerData != null) { + addr = brokerData.selectBrokerAddr(); + if (StringUtils.isNotBlank(addr)) { + result = this.mqClientInstance.getMQClientAPIImpl().getProducerConnectionList(addr, producerGroup, timeoutMillis); + } + } + + if (result.getConnectionSet().isEmpty()) { + log.warn("the producer group not online. brokerAddr={}, group={}", addr, producerGroup); + throw new MQClientException("Not found the producer group connection", null); + } + + return result; + } + + @Override + public List<String> getNameServerAddressList() { + return this.mqClientInstance.getMQClientAPIImpl().getNameServerAddressList(); + } + + @Override + public int wipeWritePermOfBroker(final String namesrvAddr, String brokerName) throws RemotingCommandException, + RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException { + return this.mqClientInstance.getMQClientAPIImpl().wipeWritePermOfBroker(namesrvAddr, brokerName, timeoutMillis); + } + + @Override + public void putKVConfig(String namespace, String key, String value) { + // TODO Auto-generated method stub + + } + + @Override + public String getKVConfig(String namespace, String key) throws RemotingException, MQClientException, InterruptedException { + return this.mqClientInstance.getMQClientAPIImpl().getKVConfigValue(namespace, key, timeoutMillis); + } + + @Override + public KVTable getKVListByNamespace(String namespace) throws RemotingException, MQClientException, InterruptedException { + return this.mqClientInstance.getMQClientAPIImpl().getKVListByNamespace(namespace, timeoutMillis); + } + + @Override + public void deleteTopicInBroker(Set<String> addrs, String topic) throws RemotingException, MQBrokerException, InterruptedException, + MQClientException { + for (String addr : addrs) { + this.mqClientInstance.getMQClientAPIImpl().deleteTopicInBroker(addr, topic, timeoutMillis); + } + } + + @Override + public void deleteTopicInNameServer(Set<String> addrs, String topic) throws RemotingException, MQBrokerException, InterruptedException, + MQClientException { + if (addrs == null) { + String ns = this.mqClientInstance.getMQClientAPIImpl().fetchNameServerAddr(); + addrs = new HashSet(Arrays.asList(ns.split(";"))); + } + for (String addr : addrs) { + this.mqClientInstance.getMQClientAPIImpl().deleteTopicInNameServer(addr, topic, timeoutMillis); + } + } + + @Override + public void deleteSubscriptionGroup(String addr, String groupName) throws RemotingException, MQBrokerException, InterruptedException, + MQClientException { + this.mqClientInstance.getMQClientAPIImpl().deleteSubscriptionGroup(addr, groupName, timeoutMillis); + } + + @Override + public void createAndUpdateKvConfig(String namespace, String key, String value) throws RemotingException, MQBrokerException, + InterruptedException, MQClientException { + this.mqClientInstance.getMQClientAPIImpl().putKVConfigValue(namespace, key, value, timeoutMillis); + } + + @Override + public void deleteKvConfig(String namespace, String key) throws RemotingException, MQBrokerException, InterruptedException, + MQClientException { + this.mqClientInstance.getMQClientAPIImpl().deleteKVConfigValue(namespace, key, timeoutMillis); + } + + @Override + public List<RollbackStats> resetOffsetByTimestampOld(String consumerGroup, String topic, long timestamp, boolean force) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic); + List<RollbackStats> rollbackStatsList = new ArrayList<RollbackStats>(); + Map<String, Integer> topicRouteMap = new HashMap<String, Integer>(); + for (BrokerData bd : topicRouteData.getBrokerDatas()) { + for (QueueData queueData : topicRouteData.getQueueDatas()) { + topicRouteMap.put(bd.selectBrokerAddr(), queueData.getReadQueueNums()); + } + } + for (BrokerData bd : topicRouteData.getBrokerDatas()) { + String addr = bd.selectBrokerAddr(); + if (addr != null) { + ConsumeStats consumeStats = this.mqClientInstance.getMQClientAPIImpl().getConsumeStats(addr, consumerGroup, timeoutMillis); + + boolean hasConsumed = false; + for (Map.Entry<MessageQueue, OffsetWrapper> entry : consumeStats.getOffsetTable().entrySet()) { + MessageQueue queue = entry.getKey(); + OffsetWrapper offsetWrapper = entry.getValue(); + if (topic.equals(queue.getTopic())) { + hasConsumed = true; + RollbackStats rollbackStats = resetOffsetConsumeOffset(addr, consumerGroup, queue, offsetWrapper, timestamp, force); + rollbackStatsList.add(rollbackStats); + } + } + + if (!hasConsumed) { + HashMap<MessageQueue, TopicOffset> topicStatus = + this.mqClientInstance.getMQClientAPIImpl().getTopicStatsInfo(addr, topic, timeoutMillis).getOffsetTable(); + for (int i = 0; i < topicRouteMap.get(addr); i++) { + MessageQueue queue = new MessageQueue(topic, bd.getBrokerName(), i); + OffsetWrapper offsetWrapper = new OffsetWrapper(); + offsetWrapper.setBrokerOffset(topicStatus.get(queue).getMaxOffset()); + offsetWrapper.setConsumerOffset(topicStatus.get(queue).getMinOffset()); + + RollbackStats rollbackStats = resetOffsetConsumeOffset(addr, consumerGroup, queue, offsetWrapper, timestamp, force); + rollbackStatsList.add(rollbackStats); + } + } + } + } + return rollbackStatsList; + } + + @Override + public Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String group, long timestamp, boolean isForce) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + return resetOffsetByTimestamp(topic, group, timestamp, isForce, false); + } + + @Override + public void resetOffsetNew(String consumerGroup, String topic, long timestamp) throws RemotingException, MQBrokerException, + InterruptedException, MQClientException { + try { + this.resetOffsetByTimestamp(topic, consumerGroup, timestamp, true); + } catch (MQClientException e) { + if (ResponseCode.CONSUMER_NOT_ONLINE == e.getResponseCode()) { + this.resetOffsetByTimestampOld(consumerGroup, topic, timestamp, true); + return; + } + throw e; + } + } + + public Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String group, long timestamp, boolean isForce, boolean isC) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic); + List<BrokerData> brokerDatas = topicRouteData.getBrokerDatas(); + Map<MessageQueue, Long> allOffsetTable = new HashMap<MessageQueue, Long>(); + if (brokerDatas != null) { + for (BrokerData brokerData : brokerDatas) { + String addr = brokerData.selectBrokerAddr(); + if (addr != null) { + Map<MessageQueue, Long> offsetTable = + this.mqClientInstance.getMQClientAPIImpl().invokeBrokerToResetOffset(addr, topic, group, timestamp, isForce, + timeoutMillis, isC); + if (offsetTable != null) { + allOffsetTable.putAll(offsetTable); + } + } + } + } + return allOffsetTable; + } + + private RollbackStats resetOffsetConsumeOffset(String brokerAddr, String consumeGroup, MessageQueue queue, OffsetWrapper offsetWrapper, + long timestamp, boolean force) throws RemotingException, InterruptedException, MQBrokerException { + long resetOffset; + if (timestamp == -1) { + + resetOffset = this.mqClientInstance.getMQClientAPIImpl().getMaxOffset(brokerAddr, queue.getTopic(), queue.getQueueId(), timeoutMillis); + } else { + resetOffset = + this.mqClientInstance.getMQClientAPIImpl().searchOffset(brokerAddr, queue.getTopic(), queue.getQueueId(), timestamp, + timeoutMillis); + } + + + RollbackStats rollbackStats = new RollbackStats(); + rollbackStats.setBrokerName(queue.getBrokerName()); + rollbackStats.setQueueId(queue.getQueueId()); + rollbackStats.setBrokerOffset(offsetWrapper.getBrokerOffset()); + rollbackStats.setConsumerOffset(offsetWrapper.getConsumerOffset()); + rollbackStats.setTimestampOffset(resetOffset); + rollbackStats.setRollbackOffset(offsetWrapper.getConsumerOffset()); + + if (force || resetOffset <= offsetWrapper.getConsumerOffset()) { + rollbackStats.setRollbackOffset(resetOffset); + UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader(); + requestHeader.setConsumerGroup(consumeGroup); + requestHeader.setTopic(queue.getTopic()); + requestHeader.setQueueId(queue.getQueueId()); + requestHeader.setCommitOffset(resetOffset); + this.mqClientInstance.getMQClientAPIImpl().updateConsumerOffset(brokerAddr, requestHeader, timeoutMillis); + } + return rollbackStats; + } + + @Override + public Map<String, Map<MessageQueue, Long>> getConsumeStatus(String topic, String group, String clientAddr) throws RemotingException, + MQBrokerException, InterruptedException, MQClientException { + TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic); + List<BrokerData> brokerDatas = topicRouteData.getBrokerDatas(); + if (brokerDatas != null && brokerDatas.size() > 0) { + String addr = brokerDatas.get(0).selectBrokerAddr(); + if (addr != null) { + return this.mqClientInstance.getMQClientAPIImpl().invokeBrokerToGetConsumerStatus(addr, topic, group, clientAddr, + timeoutMillis); + } + } + return Collections.EMPTY_MAP; + } + + public void createOrUpdateOrderConf(String key, String value, boolean isCluster) throws RemotingException, MQBrokerException, + InterruptedException, MQClientException { + + if (isCluster) { + this.mqClientInstance.getMQClientAPIImpl() + .putKVConfigValue(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, key, value, timeoutMillis); + } else { + String oldOrderConfs = null; + try { + oldOrderConfs = + this.mqClientInstance.getMQClientAPIImpl().getKVConfigValue(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, key, + timeoutMillis); + } catch (Exception e) { + e.printStackTrace(); + } + + Map<String, String> orderConfMap = new HashMap<String, String>(); + if (!UtilAll.isBlank(oldOrderConfs)) { + String[] oldOrderConfArr = oldOrderConfs.split(";"); + for (String oldOrderConf : oldOrderConfArr) { + String[] items = oldOrderConf.split(":"); + orderConfMap.put(items[0], oldOrderConf); + } + } + String[] items = value.split(":"); + orderConfMap.put(items[0], value); + + StringBuilder newOrderConf = new StringBuilder(); + String splitor = ""; + for (Map.Entry<String, String> entry : orderConfMap.entrySet()) { + newOrderConf.append(splitor).append(entry.getValue()); + splitor = ";"; + } + this.mqClientInstance.getMQClientAPIImpl().putKVConfigValue(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, key, + newOrderConf.toString(), timeoutMillis); + } + } + + @Override + public GroupList queryTopicConsumeByWho(String topic) throws InterruptedException, MQBrokerException, RemotingException, + MQClientException { + TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic); + + for (BrokerData bd : topicRouteData.getBrokerDatas()) { + String addr = bd.selectBrokerAddr(); + if (addr != null) { + return this.mqClientInstance.getMQClientAPIImpl().queryTopicConsumeByWho(addr, topic, timeoutMillis); + } + + break; + } + + return null; + } + + @Override + public List<QueueTimeSpan> queryConsumeTimeSpan(final String topic, final String group) throws InterruptedException, MQBrokerException, + RemotingException, MQClientException { + List<QueueTimeSpan> spanSet = new ArrayList<QueueTimeSpan>(); + TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic); + for (BrokerData bd : topicRouteData.getBrokerDatas()) { + String addr = bd.selectBrokerAddr(); + if (addr != null) { + spanSet.addAll(this.mqClientInstance.getMQClientAPIImpl().queryConsumeTimeSpan(addr, topic, group, timeoutMillis)); + } + } + return spanSet; + } + + @Override + public boolean cleanExpiredConsumerQueue(String cluster) throws RemotingConnectException, RemotingSendRequestException, + RemotingTimeoutException, MQClientException, InterruptedException { + boolean result = false; + try { + ClusterInfo clusterInfo = examineBrokerClusterInfo(); + if (null == cluster || "".equals(cluster)) { + for (String targetCluster : clusterInfo.retrieveAllClusterNames()) { + result = cleanExpiredConsumerQueueByCluster(clusterInfo, targetCluster); + } + } else { + result = cleanExpiredConsumerQueueByCluster(clusterInfo, cluster); + } + } catch (MQBrokerException e) { + log.error("cleanExpiredConsumerQueue error.", e); + } + + return result; + } + + public boolean cleanExpiredConsumerQueueByCluster(ClusterInfo clusterInfo, String cluster) throws RemotingConnectException, + RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException { + boolean result = false; + String[] addrs = clusterInfo.retrieveAllAddrByCluster(cluster); + for (String addr : addrs) { + result = cleanExpiredConsumerQueueByAddr(addr); + } + return result; + } + + @Override + public boolean cleanExpiredConsumerQueueByAddr(String addr) throws RemotingConnectException, RemotingSendRequestException, + RemotingTimeoutException, MQClientException, InterruptedException { + boolean result = mqClientInstance.getMQClientAPIImpl().cleanExpiredConsumeQueue(addr, timeoutMillis); + log.warn("clean expired ConsumeQueue on target " + addr + " broker " + result); + return result; + } + + @Override + public boolean cleanUnusedTopic(String cluster) throws RemotingConnectException, RemotingSendRequestException, + RemotingTimeoutException, MQClientException, InterruptedException { + boolean result = false; + try { + ClusterInfo clusterInfo = examineBrokerClusterInfo(); + if (null == cluster || "".equals(cluster)) { + for (String targetCluster : clusterInfo.retrieveAllClusterNames()) { + result = cleanUnusedTopicByCluster(clusterInfo, targetCluster); + } + } else { + result = cleanUnusedTopicByCluster(clusterInfo, cluster); + } + } catch (MQBrokerException e) { + log.error("cleanExpiredConsumerQueue error.", e); + } + + return result; + } + + public boolean cleanUnusedTopicByCluster(ClusterInfo clusterInfo, String cluster) throws RemotingConnectException, + RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException { + boolean result = false; + String[] addrs = clusterInfo.retrieveAllAddrByCluster(cluster); + for (String addr : addrs) { + result = cleanUnusedTopicByAddr(addr); + } + return result; + } + + @Override + public boolean cleanUnusedTopicByAddr(String addr) throws RemotingConnectException, RemotingSendRequestException, + RemotingTimeoutException, MQClientException, InterruptedException { + boolean result = mqClientInstance.getMQClientAPIImpl().cleanUnusedTopicByAddr(addr, timeoutMillis); + log.warn("clean expired ConsumeQueue on target " + addr + " broker " + result); + return result; + } + + @Override + public ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String clientId, boolean jstack) throws RemotingException, + MQClientException, InterruptedException { + String topic = MixAll.RETRY_GROUP_TOPIC_PREFIX + consumerGroup; + TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic); + List<BrokerData> brokerDatas = topicRouteData.getBrokerDatas(); + if (brokerDatas != null) { + for (BrokerData brokerData : brokerDatas) { + String addr = brokerData.selectBrokerAddr(); + if (addr != null) { + return this.mqClientInstance.getMQClientAPIImpl().getConsumerRunningInfo(addr, consumerGroup, clientId, jstack, + timeoutMillis * 3); + } + } + } + return null; + } + + @Override + public ConsumeMessageDirectlyResult consumeMessageDirectly(String consumerGroup, String clientId, String msgId) + throws RemotingException, MQClientException, InterruptedException, MQBrokerException { + MessageExt msg = this.viewMessage(msgId); + + return this.mqClientInstance.getMQClientAPIImpl().consumeMessageDirectly(RemotingUtil.socketAddress2String(msg.getStoreHost()), + consumerGroup, clientId, msgId, timeoutMillis * 3); + } + + @Override + public ConsumeMessageDirectlyResult consumeMessageDirectly(final String consumerGroup, final String clientId, final String topic, final String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException { + MessageExt msg = this.viewMessage(topic, msgId); + if (msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX) == null) { + return this.mqClientInstance.getMQClientAPIImpl().consumeMessageDirectly(RemotingUtil.socketAddress2String(msg.getStoreHost()), + consumerGroup, clientId, msgId, timeoutMillis * 3); + } else { + MessageClientExt msgClient = (MessageClientExt) msg; + return this.mqClientInstance.getMQClientAPIImpl().consumeMessageDirectly(RemotingUtil.socketAddress2String(msg.getStoreHost()), + consumerGroup, clientId, msgClient.getOffsetMsgId(), timeoutMillis * 3); + } + } + + @Override + public List<MessageTrack> messageTrackDetail(MessageExt msg) throws RemotingException, MQClientException, InterruptedException, + MQBrokerException { + List<MessageTrack> result = new ArrayList<MessageTrack>(); + + GroupList groupList = this.queryTopicConsumeByWho(msg.getTopic()); + + for (String group : groupList.getGroupList()) { + + MessageTrack mt = new MessageTrack(); + mt.setConsumerGroup(group); + mt.setTrackType(TrackType.UNKNOWN); + ConsumerConnection cc = null; + try { + cc = this.examineConsumerConnectionInfo(group); + } catch (MQBrokerException e) { + if (ResponseCode.CONSUMER_NOT_ONLINE == e.getResponseCode()) { + mt.setTrackType(TrackType.NOT_ONLINE); + } + mt.setExceptionDesc("CODE:" + e.getResponseCode() + " DESC:" + e.getErrorMessage()); + result.add(mt); + continue; + } catch (Exception e) { + mt.setExceptionDesc(RemotingHelper.exceptionSimpleDesc(e)); + result.add(mt); + continue; + } + + switch (cc.getConsumeType()) { + case CONSUME_ACTIVELY: + mt.setTrackType(TrackType.PULL); + break; + case CONSUME_PASSIVELY: + boolean ifConsumed = false; + try { + ifConsumed = this.consumed(msg, group); + } catch (MQClientException e) { + if (ResponseCode.CONSUMER_NOT_ONLINE == e.getResponseCode()) { + mt.setTrackType(TrackType.NOT_ONLINE); + } + mt.setExceptionDesc("CODE:" + e.getResponseCode() + " DESC:" + e.getErrorMessage()); + result.add(mt); + continue; + } catch (MQBrokerException e) { + if (ResponseCode.CONSUMER_NOT_ONLINE == e.getResponseCode()) { + mt.setTrackType(TrackType.NOT_ONLINE); + } + mt.setExceptionDesc("CODE:" + e.getResponseCode() + " DESC:" + e.getErrorMessage()); + result.add(mt); + continue; + } catch (Exception e) { + mt.setExceptionDesc(RemotingHelper.exceptionSimpleDesc(e)); + result.add(mt); + continue; + } + + if (ifConsumed) { + mt.setTrackType(TrackType.CONSUMED); + Iterator<Entry<String, SubscriptionData>> it = cc.getSubscriptionTable().entrySet().iterator(); + while (it.hasNext()) { + Entry<String, SubscriptionData> next = it.next(); + if (next.getKey().equals(msg.getTopic())) { + if (next.getValue().getTagsSet().contains(msg.getTags()) + || next.getValue().getTagsSet().contains("*") + || next.getValue().getTagsSet().isEmpty()) { + } else { + mt.setTrackType(TrackType.CONSUMED_BUT_FILTERED); + } + } + } + } else { + mt.setTrackType(TrackType.NOT_CONSUME_YET); + } + break; + default: + break; + } + result.add(mt); + } + + return result; + } + + public boolean consumed(final MessageExt msg, final String group) throws RemotingException, MQClientException, InterruptedException, + MQBrokerException { + + ConsumeStats cstats = this.examineConsumeStats(group); + + ClusterInfo ci = this.examineBrokerClusterInfo(); + + Iterator<Entry<MessageQueue, OffsetWrapper>> it = cstats.getOffsetTable().entrySet().iterator(); + while (it.hasNext()) { + Entry<MessageQueue, OffsetWrapper> next = it.next(); + MessageQueue mq = next.getKey(); + if (mq.getTopic().equals(msg.getTopic()) && mq.getQueueId() == msg.getQueueId()) { + BrokerData brokerData = ci.getBrokerAddrTable().get(mq.getBrokerName()); + if (brokerData != null) { + String addr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID); + if (addr.equals(RemotingUtil.socketAddress2String(msg.getStoreHost()))) { + if (next.getValue().getConsumerOffset() > msg.getQueueOffset()) { + return true; + } + } + } + } + } + + return false; + } + + @Override + public void cloneGroupOffset(String srcGroup, String destGroup, String topic, boolean isOffline) throws RemotingException, + MQClientException, InterruptedException, MQBrokerException { + String retryTopic = MixAll.getRetryTopic(srcGroup); + TopicRouteData topicRouteData = this.examineTopicRouteInfo(retryTopic); + + for (BrokerData bd : topicRouteData.getBrokerDatas()) { + String addr = bd.selectBrokerAddr(); + if (addr != null) { + this.mqClientInstance.getMQClientAPIImpl().cloneGroupOffset(addr, srcGroup, destGroup, topic, isOffline, timeoutMillis * 3); + } + } + } + + @Override + public BrokerStatsData viewBrokerStatsData(String brokerAddr, String statsName, String statsKey) throws RemotingConnectException, + RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException { + return this.mqClientInstance.getMQClientAPIImpl().viewBrokerStatsData(brokerAddr, statsName, statsKey, timeoutMillis); + } + + @Override + public Set<String> getClusterList(String topic) throws RemotingConnectException, RemotingSendRequestException, + RemotingTimeoutException, MQClientException, InterruptedException { + return this.mqClientInstance.getMQClientAPIImpl().getClusterList(topic, timeoutMillis); + } + + @Override + public ConsumeStatsList fetchConsumeStatsInBroker(final String brokerAddr, boolean isOrder, long timeoutMillis) + throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, + InterruptedException { + return this.mqClientInstance.getMQClientAPIImpl().fetchConsumeStatsInBroker(brokerAddr, isOrder, timeoutMillis); + } + + @Override + public Set<String> getTopicClusterList(final String topic) throws InterruptedException, MQBrokerException, MQClientException, + RemotingException { + Set<String> clusterSet = new HashSet<String>(); + ClusterInfo clusterInfo = examineBrokerClusterInfo(); + TopicRouteData topicRouteData = examineTopicRouteInfo(topic); + BrokerData brokerData = topicRouteData.getBrokerDatas().get(0); + String brokerName = brokerData.getBrokerName(); + Iterator<Map.Entry<String, Set<String>>> it = clusterInfo.getClusterAddrTable().entrySet().iterator(); + while (it.hasNext()) { + Map.Entry<String, Set<String>> next = it.next(); + if (next.getValue().contains(brokerName)) { + clusterSet.add(next.getKey()); + } + } + return clusterSet; + } + + @Override + public SubscriptionGroupWrapper getAllSubscriptionGroup(final String brokerAddr, long timeoutMillis) throws InterruptedException, + RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException { + return this.mqClientInstance.getMQClientAPIImpl().getAllSubscriptionGroup(brokerAddr, timeoutMillis); + } + + @Override + public TopicConfigSerializeWrapper getAllTopicGroup(final String brokerAddr, long timeoutMillis) throws InterruptedException, + RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException { + return this.mqClientInstance.getMQClientAPIImpl().getAllTopicConfig(brokerAddr, timeoutMillis); + } + + @Override + public void createTopic(String key, String newTopic, int queueNum) throws MQClientException { + createTopic(key, newTopic, queueNum, 0); + } + + @Override + public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException { + this.mqClientInstance.getMQAdminImpl().createTopic(key, newTopic, queueNum, topicSysFlag); + } + + @Override + public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException { + return this.mqClientInstance.getMQAdminImpl().searchOffset(mq, timestamp); + } + + @Override + public long maxOffset(MessageQueue mq) throws MQClientException { + return this.mqClientInstance.getMQAdminImpl().maxOffset(mq); + } + + @Override + public long minOffset(MessageQueue mq) throws MQClientException { + return this.mqClientInstance.getMQAdminImpl().minOffset(mq); + } + + @Override + public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException { + return this.mqClientInstance.getMQAdminImpl().earliestMsgStoreTime(mq); + } + + @Override + public MessageExt viewMessage(String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + return this.mqClientInstance.getMQAdminImpl().viewMessage(msgId); + } + + @Override + public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) throws MQClientException, + InterruptedException { + return this.mqClientInstance.getMQAdminImpl().queryMessage(topic, key, maxNum, begin, end); + } + + @Override + public void updateConsumeOffset(String brokerAddr, String consumeGroup, MessageQueue mq, long offset) throws RemotingException, InterruptedException, MQBrokerException { + UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader(); + requestHeader.setConsumerGroup(consumeGroup); + requestHeader.setTopic(mq.getTopic()); + requestHeader.setQueueId(mq.getQueueId()); + requestHeader.setCommitOffset(offset); + this.mqClientInstance.getMQClientAPIImpl().updateConsumerOffset(brokerAddr, requestHeader, timeoutMillis); + } + + @Override + public void updateNameServerConfig(final Properties properties, final List<String> nameServers) + throws InterruptedException, RemotingConnectException, + UnsupportedEncodingException, RemotingSendRequestException, RemotingTimeoutException, + MQClientException, MQBrokerException { + this.mqClientInstance.getMQClientAPIImpl().updateNameServerConfig(properties, nameServers, timeoutMillis); + } + + @Override + public Map<String, Properties> getNameServerConfig(final List<String> nameServers) + throws InterruptedException, RemotingTimeoutException, + RemotingSendRequestException, RemotingConnectException, MQClientException, + UnsupportedEncodingException { + return this.mqClientInstance.getMQClientAPIImpl().getNameServerConfig(nameServers, timeoutMillis); + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/MQAdminExt.java ---------------------------------------------------------------------- diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/MQAdminExt.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/MQAdminExt.java new file mode 100644 index 0000000..0075983 --- /dev/null +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/MQAdminExt.java @@ -0,0 +1,229 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.tools.admin; + +import com.alibaba.rocketmq.client.MQAdmin; +import com.alibaba.rocketmq.client.exception.MQBrokerException; +import com.alibaba.rocketmq.client.exception.MQClientException; +import com.alibaba.rocketmq.common.TopicConfig; +import com.alibaba.rocketmq.common.admin.ConsumeStats; +import com.alibaba.rocketmq.common.admin.RollbackStats; +import com.alibaba.rocketmq.common.admin.TopicStatsTable; +import com.alibaba.rocketmq.common.message.MessageExt; +import com.alibaba.rocketmq.common.message.MessageQueue; +import com.alibaba.rocketmq.common.protocol.body.*; +import com.alibaba.rocketmq.common.protocol.route.TopicRouteData; +import com.alibaba.rocketmq.common.subscription.SubscriptionGroupConfig; +import com.alibaba.rocketmq.remoting.exception.*; +import com.alibaba.rocketmq.tools.admin.api.MessageTrack; + +import java.io.UnsupportedEncodingException; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +public interface MQAdminExt extends MQAdmin { + void start() throws MQClientException; + + void shutdown(); + + void updateBrokerConfig(final String brokerAddr, final Properties properties) throws RemotingConnectException, + RemotingSendRequestException, RemotingTimeoutException, UnsupportedEncodingException, InterruptedException, MQBrokerException; + + Properties getBrokerConfig(final String brokerAddr) throws RemotingConnectException, + RemotingSendRequestException, RemotingTimeoutException, UnsupportedEncodingException, InterruptedException, MQBrokerException; + + void createAndUpdateTopicConfig(final String addr, final TopicConfig config) throws RemotingException, MQBrokerException, + InterruptedException, MQClientException; + + void createAndUpdateSubscriptionGroupConfig(final String addr, final SubscriptionGroupConfig config) throws RemotingException, + MQBrokerException, InterruptedException, MQClientException; + + SubscriptionGroupConfig examineSubscriptionGroupConfig(final String addr, final String group); + + TopicConfig examineTopicConfig(final String addr, final String topic); + + TopicStatsTable examineTopicStats(final String topic) throws RemotingException, MQClientException, InterruptedException, + MQBrokerException; + + TopicList fetchAllTopicList() throws RemotingException, MQClientException, InterruptedException; + + TopicList fetchTopicsByCLuster(String clusterName) throws RemotingException, MQClientException, InterruptedException; + + KVTable fetchBrokerRuntimeStats(final String brokerAddr) throws RemotingConnectException, RemotingSendRequestException, + RemotingTimeoutException, InterruptedException, MQBrokerException; + + ConsumeStats examineConsumeStats(final String consumerGroup) throws RemotingException, MQClientException, InterruptedException, + MQBrokerException; + + ConsumeStats examineConsumeStats(final String consumerGroup, final String topic) throws RemotingException, MQClientException, + InterruptedException, MQBrokerException; + + ClusterInfo examineBrokerClusterInfo() throws InterruptedException, MQBrokerException, RemotingTimeoutException, + RemotingSendRequestException, RemotingConnectException; + + TopicRouteData examineTopicRouteInfo(final String topic) throws RemotingException, MQClientException, InterruptedException; + + ConsumerConnection examineConsumerConnectionInfo(final String consumerGroup) throws RemotingConnectException, + RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException, RemotingException, + MQClientException; + + ProducerConnection examineProducerConnectionInfo(final String producerGroup, final String topic) throws RemotingException, + MQClientException, InterruptedException, MQBrokerException; + + List<String> getNameServerAddressList(); + + int wipeWritePermOfBroker(final String namesrvAddr, String brokerName) throws RemotingCommandException, + RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException; + + void putKVConfig(final String namespace, final String key, final String value); + + String getKVConfig(final String namespace, final String key) throws RemotingException, MQClientException, InterruptedException; + + KVTable getKVListByNamespace(final String namespace) throws RemotingException, MQClientException, InterruptedException; + + void deleteTopicInBroker(final Set<String> addrs, final String topic) throws RemotingException, MQBrokerException, + InterruptedException, MQClientException; + + void deleteTopicInNameServer(final Set<String> addrs, final String topic) throws RemotingException, MQBrokerException, + InterruptedException, MQClientException; + + void deleteSubscriptionGroup(final String addr, String groupName) throws RemotingException, MQBrokerException, + InterruptedException, MQClientException; + + void createAndUpdateKvConfig(String namespace, String key, String value) throws RemotingException, MQBrokerException, + InterruptedException, MQClientException; + + void deleteKvConfig(String namespace, String key) throws RemotingException, MQBrokerException, InterruptedException, + MQClientException; + + List<RollbackStats> resetOffsetByTimestampOld(String consumerGroup, String topic, long timestamp, boolean force) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException; + + Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String group, long timestamp, boolean isForce) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException; + + void resetOffsetNew(String consumerGroup, String topic, long timestamp) throws RemotingException, MQBrokerException, + InterruptedException, MQClientException; + + Map<String, Map<MessageQueue, Long>> getConsumeStatus(String topic, String group, String clientAddr) throws RemotingException, + MQBrokerException, InterruptedException, MQClientException; + + void createOrUpdateOrderConf(String key, String value, boolean isCluster) throws RemotingException, MQBrokerException, + InterruptedException, MQClientException; + + GroupList queryTopicConsumeByWho(final String topic) throws RemotingConnectException, RemotingSendRequestException, + RemotingTimeoutException, InterruptedException, MQBrokerException, RemotingException, MQClientException; + + List<QueueTimeSpan> queryConsumeTimeSpan(final String topic, final String group) throws InterruptedException, MQBrokerException, + RemotingException, MQClientException; + + boolean cleanExpiredConsumerQueue(String cluster) throws RemotingConnectException, RemotingSendRequestException, + RemotingTimeoutException, MQClientException, InterruptedException; + + boolean cleanExpiredConsumerQueueByAddr(String addr) throws RemotingConnectException, RemotingSendRequestException, + RemotingTimeoutException, MQClientException, InterruptedException; + + boolean cleanUnusedTopic(String cluster) throws RemotingConnectException, RemotingSendRequestException, + RemotingTimeoutException, MQClientException, InterruptedException; + + + boolean cleanUnusedTopicByAddr(String addr) throws RemotingConnectException, RemotingSendRequestException, + RemotingTimeoutException, MQClientException, InterruptedException; + + ConsumerRunningInfo getConsumerRunningInfo(final String consumerGroup, final String clientId, final boolean jstack) + throws RemotingException, MQClientException, InterruptedException; + + ConsumeMessageDirectlyResult consumeMessageDirectly(String consumerGroup, + String clientId, + String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException; + + ConsumeMessageDirectlyResult consumeMessageDirectly(String consumerGroup, + String clientId, + String topic, + String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException; + + List<MessageTrack> messageTrackDetail(MessageExt msg) throws RemotingException, MQClientException, InterruptedException, + MQBrokerException; + + void cloneGroupOffset(String srcGroup, String destGroup, String topic, boolean isOffline) throws RemotingException, + MQClientException, InterruptedException, MQBrokerException; + + BrokerStatsData viewBrokerStatsData(final String brokerAddr, final String statsName, final String statsKey) + throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, + InterruptedException; + + Set<String> getClusterList(final String topic) throws RemotingConnectException, RemotingSendRequestException, + RemotingTimeoutException, MQClientException, InterruptedException; + + ConsumeStatsList fetchConsumeStatsInBroker(final String brokerAddr, boolean isOrder, long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, + RemotingTimeoutException, MQClientException, InterruptedException; + + Set<String> getTopicClusterList(final String topic) throws InterruptedException, MQBrokerException, MQClientException, RemotingException; + + SubscriptionGroupWrapper getAllSubscriptionGroup(final String brokerAddr, long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, + RemotingConnectException, MQBrokerException; + + TopicConfigSerializeWrapper getAllTopicGroup(final String brokerAddr, long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, + RemotingConnectException, MQBrokerException; + + void updateConsumeOffset(String brokerAddr, String consumeGroup, MessageQueue mq, long offset) throws RemotingException, InterruptedException, MQBrokerException; + + /** + * Update name server config. + * <br> + * Command Code : RequestCode.UPDATE_NAMESRV_CONFIG + * + * <br> If param(nameServers) is null or empty, will use name servers from ns! + * + * @param properties + * @param nameServers + * + * @throws InterruptedException + * @throws RemotingConnectException + * @throws UnsupportedEncodingException + * @throws RemotingSendRequestException + * @throws RemotingTimeoutException + * @throws MQClientException + * @throws MQBrokerException + */ + void updateNameServerConfig(final Properties properties, final List<String> nameServers) throws InterruptedException, RemotingConnectException, + UnsupportedEncodingException, RemotingSendRequestException, RemotingTimeoutException, + MQClientException, MQBrokerException; + + /** + * Get name server config. + * <br> + * Command Code : RequestCode.GET_NAMESRV_CONFIG + * <br> If param(nameServers) is null or empty, will use name servers from ns! + * + * @param nameServers + * + * @return The fetched name server config + * + * @throws InterruptedException + * @throws RemotingTimeoutException + * @throws RemotingSendRequestException + * @throws RemotingConnectException + * @throws MQClientException + * @throws UnsupportedEncodingException + */ + Map<String, Properties> getNameServerConfig(final List<String> nameServers) throws InterruptedException, + RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, + MQClientException, UnsupportedEncodingException; +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/api/MessageTrack.java ---------------------------------------------------------------------- diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/api/MessageTrack.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/api/MessageTrack.java new file mode 100644 index 0000000..bdc7288 --- /dev/null +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/api/MessageTrack.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.tools.admin.api; + +public class MessageTrack { + private String consumerGroup; + private TrackType trackType; + private String exceptionDesc; + + + public String getConsumerGroup() { + return consumerGroup; + } + + + public void setConsumerGroup(String consumerGroup) { + this.consumerGroup = consumerGroup; + } + + + public TrackType getTrackType() { + return trackType; + } + + + public void setTrackType(TrackType trackType) { + this.trackType = trackType; + } + + + public String getExceptionDesc() { + return exceptionDesc; + } + + + public void setExceptionDesc(String exceptionDesc) { + this.exceptionDesc = exceptionDesc; + } + + + @Override + public String toString() { + return "MessageTrack [consumerGroup=" + consumerGroup + ", trackType=" + trackType + + ", exceptionDesc=" + exceptionDesc + "]"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/api/TrackType.java ---------------------------------------------------------------------- diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/api/TrackType.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/api/TrackType.java new file mode 100644 index 0000000..ca475ac --- /dev/null +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/api/TrackType.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.tools.admin.api; + +public enum TrackType { + CONSUMED, + CONSUMED_BUT_FILTERED, + PULL, + NOT_CONSUME_YET, + NOT_ONLINE, + UNKNOWN +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/CommandUtil.java ---------------------------------------------------------------------- diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/CommandUtil.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/CommandUtil.java new file mode 100644 index 0000000..1b5d264 --- /dev/null +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/CommandUtil.java @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.tools.command; + +import com.alibaba.rocketmq.client.exception.MQBrokerException; +import com.alibaba.rocketmq.common.MixAll; +import com.alibaba.rocketmq.common.protocol.body.ClusterInfo; +import com.alibaba.rocketmq.common.protocol.route.BrokerData; +import com.alibaba.rocketmq.remoting.exception.RemotingConnectException; +import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException; +import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException; +import com.alibaba.rocketmq.tools.admin.MQAdminExt; + +import java.util.*; + + +/** + * @author shijia.wxr + */ +public class CommandUtil { + + public static Map<String/*master addr*/, List<String>/*slave addr*/> fetchMasterAndSlaveDistinguish( + final MQAdminExt adminExt, final String clusterName) + throws InterruptedException, RemotingConnectException, + RemotingTimeoutException, RemotingSendRequestException, + MQBrokerException { + Map<String, List<String>> masterAndSlaveMap = new HashMap<String, List<String>>(4); + + ClusterInfo clusterInfoSerializeWrapper = adminExt.examineBrokerClusterInfo(); + Set<String> brokerNameSet = clusterInfoSerializeWrapper.getClusterAddrTable().get(clusterName); + + if (brokerNameSet == null) { + System.out + .printf("[error] Make sure the specified clusterName exists or the nameserver which connected is correct."); + return masterAndSlaveMap; + } + + for (String brokerName : brokerNameSet) { + BrokerData brokerData = clusterInfoSerializeWrapper.getBrokerAddrTable().get(brokerName); + + if (brokerData == null || brokerData.getBrokerAddrs() == null) { + continue; + } + + String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID); + masterAndSlaveMap.put(masterAddr, new ArrayList<String>()); + + for (Long id : brokerData.getBrokerAddrs().keySet()) { + if (brokerData.getBrokerAddrs().get(id) == null + || id.longValue() == MixAll.MASTER_ID) { + continue; + } + + masterAndSlaveMap.get(masterAddr).add(brokerData.getBrokerAddrs().get(id)); + } + } + + return masterAndSlaveMap; + } + + public static Set<String> fetchMasterAddrByClusterName(final MQAdminExt adminExt, final String clusterName) + throws InterruptedException, RemotingConnectException, RemotingTimeoutException, + RemotingSendRequestException, MQBrokerException { + Set<String> masterSet = new HashSet<String>(); + + ClusterInfo clusterInfoSerializeWrapper = adminExt.examineBrokerClusterInfo(); + + Set<String> brokerNameSet = clusterInfoSerializeWrapper.getClusterAddrTable().get(clusterName); + + if (brokerNameSet != null) { + for (String brokerName : brokerNameSet) { + BrokerData brokerData = clusterInfoSerializeWrapper.getBrokerAddrTable().get(brokerName); + if (brokerData != null) { + + String addr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID); + if (addr != null) { + masterSet.add(addr); + } + } + } + } else { + System.out + .printf("[error] Make sure the specified clusterName exists or the nameserver which connected is correct."); + } + + return masterSet; + } + + public static Set<String> fetchMasterAndSlaveAddrByClusterName(final MQAdminExt adminExt, final String clusterName) + throws InterruptedException, RemotingConnectException, RemotingTimeoutException, + RemotingSendRequestException, MQBrokerException { + Set<String> masterSet = new HashSet<String>(); + + ClusterInfo clusterInfoSerializeWrapper = adminExt.examineBrokerClusterInfo(); + + Set<String> brokerNameSet = clusterInfoSerializeWrapper.getClusterAddrTable().get(clusterName); + + if (brokerNameSet != null) { + for (String brokerName : brokerNameSet) { + BrokerData brokerData = clusterInfoSerializeWrapper.getBrokerAddrTable().get(brokerName); + if (brokerData != null) { + final Collection<String> addrs = brokerData.getBrokerAddrs().values(); + masterSet.addAll(addrs); + } + } + } else { + System.out + .printf("[error] Make sure the specified clusterName exists or the nameserver which connected is correct."); + } + + return masterSet; + } + + + public static Set<String> fetchBrokerNameByClusterName(final MQAdminExt adminExt, final String clusterName) + throws Exception { + ClusterInfo clusterInfoSerializeWrapper = adminExt.examineBrokerClusterInfo(); + Set<String> brokerNameSet = clusterInfoSerializeWrapper.getClusterAddrTable().get(clusterName); + if (brokerNameSet.isEmpty()) { + throw new Exception( + "Make sure the specified clusterName exists or the nameserver which connected is correct."); + } + return brokerNameSet; + } + + + public static String fetchBrokerNameByAddr(final MQAdminExt adminExt, final String addr) throws Exception { + ClusterInfo clusterInfoSerializeWrapper = adminExt.examineBrokerClusterInfo(); + HashMap<String/* brokerName */, BrokerData> brokerAddrTable = + clusterInfoSerializeWrapper.getBrokerAddrTable(); + Iterator<Map.Entry<String, BrokerData>> it = brokerAddrTable.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry<String, BrokerData> entry = it.next(); + HashMap<Long, String> brokerAddrs = entry.getValue().getBrokerAddrs(); + if (brokerAddrs.containsValue(addr)) + return entry.getKey(); + } + throw new Exception( + "Make sure the specified broker addr exists or the nameserver which connected is correct."); + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/MQAdminStartup.java ---------------------------------------------------------------------- diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/MQAdminStartup.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/MQAdminStartup.java new file mode 100644 index 0000000..56ecc59 --- /dev/null +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/MQAdminStartup.java @@ -0,0 +1,219 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.tools.command; + +import ch.qos.logback.classic.LoggerContext; +import ch.qos.logback.classic.joran.JoranConfigurator; +import ch.qos.logback.core.joran.spi.JoranException; +import com.alibaba.rocketmq.common.MQVersion; +import com.alibaba.rocketmq.common.MixAll; +import com.alibaba.rocketmq.remoting.RPCHook; +import com.alibaba.rocketmq.remoting.protocol.RemotingCommand; +import com.alibaba.rocketmq.srvutil.ServerUtil; +import com.alibaba.rocketmq.tools.command.broker.*; +import com.alibaba.rocketmq.tools.command.cluster.CLusterSendMsgRTCommand; +import com.alibaba.rocketmq.tools.command.cluster.ClusterListSubCommand; +import com.alibaba.rocketmq.tools.command.connection.ConsumerConnectionSubCommand; +import com.alibaba.rocketmq.tools.command.connection.ProducerConnectionSubCommand; +import com.alibaba.rocketmq.tools.command.consumer.*; +import com.alibaba.rocketmq.tools.command.message.*; +import com.alibaba.rocketmq.tools.command.namesrv.*; +import com.alibaba.rocketmq.tools.command.offset.CloneGroupOffsetCommand; +import com.alibaba.rocketmq.tools.command.offset.ResetOffsetByTimeCommand; +import com.alibaba.rocketmq.tools.command.stats.StatsAllSubCommand; +import com.alibaba.rocketmq.tools.command.topic.*; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; + + +/** + * @author shijia.wxr + */ +public class MQAdminStartup { + protected static List<SubCommand> subCommandList = new ArrayList<SubCommand>(); + + public static void main(String[] args) { + main0(args, null); + } + + public static void main0(String[] args, RPCHook rpcHook) { + System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION)); + + + //PackageConflictDetect.detectFastjson(); + + initCommand(); + + try { + initLogback(); + switch (args.length) { + case 0: + printHelp(); + break; + case 2: + if (args[0].equals("help")) { + SubCommand cmd = findSubCommand(args[1]); + if (cmd != null) { + Options options = ServerUtil.buildCommandlineOptions(new Options()); + options = cmd.buildCommandlineOptions(options); + if (options != null) { + ServerUtil.printCommandLineHelp("mqadmin " + cmd.commandName(), options); + } + } else { + System.out.printf("The sub command \'" + args[1] + "\' not exist.%n"); + } + break; + } + case 1: + default: + SubCommand cmd = findSubCommand(args[0]); + if (cmd != null) { + String[] subargs = parseSubArgs(args); + + + Options options = ServerUtil.buildCommandlineOptions(new Options()); + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), + new PosixParser()); + if (null == commandLine) { + System.exit(-1); + return; + } + + if (commandLine.hasOption('n')) { + String namesrvAddr = commandLine.getOptionValue('n'); + System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, namesrvAddr); + } + + cmd.execute(commandLine, options, rpcHook); + } else { + System.out.printf("The sub command \'" + args[0] + "\' not exist.%n"); + } + break; + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + public static void initCommand() { + initCommand(new UpdateTopicSubCommand()); + initCommand(new DeleteTopicSubCommand()); + initCommand(new UpdateSubGroupSubCommand()); + initCommand(new DeleteSubscriptionGroupCommand()); + initCommand(new UpdateBrokerConfigSubCommand()); + initCommand(new UpdateTopicPermSubCommand()); + + initCommand(new TopicRouteSubCommand()); + initCommand(new TopicStatusSubCommand()); + initCommand(new TopicClusterSubCommand()); + + + initCommand(new BrokerStatusSubCommand()); + initCommand(new QueryMsgByIdSubCommand()); + initCommand(new QueryMsgByKeySubCommand()); + initCommand(new QueryMsgByUniqueKeySubCommand()); + initCommand(new QueryMsgByOffsetSubCommand()); + initCommand(new QueryMsgByUniqueKeySubCommand()); + initCommand(new PrintMessageSubCommand()); + initCommand(new PrintMessageByQueueCommand()); + initCommand(new SendMsgStatusCommand()); + initCommand(new BrokerConsumeStatsSubCommad()); + + + initCommand(new ProducerConnectionSubCommand()); + initCommand(new ConsumerConnectionSubCommand()); + initCommand(new ConsumerProgressSubCommand()); + initCommand(new ConsumerStatusSubCommand()); + initCommand(new CloneGroupOffsetCommand()); + + initCommand(new ClusterListSubCommand()); + initCommand(new TopicListSubCommand()); + + initCommand(new UpdateKvConfigCommand()); + initCommand(new DeleteKvConfigCommand()); + + initCommand(new WipeWritePermSubCommand()); + initCommand(new ResetOffsetByTimeCommand()); + + initCommand(new UpdateOrderConfCommand()); + initCommand(new CleanExpiredCQSubCommand()); + initCommand(new CleanUnusedTopicCommand()); + + initCommand(new StartMonitoringSubCommand()); + initCommand(new StatsAllSubCommand()); + + initCommand(new AllocateMQSubCommand()); + + initCommand(new CheckMsgSendRTCommand()); + initCommand(new CLusterSendMsgRTCommand()); + + initCommand(new GetNamesrvConfigCommand()); + initCommand(new UpdateNamesrvConfigCommand()); + initCommand(new GetBrokerConfigCommand()); + } + + private static void initLogback() throws JoranException { + String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV)); + + LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); + JoranConfigurator configurator = new JoranConfigurator(); + configurator.setContext(lc); + lc.reset(); + configurator.doConfigure(rocketmqHome + "/conf/logback_tools.xml"); + } + + private static void printHelp() { + System.out.printf("The most commonly used mqadmin commands are:%n"); + + for (SubCommand cmd : subCommandList) { + System.out.printf(" %-20s %s%n", cmd.commandName(), cmd.commandDesc()); + } + + System.out.printf("%nSee 'mqadmin help <command>' for more information on a specific command."); + } + + private static SubCommand findSubCommand(final String name) { + for (SubCommand cmd : subCommandList) { + if (cmd.commandName().toUpperCase().equals(name.toUpperCase())) { + return cmd; + } + } + + return null; + } + + private static String[] parseSubArgs(String[] args) { + if (args.length > 1) { + String[] result = new String[args.length - 1]; + for (int i = 0; i < args.length - 1; i++) { + result[i] = args[i + 1]; + } + return result; + } + return null; + } + + public static void initCommand(SubCommand command) { + subCommandList.add(command); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/SubCommand.java ---------------------------------------------------------------------- diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/SubCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/SubCommand.java new file mode 100644 index 0000000..e28b7bc --- /dev/null +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/SubCommand.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.tools.command; + +import com.alibaba.rocketmq.remoting.RPCHook; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; + + +/** + * @author shijia.wxr + */ +public interface SubCommand { + public String commandName(); + + + public String commandDesc(); + + + public Options buildCommandlineOptions(final Options options); + + + public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/BrokerConsumeStatsSubCommad.java ---------------------------------------------------------------------- diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/BrokerConsumeStatsSubCommad.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/BrokerConsumeStatsSubCommad.java new file mode 100644 index 0000000..0f24df7 --- /dev/null +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/BrokerConsumeStatsSubCommad.java @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.tools.command.broker; + +import com.alibaba.rocketmq.common.UtilAll; +import com.alibaba.rocketmq.common.admin.ConsumeStats; +import com.alibaba.rocketmq.common.admin.OffsetWrapper; +import com.alibaba.rocketmq.common.message.MessageQueue; +import com.alibaba.rocketmq.common.protocol.body.ConsumeStatsList; +import com.alibaba.rocketmq.remoting.RPCHook; +import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt; +import com.alibaba.rocketmq.tools.command.SubCommand; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; + +import java.util.*; + + +/** + * @author shijia.wxr + */ +public class BrokerConsumeStatsSubCommad implements SubCommand { + + @Override + public String commandName() { + return "brokerConsumeStats"; + } + + @Override + public String commandDesc() { + return "Fetch broker consume stats data"; + } + + @Override + public Options buildCommandlineOptions(Options options) { + Option opt = new Option("b", "brokerAddr", true, "Broker address"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("t", "timeoutMillis", true, "request timeout Millis"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("l", "level", true, "threshold of print diff"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("o", "order", true, "order topic"); + opt.setRequired(false); + options.addOption(opt); + + return options; + } + + @Override + public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { + DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); + defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + try { + defaultMQAdminExt.start(); + String brokerAddr = commandLine.getOptionValue('b').trim(); + boolean isOrder = false; + long timeoutMillis = 50000; + long diffLevel = 0; + if (commandLine.hasOption('o')) { + isOrder = Boolean.parseBoolean(commandLine.getOptionValue('o').trim()); + } + if (commandLine.hasOption('t')) { + timeoutMillis = Long.parseLong(commandLine.getOptionValue('t').trim()); + } + if (commandLine.hasOption('l')) { + diffLevel = Long.parseLong(commandLine.getOptionValue('l').trim()); + } + + ConsumeStatsList consumeStatsList = defaultMQAdminExt.fetchConsumeStatsInBroker(brokerAddr, isOrder, timeoutMillis); + System.out.printf("%-32s %-32s %-32s %-4s %-20s %-20s %-20s %s%n", + "#Topic", + "#Group", + "#Broker Name", + "#QID", + "#Broker Offset", + "#Consumer Offset", + "#Diff", + "#LastTime"); + for (Map<String, List<ConsumeStats>> map : consumeStatsList.getConsumeStatsList()) { + for (Map.Entry<String, List<ConsumeStats>> entry : map.entrySet()) { + String group = entry.getKey(); + List<ConsumeStats> consumeStatsArray = entry.getValue(); + for (ConsumeStats consumeStats : consumeStatsArray) { + List<MessageQueue> mqList = new LinkedList<MessageQueue>(); + mqList.addAll(consumeStats.getOffsetTable().keySet()); + Collections.sort(mqList); + for (MessageQueue mq : mqList) { + OffsetWrapper offsetWrapper = consumeStats.getOffsetTable().get(mq); + long diff = offsetWrapper.getBrokerOffset() - offsetWrapper.getConsumerOffset(); + + if (diff < diffLevel) { + continue; + } + String lastTime = "-"; + try { + lastTime = UtilAll.formatDate(new Date(offsetWrapper.getLastTimestamp()), UtilAll.YYYY_MM_DD_HH_MM_SS); + } catch (Exception e) { + + } + if (offsetWrapper.getLastTimestamp() > 0) + System.out.printf("%-32s %-32s %-32s %-4d %-20d %-20d %-20d %s%n", + UtilAll.frontStringAtLeast(mq.getTopic(), 32), + group, + UtilAll.frontStringAtLeast(mq.getBrokerName(), 32), + mq.getQueueId(), + offsetWrapper.getBrokerOffset(), + offsetWrapper.getConsumerOffset(), + diff, + lastTime + ); + } + } + } + } + System.out.printf("%nDiff Total: %d%n", consumeStatsList.getTotalDiff()); + } catch (Exception e) { + e.printStackTrace(); + } finally { + defaultMQAdminExt.shutdown(); + } + } +}
