http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
 
b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
new file mode 100644
index 0000000..1de96db
--- /dev/null
+++ 
b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -0,0 +1,933 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.tools.admin;
+
+import com.alibaba.rocketmq.client.QueryResult;
+import com.alibaba.rocketmq.client.admin.MQAdminExtInner;
+import com.alibaba.rocketmq.client.exception.MQBrokerException;
+import com.alibaba.rocketmq.client.exception.MQClientException;
+import com.alibaba.rocketmq.client.impl.MQClientManager;
+import com.alibaba.rocketmq.client.impl.factory.MQClientInstance;
+import com.alibaba.rocketmq.client.log.ClientLogger;
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.common.ServiceState;
+import com.alibaba.rocketmq.common.TopicConfig;
+import com.alibaba.rocketmq.common.UtilAll;
+import com.alibaba.rocketmq.common.admin.*;
+import com.alibaba.rocketmq.common.help.FAQUrl;
+import com.alibaba.rocketmq.common.message.*;
+import com.alibaba.rocketmq.common.namesrv.NamesrvUtil;
+import com.alibaba.rocketmq.common.protocol.ResponseCode;
+import com.alibaba.rocketmq.common.protocol.body.*;
+import 
com.alibaba.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
+import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import com.alibaba.rocketmq.common.protocol.route.BrokerData;
+import com.alibaba.rocketmq.common.protocol.route.QueueData;
+import com.alibaba.rocketmq.common.protocol.route.TopicRouteData;
+import com.alibaba.rocketmq.common.subscription.SubscriptionGroupConfig;
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.remoting.common.RemotingHelper;
+import com.alibaba.rocketmq.remoting.common.RemotingUtil;
+import com.alibaba.rocketmq.remoting.exception.*;
+import com.alibaba.rocketmq.tools.admin.api.MessageTrack;
+import com.alibaba.rocketmq.tools.admin.api.TrackType;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+
+import java.io.UnsupportedEncodingException;
+import java.util.*;
+import java.util.Map.Entry;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
+    private final Logger log = ClientLogger.getLog();
+    private final DefaultMQAdminExt defaultMQAdminExt;
+    private ServiceState serviceState = ServiceState.CREATE_JUST;
+    private MQClientInstance mqClientInstance;
+    private RPCHook rpcHook;
+    private long timeoutMillis = 20000;
+    private Random random = new Random();
+
+
+    public DefaultMQAdminExtImpl(DefaultMQAdminExt defaultMQAdminExt, long 
timeoutMillis) {
+        this(defaultMQAdminExt, null, timeoutMillis);
+    }
+
+
+    public DefaultMQAdminExtImpl(DefaultMQAdminExt defaultMQAdminExt, RPCHook 
rpcHook, long timeoutMillis) {
+        this.defaultMQAdminExt = defaultMQAdminExt;
+        this.rpcHook = rpcHook;
+        this.timeoutMillis = timeoutMillis;
+    }
+
+
+    @Override
+    public void start() throws MQClientException {
+        switch (this.serviceState) {
+            case CREATE_JUST:
+                this.serviceState = ServiceState.START_FAILED;
+
+                this.defaultMQAdminExt.changeInstanceNameToPID();
+
+                this.mqClientInstance = 
MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQAdminExt,
 rpcHook);
+
+                boolean registerOK = 
mqClientInstance.registerAdminExt(this.defaultMQAdminExt.getAdminExtGroup(), 
this);
+                if (!registerOK) {
+                    this.serviceState = ServiceState.CREATE_JUST;
+                    throw new MQClientException("The adminExt group[" + 
this.defaultMQAdminExt.getAdminExtGroup()
+                            + "] has created already, specifed another name 
please."//
+                            + 
FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null);
+                }
+
+                mqClientInstance.start();
+
+                log.info("the adminExt [{}] start OK", 
this.defaultMQAdminExt.getAdminExtGroup());
+
+                this.serviceState = ServiceState.RUNNING;
+                break;
+            case RUNNING:
+            case START_FAILED:
+            case SHUTDOWN_ALREADY:
+                throw new MQClientException("The AdminExt service state not 
OK, maybe started once, "//
+                        + this.serviceState//
+                        + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), 
null);
+            default:
+                break;
+        }
+    }
+
+
+    @Override
+    public void shutdown() {
+        switch (this.serviceState) {
+            case CREATE_JUST:
+                break;
+            case RUNNING:
+                
this.mqClientInstance.unregisterAdminExt(this.defaultMQAdminExt.getAdminExtGroup());
+                this.mqClientInstance.shutdown();
+
+                log.info("the adminExt [{}] shutdown OK", 
this.defaultMQAdminExt.getAdminExtGroup());
+                this.serviceState = ServiceState.SHUTDOWN_ALREADY;
+                break;
+            case SHUTDOWN_ALREADY:
+                break;
+            default:
+                break;
+        }
+    }
+
+    @Override
+    public void updateBrokerConfig(String brokerAddr, Properties properties) 
throws RemotingConnectException, RemotingSendRequestException,
+            RemotingTimeoutException, UnsupportedEncodingException, 
InterruptedException, MQBrokerException {
+        
this.mqClientInstance.getMQClientAPIImpl().updateBrokerConfig(brokerAddr, 
properties, timeoutMillis);
+    }
+
+    @Override
+    public Properties getBrokerConfig(final String brokerAddr) throws 
RemotingConnectException,
+            RemotingSendRequestException, RemotingTimeoutException, 
UnsupportedEncodingException, InterruptedException, MQBrokerException {
+        return 
this.mqClientInstance.getMQClientAPIImpl().getBrokerConfig(brokerAddr, 
timeoutMillis);
+    }
+
+    @Override
+    public void createAndUpdateTopicConfig(String addr, TopicConfig config) 
throws RemotingException, MQBrokerException,
+            InterruptedException, MQClientException {
+        this.mqClientInstance.getMQClientAPIImpl().createTopic(addr, 
this.defaultMQAdminExt.getCreateTopicKey(), config, timeoutMillis);
+    }
+
+    @Override
+    public void createAndUpdateSubscriptionGroupConfig(String addr, 
SubscriptionGroupConfig config) throws RemotingException,
+            MQBrokerException, InterruptedException, MQClientException {
+        
this.mqClientInstance.getMQClientAPIImpl().createSubscriptionGroup(addr, 
config, timeoutMillis);
+    }
+
+    @Override
+    public SubscriptionGroupConfig examineSubscriptionGroupConfig(String addr, 
String group) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public TopicConfig examineTopicConfig(String addr, String topic) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public TopicStatsTable examineTopicStats(String topic) throws 
RemotingException, MQClientException, InterruptedException,
+            MQBrokerException {
+        TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
+        TopicStatsTable topicStatsTable = new TopicStatsTable();
+
+        for (BrokerData bd : topicRouteData.getBrokerDatas()) {
+            String addr = bd.selectBrokerAddr();
+            if (addr != null) {
+                TopicStatsTable tst = 
this.mqClientInstance.getMQClientAPIImpl().getTopicStatsInfo(addr, topic, 
timeoutMillis);
+                topicStatsTable.getOffsetTable().putAll(tst.getOffsetTable());
+            }
+        }
+
+        if (topicStatsTable.getOffsetTable().isEmpty()) {
+            throw new MQClientException("Not found the topic stats info", 
null);
+        }
+
+        return topicStatsTable;
+    }
+
+    @Override
+    public TopicList fetchAllTopicList() throws RemotingException, 
MQClientException, InterruptedException {
+        return 
this.mqClientInstance.getMQClientAPIImpl().getTopicListFromNameServer(timeoutMillis);
+    }
+
+    @Override
+    public TopicList fetchTopicsByCLuster(String clusterName) throws 
RemotingException, MQClientException, InterruptedException {
+        return 
this.mqClientInstance.getMQClientAPIImpl().getTopicsByCluster(clusterName, 
timeoutMillis);
+    }
+
+    @Override
+    public KVTable fetchBrokerRuntimeStats(final String brokerAddr) throws 
RemotingConnectException, RemotingSendRequestException,
+            RemotingTimeoutException, InterruptedException, MQBrokerException {
+        return 
this.mqClientInstance.getMQClientAPIImpl().getBrokerRuntimeInfo(brokerAddr, 
timeoutMillis);
+    }
+
+    @Override
+    public ConsumeStats examineConsumeStats(String consumerGroup) throws 
RemotingException, MQClientException, InterruptedException,
+            MQBrokerException {
+        return examineConsumeStats(consumerGroup, null);
+    }
+
+    @Override
+    public ConsumeStats examineConsumeStats(String consumerGroup, String 
topic) throws RemotingException, MQClientException,
+            InterruptedException, MQBrokerException {
+        String retryTopic = MixAll.getRetryTopic(consumerGroup);
+        TopicRouteData topicRouteData = this.examineTopicRouteInfo(retryTopic);
+        ConsumeStats result = new ConsumeStats();
+
+        for (BrokerData bd : topicRouteData.getBrokerDatas()) {
+            String addr = bd.selectBrokerAddr();
+            if (addr != null) {
+                ConsumeStats consumeStats =
+                        
this.mqClientInstance.getMQClientAPIImpl().getConsumeStats(addr, consumerGroup, 
topic, timeoutMillis * 3);
+                result.getOffsetTable().putAll(consumeStats.getOffsetTable());
+                double value = result.getConsumeTps() + 
consumeStats.getConsumeTps();
+                result.setConsumeTps(value);
+            }
+        }
+
+        if (result.getOffsetTable().isEmpty()) {
+            throw new MQClientException(ResponseCode.CONSUMER_NOT_ONLINE,
+                    "Not found the consumer group consume stats, because 
return offset table is empty, maybe the consumer not consume any message");
+        }
+
+        return result;
+    }
+
+    @Override
+    public ClusterInfo examineBrokerClusterInfo() throws InterruptedException, 
MQBrokerException, RemotingTimeoutException,
+            RemotingSendRequestException, RemotingConnectException {
+        return 
this.mqClientInstance.getMQClientAPIImpl().getBrokerClusterInfo(timeoutMillis);
+    }
+
+    @Override
+    public TopicRouteData examineTopicRouteInfo(String topic) throws 
RemotingException, MQClientException, InterruptedException {
+        return 
this.mqClientInstance.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic,
 timeoutMillis);
+    }
+
+
+    @Override
+    public MessageExt viewMessage(String topic, String msgId) throws 
RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        try {
+            MessageDecoder.decodeMessageId(msgId);
+            return this.viewMessage(msgId);
+        } catch (Exception e) {
+            log.warn("the msgId maybe created by new client. msgId={}", msgId, 
e);
+        }
+        return 
this.mqClientInstance.getMQAdminImpl().queryMessageByUniqKey(topic, msgId);
+    }
+
+    @Override
+    public ConsumerConnection examineConsumerConnectionInfo(String 
consumerGroup) throws InterruptedException, MQBrokerException,
+            RemotingException, MQClientException {
+        ConsumerConnection result = new ConsumerConnection();
+        String topic = MixAll.getRetryTopic(consumerGroup);
+        List<BrokerData> brokers = 
this.examineTopicRouteInfo(topic).getBrokerDatas();
+        BrokerData brokerData = brokers.get(random.nextInt(brokers.size()));
+        String addr = null;
+        if (brokerData != null) {
+            addr = brokerData.selectBrokerAddr();
+            if (StringUtils.isNotBlank(addr)) {
+                result = 
this.mqClientInstance.getMQClientAPIImpl().getConsumerConnectionList(addr, 
consumerGroup, timeoutMillis);
+            }
+        }
+
+        if (result.getConnectionSet().isEmpty()) {
+            log.warn("the consumer group not online. brokerAddr={}, group={}", 
addr, consumerGroup);
+            throw new MQClientException(ResponseCode.CONSUMER_NOT_ONLINE, "Not 
found the consumer group connection");
+        }
+
+        return result;
+    }
+
+    @Override
+    public ProducerConnection examineProducerConnectionInfo(String 
producerGroup, final String topic) throws RemotingException,
+            MQClientException, InterruptedException, MQBrokerException {
+        ProducerConnection result = new ProducerConnection();
+        List<BrokerData> brokers = 
this.examineTopicRouteInfo(topic).getBrokerDatas();
+        BrokerData brokerData = brokers.get(random.nextInt(brokers.size()));
+        String addr = null;
+        if (brokerData != null) {
+            addr = brokerData.selectBrokerAddr();
+            if (StringUtils.isNotBlank(addr)) {
+                result = 
this.mqClientInstance.getMQClientAPIImpl().getProducerConnectionList(addr, 
producerGroup, timeoutMillis);
+            }
+        }
+
+        if (result.getConnectionSet().isEmpty()) {
+            log.warn("the producer group not online. brokerAddr={}, group={}", 
addr, producerGroup);
+            throw new MQClientException("Not found the producer group 
connection", null);
+        }
+
+        return result;
+    }
+
+    @Override
+    public List<String> getNameServerAddressList() {
+        return 
this.mqClientInstance.getMQClientAPIImpl().getNameServerAddressList();
+    }
+
+    @Override
+    public int wipeWritePermOfBroker(final String namesrvAddr, String 
brokerName) throws RemotingCommandException,
+            RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, InterruptedException, MQClientException {
+        return 
this.mqClientInstance.getMQClientAPIImpl().wipeWritePermOfBroker(namesrvAddr, 
brokerName, timeoutMillis);
+    }
+
+    @Override
+    public void putKVConfig(String namespace, String key, String value) {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public String getKVConfig(String namespace, String key) throws 
RemotingException, MQClientException, InterruptedException {
+        return 
this.mqClientInstance.getMQClientAPIImpl().getKVConfigValue(namespace, key, 
timeoutMillis);
+    }
+
+    @Override
+    public KVTable getKVListByNamespace(String namespace) throws 
RemotingException, MQClientException, InterruptedException {
+        return 
this.mqClientInstance.getMQClientAPIImpl().getKVListByNamespace(namespace, 
timeoutMillis);
+    }
+
+    @Override
+    public void deleteTopicInBroker(Set<String> addrs, String topic) throws 
RemotingException, MQBrokerException, InterruptedException,
+            MQClientException {
+        for (String addr : addrs) {
+            
this.mqClientInstance.getMQClientAPIImpl().deleteTopicInBroker(addr, topic, 
timeoutMillis);
+        }
+    }
+
+    @Override
+    public void deleteTopicInNameServer(Set<String> addrs, String topic) 
throws RemotingException, MQBrokerException, InterruptedException,
+            MQClientException {
+        if (addrs == null) {
+            String ns = 
this.mqClientInstance.getMQClientAPIImpl().fetchNameServerAddr();
+            addrs = new HashSet(Arrays.asList(ns.split(";")));
+        }
+        for (String addr : addrs) {
+            
this.mqClientInstance.getMQClientAPIImpl().deleteTopicInNameServer(addr, topic, 
timeoutMillis);
+        }
+    }
+
+    @Override
+    public void deleteSubscriptionGroup(String addr, String groupName) throws 
RemotingException, MQBrokerException, InterruptedException,
+            MQClientException {
+        
this.mqClientInstance.getMQClientAPIImpl().deleteSubscriptionGroup(addr, 
groupName, timeoutMillis);
+    }
+
+    @Override
+    public void createAndUpdateKvConfig(String namespace, String key, String 
value) throws RemotingException, MQBrokerException,
+            InterruptedException, MQClientException {
+        this.mqClientInstance.getMQClientAPIImpl().putKVConfigValue(namespace, 
key, value, timeoutMillis);
+    }
+
+    @Override
+    public void deleteKvConfig(String namespace, String key) throws 
RemotingException, MQBrokerException, InterruptedException,
+            MQClientException {
+        
this.mqClientInstance.getMQClientAPIImpl().deleteKVConfigValue(namespace, key, 
timeoutMillis);
+    }
+
+    @Override
+    public List<RollbackStats> resetOffsetByTimestampOld(String consumerGroup, 
String topic, long timestamp, boolean force)
+            throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException {
+        TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
+        List<RollbackStats> rollbackStatsList = new ArrayList<RollbackStats>();
+        Map<String, Integer> topicRouteMap = new HashMap<String, Integer>();
+        for (BrokerData bd : topicRouteData.getBrokerDatas()) {
+            for (QueueData queueData : topicRouteData.getQueueDatas()) {
+                topicRouteMap.put(bd.selectBrokerAddr(), 
queueData.getReadQueueNums());
+            }
+        }
+        for (BrokerData bd : topicRouteData.getBrokerDatas()) {
+            String addr = bd.selectBrokerAddr();
+            if (addr != null) {
+                ConsumeStats consumeStats = 
this.mqClientInstance.getMQClientAPIImpl().getConsumeStats(addr, consumerGroup, 
timeoutMillis);
+
+                boolean hasConsumed = false;
+                for (Map.Entry<MessageQueue, OffsetWrapper> entry : 
consumeStats.getOffsetTable().entrySet()) {
+                    MessageQueue queue = entry.getKey();
+                    OffsetWrapper offsetWrapper = entry.getValue();
+                    if (topic.equals(queue.getTopic())) {
+                        hasConsumed = true;
+                        RollbackStats rollbackStats = 
resetOffsetConsumeOffset(addr, consumerGroup, queue, offsetWrapper, timestamp, 
force);
+                        rollbackStatsList.add(rollbackStats);
+                    }
+                }
+
+                if (!hasConsumed) {
+                    HashMap<MessageQueue, TopicOffset> topicStatus =
+                            
this.mqClientInstance.getMQClientAPIImpl().getTopicStatsInfo(addr, topic, 
timeoutMillis).getOffsetTable();
+                    for (int i = 0; i < topicRouteMap.get(addr); i++) {
+                        MessageQueue queue = new MessageQueue(topic, 
bd.getBrokerName(), i);
+                        OffsetWrapper offsetWrapper = new OffsetWrapper();
+                        
offsetWrapper.setBrokerOffset(topicStatus.get(queue).getMaxOffset());
+                        
offsetWrapper.setConsumerOffset(topicStatus.get(queue).getMinOffset());
+
+                        RollbackStats rollbackStats = 
resetOffsetConsumeOffset(addr, consumerGroup, queue, offsetWrapper, timestamp, 
force);
+                        rollbackStatsList.add(rollbackStats);
+                    }
+                }
+            }
+        }
+        return rollbackStatsList;
+    }
+
+    @Override
+    public Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String 
group, long timestamp, boolean isForce)
+            throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException {
+        return resetOffsetByTimestamp(topic, group, timestamp, isForce, false);
+    }
+
+    @Override
+    public void resetOffsetNew(String consumerGroup, String topic, long 
timestamp) throws RemotingException, MQBrokerException,
+            InterruptedException, MQClientException {
+        try {
+            this.resetOffsetByTimestamp(topic, consumerGroup, timestamp, true);
+        } catch (MQClientException e) {
+            if (ResponseCode.CONSUMER_NOT_ONLINE == e.getResponseCode()) {
+                this.resetOffsetByTimestampOld(consumerGroup, topic, 
timestamp, true);
+                return;
+            }
+            throw e;
+        }
+    }
+
+    public Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String 
group, long timestamp, boolean isForce, boolean isC)
+            throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException {
+        TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
+        List<BrokerData> brokerDatas = topicRouteData.getBrokerDatas();
+        Map<MessageQueue, Long> allOffsetTable = new HashMap<MessageQueue, 
Long>();
+        if (brokerDatas != null) {
+            for (BrokerData brokerData : brokerDatas) {
+                String addr = brokerData.selectBrokerAddr();
+                if (addr != null) {
+                    Map<MessageQueue, Long> offsetTable =
+                            
this.mqClientInstance.getMQClientAPIImpl().invokeBrokerToResetOffset(addr, 
topic, group, timestamp, isForce,
+                                    timeoutMillis, isC);
+                    if (offsetTable != null) {
+                        allOffsetTable.putAll(offsetTable);
+                    }
+                }
+            }
+        }
+        return allOffsetTable;
+    }
+
+    private RollbackStats resetOffsetConsumeOffset(String brokerAddr, String 
consumeGroup, MessageQueue queue, OffsetWrapper offsetWrapper,
+                                                   long timestamp, boolean 
force) throws RemotingException, InterruptedException, MQBrokerException {
+        long resetOffset;
+        if (timestamp == -1) {
+
+            resetOffset = 
this.mqClientInstance.getMQClientAPIImpl().getMaxOffset(brokerAddr, 
queue.getTopic(), queue.getQueueId(), timeoutMillis);
+        } else {
+            resetOffset =
+                    
this.mqClientInstance.getMQClientAPIImpl().searchOffset(brokerAddr, 
queue.getTopic(), queue.getQueueId(), timestamp,
+                            timeoutMillis);
+        }
+
+
+        RollbackStats rollbackStats = new RollbackStats();
+        rollbackStats.setBrokerName(queue.getBrokerName());
+        rollbackStats.setQueueId(queue.getQueueId());
+        rollbackStats.setBrokerOffset(offsetWrapper.getBrokerOffset());
+        rollbackStats.setConsumerOffset(offsetWrapper.getConsumerOffset());
+        rollbackStats.setTimestampOffset(resetOffset);
+        rollbackStats.setRollbackOffset(offsetWrapper.getConsumerOffset());
+
+        if (force || resetOffset <= offsetWrapper.getConsumerOffset()) {
+            rollbackStats.setRollbackOffset(resetOffset);
+            UpdateConsumerOffsetRequestHeader requestHeader = new 
UpdateConsumerOffsetRequestHeader();
+            requestHeader.setConsumerGroup(consumeGroup);
+            requestHeader.setTopic(queue.getTopic());
+            requestHeader.setQueueId(queue.getQueueId());
+            requestHeader.setCommitOffset(resetOffset);
+            
this.mqClientInstance.getMQClientAPIImpl().updateConsumerOffset(brokerAddr, 
requestHeader, timeoutMillis);
+        }
+        return rollbackStats;
+    }
+
+    @Override
+    public Map<String, Map<MessageQueue, Long>> getConsumeStatus(String topic, 
String group, String clientAddr) throws RemotingException,
+            MQBrokerException, InterruptedException, MQClientException {
+        TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
+        List<BrokerData> brokerDatas = topicRouteData.getBrokerDatas();
+        if (brokerDatas != null && brokerDatas.size() > 0) {
+            String addr = brokerDatas.get(0).selectBrokerAddr();
+            if (addr != null) {
+                return 
this.mqClientInstance.getMQClientAPIImpl().invokeBrokerToGetConsumerStatus(addr,
 topic, group, clientAddr,
+                        timeoutMillis);
+            }
+        }
+        return Collections.EMPTY_MAP;
+    }
+
+    public void createOrUpdateOrderConf(String key, String value, boolean 
isCluster) throws RemotingException, MQBrokerException,
+            InterruptedException, MQClientException {
+
+        if (isCluster) {
+            this.mqClientInstance.getMQClientAPIImpl()
+                    
.putKVConfigValue(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, key, value, 
timeoutMillis);
+        } else {
+            String oldOrderConfs = null;
+            try {
+                oldOrderConfs =
+                        
this.mqClientInstance.getMQClientAPIImpl().getKVConfigValue(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
 key,
+                                timeoutMillis);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+
+            Map<String, String> orderConfMap = new HashMap<String, String>();
+            if (!UtilAll.isBlank(oldOrderConfs)) {
+                String[] oldOrderConfArr = oldOrderConfs.split(";");
+                for (String oldOrderConf : oldOrderConfArr) {
+                    String[] items = oldOrderConf.split(":");
+                    orderConfMap.put(items[0], oldOrderConf);
+                }
+            }
+            String[] items = value.split(":");
+            orderConfMap.put(items[0], value);
+
+            StringBuilder newOrderConf = new StringBuilder();
+            String splitor = "";
+            for (Map.Entry<String, String> entry : orderConfMap.entrySet()) {
+                newOrderConf.append(splitor).append(entry.getValue());
+                splitor = ";";
+            }
+            
this.mqClientInstance.getMQClientAPIImpl().putKVConfigValue(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
 key,
+                    newOrderConf.toString(), timeoutMillis);
+        }
+    }
+
+    @Override
+    public GroupList queryTopicConsumeByWho(String topic) throws 
InterruptedException, MQBrokerException, RemotingException,
+            MQClientException {
+        TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
+
+        for (BrokerData bd : topicRouteData.getBrokerDatas()) {
+            String addr = bd.selectBrokerAddr();
+            if (addr != null) {
+                return 
this.mqClientInstance.getMQClientAPIImpl().queryTopicConsumeByWho(addr, topic, 
timeoutMillis);
+            }
+
+            break;
+        }
+
+        return null;
+    }
+
+    @Override
+    public List<QueueTimeSpan> queryConsumeTimeSpan(final String topic, final 
String group) throws InterruptedException, MQBrokerException,
+            RemotingException, MQClientException {
+        List<QueueTimeSpan> spanSet = new ArrayList<QueueTimeSpan>();
+        TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
+        for (BrokerData bd : topicRouteData.getBrokerDatas()) {
+            String addr = bd.selectBrokerAddr();
+            if (addr != null) {
+                
spanSet.addAll(this.mqClientInstance.getMQClientAPIImpl().queryConsumeTimeSpan(addr,
 topic, group, timeoutMillis));
+            }
+        }
+        return spanSet;
+    }
+
+    @Override
+    public boolean cleanExpiredConsumerQueue(String cluster) throws 
RemotingConnectException, RemotingSendRequestException,
+            RemotingTimeoutException, MQClientException, InterruptedException {
+        boolean result = false;
+        try {
+            ClusterInfo clusterInfo = examineBrokerClusterInfo();
+            if (null == cluster || "".equals(cluster)) {
+                for (String targetCluster : 
clusterInfo.retrieveAllClusterNames()) {
+                    result = cleanExpiredConsumerQueueByCluster(clusterInfo, 
targetCluster);
+                }
+            } else {
+                result = cleanExpiredConsumerQueueByCluster(clusterInfo, 
cluster);
+            }
+        } catch (MQBrokerException e) {
+            log.error("cleanExpiredConsumerQueue error.", e);
+        }
+
+        return result;
+    }
+
+    public boolean cleanExpiredConsumerQueueByCluster(ClusterInfo clusterInfo, 
String cluster) throws RemotingConnectException,
+            RemotingSendRequestException, RemotingTimeoutException, 
MQClientException, InterruptedException {
+        boolean result = false;
+        String[] addrs = clusterInfo.retrieveAllAddrByCluster(cluster);
+        for (String addr : addrs) {
+            result = cleanExpiredConsumerQueueByAddr(addr);
+        }
+        return result;
+    }
+
+    @Override
+    public boolean cleanExpiredConsumerQueueByAddr(String addr) throws 
RemotingConnectException, RemotingSendRequestException,
+            RemotingTimeoutException, MQClientException, InterruptedException {
+        boolean result = 
mqClientInstance.getMQClientAPIImpl().cleanExpiredConsumeQueue(addr, 
timeoutMillis);
+        log.warn("clean expired ConsumeQueue on target " + addr + " broker " + 
result);
+        return result;
+    }
+
+    @Override
+    public boolean cleanUnusedTopic(String cluster) throws 
RemotingConnectException, RemotingSendRequestException,
+            RemotingTimeoutException, MQClientException, InterruptedException {
+        boolean result = false;
+        try {
+            ClusterInfo clusterInfo = examineBrokerClusterInfo();
+            if (null == cluster || "".equals(cluster)) {
+                for (String targetCluster : 
clusterInfo.retrieveAllClusterNames()) {
+                    result = cleanUnusedTopicByCluster(clusterInfo, 
targetCluster);
+                }
+            } else {
+                result = cleanUnusedTopicByCluster(clusterInfo, cluster);
+            }
+        } catch (MQBrokerException e) {
+            log.error("cleanExpiredConsumerQueue error.", e);
+        }
+
+        return result;
+    }
+
+    public boolean cleanUnusedTopicByCluster(ClusterInfo clusterInfo, String 
cluster) throws RemotingConnectException,
+            RemotingSendRequestException, RemotingTimeoutException, 
MQClientException, InterruptedException {
+        boolean result = false;
+        String[] addrs = clusterInfo.retrieveAllAddrByCluster(cluster);
+        for (String addr : addrs) {
+            result = cleanUnusedTopicByAddr(addr);
+        }
+        return result;
+    }
+
+    @Override
+    public boolean cleanUnusedTopicByAddr(String addr) throws 
RemotingConnectException, RemotingSendRequestException,
+            RemotingTimeoutException, MQClientException, InterruptedException {
+        boolean result = 
mqClientInstance.getMQClientAPIImpl().cleanUnusedTopicByAddr(addr, 
timeoutMillis);
+        log.warn("clean expired ConsumeQueue on target " + addr + " broker " + 
result);
+        return result;
+    }
+
+    @Override
+    public ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, 
String clientId, boolean jstack) throws RemotingException,
+            MQClientException, InterruptedException {
+        String topic = MixAll.RETRY_GROUP_TOPIC_PREFIX + consumerGroup;
+        TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
+        List<BrokerData> brokerDatas = topicRouteData.getBrokerDatas();
+        if (brokerDatas != null) {
+            for (BrokerData brokerData : brokerDatas) {
+                String addr = brokerData.selectBrokerAddr();
+                if (addr != null) {
+                    return 
this.mqClientInstance.getMQClientAPIImpl().getConsumerRunningInfo(addr, 
consumerGroup, clientId, jstack,
+                            timeoutMillis * 3);
+                }
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public ConsumeMessageDirectlyResult consumeMessageDirectly(String 
consumerGroup, String clientId, String msgId)
+            throws RemotingException, MQClientException, InterruptedException, 
MQBrokerException {
+        MessageExt msg = this.viewMessage(msgId);
+
+        return 
this.mqClientInstance.getMQClientAPIImpl().consumeMessageDirectly(RemotingUtil.socketAddress2String(msg.getStoreHost()),
+                consumerGroup, clientId, msgId, timeoutMillis * 3);
+    }
+
+    @Override
+    public ConsumeMessageDirectlyResult consumeMessageDirectly(final String 
consumerGroup, final String clientId, final String topic, final String msgId) 
throws RemotingException, MQClientException, InterruptedException, 
MQBrokerException {
+        MessageExt msg = this.viewMessage(topic, msgId);
+        if 
(msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX) == null) {
+            return 
this.mqClientInstance.getMQClientAPIImpl().consumeMessageDirectly(RemotingUtil.socketAddress2String(msg.getStoreHost()),
+                    consumerGroup, clientId, msgId, timeoutMillis * 3);
+        } else {
+            MessageClientExt msgClient = (MessageClientExt) msg;
+            return 
this.mqClientInstance.getMQClientAPIImpl().consumeMessageDirectly(RemotingUtil.socketAddress2String(msg.getStoreHost()),
+                    consumerGroup, clientId, msgClient.getOffsetMsgId(), 
timeoutMillis * 3);
+        }
+    }
+
+    @Override
+    public List<MessageTrack> messageTrackDetail(MessageExt msg) throws 
RemotingException, MQClientException, InterruptedException,
+            MQBrokerException {
+        List<MessageTrack> result = new ArrayList<MessageTrack>();
+
+        GroupList groupList = this.queryTopicConsumeByWho(msg.getTopic());
+
+        for (String group : groupList.getGroupList()) {
+
+            MessageTrack mt = new MessageTrack();
+            mt.setConsumerGroup(group);
+            mt.setTrackType(TrackType.UNKNOWN);
+            ConsumerConnection cc = null;
+            try {
+                cc = this.examineConsumerConnectionInfo(group);
+            } catch (MQBrokerException e) {
+                if (ResponseCode.CONSUMER_NOT_ONLINE == e.getResponseCode()) {
+                    mt.setTrackType(TrackType.NOT_ONLINE);
+                }
+                mt.setExceptionDesc("CODE:" + e.getResponseCode() + " DESC:" + 
e.getErrorMessage());
+                result.add(mt);
+                continue;
+            } catch (Exception e) {
+                mt.setExceptionDesc(RemotingHelper.exceptionSimpleDesc(e));
+                result.add(mt);
+                continue;
+            }
+
+            switch (cc.getConsumeType()) {
+                case CONSUME_ACTIVELY:
+                    mt.setTrackType(TrackType.PULL);
+                    break;
+                case CONSUME_PASSIVELY:
+                    boolean ifConsumed = false;
+                    try {
+                        ifConsumed = this.consumed(msg, group);
+                    } catch (MQClientException e) {
+                        if (ResponseCode.CONSUMER_NOT_ONLINE == 
e.getResponseCode()) {
+                            mt.setTrackType(TrackType.NOT_ONLINE);
+                        }
+                        mt.setExceptionDesc("CODE:" + e.getResponseCode() + " 
DESC:" + e.getErrorMessage());
+                        result.add(mt);
+                        continue;
+                    } catch (MQBrokerException e) {
+                        if (ResponseCode.CONSUMER_NOT_ONLINE == 
e.getResponseCode()) {
+                            mt.setTrackType(TrackType.NOT_ONLINE);
+                        }
+                        mt.setExceptionDesc("CODE:" + e.getResponseCode() + " 
DESC:" + e.getErrorMessage());
+                        result.add(mt);
+                        continue;
+                    } catch (Exception e) {
+                        
mt.setExceptionDesc(RemotingHelper.exceptionSimpleDesc(e));
+                        result.add(mt);
+                        continue;
+                    }
+
+                    if (ifConsumed) {
+                        mt.setTrackType(TrackType.CONSUMED);
+                        Iterator<Entry<String, SubscriptionData>> it = 
cc.getSubscriptionTable().entrySet().iterator();
+                        while (it.hasNext()) {
+                            Entry<String, SubscriptionData> next = it.next();
+                            if (next.getKey().equals(msg.getTopic())) {
+                                if 
(next.getValue().getTagsSet().contains(msg.getTags())
+                                        || 
next.getValue().getTagsSet().contains("*")
+                                        || 
next.getValue().getTagsSet().isEmpty()) {
+                                } else {
+                                    
mt.setTrackType(TrackType.CONSUMED_BUT_FILTERED);
+                                }
+                            }
+                        }
+                    } else {
+                        mt.setTrackType(TrackType.NOT_CONSUME_YET);
+                    }
+                    break;
+                default:
+                    break;
+            }
+            result.add(mt);
+        }
+
+        return result;
+    }
+
+    public boolean consumed(final MessageExt msg, final String group) throws 
RemotingException, MQClientException, InterruptedException,
+            MQBrokerException {
+
+        ConsumeStats cstats = this.examineConsumeStats(group);
+
+        ClusterInfo ci = this.examineBrokerClusterInfo();
+
+        Iterator<Entry<MessageQueue, OffsetWrapper>> it = 
cstats.getOffsetTable().entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<MessageQueue, OffsetWrapper> next = it.next();
+            MessageQueue mq = next.getKey();
+            if (mq.getTopic().equals(msg.getTopic()) && mq.getQueueId() == 
msg.getQueueId()) {
+                BrokerData brokerData = 
ci.getBrokerAddrTable().get(mq.getBrokerName());
+                if (brokerData != null) {
+                    String addr = 
brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
+                    if 
(addr.equals(RemotingUtil.socketAddress2String(msg.getStoreHost()))) {
+                        if (next.getValue().getConsumerOffset() > 
msg.getQueueOffset()) {
+                            return true;
+                        }
+                    }
+                }
+            }
+        }
+
+        return false;
+    }
+
+    @Override
+    public void cloneGroupOffset(String srcGroup, String destGroup, String 
topic, boolean isOffline) throws RemotingException,
+            MQClientException, InterruptedException, MQBrokerException {
+        String retryTopic = MixAll.getRetryTopic(srcGroup);
+        TopicRouteData topicRouteData = this.examineTopicRouteInfo(retryTopic);
+
+        for (BrokerData bd : topicRouteData.getBrokerDatas()) {
+            String addr = bd.selectBrokerAddr();
+            if (addr != null) {
+                
this.mqClientInstance.getMQClientAPIImpl().cloneGroupOffset(addr, srcGroup, 
destGroup, topic, isOffline, timeoutMillis * 3);
+            }
+        }
+    }
+
+    @Override
+    public BrokerStatsData viewBrokerStatsData(String brokerAddr, String 
statsName, String statsKey) throws RemotingConnectException,
+            RemotingSendRequestException, RemotingTimeoutException, 
MQClientException, InterruptedException {
+        return 
this.mqClientInstance.getMQClientAPIImpl().viewBrokerStatsData(brokerAddr, 
statsName, statsKey, timeoutMillis);
+    }
+
+    @Override
+    public Set<String> getClusterList(String topic) throws 
RemotingConnectException, RemotingSendRequestException,
+            RemotingTimeoutException, MQClientException, InterruptedException {
+        return 
this.mqClientInstance.getMQClientAPIImpl().getClusterList(topic, timeoutMillis);
+    }
+
+    @Override
+    public ConsumeStatsList fetchConsumeStatsInBroker(final String brokerAddr, 
boolean isOrder, long timeoutMillis)
+            throws RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, MQClientException,
+            InterruptedException {
+        return 
this.mqClientInstance.getMQClientAPIImpl().fetchConsumeStatsInBroker(brokerAddr,
 isOrder, timeoutMillis);
+    }
+
+    @Override
+    public Set<String> getTopicClusterList(final String topic) throws 
InterruptedException, MQBrokerException, MQClientException,
+            RemotingException {
+        Set<String> clusterSet = new HashSet<String>();
+        ClusterInfo clusterInfo = examineBrokerClusterInfo();
+        TopicRouteData topicRouteData = examineTopicRouteInfo(topic);
+        BrokerData brokerData = topicRouteData.getBrokerDatas().get(0);
+        String brokerName = brokerData.getBrokerName();
+        Iterator<Map.Entry<String, Set<String>>> it = 
clusterInfo.getClusterAddrTable().entrySet().iterator();
+        while (it.hasNext()) {
+            Map.Entry<String, Set<String>> next = it.next();
+            if (next.getValue().contains(brokerName)) {
+                clusterSet.add(next.getKey());
+            }
+        }
+        return clusterSet;
+    }
+
+    @Override
+    public SubscriptionGroupWrapper getAllSubscriptionGroup(final String 
brokerAddr, long timeoutMillis) throws InterruptedException,
+            RemotingTimeoutException, RemotingSendRequestException, 
RemotingConnectException, MQBrokerException {
+        return 
this.mqClientInstance.getMQClientAPIImpl().getAllSubscriptionGroup(brokerAddr, 
timeoutMillis);
+    }
+
+    @Override
+    public TopicConfigSerializeWrapper getAllTopicGroup(final String 
brokerAddr, long timeoutMillis) throws InterruptedException,
+            RemotingTimeoutException, RemotingSendRequestException, 
RemotingConnectException, MQBrokerException {
+        return 
this.mqClientInstance.getMQClientAPIImpl().getAllTopicConfig(brokerAddr, 
timeoutMillis);
+    }
+
+    @Override
+    public void createTopic(String key, String newTopic, int queueNum) throws 
MQClientException {
+        createTopic(key, newTopic, queueNum, 0);
+    }
+
+    @Override
+    public void createTopic(String key, String newTopic, int queueNum, int 
topicSysFlag) throws MQClientException {
+        this.mqClientInstance.getMQAdminImpl().createTopic(key, newTopic, 
queueNum, topicSysFlag);
+    }
+
+    @Override
+    public long searchOffset(MessageQueue mq, long timestamp) throws 
MQClientException {
+        return this.mqClientInstance.getMQAdminImpl().searchOffset(mq, 
timestamp);
+    }
+
+    @Override
+    public long maxOffset(MessageQueue mq) throws MQClientException {
+        return this.mqClientInstance.getMQAdminImpl().maxOffset(mq);
+    }
+
+    @Override
+    public long minOffset(MessageQueue mq) throws MQClientException {
+        return this.mqClientInstance.getMQAdminImpl().minOffset(mq);
+    }
+
+    @Override
+    public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException 
{
+        return this.mqClientInstance.getMQAdminImpl().earliestMsgStoreTime(mq);
+    }
+
+    @Override
+    public MessageExt viewMessage(String msgId) throws RemotingException, 
MQBrokerException, InterruptedException, MQClientException {
+        return this.mqClientInstance.getMQAdminImpl().viewMessage(msgId);
+    }
+
+    @Override
+    public QueryResult queryMessage(String topic, String key, int maxNum, long 
begin, long end) throws MQClientException,
+            InterruptedException {
+        return this.mqClientInstance.getMQAdminImpl().queryMessage(topic, key, 
maxNum, begin, end);
+    }
+
+    @Override
+    public void updateConsumeOffset(String brokerAddr, String consumeGroup, 
MessageQueue mq, long offset) throws RemotingException, InterruptedException, 
MQBrokerException {
+        UpdateConsumerOffsetRequestHeader requestHeader = new 
UpdateConsumerOffsetRequestHeader();
+        requestHeader.setConsumerGroup(consumeGroup);
+        requestHeader.setTopic(mq.getTopic());
+        requestHeader.setQueueId(mq.getQueueId());
+        requestHeader.setCommitOffset(offset);
+        
this.mqClientInstance.getMQClientAPIImpl().updateConsumerOffset(brokerAddr, 
requestHeader, timeoutMillis);
+    }
+
+    @Override
+    public void updateNameServerConfig(final Properties properties, final 
List<String> nameServers)
+            throws InterruptedException, RemotingConnectException,
+            UnsupportedEncodingException, RemotingSendRequestException, 
RemotingTimeoutException,
+            MQClientException, MQBrokerException {
+        
this.mqClientInstance.getMQClientAPIImpl().updateNameServerConfig(properties, 
nameServers, timeoutMillis);
+    }
+
+    @Override
+    public Map<String, Properties> getNameServerConfig(final List<String> 
nameServers)
+            throws InterruptedException, RemotingTimeoutException,
+            RemotingSendRequestException, RemotingConnectException, 
MQClientException,
+            UnsupportedEncodingException {
+        return 
this.mqClientInstance.getMQClientAPIImpl().getNameServerConfig(nameServers, 
timeoutMillis);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/MQAdminExt.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/MQAdminExt.java 
b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/MQAdminExt.java
new file mode 100644
index 0000000..0075983
--- /dev/null
+++ 
b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/MQAdminExt.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.tools.admin;
+
+import com.alibaba.rocketmq.client.MQAdmin;
+import com.alibaba.rocketmq.client.exception.MQBrokerException;
+import com.alibaba.rocketmq.client.exception.MQClientException;
+import com.alibaba.rocketmq.common.TopicConfig;
+import com.alibaba.rocketmq.common.admin.ConsumeStats;
+import com.alibaba.rocketmq.common.admin.RollbackStats;
+import com.alibaba.rocketmq.common.admin.TopicStatsTable;
+import com.alibaba.rocketmq.common.message.MessageExt;
+import com.alibaba.rocketmq.common.message.MessageQueue;
+import com.alibaba.rocketmq.common.protocol.body.*;
+import com.alibaba.rocketmq.common.protocol.route.TopicRouteData;
+import com.alibaba.rocketmq.common.subscription.SubscriptionGroupConfig;
+import com.alibaba.rocketmq.remoting.exception.*;
+import com.alibaba.rocketmq.tools.admin.api.MessageTrack;
+
+import java.io.UnsupportedEncodingException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+public interface MQAdminExt extends MQAdmin {
+    void start() throws MQClientException;
+
+    void shutdown();
+
+    void updateBrokerConfig(final String brokerAddr, final Properties 
properties) throws RemotingConnectException,
+            RemotingSendRequestException, RemotingTimeoutException, 
UnsupportedEncodingException, InterruptedException, MQBrokerException;
+
+    Properties getBrokerConfig(final String brokerAddr) throws 
RemotingConnectException,
+            RemotingSendRequestException, RemotingTimeoutException, 
UnsupportedEncodingException, InterruptedException, MQBrokerException;
+
+    void createAndUpdateTopicConfig(final String addr, final TopicConfig 
config) throws RemotingException, MQBrokerException,
+            InterruptedException, MQClientException;
+
+    void createAndUpdateSubscriptionGroupConfig(final String addr, final 
SubscriptionGroupConfig config) throws RemotingException,
+            MQBrokerException, InterruptedException, MQClientException;
+
+    SubscriptionGroupConfig examineSubscriptionGroupConfig(final String addr, 
final String group);
+
+    TopicConfig examineTopicConfig(final String addr, final String topic);
+
+    TopicStatsTable examineTopicStats(final String topic) throws 
RemotingException, MQClientException, InterruptedException,
+            MQBrokerException;
+
+    TopicList fetchAllTopicList() throws RemotingException, MQClientException, 
InterruptedException;
+
+    TopicList fetchTopicsByCLuster(String clusterName) throws 
RemotingException, MQClientException, InterruptedException;
+
+    KVTable fetchBrokerRuntimeStats(final String brokerAddr) throws 
RemotingConnectException, RemotingSendRequestException,
+            RemotingTimeoutException, InterruptedException, MQBrokerException;
+
+    ConsumeStats examineConsumeStats(final String consumerGroup) throws 
RemotingException, MQClientException, InterruptedException,
+            MQBrokerException;
+
+    ConsumeStats examineConsumeStats(final String consumerGroup, final String 
topic) throws RemotingException, MQClientException,
+            InterruptedException, MQBrokerException;
+
+    ClusterInfo examineBrokerClusterInfo() throws InterruptedException, 
MQBrokerException, RemotingTimeoutException,
+            RemotingSendRequestException, RemotingConnectException;
+
+    TopicRouteData examineTopicRouteInfo(final String topic) throws 
RemotingException, MQClientException, InterruptedException;
+
+    ConsumerConnection examineConsumerConnectionInfo(final String 
consumerGroup) throws RemotingConnectException,
+            RemotingSendRequestException, RemotingTimeoutException, 
InterruptedException, MQBrokerException, RemotingException,
+            MQClientException;
+
+    ProducerConnection examineProducerConnectionInfo(final String 
producerGroup, final String topic) throws RemotingException,
+            MQClientException, InterruptedException, MQBrokerException;
+
+    List<String> getNameServerAddressList();
+
+    int wipeWritePermOfBroker(final String namesrvAddr, String brokerName) 
throws RemotingCommandException,
+            RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, InterruptedException, MQClientException;
+
+    void putKVConfig(final String namespace, final String key, final String 
value);
+
+    String getKVConfig(final String namespace, final String key) throws 
RemotingException, MQClientException, InterruptedException;
+
+    KVTable getKVListByNamespace(final String namespace) throws 
RemotingException, MQClientException, InterruptedException;
+
+    void deleteTopicInBroker(final Set<String> addrs, final String topic) 
throws RemotingException, MQBrokerException,
+            InterruptedException, MQClientException;
+
+    void deleteTopicInNameServer(final Set<String> addrs, final String topic) 
throws RemotingException, MQBrokerException,
+            InterruptedException, MQClientException;
+
+    void deleteSubscriptionGroup(final String addr, String groupName) throws 
RemotingException, MQBrokerException,
+            InterruptedException, MQClientException;
+
+    void createAndUpdateKvConfig(String namespace, String key, String value) 
throws RemotingException, MQBrokerException,
+            InterruptedException, MQClientException;
+
+    void deleteKvConfig(String namespace, String key) throws 
RemotingException, MQBrokerException, InterruptedException,
+            MQClientException;
+
+    List<RollbackStats> resetOffsetByTimestampOld(String consumerGroup, String 
topic, long timestamp, boolean force)
+            throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException;
+
+    Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String group, 
long timestamp, boolean isForce)
+            throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException;
+
+    void resetOffsetNew(String consumerGroup, String topic, long timestamp) 
throws RemotingException, MQBrokerException,
+            InterruptedException, MQClientException;
+
+    Map<String, Map<MessageQueue, Long>> getConsumeStatus(String topic, String 
group, String clientAddr) throws RemotingException,
+            MQBrokerException, InterruptedException, MQClientException;
+
+    void createOrUpdateOrderConf(String key, String value, boolean isCluster) 
throws RemotingException, MQBrokerException,
+            InterruptedException, MQClientException;
+
+    GroupList queryTopicConsumeByWho(final String topic) throws 
RemotingConnectException, RemotingSendRequestException,
+            RemotingTimeoutException, InterruptedException, MQBrokerException, 
RemotingException, MQClientException;
+
+    List<QueueTimeSpan> queryConsumeTimeSpan(final String topic, final String 
group) throws InterruptedException, MQBrokerException,
+            RemotingException, MQClientException;
+
+    boolean cleanExpiredConsumerQueue(String cluster) throws 
RemotingConnectException, RemotingSendRequestException,
+            RemotingTimeoutException, MQClientException, InterruptedException;
+
+    boolean cleanExpiredConsumerQueueByAddr(String addr) throws 
RemotingConnectException, RemotingSendRequestException,
+            RemotingTimeoutException, MQClientException, InterruptedException;
+
+    boolean cleanUnusedTopic(String cluster) throws RemotingConnectException, 
RemotingSendRequestException,
+            RemotingTimeoutException, MQClientException, InterruptedException;
+
+
+    boolean cleanUnusedTopicByAddr(String addr) throws 
RemotingConnectException, RemotingSendRequestException,
+            RemotingTimeoutException, MQClientException, InterruptedException;
+
+    ConsumerRunningInfo getConsumerRunningInfo(final String consumerGroup, 
final String clientId, final boolean jstack)
+            throws RemotingException, MQClientException, InterruptedException;
+
+    ConsumeMessageDirectlyResult consumeMessageDirectly(String consumerGroup,
+                                                        String clientId,
+                                                        String msgId) throws 
RemotingException, MQClientException, InterruptedException, MQBrokerException;
+
+    ConsumeMessageDirectlyResult consumeMessageDirectly(String consumerGroup,
+                                                        String clientId,
+                                                        String topic,
+                                                        String msgId) throws 
RemotingException, MQClientException, InterruptedException, MQBrokerException;
+
+    List<MessageTrack> messageTrackDetail(MessageExt msg) throws 
RemotingException, MQClientException, InterruptedException,
+            MQBrokerException;
+
+    void cloneGroupOffset(String srcGroup, String destGroup, String topic, 
boolean isOffline) throws RemotingException,
+            MQClientException, InterruptedException, MQBrokerException;
+
+    BrokerStatsData viewBrokerStatsData(final String brokerAddr, final String 
statsName, final String statsKey)
+            throws RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, MQClientException,
+            InterruptedException;
+
+    Set<String> getClusterList(final String topic) throws 
RemotingConnectException, RemotingSendRequestException,
+            RemotingTimeoutException, MQClientException, InterruptedException;
+
+    ConsumeStatsList fetchConsumeStatsInBroker(final String brokerAddr, 
boolean isOrder, long timeoutMillis) throws RemotingConnectException, 
RemotingSendRequestException,
+            RemotingTimeoutException, MQClientException, InterruptedException;
+
+    Set<String> getTopicClusterList(final String topic) throws 
InterruptedException, MQBrokerException, MQClientException, RemotingException;
+
+    SubscriptionGroupWrapper getAllSubscriptionGroup(final String brokerAddr, 
long timeoutMillis) throws InterruptedException, RemotingTimeoutException, 
RemotingSendRequestException,
+            RemotingConnectException, MQBrokerException;
+
+    TopicConfigSerializeWrapper getAllTopicGroup(final String brokerAddr, long 
timeoutMillis) throws InterruptedException, RemotingTimeoutException, 
RemotingSendRequestException,
+            RemotingConnectException, MQBrokerException;
+
+    void updateConsumeOffset(String brokerAddr, String consumeGroup, 
MessageQueue mq, long offset) throws RemotingException, InterruptedException, 
MQBrokerException;
+
+    /**
+     * Update name server config.
+     * <br>
+     * Command Code : RequestCode.UPDATE_NAMESRV_CONFIG
+     *
+     * <br> If param(nameServers) is null or empty, will use name servers from 
ns!
+     *
+     * @param properties
+     * @param nameServers
+     *
+     * @throws InterruptedException
+     * @throws RemotingConnectException
+     * @throws UnsupportedEncodingException
+     * @throws RemotingSendRequestException
+     * @throws RemotingTimeoutException
+     * @throws MQClientException
+     * @throws MQBrokerException
+     */
+    void updateNameServerConfig(final Properties properties, final 
List<String> nameServers) throws InterruptedException, RemotingConnectException,
+            UnsupportedEncodingException, RemotingSendRequestException, 
RemotingTimeoutException,
+            MQClientException, MQBrokerException;
+
+    /**
+     * Get name server config.
+     * <br>
+     * Command Code : RequestCode.GET_NAMESRV_CONFIG
+     * <br> If param(nameServers) is null or empty, will use name servers from 
ns!
+     *
+     * @param nameServers
+     *
+     * @return The fetched name server config
+     *
+     * @throws InterruptedException
+     * @throws RemotingTimeoutException
+     * @throws RemotingSendRequestException
+     * @throws RemotingConnectException
+     * @throws MQClientException
+     * @throws UnsupportedEncodingException
+     */
+    Map<String, Properties> getNameServerConfig(final List<String> 
nameServers) throws InterruptedException,
+            RemotingTimeoutException, RemotingSendRequestException, 
RemotingConnectException,
+            MQClientException, UnsupportedEncodingException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/api/MessageTrack.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/api/MessageTrack.java
 
b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/api/MessageTrack.java
new file mode 100644
index 0000000..bdc7288
--- /dev/null
+++ 
b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/api/MessageTrack.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.tools.admin.api;
+
+public class MessageTrack {
+    private String consumerGroup;
+    private TrackType trackType;
+    private String exceptionDesc;
+
+
+    public String getConsumerGroup() {
+        return consumerGroup;
+    }
+
+
+    public void setConsumerGroup(String consumerGroup) {
+        this.consumerGroup = consumerGroup;
+    }
+
+
+    public TrackType getTrackType() {
+        return trackType;
+    }
+
+
+    public void setTrackType(TrackType trackType) {
+        this.trackType = trackType;
+    }
+
+
+    public String getExceptionDesc() {
+        return exceptionDesc;
+    }
+
+
+    public void setExceptionDesc(String exceptionDesc) {
+        this.exceptionDesc = exceptionDesc;
+    }
+
+
+    @Override
+    public String toString() {
+        return "MessageTrack [consumerGroup=" + consumerGroup + ", trackType=" 
+ trackType
+                + ", exceptionDesc=" + exceptionDesc + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/api/TrackType.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/api/TrackType.java
 
b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/api/TrackType.java
new file mode 100644
index 0000000..ca475ac
--- /dev/null
+++ 
b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/api/TrackType.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.tools.admin.api;
+
+public enum TrackType {
+    CONSUMED,
+    CONSUMED_BUT_FILTERED,
+    PULL,
+    NOT_CONSUME_YET,
+    NOT_ONLINE,
+    UNKNOWN
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/CommandUtil.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/CommandUtil.java
 
b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/CommandUtil.java
new file mode 100644
index 0000000..1b5d264
--- /dev/null
+++ 
b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/CommandUtil.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.tools.command;
+
+import com.alibaba.rocketmq.client.exception.MQBrokerException;
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.common.protocol.body.ClusterInfo;
+import com.alibaba.rocketmq.common.protocol.route.BrokerData;
+import com.alibaba.rocketmq.remoting.exception.RemotingConnectException;
+import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException;
+import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException;
+import com.alibaba.rocketmq.tools.admin.MQAdminExt;
+
+import java.util.*;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class CommandUtil {
+
+    public static Map<String/*master addr*/, List<String>/*slave addr*/> 
fetchMasterAndSlaveDistinguish(
+            final MQAdminExt adminExt, final String clusterName)
+            throws InterruptedException, RemotingConnectException,
+            RemotingTimeoutException, RemotingSendRequestException,
+            MQBrokerException {
+        Map<String, List<String>> masterAndSlaveMap = new HashMap<String, 
List<String>>(4);
+
+        ClusterInfo clusterInfoSerializeWrapper = 
adminExt.examineBrokerClusterInfo();
+        Set<String> brokerNameSet = 
clusterInfoSerializeWrapper.getClusterAddrTable().get(clusterName);
+
+        if (brokerNameSet == null) {
+            System.out
+                    .printf("[error] Make sure the specified clusterName 
exists or the nameserver which connected is correct.");
+            return masterAndSlaveMap;
+        }
+
+        for (String brokerName : brokerNameSet) {
+            BrokerData brokerData = 
clusterInfoSerializeWrapper.getBrokerAddrTable().get(brokerName);
+
+            if (brokerData == null || brokerData.getBrokerAddrs() == null) {
+                continue;
+            }
+
+            String masterAddr = 
brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
+            masterAndSlaveMap.put(masterAddr, new ArrayList<String>());
+
+            for (Long id : brokerData.getBrokerAddrs().keySet()) {
+                if (brokerData.getBrokerAddrs().get(id) == null
+                        || id.longValue() == MixAll.MASTER_ID) {
+                    continue;
+                }
+
+                
masterAndSlaveMap.get(masterAddr).add(brokerData.getBrokerAddrs().get(id));
+            }
+        }
+
+        return masterAndSlaveMap;
+    }
+
+    public static Set<String> fetchMasterAddrByClusterName(final MQAdminExt 
adminExt, final String clusterName)
+            throws InterruptedException, RemotingConnectException, 
RemotingTimeoutException,
+            RemotingSendRequestException, MQBrokerException {
+        Set<String> masterSet = new HashSet<String>();
+
+        ClusterInfo clusterInfoSerializeWrapper = 
adminExt.examineBrokerClusterInfo();
+
+        Set<String> brokerNameSet = 
clusterInfoSerializeWrapper.getClusterAddrTable().get(clusterName);
+
+        if (brokerNameSet != null) {
+            for (String brokerName : brokerNameSet) {
+                BrokerData brokerData = 
clusterInfoSerializeWrapper.getBrokerAddrTable().get(brokerName);
+                if (brokerData != null) {
+
+                    String addr = 
brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
+                    if (addr != null) {
+                        masterSet.add(addr);
+                    }
+                }
+            }
+        } else {
+            System.out
+                    .printf("[error] Make sure the specified clusterName 
exists or the nameserver which connected is correct.");
+        }
+
+        return masterSet;
+    }
+
+    public static Set<String> fetchMasterAndSlaveAddrByClusterName(final 
MQAdminExt adminExt, final String clusterName)
+            throws InterruptedException, RemotingConnectException, 
RemotingTimeoutException,
+            RemotingSendRequestException, MQBrokerException {
+        Set<String> masterSet = new HashSet<String>();
+
+        ClusterInfo clusterInfoSerializeWrapper = 
adminExt.examineBrokerClusterInfo();
+
+        Set<String> brokerNameSet = 
clusterInfoSerializeWrapper.getClusterAddrTable().get(clusterName);
+
+        if (brokerNameSet != null) {
+            for (String brokerName : brokerNameSet) {
+                BrokerData brokerData = 
clusterInfoSerializeWrapper.getBrokerAddrTable().get(brokerName);
+                if (brokerData != null) {
+                    final Collection<String> addrs = 
brokerData.getBrokerAddrs().values();
+                    masterSet.addAll(addrs);
+                }
+            }
+        } else {
+            System.out
+                    .printf("[error] Make sure the specified clusterName 
exists or the nameserver which connected is correct.");
+        }
+
+        return masterSet;
+    }
+
+
+    public static Set<String> fetchBrokerNameByClusterName(final MQAdminExt 
adminExt, final String clusterName)
+            throws Exception {
+        ClusterInfo clusterInfoSerializeWrapper = 
adminExt.examineBrokerClusterInfo();
+        Set<String> brokerNameSet = 
clusterInfoSerializeWrapper.getClusterAddrTable().get(clusterName);
+        if (brokerNameSet.isEmpty()) {
+            throw new Exception(
+                    "Make sure the specified clusterName exists or the 
nameserver which connected is correct.");
+        }
+        return brokerNameSet;
+    }
+
+
+    public static String fetchBrokerNameByAddr(final MQAdminExt adminExt, 
final String addr) throws Exception {
+        ClusterInfo clusterInfoSerializeWrapper = 
adminExt.examineBrokerClusterInfo();
+        HashMap<String/* brokerName */, BrokerData> brokerAddrTable =
+                clusterInfoSerializeWrapper.getBrokerAddrTable();
+        Iterator<Map.Entry<String, BrokerData>> it = 
brokerAddrTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Map.Entry<String, BrokerData> entry = it.next();
+            HashMap<Long, String> brokerAddrs = 
entry.getValue().getBrokerAddrs();
+            if (brokerAddrs.containsValue(addr))
+                return entry.getKey();
+        }
+        throw new Exception(
+                "Make sure the specified broker addr exists or the nameserver 
which connected is correct.");
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/MQAdminStartup.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/MQAdminStartup.java
 
b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/MQAdminStartup.java
new file mode 100644
index 0000000..56ecc59
--- /dev/null
+++ 
b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/MQAdminStartup.java
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.tools.command;
+
+import ch.qos.logback.classic.LoggerContext;
+import ch.qos.logback.classic.joran.JoranConfigurator;
+import ch.qos.logback.core.joran.spi.JoranException;
+import com.alibaba.rocketmq.common.MQVersion;
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
+import com.alibaba.rocketmq.srvutil.ServerUtil;
+import com.alibaba.rocketmq.tools.command.broker.*;
+import com.alibaba.rocketmq.tools.command.cluster.CLusterSendMsgRTCommand;
+import com.alibaba.rocketmq.tools.command.cluster.ClusterListSubCommand;
+import 
com.alibaba.rocketmq.tools.command.connection.ConsumerConnectionSubCommand;
+import 
com.alibaba.rocketmq.tools.command.connection.ProducerConnectionSubCommand;
+import com.alibaba.rocketmq.tools.command.consumer.*;
+import com.alibaba.rocketmq.tools.command.message.*;
+import com.alibaba.rocketmq.tools.command.namesrv.*;
+import com.alibaba.rocketmq.tools.command.offset.CloneGroupOffsetCommand;
+import com.alibaba.rocketmq.tools.command.offset.ResetOffsetByTimeCommand;
+import com.alibaba.rocketmq.tools.command.stats.StatsAllSubCommand;
+import com.alibaba.rocketmq.tools.command.topic.*;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class MQAdminStartup {
+    protected static List<SubCommand> subCommandList = new 
ArrayList<SubCommand>();
+
+    public static void main(String[] args) {
+        main0(args, null);
+    }
+
+    public static void main0(String[] args, RPCHook rpcHook) {
+        System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, 
Integer.toString(MQVersion.CURRENT_VERSION));
+
+
+        //PackageConflictDetect.detectFastjson();
+
+        initCommand();
+
+        try {
+            initLogback();
+            switch (args.length) {
+                case 0:
+                    printHelp();
+                    break;
+                case 2:
+                    if (args[0].equals("help")) {
+                        SubCommand cmd = findSubCommand(args[1]);
+                        if (cmd != null) {
+                            Options options = 
ServerUtil.buildCommandlineOptions(new Options());
+                            options = cmd.buildCommandlineOptions(options);
+                            if (options != null) {
+                                ServerUtil.printCommandLineHelp("mqadmin " + 
cmd.commandName(), options);
+                            }
+                        } else {
+                            System.out.printf("The sub command \'" + args[1] + 
"\' not exist.%n");
+                        }
+                        break;
+                    }
+                case 1:
+                default:
+                    SubCommand cmd = findSubCommand(args[0]);
+                    if (cmd != null) {
+                        String[] subargs = parseSubArgs(args);
+
+
+                        Options options = 
ServerUtil.buildCommandlineOptions(new Options());
+                        final CommandLine commandLine =
+                                ServerUtil.parseCmdLine("mqadmin " + 
cmd.commandName(), subargs, cmd.buildCommandlineOptions(options),
+                                        new PosixParser());
+                        if (null == commandLine) {
+                            System.exit(-1);
+                            return;
+                        }
+
+                        if (commandLine.hasOption('n')) {
+                            String namesrvAddr = 
commandLine.getOptionValue('n');
+                            System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, 
namesrvAddr);
+                        }
+
+                        cmd.execute(commandLine, options, rpcHook);
+                    } else {
+                        System.out.printf("The sub command \'" + args[0] + "\' 
not exist.%n");
+                    }
+                    break;
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    public static void initCommand() {
+        initCommand(new UpdateTopicSubCommand());
+        initCommand(new DeleteTopicSubCommand());
+        initCommand(new UpdateSubGroupSubCommand());
+        initCommand(new DeleteSubscriptionGroupCommand());
+        initCommand(new UpdateBrokerConfigSubCommand());
+        initCommand(new UpdateTopicPermSubCommand());
+
+        initCommand(new TopicRouteSubCommand());
+        initCommand(new TopicStatusSubCommand());
+        initCommand(new TopicClusterSubCommand());
+
+
+        initCommand(new BrokerStatusSubCommand());
+        initCommand(new QueryMsgByIdSubCommand());
+        initCommand(new QueryMsgByKeySubCommand());
+        initCommand(new QueryMsgByUniqueKeySubCommand());
+        initCommand(new QueryMsgByOffsetSubCommand());
+        initCommand(new QueryMsgByUniqueKeySubCommand());
+        initCommand(new PrintMessageSubCommand());
+        initCommand(new PrintMessageByQueueCommand());
+        initCommand(new SendMsgStatusCommand());
+        initCommand(new BrokerConsumeStatsSubCommad());
+
+
+        initCommand(new ProducerConnectionSubCommand());
+        initCommand(new ConsumerConnectionSubCommand());
+        initCommand(new ConsumerProgressSubCommand());
+        initCommand(new ConsumerStatusSubCommand());
+        initCommand(new CloneGroupOffsetCommand());
+
+        initCommand(new ClusterListSubCommand());
+        initCommand(new TopicListSubCommand());
+
+        initCommand(new UpdateKvConfigCommand());
+        initCommand(new DeleteKvConfigCommand());
+
+        initCommand(new WipeWritePermSubCommand());
+        initCommand(new ResetOffsetByTimeCommand());
+
+        initCommand(new UpdateOrderConfCommand());
+        initCommand(new CleanExpiredCQSubCommand());
+        initCommand(new CleanUnusedTopicCommand());
+
+        initCommand(new StartMonitoringSubCommand());
+        initCommand(new StatsAllSubCommand());
+
+        initCommand(new AllocateMQSubCommand());
+
+        initCommand(new CheckMsgSendRTCommand());
+        initCommand(new CLusterSendMsgRTCommand());
+
+        initCommand(new GetNamesrvConfigCommand());
+        initCommand(new UpdateNamesrvConfigCommand());
+        initCommand(new GetBrokerConfigCommand());
+    }
+
+    private static void initLogback() throws JoranException {
+        String rocketmqHome = 
System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, 
System.getenv(MixAll.ROCKETMQ_HOME_ENV));
+
+        LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
+        JoranConfigurator configurator = new JoranConfigurator();
+        configurator.setContext(lc);
+        lc.reset();
+        configurator.doConfigure(rocketmqHome + "/conf/logback_tools.xml");
+    }
+
+    private static void printHelp() {
+        System.out.printf("The most commonly used mqadmin commands are:%n");
+
+        for (SubCommand cmd : subCommandList) {
+            System.out.printf("   %-20s %s%n", cmd.commandName(), 
cmd.commandDesc());
+        }
+
+        System.out.printf("%nSee 'mqadmin help <command>' for more information 
on a specific command.");
+    }
+
+    private static SubCommand findSubCommand(final String name) {
+        for (SubCommand cmd : subCommandList) {
+            if (cmd.commandName().toUpperCase().equals(name.toUpperCase())) {
+                return cmd;
+            }
+        }
+
+        return null;
+    }
+
+    private static String[] parseSubArgs(String[] args) {
+        if (args.length > 1) {
+            String[] result = new String[args.length - 1];
+            for (int i = 0; i < args.length - 1; i++) {
+                result[i] = args[i + 1];
+            }
+            return result;
+        }
+        return null;
+    }
+
+    public static void initCommand(SubCommand command) {
+        subCommandList.add(command);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/SubCommand.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/SubCommand.java
 
b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/SubCommand.java
new file mode 100644
index 0000000..e28b7bc
--- /dev/null
+++ 
b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/SubCommand.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.tools.command;
+
+import com.alibaba.rocketmq.remoting.RPCHook;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+
+
+/**
+ * @author shijia.wxr
+ */
+public interface SubCommand {
+    public String commandName();
+
+
+    public String commandDesc();
+
+
+    public Options buildCommandlineOptions(final Options options);
+
+
+    public void execute(final CommandLine commandLine, final Options options, 
RPCHook rpcHook);
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/BrokerConsumeStatsSubCommad.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/BrokerConsumeStatsSubCommad.java
 
b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/BrokerConsumeStatsSubCommad.java
new file mode 100644
index 0000000..0f24df7
--- /dev/null
+++ 
b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/BrokerConsumeStatsSubCommad.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.tools.command.broker;
+
+import com.alibaba.rocketmq.common.UtilAll;
+import com.alibaba.rocketmq.common.admin.ConsumeStats;
+import com.alibaba.rocketmq.common.admin.OffsetWrapper;
+import com.alibaba.rocketmq.common.message.MessageQueue;
+import com.alibaba.rocketmq.common.protocol.body.ConsumeStatsList;
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt;
+import com.alibaba.rocketmq.tools.command.SubCommand;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+
+import java.util.*;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class BrokerConsumeStatsSubCommad implements SubCommand {
+
+    @Override
+    public String commandName() {
+        return "brokerConsumeStats";
+    }
+
+    @Override
+    public String commandDesc() {
+        return "Fetch broker consume stats data";
+    }
+
+    @Override
+    public Options buildCommandlineOptions(Options options) {
+        Option opt = new Option("b", "brokerAddr", true, "Broker address");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option("t", "timeoutMillis", true, "request timeout Millis");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("l", "level", true, "threshold of print diff");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("o", "order", true, "order topic");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        return options;
+    }
+
+    @Override
+    public void execute(CommandLine commandLine, Options options, RPCHook 
rpcHook) {
+        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+        
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+        try {
+            defaultMQAdminExt.start();
+            String brokerAddr = commandLine.getOptionValue('b').trim();
+            boolean isOrder = false;
+            long timeoutMillis = 50000;
+            long diffLevel = 0;
+            if (commandLine.hasOption('o')) {
+                isOrder = 
Boolean.parseBoolean(commandLine.getOptionValue('o').trim());
+            }
+            if (commandLine.hasOption('t')) {
+                timeoutMillis = 
Long.parseLong(commandLine.getOptionValue('t').trim());
+            }
+            if (commandLine.hasOption('l')) {
+                diffLevel = 
Long.parseLong(commandLine.getOptionValue('l').trim());
+            }
+
+            ConsumeStatsList consumeStatsList = 
defaultMQAdminExt.fetchConsumeStatsInBroker(brokerAddr, isOrder, timeoutMillis);
+            System.out.printf("%-32s  %-32s  %-32s  %-4s  %-20s  %-20s  %-20s  
%s%n",
+                    "#Topic",
+                    "#Group",
+                    "#Broker Name",
+                    "#QID",
+                    "#Broker Offset",
+                    "#Consumer Offset",
+                    "#Diff",
+                    "#LastTime");
+            for (Map<String, List<ConsumeStats>> map : 
consumeStatsList.getConsumeStatsList()) {
+                for (Map.Entry<String, List<ConsumeStats>> entry : 
map.entrySet()) {
+                    String group = entry.getKey();
+                    List<ConsumeStats> consumeStatsArray = entry.getValue();
+                    for (ConsumeStats consumeStats : consumeStatsArray) {
+                        List<MessageQueue> mqList = new 
LinkedList<MessageQueue>();
+                        mqList.addAll(consumeStats.getOffsetTable().keySet());
+                        Collections.sort(mqList);
+                        for (MessageQueue mq : mqList) {
+                            OffsetWrapper offsetWrapper = 
consumeStats.getOffsetTable().get(mq);
+                            long diff = offsetWrapper.getBrokerOffset() - 
offsetWrapper.getConsumerOffset();
+
+                            if (diff < diffLevel) {
+                                continue;
+                            }
+                            String lastTime = "-";
+                            try {
+                                lastTime = UtilAll.formatDate(new 
Date(offsetWrapper.getLastTimestamp()), UtilAll.YYYY_MM_DD_HH_MM_SS);
+                            } catch (Exception e) {
+
+                            }
+                            if (offsetWrapper.getLastTimestamp() > 0)
+                                System.out.printf("%-32s  %-32s  %-32s  %-4d  
%-20d  %-20d  %-20d  %s%n",
+                                        
UtilAll.frontStringAtLeast(mq.getTopic(), 32),
+                                        group,
+                                        
UtilAll.frontStringAtLeast(mq.getBrokerName(), 32),
+                                        mq.getQueueId(),
+                                        offsetWrapper.getBrokerOffset(),
+                                        offsetWrapper.getConsumerOffset(),
+                                        diff,
+                                        lastTime
+                                );
+                        }
+                    }
+                }
+            }
+            System.out.printf("%nDiff Total: %d%n", 
consumeStatsList.getTotalDiff());
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            defaultMQAdminExt.shutdown();
+        }
+    }
+}


Reply via email to