http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/2a8ff251/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/client/MQAdminExtImpl.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/client/MQAdminExtImpl.java
 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/client/MQAdminExtImpl.java
deleted file mode 100644
index 0eba3a5..0000000
--- 
a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/client/MQAdminExtImpl.java
+++ /dev/null
@@ -1,501 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.rocketmq.console.service.client;
-
-import com.google.common.base.Throwables;
-import java.io.UnsupportedEncodingException;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import org.apache.rocketmq.client.QueryResult;
-import org.apache.rocketmq.client.exception.MQBrokerException;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.impl.MQAdminImpl;
-import org.apache.rocketmq.common.TopicConfig;
-import org.apache.rocketmq.common.admin.ConsumeStats;
-import org.apache.rocketmq.common.admin.RollbackStats;
-import org.apache.rocketmq.common.admin.TopicStatsTable;
-import org.apache.rocketmq.common.message.MessageClientIDSetter;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.common.protocol.RequestCode;
-import org.apache.rocketmq.common.protocol.ResponseCode;
-import org.apache.rocketmq.common.protocol.body.BrokerStatsData;
-import org.apache.rocketmq.common.protocol.body.ClusterInfo;
-import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
-import org.apache.rocketmq.common.protocol.body.ConsumeStatsList;
-import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
-import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
-import org.apache.rocketmq.common.protocol.body.GroupList;
-import org.apache.rocketmq.common.protocol.body.KVTable;
-import org.apache.rocketmq.common.protocol.body.ProducerConnection;
-import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
-import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
-import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
-import org.apache.rocketmq.common.protocol.body.TopicList;
-import org.apache.rocketmq.common.protocol.route.TopicRouteData;
-import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
-import org.apache.rocketmq.console.util.JsonUtil;
-import org.apache.rocketmq.remoting.RemotingClient;
-import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-import org.apache.rocketmq.remoting.exception.RemotingConnectException;
-import org.apache.rocketmq.remoting.exception.RemotingException;
-import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
-import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
-import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-import org.apache.rocketmq.tools.admin.MQAdminExt;
-import org.apache.rocketmq.tools.admin.api.MessageTrack;
-import org.joor.Reflect;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.stereotype.Service;
-
-import static 
org.apache.rocketmq.remoting.protocol.RemotingSerializable.decode;
-
-@Service
-public class MQAdminExtImpl implements MQAdminExt {
-    private Logger logger = LoggerFactory.getLogger(MQAdminExtImpl.class);
-
-    public MQAdminExtImpl() {
-    }
-
-    @Override
-    public void updateBrokerConfig(String brokerAddr, Properties properties)
-        throws RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException,
-        UnsupportedEncodingException, InterruptedException, MQBrokerException {
-        MQAdminInstance.threadLocalMQAdminExt().updateBrokerConfig(brokerAddr, 
properties);
-    }
-
-    @Override
-    public void createAndUpdateTopicConfig(String addr, TopicConfig config)
-        throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException {
-        
MQAdminInstance.threadLocalMQAdminExt().createAndUpdateTopicConfig(addr, 
config);
-    }
-
-    @Override
-    public void createAndUpdateSubscriptionGroupConfig(String addr, 
SubscriptionGroupConfig config)
-        throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException {
-        
MQAdminInstance.threadLocalMQAdminExt().createAndUpdateSubscriptionGroupConfig(addr,
 config);
-    }
-
-    @Override
-    public SubscriptionGroupConfig examineSubscriptionGroupConfig(String addr, 
String group) {
-        RemotingClient remotingClient = 
MQAdminInstance.threadLocalRemotingClient();
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG,
 null);
-        RemotingCommand response = null;
-        try {
-            response = remotingClient.invokeSync(addr, request, 3000);
-        }
-        catch (Exception err) {
-            throw Throwables.propagate(err);
-        }
-        assert response != null;
-        switch (response.getCode()) {
-            case ResponseCode.SUCCESS: {
-                SubscriptionGroupWrapper subscriptionGroupWrapper = 
decode(response.getBody(), SubscriptionGroupWrapper.class);
-                return 
subscriptionGroupWrapper.getSubscriptionGroupTable().get(group);
-            }
-            default:
-                throw Throwables.propagate(new 
MQBrokerException(response.getCode(), response.getRemark()));
-        }
-    }
-
-    @Override
-    public TopicConfig examineTopicConfig(String addr, String topic) {
-        RemotingClient remotingClient = 
MQAdminInstance.threadLocalRemotingClient();
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null);
-        RemotingCommand response = null;
-        try {
-            response = remotingClient.invokeSync(addr, request, 3000);
-        }
-        catch (Exception err) {
-            throw Throwables.propagate(err);
-        }
-        switch (response.getCode()) {
-            case ResponseCode.SUCCESS: {
-                TopicConfigSerializeWrapper topicConfigSerializeWrapper = 
decode(response.getBody(), TopicConfigSerializeWrapper.class);
-                return 
topicConfigSerializeWrapper.getTopicConfigTable().get(topic);
-            }
-            default:
-                throw Throwables.propagate(new 
MQBrokerException(response.getCode(), response.getRemark()));
-        }
-    }
-
-    @Override
-    public TopicStatsTable examineTopicStats(String topic)
-        throws RemotingException, MQClientException, InterruptedException, 
MQBrokerException {
-        return 
MQAdminInstance.threadLocalMQAdminExt().examineTopicStats(topic);
-    }
-
-    @Override
-    public TopicList fetchAllTopicList() throws RemotingException, 
MQClientException, InterruptedException {
-        TopicList topicList = 
MQAdminInstance.threadLocalMQAdminExt().fetchAllTopicList();
-        logger.debug("op=look={}", 
JsonUtil.obj2String(topicList.getTopicList()));
-        return topicList;
-    }
-
-    @Override
-    public KVTable fetchBrokerRuntimeStats(String brokerAddr)
-        throws RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException,
-        InterruptedException, MQBrokerException {
-        return 
MQAdminInstance.threadLocalMQAdminExt().fetchBrokerRuntimeStats(brokerAddr);
-    }
-
-    @Override
-    public ConsumeStats examineConsumeStats(String consumerGroup)
-        throws RemotingException, MQClientException, InterruptedException, 
MQBrokerException {
-        return 
MQAdminInstance.threadLocalMQAdminExt().examineConsumeStats(consumerGroup);
-    }
-
-    @Override
-    public ConsumeStats examineConsumeStats(String consumerGroup, String topic)
-        throws RemotingException, MQClientException, InterruptedException, 
MQBrokerException {
-        return 
MQAdminInstance.threadLocalMQAdminExt().examineConsumeStats(consumerGroup, 
topic);
-    }
-
-    @Override
-    public ClusterInfo examineBrokerClusterInfo()
-        throws InterruptedException, MQBrokerException, 
RemotingTimeoutException, RemotingSendRequestException,
-        RemotingConnectException {
-        return 
MQAdminInstance.threadLocalMQAdminExt().examineBrokerClusterInfo();
-    }
-
-    @Override
-    public TopicRouteData examineTopicRouteInfo(String topic)
-        throws RemotingException, MQClientException, InterruptedException {
-        return 
MQAdminInstance.threadLocalMQAdminExt().examineTopicRouteInfo(topic);
-    }
-
-    @Override
-    public ConsumerConnection examineConsumerConnectionInfo(String 
consumerGroup)
-        throws RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException,
-        InterruptedException, MQBrokerException, RemotingException, 
MQClientException {
-        return 
MQAdminInstance.threadLocalMQAdminExt().examineConsumerConnectionInfo(consumerGroup);
-    }
-
-    @Override
-    public ProducerConnection examineProducerConnectionInfo(String 
producerGroup, String topic)
-        throws RemotingException, MQClientException, InterruptedException, 
MQBrokerException {
-        return 
MQAdminInstance.threadLocalMQAdminExt().examineProducerConnectionInfo(producerGroup,
 topic);
-    }
-
-    @Override
-    public List<String> getNameServerAddressList() {
-        return 
MQAdminInstance.threadLocalMQAdminExt().getNameServerAddressList();
-    }
-
-    @Override
-    public int wipeWritePermOfBroker(String namesrvAddr, String brokerName)
-        throws RemotingCommandException, RemotingConnectException, 
RemotingSendRequestException,
-        RemotingTimeoutException, InterruptedException, MQClientException {
-        return 
MQAdminInstance.threadLocalMQAdminExt().wipeWritePermOfBroker(namesrvAddr, 
brokerName);
-    }
-
-    @Override
-    public void putKVConfig(String namespace, String key, String value) {
-        MQAdminInstance.threadLocalMQAdminExt().putKVConfig(namespace, key, 
value);
-    }
-
-    @Override
-    public String getKVConfig(String namespace, String key)
-        throws RemotingException, MQClientException, InterruptedException {
-        return MQAdminInstance.threadLocalMQAdminExt().getKVConfig(namespace, 
key);
-    }
-
-    @Override
-    public KVTable getKVListByNamespace(String namespace)
-        throws RemotingException, MQClientException, InterruptedException {
-        return 
MQAdminInstance.threadLocalMQAdminExt().getKVListByNamespace(namespace);
-    }
-
-    @Override
-    public void deleteTopicInBroker(Set<String> addrs, String topic)
-        throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException {
-        logger.info("addrs={} topic={}", JsonUtil.obj2String(addrs), topic);
-        MQAdminInstance.threadLocalMQAdminExt().deleteTopicInBroker(addrs, 
topic);
-    }
-
-    @Override
-    public void deleteTopicInNameServer(Set<String> addrs, String topic)
-        throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException {
-        MQAdminInstance.threadLocalMQAdminExt().deleteTopicInNameServer(addrs, 
topic);
-    }
-
-    @Override
-    public void deleteSubscriptionGroup(String addr, String groupName)
-        throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException {
-        MQAdminInstance.threadLocalMQAdminExt().deleteSubscriptionGroup(addr, 
groupName);
-    }
-
-    @Override
-    public void createAndUpdateKvConfig(String namespace, String key, String 
value)
-        throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException {
-        
MQAdminInstance.threadLocalMQAdminExt().createAndUpdateKvConfig(namespace, key, 
value);
-    }
-
-    @Override
-    public void deleteKvConfig(String namespace, String key)
-        throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException {
-        MQAdminInstance.threadLocalMQAdminExt().deleteKvConfig(namespace, key);
-    }
-
-    @Override
-    public List<RollbackStats> resetOffsetByTimestampOld(String consumerGroup, 
String topic, long timestamp,
-        boolean force) throws RemotingException, MQBrokerException, 
InterruptedException, MQClientException {
-        return 
MQAdminInstance.threadLocalMQAdminExt().resetOffsetByTimestampOld(consumerGroup,
 topic, timestamp, force);
-    }
-
-    @Override
-    public Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String 
group, long timestamp,
-        boolean isForce) throws RemotingException, MQBrokerException, 
InterruptedException, MQClientException {
-        return 
MQAdminInstance.threadLocalMQAdminExt().resetOffsetByTimestamp(topic, group, 
timestamp, isForce);
-    }
-
-    @Override
-    public void resetOffsetNew(String consumerGroup, String topic, long 
timestamp)
-        throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException {
-        MQAdminInstance.threadLocalMQAdminExt().resetOffsetNew(consumerGroup, 
topic, timestamp);
-    }
-
-    @Override
-    public Map<String, Map<MessageQueue, Long>> getConsumeStatus(String topic, 
String group,
-        String clientAddr) throws RemotingException, MQBrokerException, 
InterruptedException, MQClientException {
-        return MQAdminInstance.threadLocalMQAdminExt().getConsumeStatus(topic, 
group, clientAddr);
-    }
-
-    @Override
-    public void createOrUpdateOrderConf(String key, String value, boolean 
isCluster)
-        throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException {
-        MQAdminInstance.threadLocalMQAdminExt().createOrUpdateOrderConf(key, 
value, isCluster);
-    }
-
-    @Override
-    public GroupList queryTopicConsumeByWho(String topic)
-        throws RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException,
-        InterruptedException, MQBrokerException, RemotingException, 
MQClientException {
-        return 
MQAdminInstance.threadLocalMQAdminExt().queryTopicConsumeByWho(topic);
-    }
-
-    @Override
-    public boolean cleanExpiredConsumerQueue(String cluster)
-        throws RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, MQClientException,
-        InterruptedException {
-        return 
MQAdminInstance.threadLocalMQAdminExt().cleanExpiredConsumerQueue(cluster);
-    }
-
-    @Override
-    public boolean cleanExpiredConsumerQueueByAddr(String addr)
-        throws RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, MQClientException,
-        InterruptedException {
-        return 
MQAdminInstance.threadLocalMQAdminExt().cleanExpiredConsumerQueueByAddr(addr);
-    }
-
-    @Override
-    public ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, 
String clientId, boolean jstack)
-        throws RemotingException, MQClientException, InterruptedException {
-        return 
MQAdminInstance.threadLocalMQAdminExt().getConsumerRunningInfo(consumerGroup, 
clientId, jstack);
-    }
-
-    @Override
-    public ConsumeMessageDirectlyResult consumeMessageDirectly(String 
consumerGroup, String clientId,
-        String msgId) throws RemotingException, MQClientException, 
InterruptedException, MQBrokerException {
-        return 
MQAdminInstance.threadLocalMQAdminExt().consumeMessageDirectly(consumerGroup, 
clientId, msgId);
-    }
-
-    @Override
-    public List<MessageTrack> messageTrackDetail(MessageExt msg)
-        throws RemotingException, MQClientException, InterruptedException, 
MQBrokerException {
-        return MQAdminInstance.threadLocalMQAdminExt().messageTrackDetail(msg);
-    }
-
-    @Override
-    public void cloneGroupOffset(String srcGroup, String destGroup, String 
topic, boolean isOffline)
-        throws RemotingException, MQClientException, InterruptedException, 
MQBrokerException {
-        MQAdminInstance.threadLocalMQAdminExt().cloneGroupOffset(srcGroup, 
destGroup, topic, isOffline);
-    }
-
-    @Override
-    public void createTopic(String key, String newTopic, int queueNum) throws 
MQClientException {
-        MQAdminInstance.threadLocalMQAdminExt().createTopic(key, newTopic, 
queueNum);
-    }
-
-    @Override
-    public void createTopic(String key, String newTopic, int queueNum, int 
topicSysFlag)
-        throws MQClientException {
-        MQAdminInstance.threadLocalMQAdminExt().createTopic(key, newTopic, 
queueNum, topicSysFlag);
-    }
-
-    @Override
-    public long searchOffset(MessageQueue mq, long timestamp) throws 
MQClientException {
-        return MQAdminInstance.threadLocalMQAdminExt().searchOffset(mq, 
timestamp);
-    }
-
-    @Override
-    public long maxOffset(MessageQueue mq) throws MQClientException {
-        return MQAdminInstance.threadLocalMQAdminExt().maxOffset(mq);
-    }
-
-    @Override
-    public long minOffset(MessageQueue mq) throws MQClientException {
-        return MQAdminInstance.threadLocalMQAdminExt().minOffset(mq);
-    }
-
-    @Override
-    public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException 
{
-        return 
MQAdminInstance.threadLocalMQAdminExt().earliestMsgStoreTime(mq);
-    }
-
-    @Override
-    public MessageExt viewMessage(String msgId)
-        throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException {
-        return MQAdminInstance.threadLocalMQAdminExt().viewMessage(msgId);
-    }
-
-    @Override
-    public QueryResult queryMessage(String topic, String key, int maxNum, long 
begin, long end)
-        throws MQClientException, InterruptedException {
-        return MQAdminInstance.threadLocalMQAdminExt().queryMessage(topic, 
key, maxNum, begin, end);
-    }
-
-    @Override
-    @Deprecated
-    public void start() throws MQClientException {
-        throw new IllegalStateException("thisMethod is deprecated.use 
org.apache.rocketmq.console.aspect.admin.MQAdminAspect instead of this");
-    }
-
-    @Override
-    @Deprecated
-    public void shutdown() {
-        throw new IllegalStateException("thisMethod is deprecated.use 
org.apache.rocketmq.console.aspect.admin.MQAdminAspect instead of this");
-    }
-
-    // below is 3.2.6->3.5.8 updated
-
-    @Override
-    public List<QueueTimeSpan> queryConsumeTimeSpan(String topic,
-        String group) throws InterruptedException, MQBrokerException, 
RemotingException, MQClientException {
-        return 
MQAdminInstance.threadLocalMQAdminExt().queryConsumeTimeSpan(topic, group);
-    }
-
-    //MessageClientIDSetter.getNearlyTimeFromID has bug,so we subtract half a 
day
-    //next version we will remove it
-    //https://issues.apache.org/jira/browse/ROCKETMQ-111
-    //https://github.com/apache/incubator-rocketmq/pull/69
-    @Override
-    public MessageExt viewMessage(String topic,
-        String msgId) throws RemotingException, MQBrokerException, 
InterruptedException, MQClientException {
-        logger.info("MessageClientIDSetter.getNearlyTimeFromID(msgId)={} 
msgId={}", MessageClientIDSetter.getNearlyTimeFromID(msgId), msgId);
-        try {
-            return viewMessage(msgId);
-        }
-        catch (Exception e) {
-        }
-        MQAdminImpl mqAdminImpl = 
MQAdminInstance.threadLocalMqClientInstance().getMQAdminImpl();
-        QueryResult qr = Reflect.on(mqAdminImpl).call("queryMessage", topic, 
msgId, 32,
-            MessageClientIDSetter.getNearlyTimeFromID(msgId).getTime() - 1000 
* 60 * 60 * 13L, Long.MAX_VALUE, true).get();
-        if (qr != null && qr.getMessageList() != null && 
qr.getMessageList().size() > 0) {
-            return qr.getMessageList().get(0);
-        }
-        else {
-            return null;
-        }
-    }
-
-    @Override
-    public ConsumeMessageDirectlyResult consumeMessageDirectly(String 
consumerGroup, String clientId, String topic,
-        String msgId) throws RemotingException, MQClientException, 
InterruptedException, MQBrokerException {
-        return 
MQAdminInstance.threadLocalMQAdminExt().consumeMessageDirectly(consumerGroup, 
clientId, topic, msgId);
-    }
-
-    @Override
-    public Properties getBrokerConfig(
-        String brokerAddr) throws RemotingConnectException, 
RemotingSendRequestException, RemotingTimeoutException, 
UnsupportedEncodingException, InterruptedException, MQBrokerException {
-        return 
MQAdminInstance.threadLocalMQAdminExt().getBrokerConfig(brokerAddr);
-    }
-
-    @Override
-    public TopicList fetchTopicsByCLuster(
-        String clusterName) throws RemotingException, MQClientException, 
InterruptedException {
-        return 
MQAdminInstance.threadLocalMQAdminExt().fetchTopicsByCLuster(clusterName);
-    }
-
-    @Override
-    public boolean cleanUnusedTopic(
-        String cluster) throws RemotingConnectException, 
RemotingSendRequestException, RemotingTimeoutException, MQClientException, 
InterruptedException {
-        return 
MQAdminInstance.threadLocalMQAdminExt().cleanUnusedTopic(cluster);
-    }
-
-    @Override
-    public boolean cleanUnusedTopicByAddr(
-        String addr) throws RemotingConnectException, 
RemotingSendRequestException, RemotingTimeoutException, MQClientException, 
InterruptedException {
-        return 
MQAdminInstance.threadLocalMQAdminExt().cleanUnusedTopicByAddr(addr);
-    }
-
-    @Override
-    public BrokerStatsData viewBrokerStatsData(String brokerAddr, String 
statsName,
-        String statsKey) throws RemotingConnectException, 
RemotingSendRequestException, RemotingTimeoutException, MQClientException, 
InterruptedException {
-        return 
MQAdminInstance.threadLocalMQAdminExt().viewBrokerStatsData(brokerAddr, 
statsName, statsKey);
-    }
-
-    @Override
-    public Set<String> getClusterList(
-        String topic) throws RemotingConnectException, 
RemotingSendRequestException, RemotingTimeoutException, MQClientException, 
InterruptedException {
-        return MQAdminInstance.threadLocalMQAdminExt().getClusterList(topic);
-    }
-
-    @Override
-    public ConsumeStatsList fetchConsumeStatsInBroker(String brokerAddr, 
boolean isOrder,
-        long timeoutMillis) throws RemotingConnectException, 
RemotingSendRequestException, RemotingTimeoutException, MQClientException, 
InterruptedException {
-        return 
MQAdminInstance.threadLocalMQAdminExt().fetchConsumeStatsInBroker(brokerAddr, 
isOrder, timeoutMillis);
-    }
-
-    @Override
-    public Set<String> getTopicClusterList(
-        String topic) throws InterruptedException, MQBrokerException, 
MQClientException, RemotingException {
-        return 
MQAdminInstance.threadLocalMQAdminExt().getTopicClusterList(topic);
-    }
-
-    @Override
-    public SubscriptionGroupWrapper getAllSubscriptionGroup(String brokerAddr,
-        long timeoutMillis) throws InterruptedException, 
RemotingTimeoutException, RemotingSendRequestException, 
RemotingConnectException, MQBrokerException {
-        return 
MQAdminInstance.threadLocalMQAdminExt().getAllSubscriptionGroup(brokerAddr, 
timeoutMillis);
-    }
-
-    @Override
-    public TopicConfigSerializeWrapper getAllTopicGroup(String brokerAddr,
-        long timeoutMillis) throws InterruptedException, 
RemotingTimeoutException, RemotingSendRequestException, 
RemotingConnectException, MQBrokerException {
-        return 
MQAdminInstance.threadLocalMQAdminExt().getAllTopicGroup(brokerAddr, 
timeoutMillis);
-    }
-
-    @Override
-    public void updateConsumeOffset(String brokerAddr, String consumeGroup, 
MessageQueue mq,
-        long offset) throws RemotingException, InterruptedException, 
MQBrokerException {
-        
MQAdminInstance.threadLocalMQAdminExt().updateConsumeOffset(brokerAddr, 
consumeGroup, mq, offset);
-    }
-
-    // 4.0.0 added
-    @Override public void updateNameServerConfig(Properties properties,
-        List<String> list) throws InterruptedException, 
RemotingConnectException, UnsupportedEncodingException, 
RemotingSendRequestException, RemotingTimeoutException, MQClientException, 
MQBrokerException {
-
-    }
-
-    @Override public Map<String, Properties> getNameServerConfig(
-        List<String> list) throws InterruptedException, 
RemotingTimeoutException, RemotingSendRequestException, 
RemotingConnectException, MQClientException, UnsupportedEncodingException {
-        return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/2a8ff251/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/client/MQAdminInstance.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/client/MQAdminInstance.java
 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/client/MQAdminInstance.java
deleted file mode 100644
index e914e6c..0000000
--- 
a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/client/MQAdminInstance.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.rocketmq.console.service.client;
-
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.impl.MQClientAPIImpl;
-import org.apache.rocketmq.client.impl.factory.MQClientInstance;
-import org.apache.rocketmq.remoting.RemotingClient;
-import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
-import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
-import org.apache.rocketmq.tools.admin.MQAdminExt;
-import org.joor.Reflect;
-
-public class MQAdminInstance {
-    private static final ThreadLocal<DefaultMQAdminExt> 
MQ_ADMIN_EXT_THREAD_LOCAL = new ThreadLocal<DefaultMQAdminExt>();
-    private static final ThreadLocal<Integer> INIT_COUNTER = new 
ThreadLocal<Integer>();
-
-    public static MQAdminExt threadLocalMQAdminExt() {
-        DefaultMQAdminExt defaultMQAdminExt = MQ_ADMIN_EXT_THREAD_LOCAL.get();
-        if (defaultMQAdminExt == null) {
-            throw new IllegalStateException("defaultMQAdminExt should be init 
before you get this");
-        }
-        return defaultMQAdminExt;
-    }
-
-    public static RemotingClient threadLocalRemotingClient() {
-        MQClientInstance mqClientInstance = threadLocalMqClientInstance();
-        MQClientAPIImpl mQClientAPIImpl = 
Reflect.on(mqClientInstance).get("mQClientAPIImpl");
-        return Reflect.on(mQClientAPIImpl).get("remotingClient");
-    }
-
-    public static MQClientInstance threadLocalMqClientInstance() {
-        DefaultMQAdminExtImpl defaultMQAdminExtImpl = 
Reflect.on(MQAdminInstance.threadLocalMQAdminExt()).get("defaultMQAdminExtImpl");
-        return Reflect.on(defaultMQAdminExtImpl).get("mqClientInstance");
-    }
-
-    public static void initMQAdminInstance(long timeoutMillis) throws 
MQClientException {
-        Integer nowCount = INIT_COUNTER.get();
-        if (nowCount == null) {
-            DefaultMQAdminExt defaultMQAdminExt;
-            if (timeoutMillis > 0) {
-                defaultMQAdminExt = new DefaultMQAdminExt(timeoutMillis);
-            }
-            else {
-                defaultMQAdminExt = new DefaultMQAdminExt();
-            }
-            
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
-            defaultMQAdminExt.start();
-            MQ_ADMIN_EXT_THREAD_LOCAL.set(defaultMQAdminExt);
-            INIT_COUNTER.set(1);
-        }
-        else {
-            INIT_COUNTER.set(nowCount + 1);
-        }
-
-    }
-
-    public static void destroyMQAdminInstance() {
-        Integer nowCount = INIT_COUNTER.get() - 1;
-        if (nowCount > 0) {
-            INIT_COUNTER.set(nowCount);
-            return;
-        }
-        MQAdminExt mqAdminExt = MQ_ADMIN_EXT_THREAD_LOCAL.get();
-        if (mqAdminExt != null) {
-            mqAdminExt.shutdown();
-            MQ_ADMIN_EXT_THREAD_LOCAL.remove();
-            INIT_COUNTER.remove();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/2a8ff251/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/ClusterServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/ClusterServiceImpl.java
 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/ClusterServiceImpl.java
deleted file mode 100644
index e225edc..0000000
--- 
a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/ClusterServiceImpl.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.rocketmq.console.service.impl;
-
-import org.apache.rocketmq.common.protocol.body.ClusterInfo;
-import org.apache.rocketmq.common.protocol.body.KVTable;
-import org.apache.rocketmq.common.protocol.route.BrokerData;
-import org.apache.rocketmq.tools.admin.MQAdminExt;
-import org.apache.rocketmq.console.service.ClusterService;
-import org.apache.rocketmq.console.util.JsonUtil;
-import com.google.common.base.Throwables;
-import com.google.common.collect.Maps;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.stereotype.Service;
-
-import javax.annotation.Resource;
-import java.util.Map;
-import java.util.Properties;
-
-@Service
-public class ClusterServiceImpl implements ClusterService {
-    private Logger logger = LoggerFactory.getLogger(ClusterServiceImpl.class);
-    @Resource
-    private MQAdminExt mqAdminExt;
-
-    @Override
-    public Map<String, Object> list() {
-        try {
-            Map<String, Object> resultMap = Maps.newHashMap();
-            ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
-            logger.info("op=look_clusterInfo {}", 
JsonUtil.obj2String(clusterInfo));
-            Map<String/*brokerName*/, Map<Long/* brokerId */, Object/* 
brokerDetail */>> brokerServer = Maps.newHashMap();
-            for (BrokerData brokerData : 
clusterInfo.getBrokerAddrTable().values()) {
-                Map<Long, Object> brokerMasterSlaveMap = Maps.newHashMap();
-                for (Map.Entry<Long/* brokerId */, String/* broker address */> 
brokerAddr : brokerData.getBrokerAddrs().entrySet()) {
-                    KVTable kvTable = 
mqAdminExt.fetchBrokerRuntimeStats(brokerAddr.getValue());
-//                KVTable kvTable = 
mqAdminExt.fetchBrokerRuntimeStats("127.0.0.1:10911");
-                    brokerMasterSlaveMap.put(brokerAddr.getKey(), 
kvTable.getTable());
-                }
-                brokerServer.put(brokerData.getBrokerName(), 
brokerMasterSlaveMap);
-            }
-            resultMap.put("clusterInfo", clusterInfo);
-            resultMap.put("brokerServer", brokerServer);
-            return resultMap;
-        }
-        catch (Exception err) {
-            throw Throwables.propagate(err);
-        }
-    }
-
-
-    @Override
-    public Properties getBrokerConfig(String brokerAddr) {
-        try {
-            return mqAdminExt.getBrokerConfig(brokerAddr);
-        }
-        catch (Exception e) {
-            throw Throwables.propagate(e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/2a8ff251/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/ConsumerServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/ConsumerServiceImpl.java
 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/ConsumerServiceImpl.java
deleted file mode 100644
index 715cbf5..0000000
--- 
a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/ConsumerServiceImpl.java
+++ /dev/null
@@ -1,341 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.rocketmq.console.service.impl;
-
-import com.google.common.base.Predicate;
-import com.google.common.base.Throwables;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.common.MQVersion;
-import org.apache.rocketmq.common.admin.ConsumeStats;
-import org.apache.rocketmq.common.admin.RollbackStats;
-import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.common.protocol.ResponseCode;
-import org.apache.rocketmq.common.protocol.body.ClusterInfo;
-import org.apache.rocketmq.common.protocol.body.Connection;
-import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
-import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
-import org.apache.rocketmq.common.protocol.body.GroupList;
-import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
-import org.apache.rocketmq.common.protocol.route.BrokerData;
-import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
-import 
org.apache.rocketmq.console.aspect.admin.annotation.MultiMQAdminCmdMethod;
-import org.apache.rocketmq.console.model.ConsumerGroupRollBackStat;
-import org.apache.rocketmq.console.model.GroupConsumeInfo;
-import org.apache.rocketmq.console.model.QueueStatInfo;
-import org.apache.rocketmq.console.model.TopicConsumerInfo;
-import org.apache.rocketmq.console.model.request.ConsumerConfigInfo;
-import org.apache.rocketmq.console.model.request.DeleteSubGroupRequest;
-import org.apache.rocketmq.console.model.request.ResetOffsetRequest;
-import org.apache.rocketmq.console.service.AbstractCommonService;
-import org.apache.rocketmq.console.service.ConsumerService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.stereotype.Service;
-
-import static com.google.common.base.Throwables.propagate;
-
-@Service
-public class ConsumerServiceImpl extends AbstractCommonService implements 
ConsumerService {
-    private Logger logger = LoggerFactory.getLogger(ConsumerServiceImpl.class);
-
-    @Override
-    @MultiMQAdminCmdMethod
-    public List<GroupConsumeInfo> queryGroupList() {
-        Set<String> consumerGroupSet = Sets.newHashSet();
-        try {
-            ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
-            for (BrokerData brokerData : 
clusterInfo.getBrokerAddrTable().values()) {
-                SubscriptionGroupWrapper subscriptionGroupWrapper = 
mqAdminExt.getAllSubscriptionGroup(brokerData.selectBrokerAddr(), 3000L);
-                
consumerGroupSet.addAll(subscriptionGroupWrapper.getSubscriptionGroupTable().keySet());
-            }
-        }
-        catch (Exception err) {
-            throw Throwables.propagate(err);
-        }
-        List<GroupConsumeInfo> groupConsumeInfoList = Lists.newArrayList();
-        for (String consumerGroup : consumerGroupSet) {
-            groupConsumeInfoList.add(queryGroup(consumerGroup));
-        }
-        Collections.sort(groupConsumeInfoList);
-        return groupConsumeInfoList;
-    }
-
-    @Override
-    @MultiMQAdminCmdMethod
-    public GroupConsumeInfo queryGroup(String consumerGroup) {
-        GroupConsumeInfo groupConsumeInfo = new GroupConsumeInfo();
-        try {
-            ConsumeStats consumeStats = null;
-            try {
-                consumeStats = mqAdminExt.examineConsumeStats(consumerGroup);
-            }
-            catch (Exception e) {
-                logger.warn("examineConsumeStats exception, " + consumerGroup, 
e);
-            }
-
-            ConsumerConnection consumerConnection = null;
-            try {
-                consumerConnection = 
mqAdminExt.examineConsumerConnectionInfo(consumerGroup);
-            }
-            catch (Exception e) {
-                logger.warn("examineConsumerConnectionInfo exception, " + 
consumerGroup, e);
-            }
-
-            groupConsumeInfo.setGroup(consumerGroup);
-
-            if (consumeStats != null) {
-                
groupConsumeInfo.setConsumeTps((int)consumeStats.getConsumeTps());
-                groupConsumeInfo.setDiffTotal(consumeStats.computeTotalDiff());
-            }
-
-            if (consumerConnection != null) {
-                
groupConsumeInfo.setCount(consumerConnection.getConnectionSet().size());
-                
groupConsumeInfo.setMessageModel(consumerConnection.getMessageModel());
-                
groupConsumeInfo.setConsumeType(consumerConnection.getConsumeType());
-                
groupConsumeInfo.setVersion(MQVersion.getVersionDesc(consumerConnection.computeMinVersion()));
-            }
-        }
-        catch (Exception e) {
-            logger.warn("examineConsumeStats or examineConsumerConnectionInfo 
exception, "
-                + consumerGroup, e);
-        }
-        return groupConsumeInfo;
-    }
-
-    @Override
-    public List<TopicConsumerInfo> queryConsumeStatsListByGroupName(String 
groupName) {
-        return queryConsumeStatsList(null, groupName);
-    }
-
-    @Override
-    @MultiMQAdminCmdMethod
-    public List<TopicConsumerInfo> queryConsumeStatsList(final String topic, 
String groupName) {
-        ConsumeStats consumeStats = null;
-        try {
-            consumeStats = mqAdminExt.examineConsumeStats(groupName, topic);
-        }
-        catch (Exception e) {
-            throw propagate(e);
-        }
-        List<MessageQueue> mqList = 
Lists.newArrayList(Iterables.filter(consumeStats.getOffsetTable().keySet(), new 
Predicate<MessageQueue>() {
-            @Override
-            public boolean apply(MessageQueue o) {
-                return StringUtils.isBlank(topic) || 
o.getTopic().equals(topic);
-            }
-        }));
-        Collections.sort(mqList);
-        List<TopicConsumerInfo> topicConsumerInfoList = Lists.newArrayList();
-        TopicConsumerInfo nowTopicConsumerInfo = null;
-        Map<MessageQueue, String> messageQueueClientMap = 
getClientConnection(groupName);
-        for (MessageQueue mq : mqList) {
-            if (nowTopicConsumerInfo == null || 
(!StringUtils.equals(mq.getTopic(), nowTopicConsumerInfo.getTopic()))) {
-                nowTopicConsumerInfo = new TopicConsumerInfo(mq.getTopic());
-                topicConsumerInfoList.add(nowTopicConsumerInfo);
-            }
-            QueueStatInfo queueStatInfo = 
QueueStatInfo.fromOffsetTableEntry(mq, consumeStats.getOffsetTable().get(mq));
-            queueStatInfo.setClientInfo(messageQueueClientMap.get(mq));
-            nowTopicConsumerInfo.appendQueueStatInfo(queueStatInfo);
-        }
-        return topicConsumerInfoList;
-    }
-
-    private Map<MessageQueue, String> getClientConnection(String groupName) {
-        Map<MessageQueue, String> results = Maps.newHashMap();
-        try {
-            ConsumerConnection consumerConnection = 
mqAdminExt.examineConsumerConnectionInfo(groupName);
-            for (Connection connection : 
consumerConnection.getConnectionSet()) {
-                String clinetId = connection.getClientId();
-                ConsumerRunningInfo consumerRunningInfo = 
mqAdminExt.getConsumerRunningInfo(groupName, clinetId, false);
-                for (MessageQueue messageQueue : 
consumerRunningInfo.getMqTable().keySet()) {
-//                    results.put(messageQueue, clinetId + " " + 
connection.getClientAddr());
-                    results.put(messageQueue, clinetId);
-                }
-            }
-        }
-        catch (Exception err) {
-            logger.error("op=getClientConnection_error", err);
-        }
-        return results;
-    }
-
-    @Override
-    @MultiMQAdminCmdMethod
-    public Map<String /*groupName*/, TopicConsumerInfo> 
queryConsumeStatsListByTopicName(String topic) {
-        Map<String, TopicConsumerInfo> group2ConsumerInfoMap = 
Maps.newHashMap();
-        try {
-            GroupList groupList = mqAdminExt.queryTopicConsumeByWho(topic);
-            for (String group : groupList.getGroupList()) {
-                List<TopicConsumerInfo> topicConsumerInfoList = null;
-                try {
-                    topicConsumerInfoList = queryConsumeStatsList(topic, 
group);
-                }
-                catch (Exception ignore) {
-                }
-                group2ConsumerInfoMap.put(group, 
CollectionUtils.isEmpty(topicConsumerInfoList) ? new TopicConsumerInfo(topic) : 
topicConsumerInfoList.get(0));
-            }
-            return group2ConsumerInfoMap;
-        }
-        catch (Exception e) {
-            throw propagate(e);
-        }
-    }
-
-    @Override
-    @MultiMQAdminCmdMethod
-    public Map<String, ConsumerGroupRollBackStat> 
resetOffset(ResetOffsetRequest resetOffsetRequest) {
-        Map<String, ConsumerGroupRollBackStat> groupRollbackStats = 
Maps.newHashMap();
-        for (String consumerGroup : resetOffsetRequest.getConsumerGroupList()) 
{
-            try {
-                Map<MessageQueue, Long> rollbackStatsMap =
-                    
mqAdminExt.resetOffsetByTimestamp(resetOffsetRequest.getTopic(), consumerGroup, 
resetOffsetRequest.getResetTime(), resetOffsetRequest.isForce());
-                ConsumerGroupRollBackStat consumerGroupRollBackStat = new 
ConsumerGroupRollBackStat(true);
-                List<RollbackStats> rollbackStatsList = 
consumerGroupRollBackStat.getRollbackStatsList();
-                for (Map.Entry<MessageQueue, Long> rollbackStatsEntty : 
rollbackStatsMap.entrySet()) {
-                    RollbackStats rollbackStats = new RollbackStats();
-                    
rollbackStats.setRollbackOffset(rollbackStatsEntty.getValue());
-                    
rollbackStats.setQueueId(rollbackStatsEntty.getKey().getQueueId());
-                    
rollbackStats.setBrokerName(rollbackStatsEntty.getKey().getBrokerName());
-                    rollbackStatsList.add(rollbackStats);
-                }
-                groupRollbackStats.put(consumerGroup, 
consumerGroupRollBackStat);
-            }
-            catch (MQClientException e) {
-                if (ResponseCode.CONSUMER_NOT_ONLINE == e.getResponseCode()) {
-                    try {
-                        ConsumerGroupRollBackStat consumerGroupRollBackStat = 
new ConsumerGroupRollBackStat(true);
-                        List<RollbackStats> rollbackStatsList = 
mqAdminExt.resetOffsetByTimestampOld(consumerGroup, 
resetOffsetRequest.getTopic(), resetOffsetRequest.getResetTime(), true);
-                        
consumerGroupRollBackStat.setRollbackStatsList(rollbackStatsList);
-                        groupRollbackStats.put(consumerGroup, 
consumerGroupRollBackStat);
-                        continue;
-                    }
-                    catch (Exception err) {
-                        logger.error("op=resetOffset_which_not_online_error", 
err);
-                    }
-                }
-                else {
-                    logger.error("op=resetOffset_error", e);
-                }
-                groupRollbackStats.put(consumerGroup, new 
ConsumerGroupRollBackStat(false, e.getMessage()));
-            }
-            catch (Exception e) {
-                logger.error("op=resetOffset_error", e);
-                groupRollbackStats.put(consumerGroup, new 
ConsumerGroupRollBackStat(false, e.getMessage()));
-            }
-        }
-        return groupRollbackStats;
-    }
-
-    @Override
-    @MultiMQAdminCmdMethod
-    public List<ConsumerConfigInfo> examineSubscriptionGroupConfig(String 
group) {
-        List<ConsumerConfigInfo> consumerConfigInfoList = Lists.newArrayList();
-        try {
-            ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
-            for (String brokerName : 
clusterInfo.getBrokerAddrTable().keySet()) { //foreach brokerName
-                String brokerAddress = 
clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr();
-                SubscriptionGroupConfig subscriptionGroupConfig = 
mqAdminExt.examineSubscriptionGroupConfig(brokerAddress, group);
-                if (subscriptionGroupConfig == null) {
-                    continue;
-                }
-                consumerConfigInfoList.add(new 
ConsumerConfigInfo(Lists.newArrayList(brokerName), subscriptionGroupConfig));
-            }
-        }
-        catch (Exception e) {
-            throw propagate(e);
-        }
-        return consumerConfigInfoList;
-    }
-
-    @Override
-    @MultiMQAdminCmdMethod
-    public boolean deleteSubGroup(DeleteSubGroupRequest deleteSubGroupRequest) 
{
-        try {
-            ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
-            for (String brokerName : 
deleteSubGroupRequest.getBrokerNameList()) {
-                logger.info("addr={} groupName={}", 
clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(), 
deleteSubGroupRequest.getGroupName());
-                
mqAdminExt.deleteSubscriptionGroup(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(),
 deleteSubGroupRequest.getGroupName());
-            }
-        }
-        catch (Exception e) {
-            throw propagate(e);
-        }
-        return true;
-    }
-
-    @Override
-    public boolean createAndUpdateSubscriptionGroupConfig(ConsumerConfigInfo 
consumerConfigInfo) {
-        try {
-            ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
-            for (String brokerName : 
changeToBrokerNameSet(clusterInfo.getClusterAddrTable(),
-                consumerConfigInfo.getClusterNameList(), 
consumerConfigInfo.getBrokerNameList())) {
-                
mqAdminExt.createAndUpdateSubscriptionGroupConfig(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(),
 consumerConfigInfo.getSubscriptionGroupConfig());
-            }
-        }
-        catch (Exception err) {
-            throw Throwables.propagate(err);
-        }
-        return true;
-    }
-
-    @Override
-    @MultiMQAdminCmdMethod
-    public Set<String> fetchBrokerNameSetBySubscriptionGroup(String group) {
-        Set<String> brokerNameSet = Sets.newHashSet();
-        try {
-            List<ConsumerConfigInfo> consumerConfigInfoList = 
examineSubscriptionGroupConfig(group);
-            for (ConsumerConfigInfo consumerConfigInfo : 
consumerConfigInfoList) {
-                brokerNameSet.addAll(consumerConfigInfo.getBrokerNameList());
-            }
-        }
-        catch (Exception e) {
-            throw Throwables.propagate(e);
-        }
-        return brokerNameSet;
-
-    }
-
-    @Override
-    public ConsumerConnection getConsumerConnection(String consumerGroup) {
-        try {
-            return mqAdminExt.examineConsumerConnectionInfo(consumerGroup);
-        }
-        catch (Exception e) {
-            throw Throwables.propagate(e);
-        }
-    }
-
-    @Override
-    public ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, 
String clientId, boolean jstack) {
-        try {
-            return mqAdminExt.getConsumerRunningInfo(consumerGroup, clientId, 
jstack);
-        }
-        catch (Exception e) {
-            throw Throwables.propagate(e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/2a8ff251/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/DashboardCollectServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/DashboardCollectServiceImpl.java
 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/DashboardCollectServiceImpl.java
deleted file mode 100644
index d32a344..0000000
--- 
a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/DashboardCollectServiceImpl.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.rocketmq.console.service.impl;
-
-import com.alibaba.fastjson.JSONArray;
-import com.alibaba.fastjson.JSONObject;
-import com.google.common.base.Charsets;
-import com.google.common.base.Throwables;
-import com.google.common.base.Ticker;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.io.Files;
-import java.io.File;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import javax.annotation.Resource;
-import org.apache.rocketmq.console.config.RMQConfigure;
-import org.apache.rocketmq.console.exception.ServiceException;
-import org.apache.rocketmq.console.service.DashboardCollectService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.stereotype.Service;
-
-@Service
-public class DashboardCollectServiceImpl implements DashboardCollectService {
-
-    @Resource
-    private RMQConfigure rmqConfigure;
-
-    private final static Logger log = 
LoggerFactory.getLogger(DashboardCollectServiceImpl.class);
-
-    private LoadingCache<String, List<String>> brokerMap = 
CacheBuilder.newBuilder()
-        .maximumSize(1000)
-        .concurrencyLevel(10)
-        .recordStats()
-        .ticker(Ticker.systemTicker())
-        .removalListener(new RemovalListener<Object, Object>() {
-            @Override
-            public void onRemoval(RemovalNotification<Object, Object> 
notification) {
-                log.debug(notification.getKey() + " was removed, cause is " + 
notification.getCause());
-            }
-        })
-        .build(
-            new CacheLoader<String, List<String>>() {
-                @Override
-                public List<String> load(String key) {
-                    List<String> list = Lists.newArrayList();
-                    return list;
-                }
-            }
-        );
-
-    private LoadingCache<String, List<String>> topicMap = 
CacheBuilder.newBuilder()
-        .maximumSize(1000)
-        .concurrencyLevel(10)
-        .recordStats()
-        .ticker(Ticker.systemTicker())
-        .removalListener(new RemovalListener<Object, Object>() {
-            @Override
-            public void onRemoval(RemovalNotification<Object, Object> 
notification) {
-                log.debug(notification.getKey() + " was removed, cause is " + 
notification.getCause());
-            }
-        })
-        .build(
-            new CacheLoader<String, List<String>>() {
-                @Override
-                public List<String> load(String key) {
-                    List<String> list = Lists.newArrayList();
-                    return list;
-                }
-            }
-        );
-
-    @Override
-    public LoadingCache<String, List<String>> getBrokerMap() {
-        return brokerMap;
-    }
-    @Override
-    public LoadingCache<String, List<String>> getTopicMap() {
-        return topicMap;
-    }
-
-    @Override
-    public Map<String, List<String>> jsonDataFile2map(File file) {
-        List<String> strings;
-        try {
-            strings = Files.readLines(file, Charsets.UTF_8);
-        }
-        catch (IOException e) {
-            throw Throwables.propagate(e);
-        }
-        StringBuffer sb = new StringBuffer();
-        for (String string : strings) {
-            sb.append(string);
-        }
-        JSONObject json = (JSONObject) JSONObject.parse(sb.toString());
-        Set<Map.Entry<String, Object>> entries = json.entrySet();
-        Map<String, List<String>> map = Maps.newHashMap();
-        for (Map.Entry<String, Object> entry : entries) {
-            JSONArray tpsArray = (JSONArray) entry.getValue();
-            if (tpsArray == null) {
-                continue;
-            }
-            Object[] tpsStrArray = tpsArray.toArray();
-            List<String> tpsList = Lists.newArrayList();
-            for (Object tpsObj : tpsStrArray) {
-                tpsList.add("" + tpsObj);
-            }
-            map.put(entry.getKey(), tpsList);
-        }
-        return map;
-    }
-
-    @Override
-    public Map<String, List<String>> getBrokerCache(String date) {
-        String dataLocationPath = rmqConfigure.getConsoleCollectData();
-        File file = new File(dataLocationPath + date + ".json");
-        if (!file.exists()) {
-            throw Throwables.propagate(new ServiceException(1, "This date 
have't data!"));
-        }
-        return jsonDataFile2map(file);
-    }
-
-    @Override
-    public Map<String, List<String>> getTopicCache(String date) {
-        String dataLocationPath = rmqConfigure.getConsoleCollectData();
-        File file = new File(dataLocationPath + date + "_topic" + ".json");
-        if (!file.exists()) {
-            throw Throwables.propagate(new ServiceException(1, "This date 
have't data!"));
-        }
-        return jsonDataFile2map(file);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/2a8ff251/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/DashboardServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/DashboardServiceImpl.java
 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/DashboardServiceImpl.java
deleted file mode 100644
index 3189093..0000000
--- 
a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/DashboardServiceImpl.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.console.service.impl;
-
-import com.google.common.collect.Lists;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import javax.annotation.Resource;
-import org.apache.rocketmq.console.service.DashboardCollectService;
-import org.apache.rocketmq.console.service.DashboardService;
-import org.springframework.stereotype.Service;
-
-@Service
-public class DashboardServiceImpl implements DashboardService {
-
-    @Resource
-    private DashboardCollectService dashboardCollectService;
-    /**
-     * @param date format yyyy-MM-dd
-     */
-    @Override
-    public Map<String, List<String>> queryBrokerData(String date) {
-        return dashboardCollectService.getBrokerCache(date);
-    }
-
-    @Override
-    public Map<String, List<String>> queryTopicData(String date) {
-        return dashboardCollectService.getTopicCache(date);
-    }
-
-    /**
-     * @param date format yyyy-MM-dd
-     * @param topicName
-     */
-    @Override
-    public List<String> queryTopicData(String date, String topicName) {
-        if (null != dashboardCollectService.getTopicCache(date)) {
-            return dashboardCollectService.getTopicCache(date).get(topicName);
-        }
-        return null;
-    }
-
-    @Override
-    public List<String> queryTopicCurrentData() {
-        Date date = new Date();
-        DateFormat format = new SimpleDateFormat("yyyy-MM-dd");
-        Map<String, List<String>> topicCache = 
dashboardCollectService.getTopicCache(format.format(date));
-        List<String> result = Lists.newArrayList();
-        for (Map.Entry<String, List<String>> entry : topicCache.entrySet()) {
-            List<String> value = entry.getValue();
-            result.add(entry.getKey() + "," + value.get(value.size() - 
1).split(",")[4]);
-        }
-        return result;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/2a8ff251/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/MessageServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/MessageServiceImpl.java
 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/MessageServiceImpl.java
deleted file mode 100644
index 0205a69..0000000
--- 
a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/MessageServiceImpl.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.rocketmq.console.service.impl;
-
-import com.google.common.base.Function;
-import com.google.common.base.Predicate;
-import com.google.common.base.Throwables;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.List;
-import java.util.Set;
-import javax.annotation.Resource;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
-import org.apache.rocketmq.client.consumer.PullResult;
-import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.common.Pair;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.common.protocol.body.Connection;
-import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
-import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
-import org.apache.rocketmq.console.model.MessageView;
-import org.apache.rocketmq.console.service.MessageService;
-import org.apache.rocketmq.tools.admin.MQAdminExt;
-import org.apache.rocketmq.tools.admin.api.MessageTrack;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.stereotype.Service;
-
-@Service
-public class MessageServiceImpl implements MessageService {
-
-    private Logger logger = LoggerFactory.getLogger(MessageServiceImpl.class);
-    /**
-     * @see org.apache.rocketmq.store.config.MessageStoreConfig 
maxMsgsNumBatch = 64;
-     * @see org.apache.rocketmq.store.index.IndexService maxNum = 
Math.min(maxNum, 
this.defaultMessageStore.getMessageStoreConfig().getMaxMsgsNumBatch());
-     */
-    private final static int QUERY_MESSAGE_MAX_NUM = 64;
-    @Resource
-    private MQAdminExt mqAdminExt;
-
-    public Pair<MessageView, List<MessageTrack>> viewMessage(String subject, 
final String msgId) {
-        try {
-
-            MessageExt messageExt = mqAdminExt.viewMessage(subject, msgId);
-            List<MessageTrack> messageTrackList = 
messageTrackDetail(messageExt);
-            return new Pair<>(MessageView.fromMessageExt(messageExt), 
messageTrackList);
-        }
-        catch (Exception e) {
-            throw Throwables.propagate(e);
-        }
-    }
-
-    @Override
-    public List<MessageView> queryMessageByTopicAndKey(String topic, String 
key) {
-        try {
-            return Lists.transform(mqAdminExt.queryMessage(topic, key, 
QUERY_MESSAGE_MAX_NUM, 0, System.currentTimeMillis()).getMessageList(), new 
Function<MessageExt, MessageView>() {
-                @Override
-                public MessageView apply(MessageExt messageExt) {
-                    return MessageView.fromMessageExt(messageExt);
-                }
-            });
-        }
-        catch (Exception err) {
-            throw Throwables.propagate(err);
-        }
-    }
-
-    @Override
-    public List<MessageView> queryMessageByTopic(String topic, final long 
begin, final long end) {
-        DefaultMQPullConsumer consumer = new 
DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, null);
-        List<MessageView> messageViewList = Lists.newArrayList();
-        try {
-            String subExpression = "*";
-            consumer.start();
-            Set<MessageQueue> mqs = 
consumer.fetchSubscribeMessageQueues(topic);
-            for (MessageQueue mq : mqs) {
-                long minOffset = consumer.searchOffset(mq, begin);
-                long maxOffset = consumer.searchOffset(mq, end);
-                READQ:
-                for (long offset = minOffset; offset <= maxOffset; ) {
-                    try {
-                        if (messageViewList.size() > 2000) {
-                            break;
-                        }
-                        PullResult pullResult = consumer.pull(mq, 
subExpression, offset, 32);
-                        offset = pullResult.getNextBeginOffset();
-                        switch (pullResult.getPullStatus()) {
-                            case FOUND:
-
-                                List<MessageView> messageViewListByQuery = 
Lists.transform(pullResult.getMsgFoundList(), new Function<MessageExt, 
MessageView>() {
-                                    @Override
-                                    public MessageView apply(MessageExt 
messageExt) {
-                                        messageExt.setBody(null);
-                                        return 
MessageView.fromMessageExt(messageExt);
-                                    }
-                                });
-                                List<MessageView> filteredList = 
Lists.newArrayList(Iterables.filter(messageViewListByQuery, new 
Predicate<MessageView>() {
-                                    @Override
-                                    public boolean apply(MessageView 
messageView) {
-                                        if (messageView.getStoreTimestamp() < 
begin || messageView.getStoreTimestamp() > end) {
-                                            logger.info("begin={} end={} time 
not in range {} {}", begin, end, messageView.getStoreTimestamp(), new 
Date(messageView.getStoreTimestamp()).toString());
-                                        }
-                                        return messageView.getStoreTimestamp() 
>= begin && messageView.getStoreTimestamp() <= end;
-                                    }
-                                }));
-                                messageViewList.addAll(filteredList);
-                                break;
-                            case NO_MATCHED_MSG:
-                            case NO_NEW_MSG:
-                            case OFFSET_ILLEGAL:
-                                break READQ;
-                        }
-                    }
-                    catch (Exception e) {
-                        break;
-                    }
-                }
-            }
-            Collections.sort(messageViewList, new Comparator<MessageView>() {
-                @Override
-                public int compare(MessageView o1, MessageView o2) {
-                    if (o1.getStoreTimestamp() - o2.getStoreTimestamp() == 0) {
-                        return 0;
-                    }
-                    return (o1.getStoreTimestamp() > o2.getStoreTimestamp()) ? 
-1 : 1;
-                }
-            });
-            return messageViewList;
-        }
-        catch (Exception e) {
-            throw Throwables.propagate(e);
-        }
-        finally {
-            consumer.shutdown();
-        }
-    }
-
-    @Override
-    public List<MessageTrack> messageTrackDetail(MessageExt msg) {
-        try {
-            return mqAdminExt.messageTrackDetail(msg);
-        }
-        catch (Exception e) {
-            logger.error("op=messageTrackDetailError", e);
-            return Collections.emptyList();
-        }
-    }
-
-
-    @Override
-    public ConsumeMessageDirectlyResult consumeMessageDirectly(String topic, 
String msgId, String consumerGroup,
-        String clientId) {
-        if (StringUtils.isNotBlank(clientId)) {
-            try {
-                return mqAdminExt.consumeMessageDirectly(consumerGroup, 
clientId, topic, msgId);
-            }
-            catch (Exception e) {
-                throw Throwables.propagate(e);
-            }
-        }
-
-        try {
-            ConsumerConnection consumerConnection = 
mqAdminExt.examineConsumerConnectionInfo(consumerGroup);
-            for (Connection connection : 
consumerConnection.getConnectionSet()) {
-                if (StringUtils.isBlank(connection.getClientId())) {
-                    continue;
-                }
-                logger.info("clientId={}", connection.getClientId());
-                return mqAdminExt.consumeMessageDirectly(consumerGroup, 
connection.getClientId(), topic, msgId);
-            }
-        }
-        catch (Exception e) {
-            throw Throwables.propagate(e);
-        }
-        throw new IllegalStateException("NO CONSUMER");
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/2a8ff251/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/MonitorServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/MonitorServiceImpl.java
 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/MonitorServiceImpl.java
deleted file mode 100644
index d3a109d..0000000
--- 
a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/MonitorServiceImpl.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.rocketmq.console.service.impl;
-
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.google.common.base.Throwables;
-import java.io.File;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import javax.annotation.PostConstruct;
-import javax.annotation.Resource;
-import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.console.config.RMQConfigure;
-import org.apache.rocketmq.console.model.ConsumerMonitorConfig;
-import org.apache.rocketmq.console.service.MonitorService;
-import org.apache.rocketmq.console.util.JsonUtil;
-import org.springframework.stereotype.Service;
-
-@Service
-public class MonitorServiceImpl implements MonitorService {
-
-
-    @Resource
-    private RMQConfigure rmqConfigure;
-
-    private Map<String, ConsumerMonitorConfig> configMap = new 
ConcurrentHashMap<>();
-
-    @Override
-    public boolean createOrUpdateConsumerMonitor(String name, 
ConsumerMonitorConfig config) {
-        configMap.put(name, config);// todo if write map success but write 
file fail
-        writeToFile(getConsumerMonitorConfigDataPath(), configMap);
-        return true;
-    }
-
-    @Override
-    public Map<String, ConsumerMonitorConfig> queryConsumerMonitorConfig() {
-        return configMap;
-    }
-
-    @Override
-    public ConsumerMonitorConfig queryConsumerMonitorConfigByGroupName(String 
consumeGroupName) {
-        return configMap.get(consumeGroupName);
-    }
-
-    @Override
-    public boolean deleteConsumerMonitor(String consumeGroupName) {
-        configMap.remove(consumeGroupName);
-        writeToFile(getConsumerMonitorConfigDataPath(), configMap);
-        return true;
-    }
-
-    //rocketmq.console.data.path/monitor/consumerMonitorConfig.json
-    private String getConsumerMonitorConfigDataPath() {
-        return rmqConfigure.getRocketMqConsoleDataPath() + File.separatorChar 
+ "monitor" + File.separatorChar + "consumerMonitorConfig.json";
-    }
-
-    private String getConsumerMonitorConfigDataPathBackUp() {
-        return getConsumerMonitorConfigDataPath() + ".bak";
-    }
-
-    private void writeToFile(String path, Object data) {
-        writeDataJsonToFile(path, JsonUtil.obj2String(data));
-    }
-
-    private void writeDataJsonToFile(String path, String dataStr) {
-        try {
-            MixAll.string2File(dataStr, path);
-        }
-        catch (Exception e) {
-            throw Throwables.propagate(e);
-        }
-    }
-
-    @PostConstruct
-    private void loadData() {
-        String content = 
MixAll.file2String(getConsumerMonitorConfigDataPath());
-        if (content == null) {
-            content = 
MixAll.file2String(getConsumerMonitorConfigDataPathBackUp());
-        }
-        if (content == null) {
-            return;
-        }
-        configMap = JsonUtil.string2Obj(content, new 
TypeReference<ConcurrentHashMap<String, ConsumerMonitorConfig>>() {
-        });
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/2a8ff251/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/OpsServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/OpsServiceImpl.java
 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/OpsServiceImpl.java
deleted file mode 100644
index 84e6d2f..0000000
--- 
a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/OpsServiceImpl.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.rocketmq.console.service.impl;
-
-import com.google.common.base.Splitter;
-import com.google.common.collect.Maps;
-import java.util.List;
-import java.util.Map;
-import javax.annotation.Resource;
-import org.apache.rocketmq.console.config.RMQConfigure;
-import org.apache.rocketmq.console.service.AbstractCommonService;
-import org.apache.rocketmq.console.service.OpsService;
-import org.apache.rocketmq.console.service.checker.CheckerType;
-import org.apache.rocketmq.console.service.checker.RocketMqChecker;
-import org.springframework.stereotype.Service;
-
-@Service
-public class OpsServiceImpl extends AbstractCommonService implements 
OpsService {
-
-    @Resource
-    private RMQConfigure rMQConfigure;
-
-    @Resource
-    private List<RocketMqChecker> rocketMqCheckerList;
-
-    @Override
-    public Map<String, Object> homePageInfo() {
-        Map<String, Object> homePageInfoMap = Maps.newHashMap();
-        homePageInfoMap.put("namesvrAddrList", 
Splitter.on(";").splitToList(rMQConfigure.getNamesrvAddr()));
-        homePageInfoMap.put("useVIPChannel", 
Boolean.valueOf(rMQConfigure.getIsVIPChannel()));
-        return homePageInfoMap;
-    }
-
-    @Override
-    public void updateNameSvrAddrList(String nameSvrAddrList) {
-        rMQConfigure.setNamesrvAddr(nameSvrAddrList);
-    }
-
-    @Override
-    public String getNameSvrList() {
-        return rMQConfigure.getNamesrvAddr();
-    }
-
-    @Override
-    public Map<CheckerType, Object> rocketMqStatusCheck() {
-        Map<CheckerType, Object> checkResultMap = Maps.newHashMap();
-        for (RocketMqChecker rocketMqChecker : rocketMqCheckerList) {
-            checkResultMap.put(rocketMqChecker.checkerType(), 
rocketMqChecker.doCheck());
-        }
-        return checkResultMap;
-    }
-
-    @Override public boolean updateIsVIPChannel(String useVIPChannel) {
-        rMQConfigure.setIsVIPChannel(useVIPChannel);
-        return true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/2a8ff251/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/ProducerServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/ProducerServiceImpl.java
 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/ProducerServiceImpl.java
deleted file mode 100644
index 3e46958..0000000
--- 
a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/ProducerServiceImpl.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.rocketmq.console.service.impl;
-
-import com.google.common.base.Throwables;
-import javax.annotation.Resource;
-import org.apache.rocketmq.common.protocol.body.ProducerConnection;
-import org.apache.rocketmq.console.service.ProducerService;
-import org.apache.rocketmq.tools.admin.MQAdminExt;
-import org.springframework.stereotype.Service;
-
-@Service
-public class ProducerServiceImpl implements ProducerService {
-    @Resource
-    private MQAdminExt mqAdminExt;
-
-    @Override
-    public ProducerConnection getProducerConnection(String producerGroup, 
String topic) {
-        try {
-            return mqAdminExt.examineProducerConnectionInfo(producerGroup, 
topic);
-        }
-        catch (Exception e) {
-            throw Throwables.propagate(e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/2a8ff251/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/TopicServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/TopicServiceImpl.java
 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/TopicServiceImpl.java
deleted file mode 100644
index 117bcfd..0000000
--- 
a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/TopicServiceImpl.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.rocketmq.console.service.impl;
-
-import com.google.common.base.Throwables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.rocketmq.client.producer.DefaultMQProducer;
-import org.apache.rocketmq.client.producer.SendResult;
-import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.common.TopicConfig;
-import org.apache.rocketmq.common.admin.TopicStatsTable;
-import org.apache.rocketmq.common.message.Message;
-import org.apache.rocketmq.common.protocol.body.ClusterInfo;
-import org.apache.rocketmq.common.protocol.body.GroupList;
-import org.apache.rocketmq.common.protocol.body.TopicList;
-import org.apache.rocketmq.common.protocol.route.BrokerData;
-import org.apache.rocketmq.common.protocol.route.TopicRouteData;
-import org.apache.rocketmq.console.config.RMQConfigure;
-import org.apache.rocketmq.console.model.request.SendTopicMessageRequest;
-import org.apache.rocketmq.console.model.request.TopicConfigInfo;
-import org.apache.rocketmq.console.service.AbstractCommonService;
-import org.apache.rocketmq.console.service.TopicService;
-import org.apache.rocketmq.tools.command.CommandUtil;
-import org.springframework.beans.BeanUtils;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-
-@Service
-public class TopicServiceImpl extends AbstractCommonService implements 
TopicService {
-
-    @Autowired
-    private RMQConfigure rMQConfigure;
-
-    @Override
-    public TopicList fetchAllTopicList() {
-        try {
-            return mqAdminExt.fetchAllTopicList();
-        }
-        catch (Exception e) {
-            throw Throwables.propagate(e);
-        }
-    }
-
-    @Override
-    public TopicStatsTable stats(String topic) {
-        try {
-            return mqAdminExt.examineTopicStats(topic);
-        }
-        catch (Exception e) {
-            throw Throwables.propagate(e);
-        }
-    }
-
-    @Override
-    public TopicRouteData route(String topic) {
-        try {
-            return mqAdminExt.examineTopicRouteInfo(topic);
-        }
-        catch (Exception ex) {
-            throw Throwables.propagate(ex);
-        }
-    }
-
-    @Override
-    public GroupList queryTopicConsumerInfo(String topic) {
-        try {
-            return mqAdminExt.queryTopicConsumeByWho(topic);
-        }
-        catch (Exception e) {
-            throw Throwables.propagate(e);
-        }
-    }
-
-    @Override
-    public void createOrUpdate(TopicConfigInfo topicCreateOrUpdateRequest) {
-        TopicConfig topicConfig = new TopicConfig();
-        BeanUtils.copyProperties(topicCreateOrUpdateRequest, topicConfig);
-        try {
-            ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
-            for (String brokerName : 
changeToBrokerNameSet(clusterInfo.getClusterAddrTable(),
-                topicCreateOrUpdateRequest.getClusterNameList(), 
topicCreateOrUpdateRequest.getBrokerNameList())) {
-                
mqAdminExt.createAndUpdateTopicConfig(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(),
 topicConfig);
-            }
-        }
-        catch (Exception err) {
-            throw Throwables.propagate(err);
-        }
-    }
-
-    @Override
-    public TopicConfig examineTopicConfig(String topic, String brokerName) {
-        ClusterInfo clusterInfo = null;
-        try {
-            clusterInfo = mqAdminExt.examineBrokerClusterInfo();
-        }
-        catch (Exception e) {
-            throw Throwables.propagate(e);
-        }
-        return 
mqAdminExt.examineTopicConfig(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(),
 topic);
-    }
-
-    @Override
-    public List<TopicConfigInfo> examineTopicConfig(String topic) {
-        List<TopicConfigInfo> topicConfigInfoList = Lists.newArrayList();
-        TopicRouteData topicRouteData = route(topic);
-        for (BrokerData brokerData : topicRouteData.getBrokerDatas()) {
-            TopicConfigInfo topicConfigInfo = new TopicConfigInfo();
-            TopicConfig topicConfig = examineTopicConfig(topic, 
brokerData.getBrokerName());
-            BeanUtils.copyProperties(topicConfig, topicConfigInfo);
-            
topicConfigInfo.setBrokerNameList(Lists.newArrayList(brokerData.getBrokerName()));
-            topicConfigInfoList.add(topicConfigInfo);
-        }
-        return topicConfigInfoList;
-    }
-
-    @Override
-    public boolean deleteTopic(String topic, String clusterName) {
-        try {
-            if (StringUtils.isBlank(clusterName)) {
-                return deleteTopic(topic);
-            }
-            Set<String> masterSet = 
CommandUtil.fetchMasterAddrByClusterName(mqAdminExt, clusterName);
-            mqAdminExt.deleteTopicInBroker(masterSet, topic);
-            Set<String> nameServerSet = null;
-            if (StringUtils.isNotBlank(rMQConfigure.getNamesrvAddr())) {
-                String[] ns = rMQConfigure.getNamesrvAddr().split(";");
-                nameServerSet = new HashSet<String>(Arrays.asList(ns));
-            }
-            mqAdminExt.deleteTopicInNameServer(nameServerSet, topic);
-        }
-        catch (Exception err) {
-            throw Throwables.propagate(err);
-        }
-        return true;
-    }
-
-    @Override
-    public boolean deleteTopic(String topic) {
-        ClusterInfo clusterInfo = null;
-        try {
-            clusterInfo = mqAdminExt.examineBrokerClusterInfo();
-        }
-        catch (Exception err) {
-            throw Throwables.propagate(err);
-        }
-        for (String clusterName : clusterInfo.getClusterAddrTable().keySet()) {
-            deleteTopic(topic, clusterName);
-        }
-        return true;
-    }
-
-    @Override
-    public boolean deleteTopicInBroker(String brokerName, String topic) {
-
-        try {
-            ClusterInfo clusterInfo = null;
-            try {
-                clusterInfo = mqAdminExt.examineBrokerClusterInfo();
-            }
-            catch (Exception e) {
-                throw Throwables.propagate(e);
-            }
-            
mqAdminExt.deleteTopicInBroker(Sets.newHashSet(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr()),
 topic);
-        }
-        catch (Exception e) {
-            throw Throwables.propagate(e);
-        }
-        return true;
-    }
-
-    @Override
-    public SendResult sendTopicMessageRequest(SendTopicMessageRequest 
sendTopicMessageRequest) {
-        DefaultMQProducer producer = new 
DefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP);
-        producer.setInstanceName(String.valueOf(System.currentTimeMillis()));
-        producer.setNamesrvAddr(rMQConfigure.getNamesrvAddr());
-        try {
-            producer.start();
-            Message msg = new Message(sendTopicMessageRequest.getTopic(),
-                sendTopicMessageRequest.getTag(),
-                sendTopicMessageRequest.getKey(),
-                sendTopicMessageRequest.getMessageBody().getBytes()
-            );
-            return producer.send(msg);
-        }
-        catch (Exception e) {
-            throw Throwables.propagate(e);
-        }
-        finally {
-            producer.shutdown();
-        }
-    }
-
-}


Reply via email to