http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/4b8f1357/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/ClusterServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/ClusterServiceImpl.java
 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/ClusterServiceImpl.java
new file mode 100644
index 0000000..e225edc
--- /dev/null
+++ 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/ClusterServiceImpl.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.console.service.impl;
+
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.body.KVTable;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.tools.admin.MQAdminExt;
+import org.apache.rocketmq.console.service.ClusterService;
+import org.apache.rocketmq.console.util.JsonUtil;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.Resource;
+import java.util.Map;
+import java.util.Properties;
+
+@Service
+public class ClusterServiceImpl implements ClusterService {
+    private Logger logger = LoggerFactory.getLogger(ClusterServiceImpl.class);
+    @Resource
+    private MQAdminExt mqAdminExt;
+
+    @Override
+    public Map<String, Object> list() {
+        try {
+            Map<String, Object> resultMap = Maps.newHashMap();
+            ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
+            logger.info("op=look_clusterInfo {}", 
JsonUtil.obj2String(clusterInfo));
+            Map<String/*brokerName*/, Map<Long/* brokerId */, Object/* 
brokerDetail */>> brokerServer = Maps.newHashMap();
+            for (BrokerData brokerData : 
clusterInfo.getBrokerAddrTable().values()) {
+                Map<Long, Object> brokerMasterSlaveMap = Maps.newHashMap();
+                for (Map.Entry<Long/* brokerId */, String/* broker address */> 
brokerAddr : brokerData.getBrokerAddrs().entrySet()) {
+                    KVTable kvTable = 
mqAdminExt.fetchBrokerRuntimeStats(brokerAddr.getValue());
+//                KVTable kvTable = 
mqAdminExt.fetchBrokerRuntimeStats("127.0.0.1:10911");
+                    brokerMasterSlaveMap.put(brokerAddr.getKey(), 
kvTable.getTable());
+                }
+                brokerServer.put(brokerData.getBrokerName(), 
brokerMasterSlaveMap);
+            }
+            resultMap.put("clusterInfo", clusterInfo);
+            resultMap.put("brokerServer", brokerServer);
+            return resultMap;
+        }
+        catch (Exception err) {
+            throw Throwables.propagate(err);
+        }
+    }
+
+
+    @Override
+    public Properties getBrokerConfig(String brokerAddr) {
+        try {
+            return mqAdminExt.getBrokerConfig(brokerAddr);
+        }
+        catch (Exception e) {
+            throw Throwables.propagate(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/4b8f1357/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/ConsumerServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/ConsumerServiceImpl.java
 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/ConsumerServiceImpl.java
new file mode 100644
index 0000000..4485a7d
--- /dev/null
+++ 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/ConsumerServiceImpl.java
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.console.service.impl;
+
+import com.google.common.base.Predicate;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.MQVersion;
+import org.apache.rocketmq.common.admin.ConsumeStats;
+import org.apache.rocketmq.common.admin.RollbackStats;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.body.Connection;
+import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
+import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
+import org.apache.rocketmq.common.protocol.body.GroupList;
+import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import 
org.apache.rocketmq.console.aspect.admin.annotation.MultiMQAdminCmdMethod;
+import org.apache.rocketmq.console.model.ConsumerGroupRollBackStat;
+import org.apache.rocketmq.console.model.GroupConsumeInfo;
+import org.apache.rocketmq.console.model.QueueStatInfo;
+import org.apache.rocketmq.console.model.TopicConsumerInfo;
+import org.apache.rocketmq.console.model.request.ConsumerConfigInfo;
+import org.apache.rocketmq.console.model.request.DeleteSubGroupRequest;
+import org.apache.rocketmq.console.model.request.ResetOffsetRequest;
+import org.apache.rocketmq.console.service.AbstractCommonService;
+import org.apache.rocketmq.console.service.ConsumerService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+import static com.google.common.base.Throwables.propagate;
+
+@Service
+public class ConsumerServiceImpl extends AbstractCommonService implements 
ConsumerService {
+    private Logger logger = LoggerFactory.getLogger(ConsumerServiceImpl.class);
+
+    @Override
+    @MultiMQAdminCmdMethod
+    public List<GroupConsumeInfo> queryGroupList() {
+        Set<String> consumerGroupSet = Sets.newHashSet();
+        try {
+            ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
+            for (BrokerData brokerData : 
clusterInfo.getBrokerAddrTable().values()) {
+                SubscriptionGroupWrapper subscriptionGroupWrapper = 
mqAdminExt.getAllSubscriptionGroup(brokerData.selectBrokerAddr(), 3000L);
+                
consumerGroupSet.addAll(subscriptionGroupWrapper.getSubscriptionGroupTable().keySet());
+            }
+        }
+        catch (Exception err) {
+            throw Throwables.propagate(err);
+        }
+        List<GroupConsumeInfo> groupConsumeInfoList = Lists.newArrayList();
+        for (String consumerGroup : consumerGroupSet) {
+            groupConsumeInfoList.add(queryGroup(consumerGroup));
+        }
+        Collections.sort(groupConsumeInfoList);
+        return groupConsumeInfoList;
+    }
+
+    @Override
+    @MultiMQAdminCmdMethod
+    public GroupConsumeInfo queryGroup(String consumerGroup) {
+        GroupConsumeInfo groupConsumeInfo = new GroupConsumeInfo();
+        try {
+            ConsumeStats consumeStats = null;
+            try {
+                consumeStats = mqAdminExt.examineConsumeStats(consumerGroup);
+            }
+            catch (Exception e) {
+                logger.warn("examineConsumeStats exception, " + consumerGroup, 
e);
+            }
+
+            ConsumerConnection consumerConnection = null;
+            try {
+                consumerConnection = 
mqAdminExt.examineConsumerConnectionInfo(consumerGroup);
+            }
+            catch (Exception e) {
+                logger.warn("examineConsumerConnectionInfo exception, " + 
consumerGroup, e);
+            }
+
+            groupConsumeInfo.setGroup(consumerGroup);
+
+            if (consumeStats != null) {
+                
groupConsumeInfo.setConsumeTps((int)consumeStats.getConsumeTps());
+                groupConsumeInfo.setDiffTotal(consumeStats.computeTotalDiff());
+            }
+
+            if (consumerConnection != null) {
+                
groupConsumeInfo.setCount(consumerConnection.getConnectionSet().size());
+                
groupConsumeInfo.setMessageModel(consumerConnection.getMessageModel());
+                
groupConsumeInfo.setConsumeType(consumerConnection.getConsumeType());
+                
groupConsumeInfo.setVersion(MQVersion.getVersionDesc(consumerConnection.computeMinVersion()));
+            }
+        }
+        catch (Exception e) {
+            logger.warn("examineConsumeStats or examineConsumerConnectionInfo 
exception, "
+                + consumerGroup, e);
+        }
+        return groupConsumeInfo;
+    }
+
+    @Override
+    public List<TopicConsumerInfo> queryConsumeStatsListByGroupName(String 
groupName) {
+        return queryConsumeStatsList(null, groupName);
+    }
+
+    @Override
+    @MultiMQAdminCmdMethod
+    public List<TopicConsumerInfo> queryConsumeStatsList(final String topic, 
String groupName) {
+        ConsumeStats consumeStats = null;
+        try {
+            consumeStats = mqAdminExt.examineConsumeStats(groupName);
+        }
+        catch (Exception e) {
+            throw propagate(e);
+        }
+        List<MessageQueue> mqList = 
Lists.newArrayList(Iterables.filter(consumeStats.getOffsetTable().keySet(), new 
Predicate<MessageQueue>() {
+            @Override
+            public boolean apply(MessageQueue o) {
+                return StringUtils.isBlank(topic) || 
o.getTopic().equals(topic);
+            }
+        }));
+        Collections.sort(mqList);
+        List<TopicConsumerInfo> topicConsumerInfoList = Lists.newArrayList();
+        TopicConsumerInfo nowTopicConsumerInfo = null;
+        Map<MessageQueue, String> messageQueueClientMap = 
getClientConnection(groupName);
+        for (MessageQueue mq : mqList) {
+            if (nowTopicConsumerInfo == null || 
(!StringUtils.equals(mq.getTopic(), nowTopicConsumerInfo.getTopic()))) {
+                nowTopicConsumerInfo = new TopicConsumerInfo(mq.getTopic());
+                topicConsumerInfoList.add(nowTopicConsumerInfo);
+            }
+            QueueStatInfo queueStatInfo = 
QueueStatInfo.fromOffsetTableEntry(mq, consumeStats.getOffsetTable().get(mq));
+            queueStatInfo.setClientInfo(messageQueueClientMap.get(mq));
+            nowTopicConsumerInfo.appendQueueStatInfo(queueStatInfo);
+        }
+        return topicConsumerInfoList;
+    }
+
+    private Map<MessageQueue, String> getClientConnection(String groupName) {
+        Map<MessageQueue, String> results = Maps.newHashMap();
+        try {
+            ConsumerConnection consumerConnection = 
mqAdminExt.examineConsumerConnectionInfo(groupName);
+            for (Connection connection : 
consumerConnection.getConnectionSet()) {
+                String clinetId = connection.getClientId();
+                ConsumerRunningInfo consumerRunningInfo = 
mqAdminExt.getConsumerRunningInfo(groupName, clinetId, false);
+                for (MessageQueue messageQueue : 
consumerRunningInfo.getMqTable().keySet()) {
+//                    results.put(messageQueue, clinetId + " " + 
connection.getClientAddr());
+                    results.put(messageQueue, clinetId);
+                }
+            }
+        }
+        catch (Exception err) {
+            logger.error("op=getClientConnection_error", err);
+        }
+        return results;
+    }
+
+    @Override
+    @MultiMQAdminCmdMethod
+    public Map<String /*groupName*/, TopicConsumerInfo> 
queryConsumeStatsListByTopicName(String topic) {
+        Map<String, TopicConsumerInfo> group2ConsumerInfoMap = 
Maps.newHashMap();
+        try {
+            GroupList groupList = mqAdminExt.queryTopicConsumeByWho(topic);
+            for (String group : groupList.getGroupList()) {
+                List<TopicConsumerInfo> topicConsumerInfoList = 
queryConsumeStatsList(topic, group);
+                group2ConsumerInfoMap.put(group, 
CollectionUtils.isEmpty(topicConsumerInfoList) ? new TopicConsumerInfo(topic) : 
topicConsumerInfoList.get(0));
+            }
+            return group2ConsumerInfoMap;
+        }
+        catch (Exception e) {
+            throw propagate(e);
+        }
+    }
+
+    @Override
+    @MultiMQAdminCmdMethod
+    public Map<String, ConsumerGroupRollBackStat> 
resetOffset(ResetOffsetRequest resetOffsetRequest) {
+        Map<String, ConsumerGroupRollBackStat> groupRollbackStats = 
Maps.newHashMap();
+        for (String consumerGroup : resetOffsetRequest.getConsumerGroupList()) 
{
+            try {
+                Map<MessageQueue, Long> rollbackStatsMap =
+                    
mqAdminExt.resetOffsetByTimestamp(resetOffsetRequest.getTopic(), consumerGroup, 
resetOffsetRequest.getResetTime(), resetOffsetRequest.isForce());
+                ConsumerGroupRollBackStat consumerGroupRollBackStat = new 
ConsumerGroupRollBackStat(true);
+                List<RollbackStats> rollbackStatsList = 
consumerGroupRollBackStat.getRollbackStatsList();
+                for (Map.Entry<MessageQueue, Long> rollbackStatsEntty : 
rollbackStatsMap.entrySet()) {
+                    RollbackStats rollbackStats = new RollbackStats();
+                    
rollbackStats.setRollbackOffset(rollbackStatsEntty.getValue());
+                    
rollbackStats.setQueueId(rollbackStatsEntty.getKey().getQueueId());
+                    
rollbackStats.setBrokerName(rollbackStatsEntty.getKey().getBrokerName());
+                    rollbackStatsList.add(rollbackStats);
+                }
+                groupRollbackStats.put(consumerGroup, 
consumerGroupRollBackStat);
+            }
+            catch (MQClientException e) {
+                if (ResponseCode.CONSUMER_NOT_ONLINE == e.getResponseCode()) {
+                    try {
+                        ConsumerGroupRollBackStat consumerGroupRollBackStat = 
new ConsumerGroupRollBackStat(true);
+                        List<RollbackStats> rollbackStatsList = 
mqAdminExt.resetOffsetByTimestampOld(consumerGroup, 
resetOffsetRequest.getTopic(), resetOffsetRequest.getResetTime(), true);
+                        
consumerGroupRollBackStat.setRollbackStatsList(rollbackStatsList);
+                        groupRollbackStats.put(consumerGroup, 
consumerGroupRollBackStat);
+                        continue;
+                    }
+                    catch (Exception err) {
+                        logger.error("op=resetOffset_which_not_online_error", 
err);
+                    }
+                }
+                else {
+                    logger.error("op=resetOffset_error", e);
+                }
+                groupRollbackStats.put(consumerGroup, new 
ConsumerGroupRollBackStat(false, e.getMessage()));
+            }
+            catch (Exception e) {
+                logger.error("op=resetOffset_error", e);
+                groupRollbackStats.put(consumerGroup, new 
ConsumerGroupRollBackStat(false, e.getMessage()));
+            }
+        }
+        return groupRollbackStats;
+    }
+
+    @Override
+    @MultiMQAdminCmdMethod
+    public List<ConsumerConfigInfo> examineSubscriptionGroupConfig(String 
group) {
+        List<ConsumerConfigInfo> consumerConfigInfoList = Lists.newArrayList();
+        try {
+            ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
+            for (String brokerName : 
clusterInfo.getBrokerAddrTable().keySet()) { //foreach brokerName
+                String brokerAddress = 
clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr();
+                SubscriptionGroupConfig subscriptionGroupConfig = 
mqAdminExt.examineSubscriptionGroupConfig(brokerAddress, group);
+                if (subscriptionGroupConfig == null) {
+                    continue;
+                }
+                consumerConfigInfoList.add(new 
ConsumerConfigInfo(Lists.newArrayList(brokerName), subscriptionGroupConfig));
+            }
+        }
+        catch (Exception e) {
+            throw propagate(e);
+        }
+        return consumerConfigInfoList;
+    }
+
+    @Override
+    @MultiMQAdminCmdMethod
+    public boolean deleteSubGroup(DeleteSubGroupRequest deleteSubGroupRequest) 
{
+        try {
+            ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
+            for (String brokerName : 
deleteSubGroupRequest.getBrokerNameList()) {
+                logger.info("addr={} groupName={}", 
clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(), 
deleteSubGroupRequest.getGroupName());
+                
mqAdminExt.deleteSubscriptionGroup(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(),
 deleteSubGroupRequest.getGroupName());
+            }
+        }
+        catch (Exception e) {
+            throw propagate(e);
+        }
+        return true;
+    }
+
+    @Override
+    public boolean createAndUpdateSubscriptionGroupConfig(ConsumerConfigInfo 
consumerConfigInfo) {
+        try {
+            ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
+            for (String brokerName : 
changeToBrokerNameSet(clusterInfo.getClusterAddrTable(),
+                consumerConfigInfo.getClusterNameList(), 
consumerConfigInfo.getBrokerNameList())) {
+                
mqAdminExt.createAndUpdateSubscriptionGroupConfig(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(),
 consumerConfigInfo.getSubscriptionGroupConfig());
+            }
+        }
+        catch (Exception err) {
+            throw Throwables.propagate(err);
+        }
+        return true;
+    }
+
+    @Override
+    @MultiMQAdminCmdMethod
+    public Set<String> fetchBrokerNameSetBySubscriptionGroup(String group) {
+        Set<String> brokerNameSet = Sets.newHashSet();
+        try {
+            List<ConsumerConfigInfo> consumerConfigInfoList = 
examineSubscriptionGroupConfig(group);
+            for (ConsumerConfigInfo consumerConfigInfo : 
consumerConfigInfoList) {
+                brokerNameSet.addAll(consumerConfigInfo.getBrokerNameList());
+            }
+        }
+        catch (Exception e) {
+            throw Throwables.propagate(e);
+        }
+        return brokerNameSet;
+
+    }
+
+    @Override
+    public ConsumerConnection getConsumerConnection(String consumerGroup) {
+        try {
+            return mqAdminExt.examineConsumerConnectionInfo(consumerGroup);
+        }
+        catch (Exception e) {
+            throw Throwables.propagate(e);
+        }
+    }
+
+    @Override
+    public ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, 
String clientId, boolean jstack) {
+        try {
+            return mqAdminExt.getConsumerRunningInfo(consumerGroup, clientId, 
jstack);
+        }
+        catch (Exception e) {
+            throw Throwables.propagate(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/4b8f1357/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/DashboardCollectServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/DashboardCollectServiceImpl.java
 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/DashboardCollectServiceImpl.java
new file mode 100644
index 0000000..d32a344
--- /dev/null
+++ 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/DashboardCollectServiceImpl.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.console.service.impl;
+
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import com.google.common.base.Charsets;
+import com.google.common.base.Throwables;
+import com.google.common.base.Ticker;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.io.Files;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Resource;
+import org.apache.rocketmq.console.config.RMQConfigure;
+import org.apache.rocketmq.console.exception.ServiceException;
+import org.apache.rocketmq.console.service.DashboardCollectService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+@Service
+public class DashboardCollectServiceImpl implements DashboardCollectService {
+
+    @Resource
+    private RMQConfigure rmqConfigure;
+
+    private final static Logger log = 
LoggerFactory.getLogger(DashboardCollectServiceImpl.class);
+
+    private LoadingCache<String, List<String>> brokerMap = 
CacheBuilder.newBuilder()
+        .maximumSize(1000)
+        .concurrencyLevel(10)
+        .recordStats()
+        .ticker(Ticker.systemTicker())
+        .removalListener(new RemovalListener<Object, Object>() {
+            @Override
+            public void onRemoval(RemovalNotification<Object, Object> 
notification) {
+                log.debug(notification.getKey() + " was removed, cause is " + 
notification.getCause());
+            }
+        })
+        .build(
+            new CacheLoader<String, List<String>>() {
+                @Override
+                public List<String> load(String key) {
+                    List<String> list = Lists.newArrayList();
+                    return list;
+                }
+            }
+        );
+
+    private LoadingCache<String, List<String>> topicMap = 
CacheBuilder.newBuilder()
+        .maximumSize(1000)
+        .concurrencyLevel(10)
+        .recordStats()
+        .ticker(Ticker.systemTicker())
+        .removalListener(new RemovalListener<Object, Object>() {
+            @Override
+            public void onRemoval(RemovalNotification<Object, Object> 
notification) {
+                log.debug(notification.getKey() + " was removed, cause is " + 
notification.getCause());
+            }
+        })
+        .build(
+            new CacheLoader<String, List<String>>() {
+                @Override
+                public List<String> load(String key) {
+                    List<String> list = Lists.newArrayList();
+                    return list;
+                }
+            }
+        );
+
+    @Override
+    public LoadingCache<String, List<String>> getBrokerMap() {
+        return brokerMap;
+    }
+    @Override
+    public LoadingCache<String, List<String>> getTopicMap() {
+        return topicMap;
+    }
+
+    @Override
+    public Map<String, List<String>> jsonDataFile2map(File file) {
+        List<String> strings;
+        try {
+            strings = Files.readLines(file, Charsets.UTF_8);
+        }
+        catch (IOException e) {
+            throw Throwables.propagate(e);
+        }
+        StringBuffer sb = new StringBuffer();
+        for (String string : strings) {
+            sb.append(string);
+        }
+        JSONObject json = (JSONObject) JSONObject.parse(sb.toString());
+        Set<Map.Entry<String, Object>> entries = json.entrySet();
+        Map<String, List<String>> map = Maps.newHashMap();
+        for (Map.Entry<String, Object> entry : entries) {
+            JSONArray tpsArray = (JSONArray) entry.getValue();
+            if (tpsArray == null) {
+                continue;
+            }
+            Object[] tpsStrArray = tpsArray.toArray();
+            List<String> tpsList = Lists.newArrayList();
+            for (Object tpsObj : tpsStrArray) {
+                tpsList.add("" + tpsObj);
+            }
+            map.put(entry.getKey(), tpsList);
+        }
+        return map;
+    }
+
+    @Override
+    public Map<String, List<String>> getBrokerCache(String date) {
+        String dataLocationPath = rmqConfigure.getConsoleCollectData();
+        File file = new File(dataLocationPath + date + ".json");
+        if (!file.exists()) {
+            throw Throwables.propagate(new ServiceException(1, "This date 
have't data!"));
+        }
+        return jsonDataFile2map(file);
+    }
+
+    @Override
+    public Map<String, List<String>> getTopicCache(String date) {
+        String dataLocationPath = rmqConfigure.getConsoleCollectData();
+        File file = new File(dataLocationPath + date + "_topic" + ".json");
+        if (!file.exists()) {
+            throw Throwables.propagate(new ServiceException(1, "This date 
have't data!"));
+        }
+        return jsonDataFile2map(file);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/4b8f1357/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/DashboardServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/DashboardServiceImpl.java
 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/DashboardServiceImpl.java
new file mode 100644
index 0000000..3189093
--- /dev/null
+++ 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/DashboardServiceImpl.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.console.service.impl;
+
+import com.google.common.collect.Lists;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Resource;
+import org.apache.rocketmq.console.service.DashboardCollectService;
+import org.apache.rocketmq.console.service.DashboardService;
+import org.springframework.stereotype.Service;
+
+@Service
+public class DashboardServiceImpl implements DashboardService {
+
+    @Resource
+    private DashboardCollectService dashboardCollectService;
+    /**
+     * @param date format yyyy-MM-dd
+     */
+    @Override
+    public Map<String, List<String>> queryBrokerData(String date) {
+        return dashboardCollectService.getBrokerCache(date);
+    }
+
+    @Override
+    public Map<String, List<String>> queryTopicData(String date) {
+        return dashboardCollectService.getTopicCache(date);
+    }
+
+    /**
+     * @param date format yyyy-MM-dd
+     * @param topicName
+     */
+    @Override
+    public List<String> queryTopicData(String date, String topicName) {
+        if (null != dashboardCollectService.getTopicCache(date)) {
+            return dashboardCollectService.getTopicCache(date).get(topicName);
+        }
+        return null;
+    }
+
+    @Override
+    public List<String> queryTopicCurrentData() {
+        Date date = new Date();
+        DateFormat format = new SimpleDateFormat("yyyy-MM-dd");
+        Map<String, List<String>> topicCache = 
dashboardCollectService.getTopicCache(format.format(date));
+        List<String> result = Lists.newArrayList();
+        for (Map.Entry<String, List<String>> entry : topicCache.entrySet()) {
+            List<String> value = entry.getValue();
+            result.add(entry.getKey() + "," + value.get(value.size() - 
1).split(",")[4]);
+        }
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/4b8f1357/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/MessageServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/MessageServiceImpl.java
 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/MessageServiceImpl.java
new file mode 100644
index 0000000..0205a69
--- /dev/null
+++ 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/MessageServiceImpl.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.console.service.impl;
+
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.List;
+import java.util.Set;
+import javax.annotation.Resource;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.Pair;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.body.Connection;
+import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
+import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
+import org.apache.rocketmq.console.model.MessageView;
+import org.apache.rocketmq.console.service.MessageService;
+import org.apache.rocketmq.tools.admin.MQAdminExt;
+import org.apache.rocketmq.tools.admin.api.MessageTrack;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+@Service
+public class MessageServiceImpl implements MessageService {
+
+    private Logger logger = LoggerFactory.getLogger(MessageServiceImpl.class);
+    /**
+     * @see org.apache.rocketmq.store.config.MessageStoreConfig 
maxMsgsNumBatch = 64;
+     * @see org.apache.rocketmq.store.index.IndexService maxNum = 
Math.min(maxNum, 
this.defaultMessageStore.getMessageStoreConfig().getMaxMsgsNumBatch());
+     */
+    private final static int QUERY_MESSAGE_MAX_NUM = 64;
+    @Resource
+    private MQAdminExt mqAdminExt;
+
+    public Pair<MessageView, List<MessageTrack>> viewMessage(String subject, 
final String msgId) {
+        try {
+
+            MessageExt messageExt = mqAdminExt.viewMessage(subject, msgId);
+            List<MessageTrack> messageTrackList = 
messageTrackDetail(messageExt);
+            return new Pair<>(MessageView.fromMessageExt(messageExt), 
messageTrackList);
+        }
+        catch (Exception e) {
+            throw Throwables.propagate(e);
+        }
+    }
+
+    @Override
+    public List<MessageView> queryMessageByTopicAndKey(String topic, String 
key) {
+        try {
+            return Lists.transform(mqAdminExt.queryMessage(topic, key, 
QUERY_MESSAGE_MAX_NUM, 0, System.currentTimeMillis()).getMessageList(), new 
Function<MessageExt, MessageView>() {
+                @Override
+                public MessageView apply(MessageExt messageExt) {
+                    return MessageView.fromMessageExt(messageExt);
+                }
+            });
+        }
+        catch (Exception err) {
+            throw Throwables.propagate(err);
+        }
+    }
+
+    @Override
+    public List<MessageView> queryMessageByTopic(String topic, final long 
begin, final long end) {
+        DefaultMQPullConsumer consumer = new 
DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, null);
+        List<MessageView> messageViewList = Lists.newArrayList();
+        try {
+            String subExpression = "*";
+            consumer.start();
+            Set<MessageQueue> mqs = 
consumer.fetchSubscribeMessageQueues(topic);
+            for (MessageQueue mq : mqs) {
+                long minOffset = consumer.searchOffset(mq, begin);
+                long maxOffset = consumer.searchOffset(mq, end);
+                READQ:
+                for (long offset = minOffset; offset <= maxOffset; ) {
+                    try {
+                        if (messageViewList.size() > 2000) {
+                            break;
+                        }
+                        PullResult pullResult = consumer.pull(mq, 
subExpression, offset, 32);
+                        offset = pullResult.getNextBeginOffset();
+                        switch (pullResult.getPullStatus()) {
+                            case FOUND:
+
+                                List<MessageView> messageViewListByQuery = 
Lists.transform(pullResult.getMsgFoundList(), new Function<MessageExt, 
MessageView>() {
+                                    @Override
+                                    public MessageView apply(MessageExt 
messageExt) {
+                                        messageExt.setBody(null);
+                                        return 
MessageView.fromMessageExt(messageExt);
+                                    }
+                                });
+                                List<MessageView> filteredList = 
Lists.newArrayList(Iterables.filter(messageViewListByQuery, new 
Predicate<MessageView>() {
+                                    @Override
+                                    public boolean apply(MessageView 
messageView) {
+                                        if (messageView.getStoreTimestamp() < 
begin || messageView.getStoreTimestamp() > end) {
+                                            logger.info("begin={} end={} time 
not in range {} {}", begin, end, messageView.getStoreTimestamp(), new 
Date(messageView.getStoreTimestamp()).toString());
+                                        }
+                                        return messageView.getStoreTimestamp() 
>= begin && messageView.getStoreTimestamp() <= end;
+                                    }
+                                }));
+                                messageViewList.addAll(filteredList);
+                                break;
+                            case NO_MATCHED_MSG:
+                            case NO_NEW_MSG:
+                            case OFFSET_ILLEGAL:
+                                break READQ;
+                        }
+                    }
+                    catch (Exception e) {
+                        break;
+                    }
+                }
+            }
+            Collections.sort(messageViewList, new Comparator<MessageView>() {
+                @Override
+                public int compare(MessageView o1, MessageView o2) {
+                    if (o1.getStoreTimestamp() - o2.getStoreTimestamp() == 0) {
+                        return 0;
+                    }
+                    return (o1.getStoreTimestamp() > o2.getStoreTimestamp()) ? 
-1 : 1;
+                }
+            });
+            return messageViewList;
+        }
+        catch (Exception e) {
+            throw Throwables.propagate(e);
+        }
+        finally {
+            consumer.shutdown();
+        }
+    }
+
+    @Override
+    public List<MessageTrack> messageTrackDetail(MessageExt msg) {
+        try {
+            return mqAdminExt.messageTrackDetail(msg);
+        }
+        catch (Exception e) {
+            logger.error("op=messageTrackDetailError", e);
+            return Collections.emptyList();
+        }
+    }
+
+
+    @Override
+    public ConsumeMessageDirectlyResult consumeMessageDirectly(String topic, 
String msgId, String consumerGroup,
+        String clientId) {
+        if (StringUtils.isNotBlank(clientId)) {
+            try {
+                return mqAdminExt.consumeMessageDirectly(consumerGroup, 
clientId, topic, msgId);
+            }
+            catch (Exception e) {
+                throw Throwables.propagate(e);
+            }
+        }
+
+        try {
+            ConsumerConnection consumerConnection = 
mqAdminExt.examineConsumerConnectionInfo(consumerGroup);
+            for (Connection connection : 
consumerConnection.getConnectionSet()) {
+                if (StringUtils.isBlank(connection.getClientId())) {
+                    continue;
+                }
+                logger.info("clientId={}", connection.getClientId());
+                return mqAdminExt.consumeMessageDirectly(consumerGroup, 
connection.getClientId(), topic, msgId);
+            }
+        }
+        catch (Exception e) {
+            throw Throwables.propagate(e);
+        }
+        throw new IllegalStateException("NO CONSUMER");
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/4b8f1357/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/MonitorServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/MonitorServiceImpl.java
 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/MonitorServiceImpl.java
new file mode 100644
index 0000000..d3a109d
--- /dev/null
+++ 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/MonitorServiceImpl.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.console.service.impl;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.base.Throwables;
+import java.io.File;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.PostConstruct;
+import javax.annotation.Resource;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.console.config.RMQConfigure;
+import org.apache.rocketmq.console.model.ConsumerMonitorConfig;
+import org.apache.rocketmq.console.service.MonitorService;
+import org.apache.rocketmq.console.util.JsonUtil;
+import org.springframework.stereotype.Service;
+
+@Service
+public class MonitorServiceImpl implements MonitorService {
+
+
+    @Resource
+    private RMQConfigure rmqConfigure;
+
+    private Map<String, ConsumerMonitorConfig> configMap = new 
ConcurrentHashMap<>();
+
+    @Override
+    public boolean createOrUpdateConsumerMonitor(String name, 
ConsumerMonitorConfig config) {
+        configMap.put(name, config);// todo if write map success but write 
file fail
+        writeToFile(getConsumerMonitorConfigDataPath(), configMap);
+        return true;
+    }
+
+    @Override
+    public Map<String, ConsumerMonitorConfig> queryConsumerMonitorConfig() {
+        return configMap;
+    }
+
+    @Override
+    public ConsumerMonitorConfig queryConsumerMonitorConfigByGroupName(String 
consumeGroupName) {
+        return configMap.get(consumeGroupName);
+    }
+
+    @Override
+    public boolean deleteConsumerMonitor(String consumeGroupName) {
+        configMap.remove(consumeGroupName);
+        writeToFile(getConsumerMonitorConfigDataPath(), configMap);
+        return true;
+    }
+
+    //rocketmq.console.data.path/monitor/consumerMonitorConfig.json
+    private String getConsumerMonitorConfigDataPath() {
+        return rmqConfigure.getRocketMqConsoleDataPath() + File.separatorChar 
+ "monitor" + File.separatorChar + "consumerMonitorConfig.json";
+    }
+
+    private String getConsumerMonitorConfigDataPathBackUp() {
+        return getConsumerMonitorConfigDataPath() + ".bak";
+    }
+
+    private void writeToFile(String path, Object data) {
+        writeDataJsonToFile(path, JsonUtil.obj2String(data));
+    }
+
+    private void writeDataJsonToFile(String path, String dataStr) {
+        try {
+            MixAll.string2File(dataStr, path);
+        }
+        catch (Exception e) {
+            throw Throwables.propagate(e);
+        }
+    }
+
+    @PostConstruct
+    private void loadData() {
+        String content = 
MixAll.file2String(getConsumerMonitorConfigDataPath());
+        if (content == null) {
+            content = 
MixAll.file2String(getConsumerMonitorConfigDataPathBackUp());
+        }
+        if (content == null) {
+            return;
+        }
+        configMap = JsonUtil.string2Obj(content, new 
TypeReference<ConcurrentHashMap<String, ConsumerMonitorConfig>>() {
+        });
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/4b8f1357/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/OpsServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/OpsServiceImpl.java
 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/OpsServiceImpl.java
new file mode 100644
index 0000000..84e6d2f
--- /dev/null
+++ 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/OpsServiceImpl.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.console.service.impl;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Maps;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Resource;
+import org.apache.rocketmq.console.config.RMQConfigure;
+import org.apache.rocketmq.console.service.AbstractCommonService;
+import org.apache.rocketmq.console.service.OpsService;
+import org.apache.rocketmq.console.service.checker.CheckerType;
+import org.apache.rocketmq.console.service.checker.RocketMqChecker;
+import org.springframework.stereotype.Service;
+
+@Service
+public class OpsServiceImpl extends AbstractCommonService implements 
OpsService {
+
+    @Resource
+    private RMQConfigure rMQConfigure;
+
+    @Resource
+    private List<RocketMqChecker> rocketMqCheckerList;
+
+    @Override
+    public Map<String, Object> homePageInfo() {
+        Map<String, Object> homePageInfoMap = Maps.newHashMap();
+        homePageInfoMap.put("namesvrAddrList", 
Splitter.on(";").splitToList(rMQConfigure.getNamesrvAddr()));
+        homePageInfoMap.put("useVIPChannel", 
Boolean.valueOf(rMQConfigure.getIsVIPChannel()));
+        return homePageInfoMap;
+    }
+
+    @Override
+    public void updateNameSvrAddrList(String nameSvrAddrList) {
+        rMQConfigure.setNamesrvAddr(nameSvrAddrList);
+    }
+
+    @Override
+    public String getNameSvrList() {
+        return rMQConfigure.getNamesrvAddr();
+    }
+
+    @Override
+    public Map<CheckerType, Object> rocketMqStatusCheck() {
+        Map<CheckerType, Object> checkResultMap = Maps.newHashMap();
+        for (RocketMqChecker rocketMqChecker : rocketMqCheckerList) {
+            checkResultMap.put(rocketMqChecker.checkerType(), 
rocketMqChecker.doCheck());
+        }
+        return checkResultMap;
+    }
+
+    @Override public boolean updateIsVIPChannel(String useVIPChannel) {
+        rMQConfigure.setIsVIPChannel(useVIPChannel);
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/4b8f1357/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/ProducerServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/ProducerServiceImpl.java
 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/ProducerServiceImpl.java
new file mode 100644
index 0000000..3e46958
--- /dev/null
+++ 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/ProducerServiceImpl.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.console.service.impl;
+
+import com.google.common.base.Throwables;
+import javax.annotation.Resource;
+import org.apache.rocketmq.common.protocol.body.ProducerConnection;
+import org.apache.rocketmq.console.service.ProducerService;
+import org.apache.rocketmq.tools.admin.MQAdminExt;
+import org.springframework.stereotype.Service;
+
+@Service
+public class ProducerServiceImpl implements ProducerService {
+    @Resource
+    private MQAdminExt mqAdminExt;
+
+    @Override
+    public ProducerConnection getProducerConnection(String producerGroup, 
String topic) {
+        try {
+            return mqAdminExt.examineProducerConnectionInfo(producerGroup, 
topic);
+        }
+        catch (Exception e) {
+            throw Throwables.propagate(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/4b8f1357/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/TopicServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/TopicServiceImpl.java
 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/TopicServiceImpl.java
new file mode 100644
index 0000000..117bcfd
--- /dev/null
+++ 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/TopicServiceImpl.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.console.service.impl;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.admin.TopicStatsTable;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.body.GroupList;
+import org.apache.rocketmq.common.protocol.body.TopicList;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.console.config.RMQConfigure;
+import org.apache.rocketmq.console.model.request.SendTopicMessageRequest;
+import org.apache.rocketmq.console.model.request.TopicConfigInfo;
+import org.apache.rocketmq.console.service.AbstractCommonService;
+import org.apache.rocketmq.console.service.TopicService;
+import org.apache.rocketmq.tools.command.CommandUtil;
+import org.springframework.beans.BeanUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+@Service
+public class TopicServiceImpl extends AbstractCommonService implements 
TopicService {
+
+    @Autowired
+    private RMQConfigure rMQConfigure;
+
+    @Override
+    public TopicList fetchAllTopicList() {
+        try {
+            return mqAdminExt.fetchAllTopicList();
+        }
+        catch (Exception e) {
+            throw Throwables.propagate(e);
+        }
+    }
+
+    @Override
+    public TopicStatsTable stats(String topic) {
+        try {
+            return mqAdminExt.examineTopicStats(topic);
+        }
+        catch (Exception e) {
+            throw Throwables.propagate(e);
+        }
+    }
+
+    @Override
+    public TopicRouteData route(String topic) {
+        try {
+            return mqAdminExt.examineTopicRouteInfo(topic);
+        }
+        catch (Exception ex) {
+            throw Throwables.propagate(ex);
+        }
+    }
+
+    @Override
+    public GroupList queryTopicConsumerInfo(String topic) {
+        try {
+            return mqAdminExt.queryTopicConsumeByWho(topic);
+        }
+        catch (Exception e) {
+            throw Throwables.propagate(e);
+        }
+    }
+
+    @Override
+    public void createOrUpdate(TopicConfigInfo topicCreateOrUpdateRequest) {
+        TopicConfig topicConfig = new TopicConfig();
+        BeanUtils.copyProperties(topicCreateOrUpdateRequest, topicConfig);
+        try {
+            ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
+            for (String brokerName : 
changeToBrokerNameSet(clusterInfo.getClusterAddrTable(),
+                topicCreateOrUpdateRequest.getClusterNameList(), 
topicCreateOrUpdateRequest.getBrokerNameList())) {
+                
mqAdminExt.createAndUpdateTopicConfig(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(),
 topicConfig);
+            }
+        }
+        catch (Exception err) {
+            throw Throwables.propagate(err);
+        }
+    }
+
+    @Override
+    public TopicConfig examineTopicConfig(String topic, String brokerName) {
+        ClusterInfo clusterInfo = null;
+        try {
+            clusterInfo = mqAdminExt.examineBrokerClusterInfo();
+        }
+        catch (Exception e) {
+            throw Throwables.propagate(e);
+        }
+        return 
mqAdminExt.examineTopicConfig(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(),
 topic);
+    }
+
+    @Override
+    public List<TopicConfigInfo> examineTopicConfig(String topic) {
+        List<TopicConfigInfo> topicConfigInfoList = Lists.newArrayList();
+        TopicRouteData topicRouteData = route(topic);
+        for (BrokerData brokerData : topicRouteData.getBrokerDatas()) {
+            TopicConfigInfo topicConfigInfo = new TopicConfigInfo();
+            TopicConfig topicConfig = examineTopicConfig(topic, 
brokerData.getBrokerName());
+            BeanUtils.copyProperties(topicConfig, topicConfigInfo);
+            
topicConfigInfo.setBrokerNameList(Lists.newArrayList(brokerData.getBrokerName()));
+            topicConfigInfoList.add(topicConfigInfo);
+        }
+        return topicConfigInfoList;
+    }
+
+    @Override
+    public boolean deleteTopic(String topic, String clusterName) {
+        try {
+            if (StringUtils.isBlank(clusterName)) {
+                return deleteTopic(topic);
+            }
+            Set<String> masterSet = 
CommandUtil.fetchMasterAddrByClusterName(mqAdminExt, clusterName);
+            mqAdminExt.deleteTopicInBroker(masterSet, topic);
+            Set<String> nameServerSet = null;
+            if (StringUtils.isNotBlank(rMQConfigure.getNamesrvAddr())) {
+                String[] ns = rMQConfigure.getNamesrvAddr().split(";");
+                nameServerSet = new HashSet<String>(Arrays.asList(ns));
+            }
+            mqAdminExt.deleteTopicInNameServer(nameServerSet, topic);
+        }
+        catch (Exception err) {
+            throw Throwables.propagate(err);
+        }
+        return true;
+    }
+
+    @Override
+    public boolean deleteTopic(String topic) {
+        ClusterInfo clusterInfo = null;
+        try {
+            clusterInfo = mqAdminExt.examineBrokerClusterInfo();
+        }
+        catch (Exception err) {
+            throw Throwables.propagate(err);
+        }
+        for (String clusterName : clusterInfo.getClusterAddrTable().keySet()) {
+            deleteTopic(topic, clusterName);
+        }
+        return true;
+    }
+
+    @Override
+    public boolean deleteTopicInBroker(String brokerName, String topic) {
+
+        try {
+            ClusterInfo clusterInfo = null;
+            try {
+                clusterInfo = mqAdminExt.examineBrokerClusterInfo();
+            }
+            catch (Exception e) {
+                throw Throwables.propagate(e);
+            }
+            
mqAdminExt.deleteTopicInBroker(Sets.newHashSet(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr()),
 topic);
+        }
+        catch (Exception e) {
+            throw Throwables.propagate(e);
+        }
+        return true;
+    }
+
+    @Override
+    public SendResult sendTopicMessageRequest(SendTopicMessageRequest 
sendTopicMessageRequest) {
+        DefaultMQProducer producer = new 
DefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP);
+        producer.setInstanceName(String.valueOf(System.currentTimeMillis()));
+        producer.setNamesrvAddr(rMQConfigure.getNamesrvAddr());
+        try {
+            producer.start();
+            Message msg = new Message(sendTopicMessageRequest.getTopic(),
+                sendTopicMessageRequest.getTag(),
+                sendTopicMessageRequest.getKey(),
+                sendTopicMessageRequest.getMessageBody().getBytes()
+            );
+            return producer.send(msg);
+        }
+        catch (Exception e) {
+            throw Throwables.propagate(e);
+        }
+        finally {
+            producer.shutdown();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/4b8f1357/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/support/GlobalExceptionHandler.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/support/GlobalExceptionHandler.java
 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/support/GlobalExceptionHandler.java
new file mode 100644
index 0000000..37a8d64
--- /dev/null
+++ 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/support/GlobalExceptionHandler.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.console.support;
+
+import javax.servlet.http.HttpServletRequest;
+import org.apache.rocketmq.console.exception.ServiceException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.web.bind.annotation.ControllerAdvice;
+import org.springframework.web.bind.annotation.ExceptionHandler;
+import org.springframework.web.bind.annotation.ResponseBody;
+
+@ControllerAdvice(basePackages = "org.apache.rocketmq.console")
+public class GlobalExceptionHandler {
+    private Logger logger = 
LoggerFactory.getLogger(GlobalExceptionHandler.class);
+
+    @ExceptionHandler(value = Exception.class)
+    @ResponseBody
+    public JsonResult<Object> jsonErrorHandler(HttpServletRequest req, 
Exception ex) throws Exception {
+        JsonResult<Object> value = null;
+        if (ex != null) {
+            logger.error("op=global_exception_handler_print_error", ex);
+            if (ex instanceof ServiceException) {
+                value = new JsonResult<Object>(((ServiceException) 
ex).getCode(), ex.getMessage());
+            }
+            else {
+                value = new JsonResult<Object>(-1, ex.getMessage());
+            }
+        }
+        return value;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/4b8f1357/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/support/GlobalRestfulResponseBodyAdvice.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/support/GlobalRestfulResponseBodyAdvice.java
 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/support/GlobalRestfulResponseBodyAdvice.java
new file mode 100644
index 0000000..e67fa33
--- /dev/null
+++ 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/support/GlobalRestfulResponseBodyAdvice.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.console.support;
+
+import java.lang.annotation.Annotation;
+import 
org.apache.rocketmq.console.aspect.admin.annotation.OriginalControllerReturnValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.core.MethodParameter;
+import org.springframework.http.MediaType;
+import org.springframework.http.converter.HttpMessageConverter;
+import org.springframework.http.server.ServerHttpRequest;
+import org.springframework.http.server.ServerHttpResponse;
+import org.springframework.web.bind.annotation.ControllerAdvice;
+import 
org.springframework.web.servlet.mvc.method.annotation.ResponseBodyAdvice;
+
+@ControllerAdvice(basePackages = "org.apache.rocketmq.console")
+public class GlobalRestfulResponseBodyAdvice implements 
ResponseBodyAdvice<Object> {
+
+    private Logger logger = 
LoggerFactory.getLogger(GlobalRestfulResponseBodyAdvice.class);
+
+    @Override
+    public Object beforeBodyWrite(
+        Object obj, MethodParameter methodParameter, MediaType mediaType,
+        Class<? extends HttpMessageConverter<?>> converterType,
+        ServerHttpRequest serverHttpRequest, ServerHttpResponse 
serverHttpResponse) {
+        Annotation originalControllerReturnValue = 
methodParameter.getMethodAnnotation(OriginalControllerReturnValue.class);
+        if (originalControllerReturnValue != null) {
+            return obj;
+        }
+        JsonResult value;
+        if (obj instanceof JsonResult) {
+            value = (JsonResult)obj;
+        }
+        else {
+            value = new JsonResult(obj);
+        }
+        return value;
+    }
+
+    @Override
+    public boolean supports(MethodParameter returnType, Class<? extends 
HttpMessageConverter<?>> converterType) {
+
+        return true;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/4b8f1357/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/support/JsonResult.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/support/JsonResult.java
 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/support/JsonResult.java
new file mode 100644
index 0000000..f5e20dd
--- /dev/null
+++ 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/support/JsonResult.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.console.support;
+
+public class JsonResult<T> {
+    private int status = 0;
+    private T data;
+    private String errMsg;
+
+    public JsonResult(T data) {
+        this.data = data;
+    }
+
+    public JsonResult(int status, String errMsg) {
+        this.status = status;
+        this.errMsg = errMsg;
+    }
+
+    public JsonResult(int status, T data, String errMsg) {
+        this.status = status;
+        this.data = data;
+        this.errMsg = errMsg;
+    }
+
+    public int getStatus() {
+        return status;
+    }
+
+    public void setStatus(int status) {
+        this.status = status;
+    }
+
+    public T getData() {
+        return data;
+    }
+
+    public void setData(T data) {
+        this.data = data;
+    }
+
+    public String getErrMsg() {
+        return errMsg;
+    }
+
+    public void setErrMsg(String errMsg) {
+        this.errMsg = errMsg;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/4b8f1357/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/task/DashboardCollectTask.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/task/DashboardCollectTask.java
 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/task/DashboardCollectTask.java
new file mode 100644
index 0000000..db1fbc4
--- /dev/null
+++ 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/task/DashboardCollectTask.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.console.task;
+
+import com.google.common.base.Stopwatch;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.body.GroupList;
+import org.apache.rocketmq.common.protocol.body.KVTable;
+import org.apache.rocketmq.common.protocol.body.TopicList;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import 
org.apache.rocketmq.console.aspect.admin.annotation.MultiMQAdminCmdMethod;
+import org.apache.rocketmq.store.stats.BrokerStatsManager;
+import org.apache.rocketmq.tools.admin.MQAdminExt;
+import org.apache.rocketmq.tools.command.stats.StatsAllSubCommand;
+import com.google.common.base.Throwables;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.io.Files;
+import java.io.File;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import javax.annotation.Resource;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.protocol.body.BrokerStatsData;
+import org.apache.rocketmq.console.config.RMQConfigure;
+import org.apache.rocketmq.console.service.DashboardCollectService;
+import org.apache.rocketmq.console.util.JsonUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+@Component
+public class DashboardCollectTask {
+    private Date currentDate = new Date();
+    @Resource
+    private MQAdminExt mqAdminExt;
+    @Resource
+    private RMQConfigure rmqConfigure;
+
+    @Resource
+    private DashboardCollectService dashboardCollectService;
+
+    private final static Logger log = 
LoggerFactory.getLogger(DashboardCollectTask.class);
+
+    @Scheduled(cron = "30 0/1 * * * ?")
+    @MultiMQAdminCmdMethod(timeoutMillis = 5000)
+    public void collectTopic() {
+        if (!rmqConfigure.isEnableDashBoardCollect()) {
+            return;
+        }
+        Date date = new Date();
+        Stopwatch stopwatch = Stopwatch.createStarted();
+        try {
+            TopicList topicList = mqAdminExt.fetchAllTopicList();
+            Set<String> topicSet = topicList.getTopicList();
+            for (String topic : topicSet) {
+                if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) || 
topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)) {
+                    continue;
+                }
+
+                TopicRouteData topicRouteData = 
mqAdminExt.examineTopicRouteInfo(topic);
+
+                GroupList groupList = mqAdminExt.queryTopicConsumeByWho(topic);
+
+                double inTPS = 0;
+
+                long inMsgCntToday = 0;
+
+                double outTPS = 0;
+
+                long outMsgCntToday = 0;
+
+                for (BrokerData bd : topicRouteData.getBrokerDatas()) {
+                    String masterAddr = 
bd.getBrokerAddrs().get(MixAll.MASTER_ID);
+                    if (masterAddr != null) {
+                        try {
+                            stopwatch.start();
+                            log.info("start time: {}", stopwatch.toString());
+                            BrokerStatsData bsd = 
mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.TOPIC_PUT_NUMS, 
topic);
+                            stopwatch.stop();
+                            log.info("stop time : {}", stopwatch.toString());
+                            stopwatch.reset();
+                            inTPS += bsd.getStatsMinute().getTps();
+                            inMsgCntToday += 
StatsAllSubCommand.compute24HourSum(bsd);
+                        }
+                        catch (Exception e) {
+//                            throw Throwables.propagate(e);
+                        }
+                    }
+                }
+
+                if (groupList != null && !groupList.getGroupList().isEmpty()) {
+
+                    for (String group : groupList.getGroupList()) {
+                        for (BrokerData bd : topicRouteData.getBrokerDatas()) {
+                            String masterAddr = 
bd.getBrokerAddrs().get(MixAll.MASTER_ID);
+                            if (masterAddr != null) {
+                                try {
+                                    String statsKey = String.format("%s@%s", 
topic, group);
+                                    BrokerStatsData bsd = 
mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.GROUP_GET_NUMS, 
statsKey);
+                                    outTPS += bsd.getStatsMinute().getTps();
+                                    outMsgCntToday += 
StatsAllSubCommand.compute24HourSum(bsd);
+                                }
+                                catch (Exception e) {
+//                                    throw Throwables.propagate(e);
+                                }
+                            }
+                        }
+                    }
+                }
+
+                List<String> list;
+                try {
+                    list = dashboardCollectService.getTopicMap().get(topic);
+                }
+                catch (ExecutionException e) {
+                    throw Throwables.propagate(e);
+                }
+                if (null == list) {
+                    list = Lists.newArrayList();
+                }
+
+                list.add(date.getTime() + "," + new 
BigDecimal(inTPS).setScale(5, BigDecimal.ROUND_HALF_UP) + "," + inMsgCntToday + 
"," + new BigDecimal(outTPS).setScale(5, BigDecimal.ROUND_HALF_UP) + "," + 
outMsgCntToday);
+                dashboardCollectService.getTopicMap().put(topic, list);
+
+            }
+
+            log.debug("Topic Collected Data in memory = {}" + 
JsonUtil.obj2String(dashboardCollectService.getTopicMap().asMap()));
+        }
+        catch (Exception err) {
+            throw Throwables.propagate(err);
+        }
+    }
+
+    @Scheduled(cron = "0 0/1 * * * ?")
+    public void collectBroker() {
+        if (!rmqConfigure.isEnableDashBoardCollect()) {
+            return;
+        }
+        try {
+            Date date = new Date();
+            ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
+            Set<Map.Entry<String, BrokerData>> clusterEntries = 
clusterInfo.getBrokerAddrTable().entrySet();
+
+            Map<String, String> addresses = Maps.newHashMap();
+            for (Map.Entry<String, BrokerData> clusterEntry : clusterEntries) {
+                HashMap<Long, String> addrs = 
clusterEntry.getValue().getBrokerAddrs();
+                Set<Map.Entry<Long, String>> addrsEntries = addrs.entrySet();
+                for (Map.Entry<Long, String> addrEntry : addrsEntries) {
+                    addresses.put(addrEntry.getValue(), clusterEntry.getKey() 
+ ":" + addrEntry.getKey());
+                }
+            }
+            Set<Map.Entry<String, String>> entries = addresses.entrySet();
+            for (Map.Entry<String, String> entry : entries) {
+                List<String> list = 
dashboardCollectService.getBrokerMap().get(entry.getValue());
+                if (null == list) {
+                    list = Lists.newArrayList();
+                }
+                KVTable kvTable = fetchBrokerRuntimeStats(entry.getKey(), 3);
+                if (kvTable == null) {
+                    continue;
+                }
+                String[] tpsArray = 
kvTable.getTable().get("getTotalTps").split(" ");
+                BigDecimal totalTps = new BigDecimal(0);
+                for (String tps : tpsArray) {
+                    totalTps = totalTps.add(new BigDecimal(tps));
+                }
+                BigDecimal averageTps = totalTps.divide(new 
BigDecimal(tpsArray.length), 5, BigDecimal.ROUND_HALF_UP);
+                list.add(date.getTime() + "," + averageTps.toString());
+                dashboardCollectService.getBrokerMap().put(entry.getValue(), 
list);
+            }
+            log.debug("Broker Collected Data in memory = {}" + 
JsonUtil.obj2String(dashboardCollectService.getBrokerMap().asMap()));
+        }
+        catch (Exception e) {
+            throw Throwables.propagate(e);
+        }
+    }
+
+    private KVTable fetchBrokerRuntimeStats(String brokerAddr, Integer 
retryTime) {
+        if (retryTime == 0) {
+            return null;
+        }
+        try {
+            return mqAdminExt.fetchBrokerRuntimeStats(brokerAddr);
+        }
+        catch (Exception e) {
+            try {
+                Thread.sleep(1000);
+            }
+            catch (InterruptedException e1) {
+                throw Throwables.propagate(e1);
+            }
+            fetchBrokerRuntimeStats(brokerAddr, retryTime - 1);
+            throw Throwables.propagate(e);
+        }
+    }
+
+    @Scheduled(cron = "0/5 * * * * ?")
+    public void saveData() {
+        if (!rmqConfigure.isEnableDashBoardCollect()) {
+            return;
+        }
+        //one day refresh cache one time
+        String dataLocationPath = rmqConfigure.getConsoleCollectData();
+        DateFormat format = new SimpleDateFormat("yyyy-MM-dd");
+        String nowDateStr = format.format(new Date());
+        String currentDateStr = format.format(currentDate);
+        if (!currentDateStr.equals(nowDateStr)) {
+            dashboardCollectService.getBrokerMap().invalidateAll();
+            dashboardCollectService.getTopicMap().invalidateAll();
+            currentDate = new Date();
+        }
+        File brokerFile = new File(dataLocationPath + nowDateStr + ".json");
+        File topicFile = new File(dataLocationPath + nowDateStr + "_topic" + 
".json");
+        try {
+            Map<String, List<String>> brokerFileMap;
+            Map<String, List<String>> topicFileMap;
+            if (brokerFile.exists()) {
+                brokerFileMap = 
dashboardCollectService.jsonDataFile2map(brokerFile);
+            }
+            else {
+                brokerFileMap = Maps.newHashMap();
+                Files.createParentDirs(brokerFile);
+            }
+
+            if (topicFile.exists()) {
+                topicFileMap = 
dashboardCollectService.jsonDataFile2map(topicFile);
+            }
+            else {
+                topicFileMap = Maps.newHashMap();
+                Files.createParentDirs(topicFile);
+            }
+
+            brokerFile.createNewFile();
+            topicFile.createNewFile();
+
+            writeFile(dashboardCollectService.getBrokerMap(), brokerFileMap, 
brokerFile);
+            writeFile(dashboardCollectService.getTopicMap(), topicFileMap, 
topicFile);
+            log.debug("Broker Collected Data in memory = {}" + 
JsonUtil.obj2String(dashboardCollectService.getBrokerMap().asMap()));
+            log.debug("Topic Collected Data in memory = {}" + 
JsonUtil.obj2String(dashboardCollectService.getTopicMap().asMap()));
+
+        }
+        catch (IOException e) {
+            throw Throwables.propagate(e);
+        }
+    }
+
+    private void writeFile(LoadingCache<String, List<String>> map, Map<String, 
List<String>> fileMap,
+        File file) throws IOException {
+        Map<String, List<String>> newMap = map.asMap();
+        Map<String, List<String>> resultMap = Maps.newHashMap();
+        if (fileMap.size() == 0) {
+            resultMap = newMap;
+        }
+        else {
+            for (Map.Entry<String, List<String>> entry : fileMap.entrySet()) {
+                List<String> oldList = entry.getValue();
+                List<String> newList = newMap.get(entry.getKey());
+                resultMap.put(entry.getKey(), appendData(newList, oldList));
+                if (newList == null || newList.size() == 0) {
+                    map.put(entry.getKey(), appendData(newList, oldList));
+                }
+            }
+
+            for (Map.Entry<String, List<String>> entry : newMap.entrySet()) {
+                List<String> oldList = fileMap.get(entry.getKey());
+                if (oldList == null || oldList.size() == 0) {
+                    resultMap.put(entry.getKey(), entry.getValue());
+                }
+            }
+        }
+        Files.write(JsonUtil.obj2String(resultMap).getBytes(), file);
+    }
+
+    private List<String> appendData(List<String> newTpsList, List<String> 
oldTpsList) {
+        List<String> result = Lists.newArrayList();
+        if (newTpsList == null || newTpsList.size() == 0) {
+            return oldTpsList;
+        }
+        if (oldTpsList == null || oldTpsList.size() == 0) {
+            return newTpsList;
+        }
+        String oldLastTps = oldTpsList.get(oldTpsList.size() - 1);
+        Long oldLastTimestamp = Long.parseLong(oldLastTps.split(",")[0]);
+        String newFirstTps = newTpsList.get(0);
+        Long newFirstTimestamp = Long.parseLong(newFirstTps.split(",")[0]);
+        if (oldLastTimestamp.longValue() < newFirstTimestamp.longValue()) {
+            result.addAll(oldTpsList);
+            result.addAll(newTpsList);
+            return result;
+        }
+        return newTpsList;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/4b8f1357/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/task/MonitorTask.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/task/MonitorTask.java
 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/task/MonitorTask.java
new file mode 100644
index 0000000..0db07be
--- /dev/null
+++ 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/task/MonitorTask.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.console.task;
+
+import java.util.Map;
+import javax.annotation.Resource;
+import org.apache.rocketmq.console.model.ConsumerMonitorConfig;
+import org.apache.rocketmq.console.model.GroupConsumeInfo;
+import org.apache.rocketmq.console.service.ConsumerService;
+import org.apache.rocketmq.console.service.MonitorService;
+import org.apache.rocketmq.console.util.JsonUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+@Component
+public class MonitorTask {
+    private Logger logger = LoggerFactory.getLogger(MonitorTask.class);
+
+    @Resource
+    private MonitorService monitorService;
+
+    @Resource
+    private ConsumerService consumerService;
+
+//    @Scheduled(cron = "* * * * * ?")
+    public void scanProblemConsumeGroup() {
+        for (Map.Entry<String, ConsumerMonitorConfig> configEntry : 
monitorService.queryConsumerMonitorConfig().entrySet()) {
+            GroupConsumeInfo consumeInfo = 
consumerService.queryGroup(configEntry.getKey());
+            if (consumeInfo.getCount() < configEntry.getValue().getMinCount() 
|| consumeInfo.getDiffTotal() > configEntry.getValue().getMaxDiffTotal()) {
+                logger.info("op=look consumeInfo {}", 
JsonUtil.obj2String(consumeInfo)); // notify the alert system
+            }
+        }
+    }
+
+}

Reply via email to