http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/4b8f1357/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/GroupConsumeInfo.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/GroupConsumeInfo.java
 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/GroupConsumeInfo.java
new file mode 100644
index 0000000..27c5f92
--- /dev/null
+++ 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/GroupConsumeInfo.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.console.model;
+
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+
+public class GroupConsumeInfo implements Comparable<GroupConsumeInfo> {
+    private String group;
+    private String version;
+    private int count;
+    private ConsumeType consumeType;
+    private MessageModel messageModel;
+    private int consumeTps;
+    private long diffTotal = -1;
+
+    public String getGroup() {
+        return group;
+    }
+
+    public void setGroup(String group) {
+        this.group = group;
+    }
+
+    public int getCount() {
+        return count;
+    }
+
+    public void setCount(int count) {
+        this.count = count;
+    }
+
+    public ConsumeType getConsumeType() {
+        return consumeType;
+    }
+
+    public void setConsumeType(ConsumeType consumeType) {
+        this.consumeType = consumeType;
+    }
+
+    public MessageModel getMessageModel() {
+        return messageModel;
+    }
+
+    public void setMessageModel(MessageModel messageModel) {
+        this.messageModel = messageModel;
+    }
+
+    public long getDiffTotal() {
+        return diffTotal;
+    }
+
+    public void setDiffTotal(long diffTotal) {
+        this.diffTotal = diffTotal;
+    }
+
+    @Override
+    public int compareTo(GroupConsumeInfo o) {
+        if (this.count != o.count) {
+            return o.count - this.count;
+        }
+
+        return (int) (o.diffTotal - diffTotal);
+    }
+
+    public int getConsumeTps() {
+        return consumeTps;
+    }
+
+    public void setConsumeTps(int consumeTps) {
+        this.consumeTps = consumeTps;
+    }
+
+    public String getVersion() {
+        return version;
+    }
+
+    public void setVersion(String version) {
+        this.version = version;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/4b8f1357/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/MessageView.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/MessageView.java
 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/MessageView.java
new file mode 100644
index 0000000..4011cad
--- /dev/null
+++ 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/MessageView.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.console.model;
+
+import org.apache.rocketmq.common.message.MessageExt;
+import com.google.common.base.Charsets;
+import org.springframework.beans.BeanUtils;
+
+import java.net.SocketAddress;
+import java.util.Map;
+
+public class MessageView {
+
+    /** from MessageExt **/
+    private int queueId;
+    private int storeSize;
+    private long queueOffset;
+    private int sysFlag;
+    private long bornTimestamp;
+    private SocketAddress bornHost;
+    private long storeTimestamp;
+    private SocketAddress storeHost;
+    private String msgId;
+    private long commitLogOffset;
+    private int bodyCRC;
+    private int reconsumeTimes;
+    private long preparedTransactionOffset;
+    /**from MessageExt**/
+
+    /** from Message **/
+    private String topic;
+    private int flag;
+    private Map<String, String> properties;
+    private String messageBody; // body
+
+    /** from Message **/
+
+    public static MessageView fromMessageExt(MessageExt messageExt) {
+        MessageView messageView = new MessageView();
+        BeanUtils.copyProperties(messageExt, messageView);
+        if (messageExt.getBody() != null) {
+            messageView.setMessageBody(new String(messageExt.getBody(), 
Charsets.UTF_8));
+        }
+        return messageView;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public int getFlag() {
+        return flag;
+    }
+
+    public void setFlag(int flag) {
+        this.flag = flag;
+    }
+
+    public Map<String, String> getProperties() {
+        return properties;
+    }
+
+    public void setProperties(Map<String, String> properties) {
+        this.properties = properties;
+    }
+
+    public int getQueueId() {
+        return queueId;
+    }
+
+    public void setQueueId(int queueId) {
+        this.queueId = queueId;
+    }
+
+    public int getStoreSize() {
+        return storeSize;
+    }
+
+    public void setStoreSize(int storeSize) {
+        this.storeSize = storeSize;
+    }
+
+    public long getQueueOffset() {
+        return queueOffset;
+    }
+
+    public void setQueueOffset(long queueOffset) {
+        this.queueOffset = queueOffset;
+    }
+
+    public int getSysFlag() {
+        return sysFlag;
+    }
+
+    public void setSysFlag(int sysFlag) {
+        this.sysFlag = sysFlag;
+    }
+
+    public long getBornTimestamp() {
+        return bornTimestamp;
+    }
+
+    public void setBornTimestamp(long bornTimestamp) {
+        this.bornTimestamp = bornTimestamp;
+    }
+
+    public SocketAddress getBornHost() {
+        return bornHost;
+    }
+
+    public void setBornHost(SocketAddress bornHost) {
+        this.bornHost = bornHost;
+    }
+
+    public long getStoreTimestamp() {
+        return storeTimestamp;
+    }
+
+    public void setStoreTimestamp(long storeTimestamp) {
+        this.storeTimestamp = storeTimestamp;
+    }
+
+    public SocketAddress getStoreHost() {
+        return storeHost;
+    }
+
+    public void setStoreHost(SocketAddress storeHost) {
+        this.storeHost = storeHost;
+    }
+
+    public String getMsgId() {
+        return msgId;
+    }
+
+    public void setMsgId(String msgId) {
+        this.msgId = msgId;
+    }
+
+    public long getCommitLogOffset() {
+        return commitLogOffset;
+    }
+
+    public void setCommitLogOffset(long commitLogOffset) {
+        this.commitLogOffset = commitLogOffset;
+    }
+
+    public int getBodyCRC() {
+        return bodyCRC;
+    }
+
+    public void setBodyCRC(int bodyCRC) {
+        this.bodyCRC = bodyCRC;
+    }
+
+    public int getReconsumeTimes() {
+        return reconsumeTimes;
+    }
+
+    public void setReconsumeTimes(int reconsumeTimes) {
+        this.reconsumeTimes = reconsumeTimes;
+    }
+
+    public long getPreparedTransactionOffset() {
+        return preparedTransactionOffset;
+    }
+
+    public void setPreparedTransactionOffset(long preparedTransactionOffset) {
+        this.preparedTransactionOffset = preparedTransactionOffset;
+    }
+
+    public String getMessageBody() {
+        return messageBody;
+    }
+
+    public void setMessageBody(String messageBody) {
+        this.messageBody = messageBody;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/4b8f1357/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/QueueStatInfo.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/QueueStatInfo.java
 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/QueueStatInfo.java
new file mode 100644
index 0000000..a6d67a8
--- /dev/null
+++ 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/QueueStatInfo.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.console.model;
+
+import org.apache.rocketmq.common.admin.OffsetWrapper;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.springframework.beans.BeanUtils;
+
+public class QueueStatInfo {
+    private String brokerName;
+    private int queueId;
+    private long brokerOffset;
+    private long consumerOffset;
+    private String clientInfo;
+    private long lastTimestamp;
+
+    public static QueueStatInfo fromOffsetTableEntry(MessageQueue key, 
OffsetWrapper value) {
+        QueueStatInfo queueStatInfo = new QueueStatInfo();
+        BeanUtils.copyProperties(key, queueStatInfo);
+        BeanUtils.copyProperties(value, queueStatInfo);
+        return queueStatInfo;
+    }
+
+    public String getClientInfo() {
+        return clientInfo;
+    }
+
+    public void setClientInfo(String clientInfo) {
+        this.clientInfo = clientInfo;
+    }
+
+    public String getBrokerName() {
+        return brokerName;
+    }
+
+    public void setBrokerName(String brokerName) {
+        this.brokerName = brokerName;
+    }
+
+    public int getQueueId() {
+        return queueId;
+    }
+
+    public void setQueueId(int queueId) {
+        this.queueId = queueId;
+    }
+
+    public long getBrokerOffset() {
+        return brokerOffset;
+    }
+
+    public void setBrokerOffset(long brokerOffset) {
+        this.brokerOffset = brokerOffset;
+    }
+
+    public long getConsumerOffset() {
+        return consumerOffset;
+    }
+
+    public void setConsumerOffset(long consumerOffset) {
+        this.consumerOffset = consumerOffset;
+    }
+
+    public long getLastTimestamp() {
+        return lastTimestamp;
+    }
+
+    public void setLastTimestamp(long lastTimestamp) {
+        this.lastTimestamp = lastTimestamp;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/4b8f1357/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/TopicConsumerInfo.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/TopicConsumerInfo.java
 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/TopicConsumerInfo.java
new file mode 100644
index 0000000..bd6d8e2
--- /dev/null
+++ 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/TopicConsumerInfo.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.console.model;
+
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+public class TopicConsumerInfo {
+    private String topic;
+    private long diffTotal;
+    private long lastTimestamp;
+    private List<QueueStatInfo> queueStatInfoList = Lists.newArrayList();
+
+    public TopicConsumerInfo(String topic) {
+        this.topic = topic;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public long getDiffTotal() {
+        return diffTotal;
+    }
+
+    public void setDiffTotal(long diffTotal) {
+        this.diffTotal = diffTotal;
+    }
+
+    public List<QueueStatInfo> getQueueStatInfoList() {
+        return queueStatInfoList;
+    }
+
+    public long getLastTimestamp() {
+        return lastTimestamp;
+    }
+
+    public void appendQueueStatInfo(QueueStatInfo queueStatInfo) {
+        queueStatInfoList.add(queueStatInfo);
+        diffTotal = diffTotal + (queueStatInfo.getBrokerOffset() - 
queueStatInfo.getConsumerOffset());
+        lastTimestamp = Math.max(lastTimestamp, 
queueStatInfo.getLastTimestamp());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/4b8f1357/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/request/ConsumerConfigInfo.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/request/ConsumerConfigInfo.java
 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/request/ConsumerConfigInfo.java
new file mode 100644
index 0000000..cee155c
--- /dev/null
+++ 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/request/ConsumerConfigInfo.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.console.model.request;
+
+import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+
+import java.util.List;
+
+public class ConsumerConfigInfo {
+    private List<String> clusterNameList;
+
+    private List<String> brokerNameList;
+    private SubscriptionGroupConfig subscriptionGroupConfig;
+
+    public ConsumerConfigInfo() {
+    }
+
+    public ConsumerConfigInfo(List<String> brokerNameList, 
SubscriptionGroupConfig subscriptionGroupConfig) {
+        this.brokerNameList = brokerNameList;
+        this.subscriptionGroupConfig = subscriptionGroupConfig;
+    }
+
+    public List<String> getClusterNameList() {
+        return clusterNameList;
+    }
+
+    public void setClusterNameList(List<String> clusterNameList) {
+        this.clusterNameList = clusterNameList;
+    }
+
+    public List<String> getBrokerNameList() {
+        return brokerNameList;
+    }
+
+    public void setBrokerNameList(List<String> brokerNameList) {
+        this.brokerNameList = brokerNameList;
+    }
+
+    public SubscriptionGroupConfig getSubscriptionGroupConfig() {
+        return subscriptionGroupConfig;
+    }
+
+    public void setSubscriptionGroupConfig(SubscriptionGroupConfig 
subscriptionGroupConfig) {
+        this.subscriptionGroupConfig = subscriptionGroupConfig;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/4b8f1357/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/request/DeleteSubGroupRequest.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/request/DeleteSubGroupRequest.java
 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/request/DeleteSubGroupRequest.java
new file mode 100644
index 0000000..152256b
--- /dev/null
+++ 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/request/DeleteSubGroupRequest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.console.model.request;
+
+import java.util.List;
+
+public class DeleteSubGroupRequest {
+    private String groupName;
+    private List<String> brokerNameList;
+
+    public String getGroupName() {
+        return groupName;
+    }
+
+    public void setGroupName(String groupName) {
+        this.groupName = groupName;
+    }
+
+    public List<String> getBrokerNameList() {
+        return brokerNameList;
+    }
+
+    public void setBrokerNameList(List<String> brokerNameList) {
+        this.brokerNameList = brokerNameList;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/4b8f1357/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/request/ResetOffsetRequest.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/request/ResetOffsetRequest.java
 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/request/ResetOffsetRequest.java
new file mode 100644
index 0000000..22263af
--- /dev/null
+++ 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/request/ResetOffsetRequest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.console.model.request;
+
+import java.util.List;
+
+public class ResetOffsetRequest {
+    private List<String> consumerGroupList;
+    private String topic;
+    private long resetTime;
+    private boolean force;
+
+    public List<String> getConsumerGroupList() {
+        return consumerGroupList;
+    }
+
+    public void setConsumerGroupList(List<String> consumerGroupList) {
+        this.consumerGroupList = consumerGroupList;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public long getResetTime() {
+        return resetTime;
+    }
+
+    public void setResetTime(long resetTime) {
+        this.resetTime = resetTime;
+    }
+
+    public boolean isForce() {
+        return force;
+    }
+
+    public void setForce(boolean force) {
+        this.force = force;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/4b8f1357/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/request/SendTopicMessageRequest.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/request/SendTopicMessageRequest.java
 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/request/SendTopicMessageRequest.java
new file mode 100644
index 0000000..c7ffa8a
--- /dev/null
+++ 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/request/SendTopicMessageRequest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.console.model.request;
+
+public class SendTopicMessageRequest {
+    private String topic;
+    private String key;
+    private String tag;
+    private String messageBody;
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public String getKey() {
+        return key;
+    }
+
+    public void setKey(String key) {
+        this.key = key;
+    }
+
+    public String getTag() {
+        return tag;
+    }
+
+    public void setTag(String tag) {
+        this.tag = tag;
+    }
+
+    public String getMessageBody() {
+        return messageBody;
+    }
+
+    public void setMessageBody(String messageBody) {
+        this.messageBody = messageBody;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/4b8f1357/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/request/TopicConfigInfo.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/request/TopicConfigInfo.java
 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/request/TopicConfigInfo.java
new file mode 100644
index 0000000..e1f56d4
--- /dev/null
+++ 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/request/TopicConfigInfo.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.console.model.request;
+
+import com.google.common.base.Objects;
+
+import java.util.List;
+
+public class TopicConfigInfo {
+
+    private List<String> clusterNameList;
+    private List<String> brokerNameList;
+
+    /** topicConfig */
+    private String topicName;
+    private int writeQueueNums;
+    private int readQueueNums;
+    private int perm;
+    private boolean order;
+
+    public List<String> getClusterNameList() {
+        return clusterNameList;
+    }
+
+    public void setClusterNameList(List<String> clusterNameList) {
+        this.clusterNameList = clusterNameList;
+    }
+
+    /** topicConfig */
+
+
+
+    public List<String> getBrokerNameList() {
+        return brokerNameList;
+    }
+
+    public void setBrokerNameList(List<String> brokerNameList) {
+        this.brokerNameList = brokerNameList;
+    }
+
+    public String getTopicName() {
+        return topicName;
+    }
+
+    public void setTopicName(String topicName) {
+        this.topicName = topicName;
+    }
+
+    public int getWriteQueueNums() {
+        return writeQueueNums;
+    }
+
+    public void setWriteQueueNums(int writeQueueNums) {
+        this.writeQueueNums = writeQueueNums;
+    }
+
+    public int getReadQueueNums() {
+        return readQueueNums;
+    }
+
+    public void setReadQueueNums(int readQueueNums) {
+        this.readQueueNums = readQueueNums;
+    }
+
+    public int getPerm() {
+        return perm;
+    }
+
+    public void setPerm(int perm) {
+        this.perm = perm;
+    }
+
+    public boolean isOrder() {
+        return order;
+    }
+
+    public void setOrder(boolean order) {
+        this.order = order;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+        TopicConfigInfo that = (TopicConfigInfo) o;
+        return writeQueueNums == that.writeQueueNums &&
+            readQueueNums == that.readQueueNums &&
+            perm == that.perm &&
+            order == that.order &&
+            Objects.equal(topicName, that.topicName);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(topicName, writeQueueNums, readQueueNums, 
perm, order);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/4b8f1357/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/AbstractCommonService.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/AbstractCommonService.java
 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/AbstractCommonService.java
new file mode 100644
index 0000000..53a0e21
--- /dev/null
+++ 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/AbstractCommonService.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.console.service;
+
+import org.apache.rocketmq.tools.admin.MQAdminExt;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Sets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Set;
+import javax.annotation.Resource;
+import org.apache.commons.collections.CollectionUtils;
+
+public abstract class AbstractCommonService {
+    @Resource
+    protected MQAdminExt mqAdminExt;
+    protected final Set<String> changeToBrokerNameSet(HashMap<String, 
Set<String>> clusterAddrTable,
+        List<String> clusterNameList, List<String> brokerNameList) {
+        Set<String> finalBrokerNameList = Sets.newHashSet();
+        if (CollectionUtils.isNotEmpty(clusterNameList)) {
+            try {
+                for (String clusterName : clusterNameList) {
+                    
finalBrokerNameList.addAll(clusterAddrTable.get(clusterName));
+                }
+            }
+            catch (Exception e) {
+                throw Throwables.propagate(e);
+            }
+        }
+        if (CollectionUtils.isNotEmpty(brokerNameList)) {
+            finalBrokerNameList.addAll(brokerNameList);
+        }
+        return finalBrokerNameList;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/4b8f1357/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/ClusterService.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/ClusterService.java
 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/ClusterService.java
new file mode 100644
index 0000000..43489fb
--- /dev/null
+++ 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/ClusterService.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.console.service;
+
+import java.util.Map;
+import java.util.Properties;
+
+public interface ClusterService {
+    Map<String, Object> list();
+
+    Properties getBrokerConfig(String brokerAddr);
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/4b8f1357/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/ConsumerService.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/ConsumerService.java
 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/ConsumerService.java
new file mode 100644
index 0000000..1d9ac12
--- /dev/null
+++ 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/ConsumerService.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.console.service;
+
+import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
+import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
+import org.apache.rocketmq.console.model.ConsumerGroupRollBackStat;
+import org.apache.rocketmq.console.model.GroupConsumeInfo;
+import org.apache.rocketmq.console.model.TopicConsumerInfo;
+import org.apache.rocketmq.console.model.request.ConsumerConfigInfo;
+import org.apache.rocketmq.console.model.request.DeleteSubGroupRequest;
+import org.apache.rocketmq.console.model.request.ResetOffsetRequest;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public interface ConsumerService {
+    List<GroupConsumeInfo> queryGroupList();
+
+    GroupConsumeInfo queryGroup(String consumerGroup);
+
+
+    List<TopicConsumerInfo> queryConsumeStatsListByGroupName(String groupName);
+
+    List<TopicConsumerInfo> queryConsumeStatsList(String topic, String 
groupName);
+
+    Map<String, TopicConsumerInfo> queryConsumeStatsListByTopicName(String 
topic);
+
+    Map<String /*consumerGroup*/, ConsumerGroupRollBackStat> 
resetOffset(ResetOffsetRequest resetOffsetRequest);
+
+    List<ConsumerConfigInfo> examineSubscriptionGroupConfig(String group);
+
+    boolean deleteSubGroup(DeleteSubGroupRequest deleteSubGroupRequest);
+
+    boolean createAndUpdateSubscriptionGroupConfig(ConsumerConfigInfo 
consumerConfigInfo);
+
+    Set<String> fetchBrokerNameSetBySubscriptionGroup(String group);
+
+    ConsumerConnection getConsumerConnection(String consumerGroup);
+
+    ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String 
clientId, boolean jstack);
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/4b8f1357/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/DashboardCollectService.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/DashboardCollectService.java
 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/DashboardCollectService.java
new file mode 100644
index 0000000..27a3645
--- /dev/null
+++ 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/DashboardCollectService.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.console.service;
+
+import com.google.common.cache.LoadingCache;
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+
+public interface DashboardCollectService {
+    // todo just move the task to 
org.apache.rocketmq.console.task.DashboardCollectTask
+    // the code can be reconstruct
+    LoadingCache<String, List<String>> getBrokerMap();
+
+    LoadingCache<String, List<String>> getTopicMap();
+
+    Map<String, List<String>> jsonDataFile2map(File file);
+
+    Map<String, List<String>> getBrokerCache(String date);
+
+    Map<String, List<String>> getTopicCache(String date);
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/4b8f1357/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/DashboardService.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/DashboardService.java
 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/DashboardService.java
new file mode 100644
index 0000000..a4cf798
--- /dev/null
+++ 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/DashboardService.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.console.service;
+
+import java.util.List;
+import java.util.Map;
+
+public interface DashboardService {
+    /**
+     * @param date format yyyy-MM-dd
+     */
+    Map<String, List<String>> queryBrokerData(String date);
+
+    /**
+     * @param date format yyyy-MM-dd
+     */
+    Map<String, List<String>> queryTopicData(String date);
+
+    /**
+     * @param date format yyyy-MM-dd
+     * @param topicName
+     */
+    List<String> queryTopicData(String date, String topicName);
+
+    List<String> queryTopicCurrentData();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/4b8f1357/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/MessageService.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/MessageService.java
 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/MessageService.java
new file mode 100644
index 0000000..e56b4d8
--- /dev/null
+++ 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/MessageService.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.console.service;
+
+import org.apache.rocketmq.common.Pair;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
+import org.apache.rocketmq.tools.admin.api.MessageTrack;
+import org.apache.rocketmq.console.model.MessageView;
+
+import java.util.List;
+
+public interface MessageService {
+    /**
+     * @param subject
+     * @param msgId
+     */
+    Pair<MessageView, List<MessageTrack>> viewMessage(String subject, final 
String msgId);
+
+    List<MessageView> queryMessageByTopicAndKey(final String topic, final 
String key);
+
+    /**
+     * @param topic
+     * @param begin
+     * @param end
+     * org.apache.rocketmq.tools.command.message.PrintMessageSubCommand
+     */
+    List<MessageView> queryMessageByTopic(final String topic, final long begin,
+        final long end);
+
+    List<MessageTrack> messageTrackDetail(MessageExt msg);
+
+    ConsumeMessageDirectlyResult consumeMessageDirectly(String topic, String 
msgId, String consumerGroup,
+        String clientId);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/4b8f1357/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/MonitorService.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/MonitorService.java
 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/MonitorService.java
new file mode 100644
index 0000000..4bf659c
--- /dev/null
+++ 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/MonitorService.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.console.service;
+
+import java.util.Map;
+import org.apache.rocketmq.console.model.ConsumerMonitorConfig;
+
+public interface MonitorService {
+    boolean createOrUpdateConsumerMonitor(String name, ConsumerMonitorConfig 
config);
+
+    Map<String, ConsumerMonitorConfig> queryConsumerMonitorConfig();
+
+    ConsumerMonitorConfig queryConsumerMonitorConfigByGroupName(String 
consumeGroupName);
+
+    boolean deleteConsumerMonitor(String consumeGroupName);
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/4b8f1357/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/OpsService.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/OpsService.java
 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/OpsService.java
new file mode 100644
index 0000000..d3bd68c
--- /dev/null
+++ 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/OpsService.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.console.service;
+
+import java.util.Map;
+import org.apache.rocketmq.console.service.checker.CheckerType;
+
+public interface OpsService {
+    Map<String, Object> homePageInfo();
+
+    void updateNameSvrAddrList(String nameSvrAddrList);
+
+    String getNameSvrList();
+
+    Map<CheckerType,Object> rocketMqStatusCheck();
+
+    boolean updateIsVIPChannel(String useVIPChannel);
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/4b8f1357/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/ProducerService.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/ProducerService.java
 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/ProducerService.java
new file mode 100644
index 0000000..cda7c48
--- /dev/null
+++ 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/ProducerService.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.console.service;
+
+import org.apache.rocketmq.common.protocol.body.ProducerConnection;
+
+public interface ProducerService {
+    ProducerConnection getProducerConnection(String producerGroup, String 
topic);
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/4b8f1357/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/TopicService.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/TopicService.java
 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/TopicService.java
new file mode 100644
index 0000000..41a6b3b
--- /dev/null
+++ 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/TopicService.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.console.service;
+
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.admin.TopicStatsTable;
+import org.apache.rocketmq.common.protocol.body.GroupList;
+import org.apache.rocketmq.common.protocol.body.TopicList;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.console.model.request.SendTopicMessageRequest;
+import org.apache.rocketmq.console.model.request.TopicConfigInfo;
+
+import java.util.List;
+
+public interface TopicService {
+    TopicList fetchAllTopicList();
+
+    TopicStatsTable stats(String topic);
+
+    TopicRouteData route(String topic);
+
+    GroupList queryTopicConsumerInfo(String topic);
+
+    void createOrUpdate(TopicConfigInfo topicCreateOrUpdateRequest);
+
+    TopicConfig examineTopicConfig(String topic, String brokerName);
+
+    List<TopicConfigInfo> examineTopicConfig(String topic);
+
+    boolean deleteTopic(String topic, String clusterName);
+
+    boolean deleteTopic(String topic);
+
+    boolean deleteTopicInBroker(String brokerName, String topic);
+
+    SendResult sendTopicMessageRequest(SendTopicMessageRequest 
sendTopicMessageRequest);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/4b8f1357/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/checker/CheckerType.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/checker/CheckerType.java
 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/checker/CheckerType.java
new file mode 100644
index 0000000..8a85008
--- /dev/null
+++ 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/checker/CheckerType.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.console.service.checker;
+
+public enum CheckerType {
+    CLUSTER_HEALTH_CHECK,
+    TOPIC_ONLY_ONE_BROKER_CHECK
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/4b8f1357/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/checker/RocketMqChecker.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/checker/RocketMqChecker.java
 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/checker/RocketMqChecker.java
new file mode 100644
index 0000000..3b8f58d
--- /dev/null
+++ 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/checker/RocketMqChecker.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.console.service.checker;
+
+public interface RocketMqChecker {
+    public Object doCheck();
+
+    public CheckerType checkerType();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/4b8f1357/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/checker/impl/ClusterHealthCheckerImpl.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/checker/impl/ClusterHealthCheckerImpl.java
 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/checker/impl/ClusterHealthCheckerImpl.java
new file mode 100644
index 0000000..5c2c893
--- /dev/null
+++ 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/checker/impl/ClusterHealthCheckerImpl.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.console.service.checker.impl;
+
+import org.apache.rocketmq.console.service.checker.CheckerType;
+import org.apache.rocketmq.console.service.checker.RocketMqChecker;
+import org.springframework.stereotype.Service;
+
+@Service
+public class ClusterHealthCheckerImpl implements RocketMqChecker {
+    @Override
+    public Object doCheck() {
+        return null;
+    }
+
+    @Override
+    public CheckerType checkerType() {
+        return CheckerType.CLUSTER_HEALTH_CHECK;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/4b8f1357/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/checker/impl/TopicOnlyOneBrokerCheckerImpl.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/checker/impl/TopicOnlyOneBrokerCheckerImpl.java
 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/checker/impl/TopicOnlyOneBrokerCheckerImpl.java
new file mode 100644
index 0000000..0f06a13
--- /dev/null
+++ 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/checker/impl/TopicOnlyOneBrokerCheckerImpl.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.console.service.checker.impl;
+
+import org.apache.rocketmq.console.service.checker.CheckerType;
+import org.apache.rocketmq.console.service.checker.RocketMqChecker;
+import org.springframework.stereotype.Service;
+
+@Service
+public class TopicOnlyOneBrokerCheckerImpl implements RocketMqChecker {
+    @Override
+    public Object doCheck() {
+        return null;
+    }
+
+    @Override
+    public CheckerType checkerType() {
+        return CheckerType.TOPIC_ONLY_ONE_BROKER_CHECK;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/4b8f1357/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/client/MQAdminExtImpl.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/client/MQAdminExtImpl.java
 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/client/MQAdminExtImpl.java
new file mode 100644
index 0000000..0eba3a5
--- /dev/null
+++ 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/client/MQAdminExtImpl.java
@@ -0,0 +1,501 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.console.service.client;
+
+import com.google.common.base.Throwables;
+import java.io.UnsupportedEncodingException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import org.apache.rocketmq.client.QueryResult;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.MQAdminImpl;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.admin.ConsumeStats;
+import org.apache.rocketmq.common.admin.RollbackStats;
+import org.apache.rocketmq.common.admin.TopicStatsTable;
+import org.apache.rocketmq.common.message.MessageClientIDSetter;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.body.BrokerStatsData;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
+import org.apache.rocketmq.common.protocol.body.ConsumeStatsList;
+import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
+import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
+import org.apache.rocketmq.common.protocol.body.GroupList;
+import org.apache.rocketmq.common.protocol.body.KVTable;
+import org.apache.rocketmq.common.protocol.body.ProducerConnection;
+import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
+import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
+import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
+import org.apache.rocketmq.common.protocol.body.TopicList;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.console.util.JsonUtil;
+import org.apache.rocketmq.remoting.RemotingClient;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.tools.admin.MQAdminExt;
+import org.apache.rocketmq.tools.admin.api.MessageTrack;
+import org.joor.Reflect;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+import static 
org.apache.rocketmq.remoting.protocol.RemotingSerializable.decode;
+
+@Service
+public class MQAdminExtImpl implements MQAdminExt {
+    private Logger logger = LoggerFactory.getLogger(MQAdminExtImpl.class);
+
+    public MQAdminExtImpl() {
+    }
+
+    @Override
+    public void updateBrokerConfig(String brokerAddr, Properties properties)
+        throws RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException,
+        UnsupportedEncodingException, InterruptedException, MQBrokerException {
+        MQAdminInstance.threadLocalMQAdminExt().updateBrokerConfig(brokerAddr, 
properties);
+    }
+
+    @Override
+    public void createAndUpdateTopicConfig(String addr, TopicConfig config)
+        throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException {
+        
MQAdminInstance.threadLocalMQAdminExt().createAndUpdateTopicConfig(addr, 
config);
+    }
+
+    @Override
+    public void createAndUpdateSubscriptionGroupConfig(String addr, 
SubscriptionGroupConfig config)
+        throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException {
+        
MQAdminInstance.threadLocalMQAdminExt().createAndUpdateSubscriptionGroupConfig(addr,
 config);
+    }
+
+    @Override
+    public SubscriptionGroupConfig examineSubscriptionGroupConfig(String addr, 
String group) {
+        RemotingClient remotingClient = 
MQAdminInstance.threadLocalRemotingClient();
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG,
 null);
+        RemotingCommand response = null;
+        try {
+            response = remotingClient.invokeSync(addr, request, 3000);
+        }
+        catch (Exception err) {
+            throw Throwables.propagate(err);
+        }
+        assert response != null;
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                SubscriptionGroupWrapper subscriptionGroupWrapper = 
decode(response.getBody(), SubscriptionGroupWrapper.class);
+                return 
subscriptionGroupWrapper.getSubscriptionGroupTable().get(group);
+            }
+            default:
+                throw Throwables.propagate(new 
MQBrokerException(response.getCode(), response.getRemark()));
+        }
+    }
+
+    @Override
+    public TopicConfig examineTopicConfig(String addr, String topic) {
+        RemotingClient remotingClient = 
MQAdminInstance.threadLocalRemotingClient();
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null);
+        RemotingCommand response = null;
+        try {
+            response = remotingClient.invokeSync(addr, request, 3000);
+        }
+        catch (Exception err) {
+            throw Throwables.propagate(err);
+        }
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                TopicConfigSerializeWrapper topicConfigSerializeWrapper = 
decode(response.getBody(), TopicConfigSerializeWrapper.class);
+                return 
topicConfigSerializeWrapper.getTopicConfigTable().get(topic);
+            }
+            default:
+                throw Throwables.propagate(new 
MQBrokerException(response.getCode(), response.getRemark()));
+        }
+    }
+
+    @Override
+    public TopicStatsTable examineTopicStats(String topic)
+        throws RemotingException, MQClientException, InterruptedException, 
MQBrokerException {
+        return 
MQAdminInstance.threadLocalMQAdminExt().examineTopicStats(topic);
+    }
+
+    @Override
+    public TopicList fetchAllTopicList() throws RemotingException, 
MQClientException, InterruptedException {
+        TopicList topicList = 
MQAdminInstance.threadLocalMQAdminExt().fetchAllTopicList();
+        logger.debug("op=look={}", 
JsonUtil.obj2String(topicList.getTopicList()));
+        return topicList;
+    }
+
+    @Override
+    public KVTable fetchBrokerRuntimeStats(String brokerAddr)
+        throws RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException,
+        InterruptedException, MQBrokerException {
+        return 
MQAdminInstance.threadLocalMQAdminExt().fetchBrokerRuntimeStats(brokerAddr);
+    }
+
+    @Override
+    public ConsumeStats examineConsumeStats(String consumerGroup)
+        throws RemotingException, MQClientException, InterruptedException, 
MQBrokerException {
+        return 
MQAdminInstance.threadLocalMQAdminExt().examineConsumeStats(consumerGroup);
+    }
+
+    @Override
+    public ConsumeStats examineConsumeStats(String consumerGroup, String topic)
+        throws RemotingException, MQClientException, InterruptedException, 
MQBrokerException {
+        return 
MQAdminInstance.threadLocalMQAdminExt().examineConsumeStats(consumerGroup, 
topic);
+    }
+
+    @Override
+    public ClusterInfo examineBrokerClusterInfo()
+        throws InterruptedException, MQBrokerException, 
RemotingTimeoutException, RemotingSendRequestException,
+        RemotingConnectException {
+        return 
MQAdminInstance.threadLocalMQAdminExt().examineBrokerClusterInfo();
+    }
+
+    @Override
+    public TopicRouteData examineTopicRouteInfo(String topic)
+        throws RemotingException, MQClientException, InterruptedException {
+        return 
MQAdminInstance.threadLocalMQAdminExt().examineTopicRouteInfo(topic);
+    }
+
+    @Override
+    public ConsumerConnection examineConsumerConnectionInfo(String 
consumerGroup)
+        throws RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException,
+        InterruptedException, MQBrokerException, RemotingException, 
MQClientException {
+        return 
MQAdminInstance.threadLocalMQAdminExt().examineConsumerConnectionInfo(consumerGroup);
+    }
+
+    @Override
+    public ProducerConnection examineProducerConnectionInfo(String 
producerGroup, String topic)
+        throws RemotingException, MQClientException, InterruptedException, 
MQBrokerException {
+        return 
MQAdminInstance.threadLocalMQAdminExt().examineProducerConnectionInfo(producerGroup,
 topic);
+    }
+
+    @Override
+    public List<String> getNameServerAddressList() {
+        return 
MQAdminInstance.threadLocalMQAdminExt().getNameServerAddressList();
+    }
+
+    @Override
+    public int wipeWritePermOfBroker(String namesrvAddr, String brokerName)
+        throws RemotingCommandException, RemotingConnectException, 
RemotingSendRequestException,
+        RemotingTimeoutException, InterruptedException, MQClientException {
+        return 
MQAdminInstance.threadLocalMQAdminExt().wipeWritePermOfBroker(namesrvAddr, 
brokerName);
+    }
+
+    @Override
+    public void putKVConfig(String namespace, String key, String value) {
+        MQAdminInstance.threadLocalMQAdminExt().putKVConfig(namespace, key, 
value);
+    }
+
+    @Override
+    public String getKVConfig(String namespace, String key)
+        throws RemotingException, MQClientException, InterruptedException {
+        return MQAdminInstance.threadLocalMQAdminExt().getKVConfig(namespace, 
key);
+    }
+
+    @Override
+    public KVTable getKVListByNamespace(String namespace)
+        throws RemotingException, MQClientException, InterruptedException {
+        return 
MQAdminInstance.threadLocalMQAdminExt().getKVListByNamespace(namespace);
+    }
+
+    @Override
+    public void deleteTopicInBroker(Set<String> addrs, String topic)
+        throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException {
+        logger.info("addrs={} topic={}", JsonUtil.obj2String(addrs), topic);
+        MQAdminInstance.threadLocalMQAdminExt().deleteTopicInBroker(addrs, 
topic);
+    }
+
+    @Override
+    public void deleteTopicInNameServer(Set<String> addrs, String topic)
+        throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException {
+        MQAdminInstance.threadLocalMQAdminExt().deleteTopicInNameServer(addrs, 
topic);
+    }
+
+    @Override
+    public void deleteSubscriptionGroup(String addr, String groupName)
+        throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException {
+        MQAdminInstance.threadLocalMQAdminExt().deleteSubscriptionGroup(addr, 
groupName);
+    }
+
+    @Override
+    public void createAndUpdateKvConfig(String namespace, String key, String 
value)
+        throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException {
+        
MQAdminInstance.threadLocalMQAdminExt().createAndUpdateKvConfig(namespace, key, 
value);
+    }
+
+    @Override
+    public void deleteKvConfig(String namespace, String key)
+        throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException {
+        MQAdminInstance.threadLocalMQAdminExt().deleteKvConfig(namespace, key);
+    }
+
+    @Override
+    public List<RollbackStats> resetOffsetByTimestampOld(String consumerGroup, 
String topic, long timestamp,
+        boolean force) throws RemotingException, MQBrokerException, 
InterruptedException, MQClientException {
+        return 
MQAdminInstance.threadLocalMQAdminExt().resetOffsetByTimestampOld(consumerGroup,
 topic, timestamp, force);
+    }
+
+    @Override
+    public Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String 
group, long timestamp,
+        boolean isForce) throws RemotingException, MQBrokerException, 
InterruptedException, MQClientException {
+        return 
MQAdminInstance.threadLocalMQAdminExt().resetOffsetByTimestamp(topic, group, 
timestamp, isForce);
+    }
+
+    @Override
+    public void resetOffsetNew(String consumerGroup, String topic, long 
timestamp)
+        throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException {
+        MQAdminInstance.threadLocalMQAdminExt().resetOffsetNew(consumerGroup, 
topic, timestamp);
+    }
+
+    @Override
+    public Map<String, Map<MessageQueue, Long>> getConsumeStatus(String topic, 
String group,
+        String clientAddr) throws RemotingException, MQBrokerException, 
InterruptedException, MQClientException {
+        return MQAdminInstance.threadLocalMQAdminExt().getConsumeStatus(topic, 
group, clientAddr);
+    }
+
+    @Override
+    public void createOrUpdateOrderConf(String key, String value, boolean 
isCluster)
+        throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException {
+        MQAdminInstance.threadLocalMQAdminExt().createOrUpdateOrderConf(key, 
value, isCluster);
+    }
+
+    @Override
+    public GroupList queryTopicConsumeByWho(String topic)
+        throws RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException,
+        InterruptedException, MQBrokerException, RemotingException, 
MQClientException {
+        return 
MQAdminInstance.threadLocalMQAdminExt().queryTopicConsumeByWho(topic);
+    }
+
+    @Override
+    public boolean cleanExpiredConsumerQueue(String cluster)
+        throws RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, MQClientException,
+        InterruptedException {
+        return 
MQAdminInstance.threadLocalMQAdminExt().cleanExpiredConsumerQueue(cluster);
+    }
+
+    @Override
+    public boolean cleanExpiredConsumerQueueByAddr(String addr)
+        throws RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, MQClientException,
+        InterruptedException {
+        return 
MQAdminInstance.threadLocalMQAdminExt().cleanExpiredConsumerQueueByAddr(addr);
+    }
+
+    @Override
+    public ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, 
String clientId, boolean jstack)
+        throws RemotingException, MQClientException, InterruptedException {
+        return 
MQAdminInstance.threadLocalMQAdminExt().getConsumerRunningInfo(consumerGroup, 
clientId, jstack);
+    }
+
+    @Override
+    public ConsumeMessageDirectlyResult consumeMessageDirectly(String 
consumerGroup, String clientId,
+        String msgId) throws RemotingException, MQClientException, 
InterruptedException, MQBrokerException {
+        return 
MQAdminInstance.threadLocalMQAdminExt().consumeMessageDirectly(consumerGroup, 
clientId, msgId);
+    }
+
+    @Override
+    public List<MessageTrack> messageTrackDetail(MessageExt msg)
+        throws RemotingException, MQClientException, InterruptedException, 
MQBrokerException {
+        return MQAdminInstance.threadLocalMQAdminExt().messageTrackDetail(msg);
+    }
+
+    @Override
+    public void cloneGroupOffset(String srcGroup, String destGroup, String 
topic, boolean isOffline)
+        throws RemotingException, MQClientException, InterruptedException, 
MQBrokerException {
+        MQAdminInstance.threadLocalMQAdminExt().cloneGroupOffset(srcGroup, 
destGroup, topic, isOffline);
+    }
+
+    @Override
+    public void createTopic(String key, String newTopic, int queueNum) throws 
MQClientException {
+        MQAdminInstance.threadLocalMQAdminExt().createTopic(key, newTopic, 
queueNum);
+    }
+
+    @Override
+    public void createTopic(String key, String newTopic, int queueNum, int 
topicSysFlag)
+        throws MQClientException {
+        MQAdminInstance.threadLocalMQAdminExt().createTopic(key, newTopic, 
queueNum, topicSysFlag);
+    }
+
+    @Override
+    public long searchOffset(MessageQueue mq, long timestamp) throws 
MQClientException {
+        return MQAdminInstance.threadLocalMQAdminExt().searchOffset(mq, 
timestamp);
+    }
+
+    @Override
+    public long maxOffset(MessageQueue mq) throws MQClientException {
+        return MQAdminInstance.threadLocalMQAdminExt().maxOffset(mq);
+    }
+
+    @Override
+    public long minOffset(MessageQueue mq) throws MQClientException {
+        return MQAdminInstance.threadLocalMQAdminExt().minOffset(mq);
+    }
+
+    @Override
+    public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException 
{
+        return 
MQAdminInstance.threadLocalMQAdminExt().earliestMsgStoreTime(mq);
+    }
+
+    @Override
+    public MessageExt viewMessage(String msgId)
+        throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException {
+        return MQAdminInstance.threadLocalMQAdminExt().viewMessage(msgId);
+    }
+
+    @Override
+    public QueryResult queryMessage(String topic, String key, int maxNum, long 
begin, long end)
+        throws MQClientException, InterruptedException {
+        return MQAdminInstance.threadLocalMQAdminExt().queryMessage(topic, 
key, maxNum, begin, end);
+    }
+
+    @Override
+    @Deprecated
+    public void start() throws MQClientException {
+        throw new IllegalStateException("thisMethod is deprecated.use 
org.apache.rocketmq.console.aspect.admin.MQAdminAspect instead of this");
+    }
+
+    @Override
+    @Deprecated
+    public void shutdown() {
+        throw new IllegalStateException("thisMethod is deprecated.use 
org.apache.rocketmq.console.aspect.admin.MQAdminAspect instead of this");
+    }
+
+    // below is 3.2.6->3.5.8 updated
+
+    @Override
+    public List<QueueTimeSpan> queryConsumeTimeSpan(String topic,
+        String group) throws InterruptedException, MQBrokerException, 
RemotingException, MQClientException {
+        return 
MQAdminInstance.threadLocalMQAdminExt().queryConsumeTimeSpan(topic, group);
+    }
+
+    //MessageClientIDSetter.getNearlyTimeFromID has bug,so we subtract half a 
day
+    //next version we will remove it
+    //https://issues.apache.org/jira/browse/ROCKETMQ-111
+    //https://github.com/apache/incubator-rocketmq/pull/69
+    @Override
+    public MessageExt viewMessage(String topic,
+        String msgId) throws RemotingException, MQBrokerException, 
InterruptedException, MQClientException {
+        logger.info("MessageClientIDSetter.getNearlyTimeFromID(msgId)={} 
msgId={}", MessageClientIDSetter.getNearlyTimeFromID(msgId), msgId);
+        try {
+            return viewMessage(msgId);
+        }
+        catch (Exception e) {
+        }
+        MQAdminImpl mqAdminImpl = 
MQAdminInstance.threadLocalMqClientInstance().getMQAdminImpl();
+        QueryResult qr = Reflect.on(mqAdminImpl).call("queryMessage", topic, 
msgId, 32,
+            MessageClientIDSetter.getNearlyTimeFromID(msgId).getTime() - 1000 
* 60 * 60 * 13L, Long.MAX_VALUE, true).get();
+        if (qr != null && qr.getMessageList() != null && 
qr.getMessageList().size() > 0) {
+            return qr.getMessageList().get(0);
+        }
+        else {
+            return null;
+        }
+    }
+
+    @Override
+    public ConsumeMessageDirectlyResult consumeMessageDirectly(String 
consumerGroup, String clientId, String topic,
+        String msgId) throws RemotingException, MQClientException, 
InterruptedException, MQBrokerException {
+        return 
MQAdminInstance.threadLocalMQAdminExt().consumeMessageDirectly(consumerGroup, 
clientId, topic, msgId);
+    }
+
+    @Override
+    public Properties getBrokerConfig(
+        String brokerAddr) throws RemotingConnectException, 
RemotingSendRequestException, RemotingTimeoutException, 
UnsupportedEncodingException, InterruptedException, MQBrokerException {
+        return 
MQAdminInstance.threadLocalMQAdminExt().getBrokerConfig(brokerAddr);
+    }
+
+    @Override
+    public TopicList fetchTopicsByCLuster(
+        String clusterName) throws RemotingException, MQClientException, 
InterruptedException {
+        return 
MQAdminInstance.threadLocalMQAdminExt().fetchTopicsByCLuster(clusterName);
+    }
+
+    @Override
+    public boolean cleanUnusedTopic(
+        String cluster) throws RemotingConnectException, 
RemotingSendRequestException, RemotingTimeoutException, MQClientException, 
InterruptedException {
+        return 
MQAdminInstance.threadLocalMQAdminExt().cleanUnusedTopic(cluster);
+    }
+
+    @Override
+    public boolean cleanUnusedTopicByAddr(
+        String addr) throws RemotingConnectException, 
RemotingSendRequestException, RemotingTimeoutException, MQClientException, 
InterruptedException {
+        return 
MQAdminInstance.threadLocalMQAdminExt().cleanUnusedTopicByAddr(addr);
+    }
+
+    @Override
+    public BrokerStatsData viewBrokerStatsData(String brokerAddr, String 
statsName,
+        String statsKey) throws RemotingConnectException, 
RemotingSendRequestException, RemotingTimeoutException, MQClientException, 
InterruptedException {
+        return 
MQAdminInstance.threadLocalMQAdminExt().viewBrokerStatsData(brokerAddr, 
statsName, statsKey);
+    }
+
+    @Override
+    public Set<String> getClusterList(
+        String topic) throws RemotingConnectException, 
RemotingSendRequestException, RemotingTimeoutException, MQClientException, 
InterruptedException {
+        return MQAdminInstance.threadLocalMQAdminExt().getClusterList(topic);
+    }
+
+    @Override
+    public ConsumeStatsList fetchConsumeStatsInBroker(String brokerAddr, 
boolean isOrder,
+        long timeoutMillis) throws RemotingConnectException, 
RemotingSendRequestException, RemotingTimeoutException, MQClientException, 
InterruptedException {
+        return 
MQAdminInstance.threadLocalMQAdminExt().fetchConsumeStatsInBroker(brokerAddr, 
isOrder, timeoutMillis);
+    }
+
+    @Override
+    public Set<String> getTopicClusterList(
+        String topic) throws InterruptedException, MQBrokerException, 
MQClientException, RemotingException {
+        return 
MQAdminInstance.threadLocalMQAdminExt().getTopicClusterList(topic);
+    }
+
+    @Override
+    public SubscriptionGroupWrapper getAllSubscriptionGroup(String brokerAddr,
+        long timeoutMillis) throws InterruptedException, 
RemotingTimeoutException, RemotingSendRequestException, 
RemotingConnectException, MQBrokerException {
+        return 
MQAdminInstance.threadLocalMQAdminExt().getAllSubscriptionGroup(brokerAddr, 
timeoutMillis);
+    }
+
+    @Override
+    public TopicConfigSerializeWrapper getAllTopicGroup(String brokerAddr,
+        long timeoutMillis) throws InterruptedException, 
RemotingTimeoutException, RemotingSendRequestException, 
RemotingConnectException, MQBrokerException {
+        return 
MQAdminInstance.threadLocalMQAdminExt().getAllTopicGroup(brokerAddr, 
timeoutMillis);
+    }
+
+    @Override
+    public void updateConsumeOffset(String brokerAddr, String consumeGroup, 
MessageQueue mq,
+        long offset) throws RemotingException, InterruptedException, 
MQBrokerException {
+        
MQAdminInstance.threadLocalMQAdminExt().updateConsumeOffset(brokerAddr, 
consumeGroup, mq, offset);
+    }
+
+    // 4.0.0 added
+    @Override public void updateNameServerConfig(Properties properties,
+        List<String> list) throws InterruptedException, 
RemotingConnectException, UnsupportedEncodingException, 
RemotingSendRequestException, RemotingTimeoutException, MQClientException, 
MQBrokerException {
+
+    }
+
+    @Override public Map<String, Properties> getNameServerConfig(
+        List<String> list) throws InterruptedException, 
RemotingTimeoutException, RemotingSendRequestException, 
RemotingConnectException, MQClientException, UnsupportedEncodingException {
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/4b8f1357/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/client/MQAdminInstance.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/client/MQAdminInstance.java
 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/client/MQAdminInstance.java
new file mode 100644
index 0000000..e914e6c
--- /dev/null
+++ 
b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/client/MQAdminInstance.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.console.service.client;
+
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.remoting.RemotingClient;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
+import org.apache.rocketmq.tools.admin.MQAdminExt;
+import org.joor.Reflect;
+
+public class MQAdminInstance {
+    private static final ThreadLocal<DefaultMQAdminExt> 
MQ_ADMIN_EXT_THREAD_LOCAL = new ThreadLocal<DefaultMQAdminExt>();
+    private static final ThreadLocal<Integer> INIT_COUNTER = new 
ThreadLocal<Integer>();
+
+    public static MQAdminExt threadLocalMQAdminExt() {
+        DefaultMQAdminExt defaultMQAdminExt = MQ_ADMIN_EXT_THREAD_LOCAL.get();
+        if (defaultMQAdminExt == null) {
+            throw new IllegalStateException("defaultMQAdminExt should be init 
before you get this");
+        }
+        return defaultMQAdminExt;
+    }
+
+    public static RemotingClient threadLocalRemotingClient() {
+        MQClientInstance mqClientInstance = threadLocalMqClientInstance();
+        MQClientAPIImpl mQClientAPIImpl = 
Reflect.on(mqClientInstance).get("mQClientAPIImpl");
+        return Reflect.on(mQClientAPIImpl).get("remotingClient");
+    }
+
+    public static MQClientInstance threadLocalMqClientInstance() {
+        DefaultMQAdminExtImpl defaultMQAdminExtImpl = 
Reflect.on(MQAdminInstance.threadLocalMQAdminExt()).get("defaultMQAdminExtImpl");
+        return Reflect.on(defaultMQAdminExtImpl).get("mqClientInstance");
+    }
+
+    public static void initMQAdminInstance(long timeoutMillis) throws 
MQClientException {
+        Integer nowCount = INIT_COUNTER.get();
+        if (nowCount == null) {
+            DefaultMQAdminExt defaultMQAdminExt;
+            if (timeoutMillis > 0) {
+                defaultMQAdminExt = new DefaultMQAdminExt(timeoutMillis);
+            }
+            else {
+                defaultMQAdminExt = new DefaultMQAdminExt();
+            }
+            
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+            defaultMQAdminExt.start();
+            MQ_ADMIN_EXT_THREAD_LOCAL.set(defaultMQAdminExt);
+            INIT_COUNTER.set(1);
+        }
+        else {
+            INIT_COUNTER.set(nowCount + 1);
+        }
+
+    }
+
+    public static void destroyMQAdminInstance() {
+        Integer nowCount = INIT_COUNTER.get() - 1;
+        if (nowCount > 0) {
+            INIT_COUNTER.set(nowCount);
+            return;
+        }
+        MQAdminExt mqAdminExt = MQ_ADMIN_EXT_THREAD_LOCAL.get();
+        if (mqAdminExt != null) {
+            mqAdminExt.shutdown();
+            MQ_ADMIN_EXT_THREAD_LOCAL.remove();
+            INIT_COUNTER.remove();
+        }
+    }
+}

Reply via email to