http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStorePluginContext.java
----------------------------------------------------------------------
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStorePluginContext.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStorePluginContext.java
new file mode 100644
index 0000000..32af402
--- /dev/null
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStorePluginContext.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ *
+ */
+package org.apache.rocketmq.broker.plugin;
+
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.store.MessageArrivingListener;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.stats.BrokerStatsManager;
+
+public class MessageStorePluginContext {
+    private MessageStoreConfig messageStoreConfig;
+    private BrokerStatsManager brokerStatsManager;
+    private MessageArrivingListener messageArrivingListener;
+    private BrokerConfig brokerConfig;
+
+    public MessageStorePluginContext(MessageStoreConfig messageStoreConfig,
+                                     BrokerStatsManager brokerStatsManager, 
MessageArrivingListener messageArrivingListener,
+                                     BrokerConfig brokerConfig) {
+        super();
+        this.messageStoreConfig = messageStoreConfig;
+        this.brokerStatsManager = brokerStatsManager;
+        this.messageArrivingListener = messageArrivingListener;
+        this.brokerConfig = brokerConfig;
+    }
+
+    public MessageStoreConfig getMessageStoreConfig() {
+        return messageStoreConfig;
+    }
+
+    public BrokerStatsManager getBrokerStatsManager() {
+        return brokerStatsManager;
+    }
+
+    public MessageArrivingListener getMessageArrivingListener() {
+        return messageArrivingListener;
+    }
+
+    public BrokerConfig getBrokerConfig() {
+        return brokerConfig;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
----------------------------------------------------------------------
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
new file mode 100644
index 0000000..3cf28b3
--- /dev/null
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
@@ -0,0 +1,332 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.broker.processor;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
+import org.apache.rocketmq.broker.mqtrace.SendMessageHook;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.TopicFilterType;
+import org.apache.rocketmq.common.constant.DBMsgConstants;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.constant.PermName;
+import org.apache.rocketmq.common.help.FAQUrl;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
+import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
+import org.apache.rocketmq.common.sysflag.MessageSysFlag;
+import org.apache.rocketmq.common.sysflag.TopicSysFlag;
+import org.apache.rocketmq.common.utils.ChannelUtil;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.store.MessageExtBrokerInner;
+import io.netty.channel.ChannelHandlerContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+
+/**
+ * @author shijia.wxr
+ */
+public abstract class AbstractSendMessageProcessor implements 
NettyRequestProcessor {
+    protected static final Logger log = 
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+
+    protected final static int DLQ_NUMS_PER_GROUP = 1;
+    protected final BrokerController brokerController;
+    protected final Random random = new Random(System.currentTimeMillis());
+    protected final SocketAddress storeHost;
+    private List<SendMessageHook> sendMessageHookList;
+
+
+    public AbstractSendMessageProcessor(final BrokerController 
brokerController) {
+        this.brokerController = brokerController;
+        this.storeHost =
+                new 
InetSocketAddress(brokerController.getBrokerConfig().getBrokerIP1(), 
brokerController
+                        .getNettyServerConfig().getListenPort());
+    }
+
+    protected SendMessageContext buildMsgContext(ChannelHandlerContext ctx,
+                                                 SendMessageRequestHeader 
requestHeader) {
+        if (!this.hasSendMessageHook()) {
+            return null;
+        }
+        SendMessageContext mqtraceContext;
+        mqtraceContext = new SendMessageContext();
+        mqtraceContext.setProducerGroup(requestHeader.getProducerGroup());
+        mqtraceContext.setTopic(requestHeader.getTopic());
+        mqtraceContext.setMsgProps(requestHeader.getProperties());
+        
mqtraceContext.setBornHost(RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+        mqtraceContext.setBrokerAddr(this.brokerController.getBrokerAddr());
+        
mqtraceContext.setBrokerRegionId(this.brokerController.getBrokerConfig().getRegionId());
+        mqtraceContext.setBornTimeStamp(requestHeader.getBornTimestamp());
+
+        Map<String, String> properties = 
MessageDecoder.string2messageProperties(requestHeader.getProperties());
+        String uniqueKey = 
properties.get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
+        properties.put(MessageConst.PROPERTY_MSG_REGION, 
this.brokerController.getBrokerConfig().getRegionId());
+        properties.put(MessageConst.PROPERTY_TRACE_SWITCH, 
String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
+        
requestHeader.setProperties(MessageDecoder.messageProperties2String(properties));
+
+
+        if (uniqueKey == null) {
+            uniqueKey = "";
+        }
+        mqtraceContext.setMsgUniqueKey(uniqueKey);
+        return mqtraceContext;
+    }
+
+    public boolean hasSendMessageHook() {
+        return sendMessageHookList != null && 
!this.sendMessageHookList.isEmpty();
+    }
+
+    protected MessageExtBrokerInner buildInnerMsg(final ChannelHandlerContext 
ctx,
+                                                  final 
SendMessageRequestHeader requestHeader, final byte[] body, TopicConfig 
topicConfig) {
+        int queueIdInt = requestHeader.getQueueId();
+        if (queueIdInt < 0) {
+            queueIdInt = Math.abs(this.random.nextInt() % 99999999) % 
topicConfig.getWriteQueueNums();
+        }
+        int sysFlag = requestHeader.getSysFlag();
+
+        if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {
+            sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG;
+        }
+
+        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
+        msgInner.setTopic(requestHeader.getTopic());
+        msgInner.setBody(body);
+        msgInner.setFlag(requestHeader.getFlag());
+        MessageAccessor.setProperties(msgInner,
+                
MessageDecoder.string2messageProperties(requestHeader.getProperties()));
+        msgInner.setPropertiesString(requestHeader.getProperties());
+        
msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(topicConfig.getTopicFilterType(),
+                msgInner.getTags()));
+
+        msgInner.setQueueId(queueIdInt);
+        msgInner.setSysFlag(sysFlag);
+        msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
+        msgInner.setBornHost(ctx.channel().remoteAddress());
+        msgInner.setStoreHost(this.getStoreHost());
+        msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 
0 : requestHeader
+                .getReconsumeTimes());
+        return msgInner;
+    }
+
+    public SocketAddress getStoreHost() {
+        return storeHost;
+    }
+
+    protected RemotingCommand msgContentCheck(final ChannelHandlerContext ctx,
+                                              final SendMessageRequestHeader 
requestHeader, RemotingCommand request,
+                                              final RemotingCommand response) {
+        if (requestHeader.getTopic().length() > Byte.MAX_VALUE) {
+            log.warn("putMessage message topic length too long " + 
requestHeader.getTopic().length());
+            response.setCode(ResponseCode.MESSAGE_ILLEGAL);
+            return response;
+        }
+        if (requestHeader.getProperties() != null && 
requestHeader.getProperties().length() > Short.MAX_VALUE) {
+            log.warn("putMessage message properties length too long "
+                    + requestHeader.getProperties().length());
+            response.setCode(ResponseCode.MESSAGE_ILLEGAL);
+            return response;
+        }
+        if (request.getBody().length > DBMsgConstants.MAX_BODY_SIZE) {
+            log.warn(" topic {}  msg body size {}  from {}", 
requestHeader.getTopic(),
+                    request.getBody().length, 
ChannelUtil.getRemoteIp(ctx.channel()));
+            response.setRemark("msg body must be less 64KB");
+            response.setCode(ResponseCode.MESSAGE_ILLEGAL);
+            return response;
+        }
+        return response;
+    }
+
+    protected RemotingCommand msgCheck(final ChannelHandlerContext ctx,
+                                       final SendMessageRequestHeader 
requestHeader, final RemotingCommand response) {
+        if 
(!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())
+                && 
this.brokerController.getTopicConfigManager().isOrderTopic(requestHeader.getTopic()))
 {
+            response.setCode(ResponseCode.NO_PERMISSION);
+            response.setRemark("the broker[" + 
this.brokerController.getBrokerConfig().getBrokerIP1()
+                    + "] sending message is forbidden");
+            return response;
+        }
+        if 
(!this.brokerController.getTopicConfigManager().isTopicCanSendMessage(requestHeader.getTopic()))
 {
+            String errorMsg =
+                    "the topic[" + requestHeader.getTopic() + "] is conflict 
with system reserved words.";
+            log.warn(errorMsg);
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark(errorMsg);
+            return response;
+        }
+
+        TopicConfig topicConfig =
+                
this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
+        if (null == topicConfig) {
+            int topicSysFlag = 0;
+            if (requestHeader.isUnitMode()) {
+                if 
(requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+                    topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
+                } else {
+                    topicSysFlag = TopicSysFlag.buildSysFlag(true, false);
+                }
+            }
+
+            log.warn("the topic " + requestHeader.getTopic() + " not exist, 
producer: "
+                    + ctx.channel().remoteAddress());
+            topicConfig = 
this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(//
+                    requestHeader.getTopic(), //
+                    requestHeader.getDefaultTopic(), //
+                    RemotingHelper.parseChannelRemoteAddr(ctx.channel()), //
+                    requestHeader.getDefaultTopicQueueNums(), topicSysFlag);
+
+            if (null == topicConfig) {
+                if 
(requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+                    topicConfig =
+                            
this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
+                                    requestHeader.getTopic(), 1, 
PermName.PERM_WRITE | PermName.PERM_READ,
+                                    topicSysFlag);
+                }
+            }
+
+            if (null == topicConfig) {
+                response.setCode(ResponseCode.TOPIC_NOT_EXIST);
+                response.setRemark("topic[" + requestHeader.getTopic() + "] 
not exist, apply first please!"
+                        + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
+                return response;
+            }
+        }
+
+        int queueIdInt = requestHeader.getQueueId();
+        int idValid = Math.max(topicConfig.getWriteQueueNums(), 
topicConfig.getReadQueueNums());
+        if (queueIdInt >= idValid) {
+            String errorInfo = String.format("request queueId[%d] is illagal, 
%s Producer: %s",
+                    queueIdInt,
+                    topicConfig.toString(),
+                    RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+
+            log.warn(errorInfo);
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark(errorInfo);
+
+            return response;
+        }
+        return response;
+    }
+
+    public void registerSendMessageHook(List<SendMessageHook> 
sendMessageHookList) {
+        this.sendMessageHookList = sendMessageHookList;
+    }
+
+    protected void doResponse(ChannelHandlerContext ctx, RemotingCommand 
request,
+                              final RemotingCommand response) {
+        if (!request.isOnewayRPC()) {
+            try {
+                ctx.writeAndFlush(response);
+            } catch (Throwable e) {
+                log.error("SendMessageProcessor process request over, but 
response failed", e);
+                log.error(request.toString());
+                log.error(response.toString());
+            }
+        }
+    }
+
+    public void executeSendMessageHookBefore(final ChannelHandlerContext ctx, 
final RemotingCommand request,
+                                             SendMessageContext context) {
+        if (hasSendMessageHook()) {
+            for (SendMessageHook hook : this.sendMessageHookList) {
+                try {
+                    final SendMessageRequestHeader requestHeader = 
parseRequestHeader(request);
+
+                    if (null != requestHeader) {
+                        
context.setProducerGroup(requestHeader.getProducerGroup());
+                        context.setTopic(requestHeader.getTopic());
+                        context.setBodyLength(request.getBody().length);
+                        context.setMsgProps(requestHeader.getProperties());
+                        
context.setBornHost(RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+                        
context.setBrokerAddr(this.brokerController.getBrokerAddr());
+                        context.setQueueId(requestHeader.getQueueId());
+                    }
+
+                    hook.sendMessageBefore(context);
+                    requestHeader.setProperties(context.getMsgProps());
+                } catch (Throwable e) {
+                }
+            }
+        }
+    }
+
+    protected SendMessageRequestHeader parseRequestHeader(RemotingCommand 
request)
+            throws RemotingCommandException {
+
+        SendMessageRequestHeaderV2 requestHeaderV2 = null;
+        SendMessageRequestHeader requestHeader = null;
+        switch (request.getCode()) {
+            case RequestCode.SEND_MESSAGE_V2:
+                requestHeaderV2 =
+                        (SendMessageRequestHeaderV2) request
+                                
.decodeCommandCustomHeader(SendMessageRequestHeaderV2.class);
+            case RequestCode.SEND_MESSAGE:
+                if (null == requestHeaderV2) {
+                    requestHeader =
+                            (SendMessageRequestHeader) request
+                                    
.decodeCommandCustomHeader(SendMessageRequestHeader.class);
+                } else {
+                    requestHeader = 
SendMessageRequestHeaderV2.createSendMessageRequestHeaderV1(requestHeaderV2);
+                }
+            default:
+                break;
+        }
+        return requestHeader;
+    }
+
+    public void executeSendMessageHookAfter(final RemotingCommand response, 
final SendMessageContext context) {
+        if (hasSendMessageHook()) {
+            for (SendMessageHook hook : this.sendMessageHookList) {
+                try {
+                    if (response != null) {
+                        final SendMessageResponseHeader responseHeader =
+                                (SendMessageResponseHeader) 
response.readCustomHeader();
+                        context.setMsgId(responseHeader.getMsgId());
+                        context.setQueueId(responseHeader.getQueueId());
+                        
context.setQueueOffset(responseHeader.getQueueOffset());
+                        context.setCode(response.getCode());
+                        context.setErrorMsg(response.getRemark());
+                    }
+                    hook.sendMessageAfter(context);
+                } catch (Throwable e) {
+
+                }
+            }
+        }
+    }
+
+    @Override
+    public boolean rejectRequest() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
----------------------------------------------------------------------
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
new file mode 100644
index 0000000..c1241bb
--- /dev/null
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -0,0 +1,1212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.processor;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.client.ClientChannelInfo;
+import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
+import org.apache.rocketmq.common.MQVersion;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.admin.ConsumeStats;
+import org.apache.rocketmq.common.admin.OffsetWrapper;
+import org.apache.rocketmq.common.admin.TopicOffset;
+import org.apache.rocketmq.common.admin.TopicStatsTable;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageId;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.body.*;
+import org.apache.rocketmq.common.protocol.header.*;
+import 
org.apache.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerRequestHeader;
+import 
org.apache.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerResponseHeader;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.common.stats.StatsItem;
+import org.apache.rocketmq.common.stats.StatsSnapshot;
+import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.SelectMappedBufferResult;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.UnsupportedEncodingException;
+import java.net.UnknownHostException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * @author shijia.wxr
+ * @author manhong.yqd
+ */
+public class AdminBrokerProcessor implements NettyRequestProcessor {
+    private static final Logger log = 
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+    private final BrokerController brokerController;
+
+    public AdminBrokerProcessor(final BrokerController brokerController) {
+        this.brokerController = brokerController;
+    }
+
+    @Override
+    public RemotingCommand processRequest(ChannelHandlerContext ctx, 
RemotingCommand request) throws RemotingCommandException {
+        switch (request.getCode()) {
+            case RequestCode.UPDATE_AND_CREATE_TOPIC:
+                return this.updateAndCreateTopic(ctx, request);
+            case RequestCode.DELETE_TOPIC_IN_BROKER:
+                return this.deleteTopic(ctx, request);
+            case RequestCode.GET_ALL_TOPIC_CONFIG:
+                return this.getAllTopicConfig(ctx, request);
+            case RequestCode.UPDATE_BROKER_CONFIG:
+                return this.updateBrokerConfig(ctx, request);
+            case RequestCode.GET_BROKER_CONFIG:
+                return this.getBrokerConfig(ctx, request);
+            case RequestCode.SEARCH_OFFSET_BY_TIMESTAMP:
+                return this.searchOffsetByTimestamp(ctx, request);
+            case RequestCode.GET_MAX_OFFSET:
+                return this.getMaxOffset(ctx, request);
+            case RequestCode.GET_MIN_OFFSET:
+                return this.getMinOffset(ctx, request);
+            case RequestCode.GET_EARLIEST_MSG_STORETIME:
+                return this.getEarliestMsgStoretime(ctx, request);
+            case RequestCode.GET_BROKER_RUNTIME_INFO:
+                return this.getBrokerRuntimeInfo(ctx, request);
+            case RequestCode.LOCK_BATCH_MQ:
+                return this.lockBatchMQ(ctx, request);
+            case RequestCode.UNLOCK_BATCH_MQ:
+                return this.unlockBatchMQ(ctx, request);
+            case RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP:
+                return this.updateAndCreateSubscriptionGroup(ctx, request);
+            case RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG:
+                return this.getAllSubscriptionGroup(ctx, request);
+            case RequestCode.DELETE_SUBSCRIPTIONGROUP:
+                return this.deleteSubscriptionGroup(ctx, request);
+            case RequestCode.GET_TOPIC_STATS_INFO:
+                return this.getTopicStatsInfo(ctx, request);
+            case RequestCode.GET_CONSUMER_CONNECTION_LIST:
+                return this.getConsumerConnectionList(ctx, request);
+            case RequestCode.GET_PRODUCER_CONNECTION_LIST:
+                return this.getProducerConnectionList(ctx, request);
+            case RequestCode.GET_CONSUME_STATS:
+                return this.getConsumeStats(ctx, request);
+            case RequestCode.GET_ALL_CONSUMER_OFFSET:
+                return this.getAllConsumerOffset(ctx, request);
+            case RequestCode.GET_ALL_DELAY_OFFSET:
+                return this.getAllDelayOffset(ctx, request);
+            case RequestCode.INVOKE_BROKER_TO_RESET_OFFSET:
+                return this.resetOffset(ctx, request);
+            case RequestCode.INVOKE_BROKER_TO_GET_CONSUMER_STATUS:
+                return this.getConsumerStatus(ctx, request);
+            case RequestCode.QUERY_TOPIC_CONSUME_BY_WHO:
+                return this.queryTopicConsumeByWho(ctx, request);
+            case RequestCode.REGISTER_FILTER_SERVER:
+                return this.registerFilterServer(ctx, request);
+            case RequestCode.QUERY_CONSUME_TIME_SPAN:
+                return this.queryConsumeTimeSpan(ctx, request);
+            case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_BROKER:
+                return this.getSystemTopicListFromBroker(ctx, request);
+            case RequestCode.CLEAN_EXPIRED_CONSUMEQUEUE:
+                return this.cleanExpiredConsumeQueue();
+            case RequestCode.CLEAN_UNUSED_TOPIC:
+                return this.cleanUnusedTopic();
+            case RequestCode.GET_CONSUMER_RUNNING_INFO:
+                return this.getConsumerRunningInfo(ctx, request);
+            case RequestCode.QUERY_CORRECTION_OFFSET:
+                return this.queryCorrectionOffset(ctx, request);
+            case RequestCode.CONSUME_MESSAGE_DIRECTLY:
+                return this.consumeMessageDirectly(ctx, request);
+            case RequestCode.CLONE_GROUP_OFFSET:
+                return this.cloneGroupOffset(ctx, request);
+            case RequestCode.VIEW_BROKER_STATS_DATA:
+                return ViewBrokerStatsData(ctx, request);
+            case RequestCode.GET_BROKER_CONSUME_STATS:
+                return fetchAllConsumeStatsInBroker(ctx, request);
+            default:
+                break;
+        }
+
+        return null;
+    }
+
+    @Override
+    public boolean rejectRequest() {
+        return false;
+    }
+
+    private RemotingCommand updateAndCreateTopic(ChannelHandlerContext ctx, 
RemotingCommand request) throws RemotingCommandException {
+        final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
+        final CreateTopicRequestHeader requestHeader =
+                (CreateTopicRequestHeader) 
request.decodeCommandCustomHeader(CreateTopicRequestHeader.class);
+        log.info("updateAndCreateTopic called by {}", 
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+
+
+        if 
(requestHeader.getTopic().equals(this.brokerController.getBrokerConfig().getBrokerClusterName()))
 {
+            String errorMsg = "the topic[" + requestHeader.getTopic() + "] is 
conflict with system reserved words.";
+            log.warn(errorMsg);
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark(errorMsg);
+            return response;
+        }
+
+        try {
+            response.setCode(ResponseCode.SUCCESS);
+            response.setOpaque(request.getOpaque());
+            response.markResponseType();
+            response.setRemark(null);
+            ctx.writeAndFlush(response);
+        } catch (Exception e) {
+        }
+
+        TopicConfig topicConfig = new TopicConfig(requestHeader.getTopic());
+        topicConfig.setReadQueueNums(requestHeader.getReadQueueNums());
+        topicConfig.setWriteQueueNums(requestHeader.getWriteQueueNums());
+        topicConfig.setTopicFilterType(requestHeader.getTopicFilterTypeEnum());
+        topicConfig.setPerm(requestHeader.getPerm());
+        topicConfig.setTopicSysFlag(requestHeader.getTopicSysFlag() == null ? 
0 : requestHeader.getTopicSysFlag());
+
+        
this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
+        this.brokerController.registerBrokerAll(false, true);
+        return null;
+    }
+
+    private RemotingCommand deleteTopic(ChannelHandlerContext ctx, 
RemotingCommand request) throws RemotingCommandException {
+        final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
+        DeleteTopicRequestHeader requestHeader =
+                (DeleteTopicRequestHeader) 
request.decodeCommandCustomHeader(DeleteTopicRequestHeader.class);
+
+        log.info("deleteTopic called by {}", 
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+
+        
this.brokerController.getTopicConfigManager().deleteTopicConfig(requestHeader.getTopic());
+        this.brokerController.getMessageStore()
+                
.cleanUnusedTopic(this.brokerController.getTopicConfigManager().getTopicConfigTable().keySet());
+
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+        return response;
+    }
+
+    private RemotingCommand getAllTopicConfig(ChannelHandlerContext ctx, 
RemotingCommand request) {
+        final RemotingCommand response = 
RemotingCommand.createResponseCommand(GetAllTopicConfigResponseHeader.class);
+        // final GetAllTopicConfigResponseHeader responseHeader =
+        // (GetAllTopicConfigResponseHeader) response.readCustomHeader();
+
+        String content = 
this.brokerController.getTopicConfigManager().encode();
+        if (content != null && content.length() > 0) {
+            try {
+                response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET));
+            } catch (UnsupportedEncodingException e) {
+                log.error("", e);
+
+                response.setCode(ResponseCode.SYSTEM_ERROR);
+                response.setRemark("UnsupportedEncodingException " + e);
+                return response;
+            }
+        } else {
+            log.error("No topic in this broker, client: " + 
ctx.channel().remoteAddress());
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark("No topic in this broker");
+            return response;
+        }
+
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+
+        return response;
+    }
+
+    private RemotingCommand updateBrokerConfig(ChannelHandlerContext ctx, 
RemotingCommand request) {
+        final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
+
+        log.info("updateBrokerConfig called by {}", 
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+
+        byte[] body = request.getBody();
+        if (body != null) {
+            try {
+                String bodyStr = new String(body, MixAll.DEFAULT_CHARSET);
+                Properties properties = MixAll.string2Properties(bodyStr);
+                if (properties != null) {
+                    log.info("updateBrokerConfig, new config: " + properties + 
" client: " + ctx.channel().remoteAddress());
+                    
this.brokerController.getConfiguration().update(properties);
+                    if (properties.containsKey("brokerPermission")) {
+                        this.brokerController.registerBrokerAll(false, false);
+                        
this.brokerController.getTopicConfigManager().getDataVersion().nextVersion();
+                    }
+                } else {
+                    log.error("string2Properties error");
+                    response.setCode(ResponseCode.SYSTEM_ERROR);
+                    response.setRemark("string2Properties error");
+                    return response;
+                }
+            } catch (UnsupportedEncodingException e) {
+                log.error("", e);
+                response.setCode(ResponseCode.SYSTEM_ERROR);
+                response.setRemark("UnsupportedEncodingException " + e);
+                return response;
+            }
+        }
+
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+        return response;
+    }
+
+    private RemotingCommand getBrokerConfig(ChannelHandlerContext ctx, 
RemotingCommand request) {
+
+        final RemotingCommand response = 
RemotingCommand.createResponseCommand(GetBrokerConfigResponseHeader.class);
+        final GetBrokerConfigResponseHeader responseHeader = 
(GetBrokerConfigResponseHeader) response.readCustomHeader();
+
+        String content = 
this.brokerController.getConfiguration().getAllConfigsFormatString();
+        if (content != null && content.length() > 0) {
+            try {
+                response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET));
+            } catch (UnsupportedEncodingException e) {
+                log.error("", e);
+
+                response.setCode(ResponseCode.SYSTEM_ERROR);
+                response.setRemark("UnsupportedEncodingException " + e);
+                return response;
+            }
+        }
+
+        
responseHeader.setVersion(this.brokerController.getConfiguration().getDataVersionJson());
+
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+        return response;
+    }
+
+    private RemotingCommand searchOffsetByTimestamp(ChannelHandlerContext ctx, 
RemotingCommand request) throws RemotingCommandException {
+        final RemotingCommand response = 
RemotingCommand.createResponseCommand(SearchOffsetResponseHeader.class);
+        final SearchOffsetResponseHeader responseHeader = 
(SearchOffsetResponseHeader) response.readCustomHeader();
+        final SearchOffsetRequestHeader requestHeader =
+                (SearchOffsetRequestHeader) 
request.decodeCommandCustomHeader(SearchOffsetRequestHeader.class);
+
+        long offset = 
this.brokerController.getMessageStore().getOffsetInQueueByTime(requestHeader.getTopic(),
 requestHeader.getQueueId(),
+                requestHeader.getTimestamp());
+
+        responseHeader.setOffset(offset);
+
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+        return response;
+    }
+
+    private RemotingCommand getMaxOffset(ChannelHandlerContext ctx, 
RemotingCommand request) throws RemotingCommandException {
+        final RemotingCommand response = 
RemotingCommand.createResponseCommand(GetMaxOffsetResponseHeader.class);
+        final GetMaxOffsetResponseHeader responseHeader = 
(GetMaxOffsetResponseHeader) response.readCustomHeader();
+        final GetMaxOffsetRequestHeader requestHeader =
+                (GetMaxOffsetRequestHeader) 
request.decodeCommandCustomHeader(GetMaxOffsetRequestHeader.class);
+
+        long offset = 
this.brokerController.getMessageStore().getMaxOffsetInQuque(requestHeader.getTopic(),
 requestHeader.getQueueId());
+
+        responseHeader.setOffset(offset);
+
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+        return response;
+    }
+
+    private RemotingCommand getMinOffset(ChannelHandlerContext ctx, 
RemotingCommand request) throws RemotingCommandException {
+        final RemotingCommand response = 
RemotingCommand.createResponseCommand(GetMinOffsetResponseHeader.class);
+        final GetMinOffsetResponseHeader responseHeader = 
(GetMinOffsetResponseHeader) response.readCustomHeader();
+        final GetMinOffsetRequestHeader requestHeader =
+                (GetMinOffsetRequestHeader) 
request.decodeCommandCustomHeader(GetMinOffsetRequestHeader.class);
+
+        long offset = 
this.brokerController.getMessageStore().getMinOffsetInQuque(requestHeader.getTopic(),
 requestHeader.getQueueId());
+
+        responseHeader.setOffset(offset);
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+        return response;
+    }
+
+    private RemotingCommand getEarliestMsgStoretime(ChannelHandlerContext ctx, 
RemotingCommand request) throws RemotingCommandException {
+        final RemotingCommand response = 
RemotingCommand.createResponseCommand(GetEarliestMsgStoretimeResponseHeader.class);
+        final GetEarliestMsgStoretimeResponseHeader responseHeader = 
(GetEarliestMsgStoretimeResponseHeader) response.readCustomHeader();
+        final GetEarliestMsgStoretimeRequestHeader requestHeader =
+                (GetEarliestMsgStoretimeRequestHeader) 
request.decodeCommandCustomHeader(GetEarliestMsgStoretimeRequestHeader.class);
+
+        long timestamp =
+                
this.brokerController.getMessageStore().getEarliestMessageTime(requestHeader.getTopic(),
 requestHeader.getQueueId());
+
+        responseHeader.setTimestamp(timestamp);
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+        return response;
+    }
+
+    private RemotingCommand getBrokerRuntimeInfo(ChannelHandlerContext ctx, 
RemotingCommand request) {
+        final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
+
+        HashMap<String, String> runtimeInfo = this.prepareRuntimeInfo();
+        KVTable kvTable = new KVTable();
+        kvTable.setTable(runtimeInfo);
+
+        byte[] body = kvTable.encode();
+        response.setBody(body);
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+        return response;
+    }
+
+    private RemotingCommand lockBatchMQ(ChannelHandlerContext ctx, 
RemotingCommand request) throws RemotingCommandException {
+        final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
+        LockBatchRequestBody requestBody = 
LockBatchRequestBody.decode(request.getBody(), LockBatchRequestBody.class);
+
+        Set<MessageQueue> lockOKMQSet = 
this.brokerController.getRebalanceLockManager().tryLockBatch(//
+                requestBody.getConsumerGroup(), //
+                requestBody.getMqSet(), //
+                requestBody.getClientId());
+
+        LockBatchResponseBody responseBody = new LockBatchResponseBody();
+        responseBody.setLockOKMQSet(lockOKMQSet);
+
+        response.setBody(responseBody.encode());
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+        return response;
+    }
+
+    private RemotingCommand unlockBatchMQ(ChannelHandlerContext ctx, 
RemotingCommand request) throws RemotingCommandException {
+        final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
+        UnlockBatchRequestBody requestBody = 
UnlockBatchRequestBody.decode(request.getBody(), UnlockBatchRequestBody.class);
+
+        this.brokerController.getRebalanceLockManager().unlockBatch(//
+                requestBody.getConsumerGroup(), //
+                requestBody.getMqSet(), //
+                requestBody.getClientId());
+
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+        return response;
+    }
+
+    private RemotingCommand 
updateAndCreateSubscriptionGroup(ChannelHandlerContext ctx, RemotingCommand 
request)
+            throws RemotingCommandException {
+        final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
+
+        log.info("updateAndCreateSubscriptionGroup called by {}", 
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+
+        SubscriptionGroupConfig config = 
RemotingSerializable.decode(request.getBody(), SubscriptionGroupConfig.class);
+        if (config != null) {
+            
this.brokerController.getSubscriptionGroupManager().updateSubscriptionGroupConfig(config);
+        }
+
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+        return response;
+    }
+
+    private RemotingCommand getAllSubscriptionGroup(ChannelHandlerContext ctx, 
RemotingCommand request) throws RemotingCommandException {
+        final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
+        String content = 
this.brokerController.getSubscriptionGroupManager().encode();
+        if (content != null && content.length() > 0) {
+            try {
+                response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET));
+            } catch (UnsupportedEncodingException e) {
+                log.error("", e);
+
+                response.setCode(ResponseCode.SYSTEM_ERROR);
+                response.setRemark("UnsupportedEncodingException " + e);
+                return response;
+            }
+        } else {
+            log.error("No subscription group in this broker, client: " + 
ctx.channel().remoteAddress());
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark("No subscription group in this broker");
+            return response;
+        }
+
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+
+        return response;
+    }
+
+    private RemotingCommand deleteSubscriptionGroup(ChannelHandlerContext ctx, 
RemotingCommand request) throws RemotingCommandException {
+        final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
+        DeleteSubscriptionGroupRequestHeader requestHeader =
+                (DeleteSubscriptionGroupRequestHeader) 
request.decodeCommandCustomHeader(DeleteSubscriptionGroupRequestHeader.class);
+
+        log.info("deleteSubscriptionGroup called by {}", 
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+
+        
this.brokerController.getSubscriptionGroupManager().deleteSubscriptionGroupConfig(requestHeader.getGroupName());
+
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+        return response;
+    }
+
+    private RemotingCommand getTopicStatsInfo(ChannelHandlerContext ctx, 
RemotingCommand request) throws RemotingCommandException {
+        final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
+        final GetTopicStatsInfoRequestHeader requestHeader =
+                (GetTopicStatsInfoRequestHeader) 
request.decodeCommandCustomHeader(GetTopicStatsInfoRequestHeader.class);
+
+        final String topic = requestHeader.getTopic();
+        TopicConfig topicConfig = 
this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
+        if (null == topicConfig) {
+            response.setCode(ResponseCode.TOPIC_NOT_EXIST);
+            response.setRemark("topic[" + topic + "] not exist");
+            return response;
+        }
+
+        TopicStatsTable topicStatsTable = new TopicStatsTable();
+        for (int i = 0; i < topicConfig.getWriteQueueNums(); i++) {
+            MessageQueue mq = new MessageQueue();
+            mq.setTopic(topic);
+            
mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
+            mq.setQueueId(i);
+
+            TopicOffset topicOffset = new TopicOffset();
+            long min = 
this.brokerController.getMessageStore().getMinOffsetInQuque(topic, i);
+            if (min < 0)
+                min = 0;
+
+            long max = 
this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, i);
+            if (max < 0)
+                max = 0;
+
+            long timestamp = 0;
+            if (max > 0) {
+                timestamp = 
this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, max 
- 1);
+            }
+
+            topicOffset.setMinOffset(min);
+            topicOffset.setMaxOffset(max);
+            topicOffset.setLastUpdateTimestamp(timestamp);
+
+            topicStatsTable.getOffsetTable().put(mq, topicOffset);
+        }
+
+        byte[] body = topicStatsTable.encode();
+        response.setBody(body);
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+        return response;
+    }
+
+    private RemotingCommand getConsumerConnectionList(ChannelHandlerContext 
ctx, RemotingCommand request) throws RemotingCommandException {
+        final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
+        final GetConsumerConnectionListRequestHeader requestHeader =
+                (GetConsumerConnectionListRequestHeader) 
request.decodeCommandCustomHeader(GetConsumerConnectionListRequestHeader.class);
+
+        ConsumerGroupInfo consumerGroupInfo =
+                
this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());
+        if (consumerGroupInfo != null) {
+            ConsumerConnection bodydata = new ConsumerConnection();
+            
bodydata.setConsumeFromWhere(consumerGroupInfo.getConsumeFromWhere());
+            bodydata.setConsumeType(consumerGroupInfo.getConsumeType());
+            bodydata.setMessageModel(consumerGroupInfo.getMessageModel());
+            
bodydata.getSubscriptionTable().putAll(consumerGroupInfo.getSubscriptionTable());
+
+            Iterator<Map.Entry<Channel, ClientChannelInfo>> it = 
consumerGroupInfo.getChannelInfoTable().entrySet().iterator();
+            while (it.hasNext()) {
+                ClientChannelInfo info = it.next().getValue();
+                Connection connection = new Connection();
+                connection.setClientId(info.getClientId());
+                connection.setLanguage(info.getLanguage());
+                connection.setVersion(info.getVersion());
+                
connection.setClientAddr(RemotingHelper.parseChannelRemoteAddr(info.getChannel()));
+
+                bodydata.getConnectionSet().add(connection);
+            }
+
+            byte[] body = bodydata.encode();
+            response.setBody(body);
+            response.setCode(ResponseCode.SUCCESS);
+            response.setRemark(null);
+
+            return response;
+        }
+
+        response.setCode(ResponseCode.CONSUMER_NOT_ONLINE);
+        response.setRemark("the consumer group[" + 
requestHeader.getConsumerGroup() + "] not online");
+        return response;
+    }
+
+    private RemotingCommand getProducerConnectionList(ChannelHandlerContext 
ctx, RemotingCommand request) throws RemotingCommandException {
+        final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
+        final GetProducerConnectionListRequestHeader requestHeader =
+                (GetProducerConnectionListRequestHeader) 
request.decodeCommandCustomHeader(GetProducerConnectionListRequestHeader.class);
+
+        ProducerConnection bodydata = new ProducerConnection();
+        HashMap<Channel, ClientChannelInfo> channelInfoHashMap =
+                
this.brokerController.getProducerManager().getGroupChannelTable().get(requestHeader.getProducerGroup());
+        if (channelInfoHashMap != null) {
+            Iterator<Map.Entry<Channel, ClientChannelInfo>> it = 
channelInfoHashMap.entrySet().iterator();
+            while (it.hasNext()) {
+                ClientChannelInfo info = it.next().getValue();
+                Connection connection = new Connection();
+                connection.setClientId(info.getClientId());
+                connection.setLanguage(info.getLanguage());
+                connection.setVersion(info.getVersion());
+                
connection.setClientAddr(RemotingHelper.parseChannelRemoteAddr(info.getChannel()));
+
+                bodydata.getConnectionSet().add(connection);
+            }
+
+            byte[] body = bodydata.encode();
+            response.setBody(body);
+            response.setCode(ResponseCode.SUCCESS);
+            response.setRemark(null);
+            return response;
+        }
+
+        response.setCode(ResponseCode.SYSTEM_ERROR);
+        response.setRemark("the producer group[" + 
requestHeader.getProducerGroup() + "] not exist");
+        return response;
+    }
+
+    private RemotingCommand getConsumeStats(ChannelHandlerContext ctx, 
RemotingCommand request) throws RemotingCommandException {
+        final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
+        final GetConsumeStatsRequestHeader requestHeader =
+                (GetConsumeStatsRequestHeader) 
request.decodeCommandCustomHeader(GetConsumeStatsRequestHeader.class);
+
+        ConsumeStats consumeStats = new ConsumeStats();
+
+        Set<String> topics = new HashSet<String>();
+        if (UtilAll.isBlank(requestHeader.getTopic())) {
+            topics = 
this.brokerController.getConsumerOffsetManager().whichTopicByConsumer(requestHeader.getConsumerGroup());
+        } else {
+            topics.add(requestHeader.getTopic());
+        }
+
+        for (String topic : topics) {
+            TopicConfig topicConfig = 
this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
+            if (null == topicConfig) {
+                log.warn("consumeStats, topic config not exist, {}", topic);
+                continue;
+            }
+
+            /**
+
+             */
+            {
+                SubscriptionData findSubscriptionData =
+                        
this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getConsumerGroup(),
 topic);
+
+                if (null == findSubscriptionData //
+                        && 
this.brokerController.getConsumerManager().findSubscriptionDataCount(requestHeader.getConsumerGroup())
 > 0) {
+                    log.warn("consumeStats, the consumer group[{}], topic[{}] 
not exist", requestHeader.getConsumerGroup(), topic);
+                    continue;
+                }
+            }
+
+            for (int i = 0; i < topicConfig.getReadQueueNums(); i++) {
+                MessageQueue mq = new MessageQueue();
+                mq.setTopic(topic);
+                
mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
+                mq.setQueueId(i);
+
+                OffsetWrapper offsetWrapper = new OffsetWrapper();
+
+                long brokerOffset = 
this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, i);
+                if (brokerOffset < 0)
+                    brokerOffset = 0;
+
+                long consumerOffset = 
this.brokerController.getConsumerOffsetManager().queryOffset(//
+                        requestHeader.getConsumerGroup(), //
+                        topic, //
+                        i);
+                if (consumerOffset < 0)
+                    consumerOffset = 0;
+
+                offsetWrapper.setBrokerOffset(brokerOffset);
+                offsetWrapper.setConsumerOffset(consumerOffset);
+
+
+                long timeOffset = consumerOffset - 1;
+                if (timeOffset >= 0) {
+                    long lastTimestamp = 
this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, 
timeOffset);
+                    if (lastTimestamp > 0) {
+                        offsetWrapper.setLastTimestamp(lastTimestamp);
+                    }
+                }
+
+                consumeStats.getOffsetTable().put(mq, offsetWrapper);
+            }
+
+            double consumeTps = 
this.brokerController.getBrokerStatsManager().tpsGroupGetNums(requestHeader.getConsumerGroup(),
 topic);
+
+            consumeTps += consumeStats.getConsumeTps();
+            consumeStats.setConsumeTps(consumeTps);
+        }
+
+        byte[] body = consumeStats.encode();
+        response.setBody(body);
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+        return response;
+    }
+
+    private RemotingCommand getAllConsumerOffset(ChannelHandlerContext ctx, 
RemotingCommand request) {
+        final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
+
+        String content = 
this.brokerController.getConsumerOffsetManager().encode();
+        if (content != null && content.length() > 0) {
+            try {
+                response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET));
+            } catch (UnsupportedEncodingException e) {
+                log.error("get all consumer offset from master error.", e);
+
+                response.setCode(ResponseCode.SYSTEM_ERROR);
+                response.setRemark("UnsupportedEncodingException " + e);
+                return response;
+            }
+        } else {
+            log.error("No consumer offset in this broker, client: " + 
ctx.channel().remoteAddress());
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark("No consumer offset in this broker");
+            return response;
+        }
+
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+
+        return response;
+    }
+
+    private RemotingCommand getAllDelayOffset(ChannelHandlerContext ctx, 
RemotingCommand request) {
+        final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
+
+        String content = ((DefaultMessageStore) 
this.brokerController.getMessageStore()).getScheduleMessageService().encode();
+        if (content != null && content.length() > 0) {
+            try {
+                response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET));
+            } catch (UnsupportedEncodingException e) {
+                log.error("get all delay offset from master error.", e);
+
+                response.setCode(ResponseCode.SYSTEM_ERROR);
+                response.setRemark("UnsupportedEncodingException " + e);
+                return response;
+            }
+        } else {
+            log.error("No delay offset in this broker, client: " + 
ctx.channel().remoteAddress());
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark("No delay offset in this broker");
+            return response;
+        }
+
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+
+        return response;
+    }
+
+    public RemotingCommand resetOffset(ChannelHandlerContext ctx, 
RemotingCommand request) throws RemotingCommandException {
+        final ResetOffsetRequestHeader requestHeader =
+                (ResetOffsetRequestHeader) 
request.decodeCommandCustomHeader(ResetOffsetRequestHeader.class);
+        log.info("[reset-offset] reset offset started by {}. topic={}, 
group={}, timestamp={}, isForce={}",
+                new 
Object[]{RemotingHelper.parseChannelRemoteAddr(ctx.channel()), 
requestHeader.getTopic(), requestHeader.getGroup(),
+                        requestHeader.getTimestamp(), 
requestHeader.isForce()});
+        boolean isC = false;
+        LanguageCode language = request.getLanguage();
+        switch (language) {
+            case CPP:
+                isC = true;
+                break;
+        }
+        return 
this.brokerController.getBroker2Client().resetOffset(requestHeader.getTopic(), 
requestHeader.getGroup(),
+                requestHeader.getTimestamp(), requestHeader.isForce(), isC);
+    }
+
+    public RemotingCommand getConsumerStatus(ChannelHandlerContext ctx, 
RemotingCommand request) throws RemotingCommandException {
+        final GetConsumerStatusRequestHeader requestHeader =
+                (GetConsumerStatusRequestHeader) 
request.decodeCommandCustomHeader(GetConsumerStatusRequestHeader.class);
+
+        log.info("[get-consumer-status] get consumer status by {}. topic={}, 
group={}",
+                new 
Object[]{RemotingHelper.parseChannelRemoteAddr(ctx.channel()), 
requestHeader.getTopic(), requestHeader.getGroup()});
+
+        return 
this.brokerController.getBroker2Client().getConsumeStatus(requestHeader.getTopic(),
 requestHeader.getGroup(),
+                requestHeader.getClientAddr());
+    }
+
+    private RemotingCommand queryTopicConsumeByWho(ChannelHandlerContext ctx, 
RemotingCommand request) throws RemotingCommandException {
+        final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
+        QueryTopicConsumeByWhoRequestHeader requestHeader =
+                (QueryTopicConsumeByWhoRequestHeader) 
request.decodeCommandCustomHeader(QueryTopicConsumeByWhoRequestHeader.class);
+
+
+        HashSet<String> groups = 
this.brokerController.getConsumerManager().queryTopicConsumeByWho(requestHeader.getTopic());
+
+        Set<String> groupInOffset = 
this.brokerController.getConsumerOffsetManager().whichGroupByTopic(requestHeader.getTopic());
+        if (groupInOffset != null && !groupInOffset.isEmpty()) {
+            groups.addAll(groupInOffset);
+        }
+
+        GroupList groupList = new GroupList();
+        groupList.setGroupList(groups);
+        byte[] body = groupList.encode();
+
+        response.setBody(body);
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+        return response;
+    }
+
+    private RemotingCommand registerFilterServer(ChannelHandlerContext ctx, 
RemotingCommand request) throws RemotingCommandException {
+        final RemotingCommand response = 
RemotingCommand.createResponseCommand(RegisterFilterServerResponseHeader.class);
+        final RegisterFilterServerResponseHeader responseHeader = 
(RegisterFilterServerResponseHeader) response.readCustomHeader();
+        final RegisterFilterServerRequestHeader requestHeader =
+                (RegisterFilterServerRequestHeader) 
request.decodeCommandCustomHeader(RegisterFilterServerRequestHeader.class);
+
+        
this.brokerController.getFilterServerManager().registerFilterServer(ctx.channel(),
 requestHeader.getFilterServerAddr());
+
+        
responseHeader.setBrokerId(this.brokerController.getBrokerConfig().getBrokerId());
+        
responseHeader.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
+
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+        return response;
+    }
+
+    private RemotingCommand queryConsumeTimeSpan(ChannelHandlerContext ctx, 
RemotingCommand request) throws RemotingCommandException {
+        final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
+        QueryConsumeTimeSpanRequestHeader requestHeader =
+                (QueryConsumeTimeSpanRequestHeader) 
request.decodeCommandCustomHeader(QueryConsumeTimeSpanRequestHeader.class);
+
+        final String topic = requestHeader.getTopic();
+        TopicConfig topicConfig = 
this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
+        if (null == topicConfig) {
+            response.setCode(ResponseCode.TOPIC_NOT_EXIST);
+            response.setRemark("topic[" + topic + "] not exist");
+            return response;
+        }
+
+        List<QueueTimeSpan> timeSpanSet = new ArrayList<QueueTimeSpan>();
+        for (int i = 0; i < topicConfig.getWriteQueueNums(); i++) {
+            QueueTimeSpan timeSpan = new QueueTimeSpan();
+            MessageQueue mq = new MessageQueue();
+            mq.setTopic(topic);
+            
mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
+            mq.setQueueId(i);
+            timeSpan.setMessageQueue(mq);
+
+            long minTime = 
this.brokerController.getMessageStore().getEarliestMessageTime(topic, i);
+            timeSpan.setMinTimeStamp(minTime);
+
+            long max = 
this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, i);
+            long maxTime = 
this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, max 
- 1);
+            timeSpan.setMaxTimeStamp(maxTime);
+
+            long consumeTime;
+            long consumerOffset = 
this.brokerController.getConsumerOffsetManager().queryOffset(
+                    requestHeader.getGroup(), topic, i);
+            if (consumerOffset > 0) {
+                consumeTime = 
this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, 
consumerOffset - 1);
+            } else {
+                consumeTime = minTime;
+            }
+            timeSpan.setConsumeTimeStamp(consumeTime);
+
+            long maxBrokerOffset = 
this.brokerController.getMessageStore().getMaxOffsetInQuque(requestHeader.getTopic(),
 i);
+            if (consumerOffset < maxBrokerOffset) {
+                long nextTime = 
this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, 
consumerOffset);
+                timeSpan.setDelayTime(System.currentTimeMillis() - nextTime);
+            }
+            timeSpanSet.add(timeSpan);
+        }
+
+        QueryConsumeTimeSpanBody queryConsumeTimeSpanBody = new 
QueryConsumeTimeSpanBody();
+        queryConsumeTimeSpanBody.setConsumeTimeSpanSet(timeSpanSet);
+        response.setBody(queryConsumeTimeSpanBody.encode());
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+        return response;
+    }
+
+    private RemotingCommand getSystemTopicListFromBroker(ChannelHandlerContext 
ctx, RemotingCommand request)
+            throws RemotingCommandException {
+        final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
+
+        Set<String> topics = 
this.brokerController.getTopicConfigManager().getSystemTopic();
+        TopicList topicList = new TopicList();
+        topicList.setTopicList(topics);
+        response.setBody(topicList.encode());
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+        return response;
+    }
+
+    public RemotingCommand cleanExpiredConsumeQueue() {
+        log.warn("invoke cleanExpiredConsumeQueue start.");
+        final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
+        brokerController.getMessageStore().cleanExpiredConsumerQueue();
+        log.warn("invoke cleanExpiredConsumeQueue end.");
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+        return response;
+    }
+
+    public RemotingCommand cleanUnusedTopic() {
+        log.warn("invoke cleanUnusedTopic start.");
+        final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
+        
brokerController.getMessageStore().cleanUnusedTopic(brokerController.getTopicConfigManager().getTopicConfigTable().keySet());
+        log.warn("invoke cleanUnusedTopic end.");
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+        return response;
+    }
+
+    /**
+
+     */
+    private RemotingCommand getConsumerRunningInfo(ChannelHandlerContext ctx, 
RemotingCommand request) throws RemotingCommandException {
+        final GetConsumerRunningInfoRequestHeader requestHeader =
+                (GetConsumerRunningInfoRequestHeader) 
request.decodeCommandCustomHeader(GetConsumerRunningInfoRequestHeader.class);
+
+        return this.callConsumer(RequestCode.GET_CONSUMER_RUNNING_INFO, 
request, requestHeader.getConsumerGroup(),
+                requestHeader.getClientId());
+    }
+
+    private RemotingCommand queryCorrectionOffset(ChannelHandlerContext ctx, 
RemotingCommand request) throws RemotingCommandException {
+        final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
+        QueryCorrectionOffsetHeader requestHeader =
+                (QueryCorrectionOffsetHeader) 
request.decodeCommandCustomHeader(QueryCorrectionOffsetHeader.class);
+
+        Map<Integer, Long> correctionOffset = 
this.brokerController.getConsumerOffsetManager()
+                .queryMinOffsetInAllGroup(requestHeader.getTopic(), 
requestHeader.getFilterGroups());
+
+        Map<Integer, Long> compareOffset =
+                
this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getTopic(),
 requestHeader.getCompareGroup());
+
+        if (compareOffset != null && !compareOffset.isEmpty()) {
+            for (Map.Entry<Integer, Long> entry : compareOffset.entrySet()) {
+                Integer queueId = entry.getKey();
+                correctionOffset.put(queueId,
+                        correctionOffset.get(queueId) > entry.getValue() ? 
Long.MAX_VALUE : correctionOffset.get(queueId));
+            }
+        }
+
+        QueryCorrectionOffsetBody body = new QueryCorrectionOffsetBody();
+        body.setCorrectionOffsets(correctionOffset);
+        response.setBody(body.encode());
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+        return response;
+    }
+
+    private RemotingCommand consumeMessageDirectly(ChannelHandlerContext ctx, 
RemotingCommand request) throws RemotingCommandException {
+        final ConsumeMessageDirectlyResultRequestHeader requestHeader = 
(ConsumeMessageDirectlyResultRequestHeader) request
+                
.decodeCommandCustomHeader(ConsumeMessageDirectlyResultRequestHeader.class);
+
+        request.getExtFields().put("brokerName", 
this.brokerController.getBrokerConfig().getBrokerName());
+        SelectMappedBufferResult selectMappedBufferResult = null;
+        try {
+            MessageId messageId = 
MessageDecoder.decodeMessageId(requestHeader.getMsgId());
+            selectMappedBufferResult = 
this.brokerController.getMessageStore().selectOneMessageByOffset(messageId.getOffset());
+
+            byte[] body = new byte[selectMappedBufferResult.getSize()];
+            selectMappedBufferResult.getByteBuffer().get(body);
+            request.setBody(body);
+        } catch (UnknownHostException e) {
+        } finally {
+            if (selectMappedBufferResult != null) {
+                selectMappedBufferResult.release();
+            }
+        }
+
+        return this.callConsumer(RequestCode.CONSUME_MESSAGE_DIRECTLY, 
request, requestHeader.getConsumerGroup(),
+                requestHeader.getClientId());
+    }
+
+    private RemotingCommand cloneGroupOffset(ChannelHandlerContext ctx, 
RemotingCommand request) throws RemotingCommandException {
+        final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
+        CloneGroupOffsetRequestHeader requestHeader =
+                (CloneGroupOffsetRequestHeader) 
request.decodeCommandCustomHeader(CloneGroupOffsetRequestHeader.class);
+
+        Set<String> topics;
+        if (UtilAll.isBlank(requestHeader.getTopic())) {
+            topics = 
this.brokerController.getConsumerOffsetManager().whichTopicByConsumer(requestHeader.getSrcGroup());
+        } else {
+            topics = new HashSet<String>();
+            topics.add(requestHeader.getTopic());
+        }
+
+        for (String topic : topics) {
+            TopicConfig topicConfig = 
this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
+            if (null == topicConfig) {
+                log.warn("[cloneGroupOffset], topic config not exist, {}", 
topic);
+                continue;
+            }
+
+            /**
+
+             */
+            if (!requestHeader.isOffline()) {
+
+                SubscriptionData findSubscriptionData =
+                        
this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getSrcGroup(),
 topic);
+                if 
(this.brokerController.getConsumerManager().findSubscriptionDataCount(requestHeader.getSrcGroup())
 > 0
+                        && findSubscriptionData == null) {
+                    log.warn("[cloneGroupOffset], the consumer group[{}], 
topic[{}] not exist", requestHeader.getSrcGroup(), topic);
+                    continue;
+                }
+            }
+
+            
this.brokerController.getConsumerOffsetManager().cloneOffset(requestHeader.getSrcGroup(),
 requestHeader.getDestGroup(),
+                    requestHeader.getTopic());
+        }
+
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+        return response;
+    }
+
+    private RemotingCommand ViewBrokerStatsData(ChannelHandlerContext ctx, 
RemotingCommand request) throws RemotingCommandException {
+        final ViewBrokerStatsDataRequestHeader requestHeader =
+                (ViewBrokerStatsDataRequestHeader) 
request.decodeCommandCustomHeader(ViewBrokerStatsDataRequestHeader.class);
+        final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
+        DefaultMessageStore messageStore = (DefaultMessageStore) 
this.brokerController.getMessageStore();
+
+        StatsItem statsItem = 
messageStore.getBrokerStatsManager().getStatsItem(requestHeader.getStatsName(), 
requestHeader.getStatsKey());
+        if (null == statsItem) {
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark(String.format("The stats <%s> <%s> not exist", 
requestHeader.getStatsName(), requestHeader.getStatsKey()));
+            return response;
+        }
+
+        BrokerStatsData brokerStatsData = new BrokerStatsData();
+
+        {
+            BrokerStatsItem it = new BrokerStatsItem();
+            StatsSnapshot ss = statsItem.getStatsDataInMinute();
+            it.setSum(ss.getSum());
+            it.setTps(ss.getTps());
+            it.setAvgpt(ss.getAvgpt());
+            brokerStatsData.setStatsMinute(it);
+        }
+
+
+        {
+            BrokerStatsItem it = new BrokerStatsItem();
+            StatsSnapshot ss = statsItem.getStatsDataInHour();
+            it.setSum(ss.getSum());
+            it.setTps(ss.getTps());
+            it.setAvgpt(ss.getAvgpt());
+            brokerStatsData.setStatsHour(it);
+        }
+
+
+        {
+            BrokerStatsItem it = new BrokerStatsItem();
+            StatsSnapshot ss = statsItem.getStatsDataInDay();
+            it.setSum(ss.getSum());
+            it.setTps(ss.getTps());
+            it.setAvgpt(ss.getAvgpt());
+            brokerStatsData.setStatsDay(it);
+        }
+
+        response.setBody(brokerStatsData.encode());
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+        return response;
+    }
+
+    private RemotingCommand fetchAllConsumeStatsInBroker(ChannelHandlerContext 
ctx, RemotingCommand request)
+            throws RemotingCommandException {
+        final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
+        GetConsumeStatsInBrokerHeader requestHeader =
+                (GetConsumeStatsInBrokerHeader) 
request.decodeCommandCustomHeader(GetConsumeStatsInBrokerHeader.class);
+        boolean isOrder = requestHeader.isOrder();
+        ConcurrentHashMap<String, SubscriptionGroupConfig> subscriptionGroups =
+                
brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable();
+
+        List<Map<String/* subscriptionGroupName */, List<ConsumeStats>>> 
brokerConsumeStatsList =
+                new ArrayList<Map<String, List<ConsumeStats>>>();
+
+        long totalDiff = 0L;
+
+        for (String group : subscriptionGroups.keySet()) {
+            Map<String, List<ConsumeStats>> subscripTopicConsumeMap = new 
HashMap<String, List<ConsumeStats>>();
+            Set<String> topics = 
this.brokerController.getConsumerOffsetManager().whichTopicByConsumer(group);
+            List<ConsumeStats> consumeStatsList = new 
ArrayList<ConsumeStats>();
+            for (String topic : topics) {
+                ConsumeStats consumeStats = new ConsumeStats();
+                TopicConfig topicConfig = 
this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
+                if (null == topicConfig) {
+                    log.warn("consumeStats, topic config not exist, {}", 
topic);
+                    continue;
+                }
+
+                if (isOrder && !topicConfig.isOrder()) {
+                    continue;
+                }
+                /**
+
+                 */
+                {
+                    SubscriptionData findSubscriptionData = 
this.brokerController.getConsumerManager().findSubscriptionData(group, topic);
+
+                    if (null == findSubscriptionData //
+                            && 
this.brokerController.getConsumerManager().findSubscriptionDataCount(group) > 
0) {
+                        log.warn("consumeStats, the consumer group[{}], 
topic[{}] not exist", group, topic);
+                        continue;
+                    }
+                }
+
+                for (int i = 0; i < topicConfig.getWriteQueueNums(); i++) {
+                    MessageQueue mq = new MessageQueue();
+                    mq.setTopic(topic);
+                    
mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
+                    mq.setQueueId(i);
+                    OffsetWrapper offsetWrapper = new OffsetWrapper();
+                    long brokerOffset = 
this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, i);
+                    if (brokerOffset < 0)
+                        brokerOffset = 0;
+                    long consumerOffset = 
this.brokerController.getConsumerOffsetManager().queryOffset(//
+                            group, //
+                            topic, //
+                            i);
+                    if (consumerOffset < 0)
+                        consumerOffset = 0;
+
+                    offsetWrapper.setBrokerOffset(brokerOffset);
+                    offsetWrapper.setConsumerOffset(consumerOffset);
+
+
+                    long timeOffset = consumerOffset - 1;
+                    if (timeOffset >= 0) {
+                        long lastTimestamp = 
this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, 
timeOffset);
+                        if (lastTimestamp > 0) {
+                            offsetWrapper.setLastTimestamp(lastTimestamp);
+                        }
+                    }
+                    consumeStats.getOffsetTable().put(mq, offsetWrapper);
+                }
+                double consumeTps = 
this.brokerController.getBrokerStatsManager().tpsGroupGetNums(group, topic);
+                consumeTps += consumeStats.getConsumeTps();
+                consumeStats.setConsumeTps(consumeTps);
+                totalDiff += consumeStats.computeTotalDiff();
+                consumeStatsList.add(consumeStats);
+            }
+            subscripTopicConsumeMap.put(group, consumeStatsList);
+            brokerConsumeStatsList.add(subscripTopicConsumeMap);
+        }
+        ConsumeStatsList consumeStats = new ConsumeStatsList();
+        consumeStats.setBrokerAddr(brokerController.getBrokerAddr());
+        consumeStats.setConsumeStatsList(brokerConsumeStatsList);
+        consumeStats.setTotalDiff(totalDiff);
+        response.setBody(consumeStats.encode());
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+        return response;
+    }
+
+    private HashMap<String, String> prepareRuntimeInfo() {
+        HashMap<String, String> runtimeInfo = 
this.brokerController.getMessageStore().getRuntimeInfo();
+        runtimeInfo.put("brokerVersionDesc", 
MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION));
+        runtimeInfo.put("brokerVersion", 
String.valueOf(MQVersion.CURRENT_VERSION));
+
+        runtimeInfo.put("msgPutTotalYesterdayMorning",
+                
String.valueOf(this.brokerController.getBrokerStats().getMsgPutTotalYesterdayMorning()));
+        runtimeInfo.put("msgPutTotalTodayMorning", 
String.valueOf(this.brokerController.getBrokerStats().getMsgPutTotalTodayMorning()));
+        runtimeInfo.put("msgPutTotalTodayNow", 
String.valueOf(this.brokerController.getBrokerStats().getMsgPutTotalTodayNow()));
+
+        runtimeInfo.put("msgGetTotalYesterdayMorning",
+                
String.valueOf(this.brokerController.getBrokerStats().getMsgGetTotalYesterdayMorning()));
+        runtimeInfo.put("msgGetTotalTodayMorning", 
String.valueOf(this.brokerController.getBrokerStats().getMsgGetTotalTodayMorning()));
+        runtimeInfo.put("msgGetTotalTodayNow", 
String.valueOf(this.brokerController.getBrokerStats().getMsgGetTotalTodayNow()));
+
+        runtimeInfo.put("sendThreadPoolQueueSize", 
String.valueOf(this.brokerController.getSendThreadPoolQueue().size()));
+
+        runtimeInfo.put("sendThreadPoolQueueCapacity",
+                
String.valueOf(this.brokerController.getBrokerConfig().getSendThreadPoolQueueCapacity()));
+
+        runtimeInfo.put("pullThreadPoolQueueSize", 
String.valueOf(this.brokerController.getPullThreadPoolQueue().size()));
+        runtimeInfo.put("pullThreadPoolQueueCapacity",
+                
String.valueOf(this.brokerController.getBrokerConfig().getPullThreadPoolQueueCapacity()));
+
+        runtimeInfo.put("dispatchBehindBytes", 
String.valueOf(this.brokerController.getMessageStore().dispatchBehindBytes()));
+        runtimeInfo.put("pageCacheLockTimeMills", 
String.valueOf(this.brokerController.getMessageStore().lockTimeMills()));
+
+        runtimeInfo.put("sendThreadPoolQueueHeadWaitTimeMills", 
String.valueOf(this.brokerController.headSlowTimeMills4SendThreadPoolQueue()));
+        runtimeInfo.put("pullThreadPoolQueueHeadWaitTimeMills", 
String.valueOf(this.brokerController.headSlowTimeMills4PullThreadPoolQueue()));
+        runtimeInfo.put("earliestMessageTimeStamp", 
String.valueOf(this.brokerController.getMessageStore().getEarliestMessageTime()));
+        runtimeInfo.put("startAcceptSendRequestTimeStamp", 
String.valueOf(this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp()));
+        if (this.brokerController.getMessageStore() instanceof 
DefaultMessageStore) {
+            DefaultMessageStore defaultMessageStore = (DefaultMessageStore) 
this.brokerController.getMessageStore();
+            runtimeInfo.put("remainTransientStoreBufferNumbs", 
String.valueOf(defaultMessageStore.remainTransientStoreBufferNumbs()));
+            if 
(defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
+                runtimeInfo.put("remainHowManyDataToCommit", 
MixAll.humanReadableByteCount(defaultMessageStore.getCommitLog().remainHowManyDataToCommit(),
 false));
+            }
+            runtimeInfo.put("remainHowManyDataToFlush", 
MixAll.humanReadableByteCount(defaultMessageStore.getCommitLog().remainHowManyDataToFlush(),
 false));
+        }
+
+        java.io.File commitLogDir = new 
java.io.File(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
+        if (commitLogDir.exists()) {
+            runtimeInfo.put("commitLogDirCapacity", String.format("Total : %s, 
Free : %s.", MixAll.humanReadableByteCount(commitLogDir.getTotalSpace(), 
false), MixAll.humanReadableByteCount(commitLogDir.getFreeSpace(), false)));
+        }
+
+        return runtimeInfo;
+    }
+
+    private RemotingCommand callConsumer(//
+                                         final int requestCode, //
+                                         final RemotingCommand request, //
+                                         final String consumerGroup, //
+                                         final String clientId) throws 
RemotingCommandException {
+        final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
+        ClientChannelInfo clientChannelInfo = 
this.brokerController.getConsumerManager().findChannel(consumerGroup, clientId);
+
+        if (null == clientChannelInfo) {
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark(String.format("The Consumer <%s> <%s> not 
online", consumerGroup, clientId));
+            return response;
+        }
+
+        if (clientChannelInfo.getVersion() < 
MQVersion.Version.V3_1_8_SNAPSHOT.ordinal()) {
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark(String.format("The Consumer <%s> Version <%s> 
too low to finish, please upgrade it to V3_1_8_SNAPSHOT", //
+                    clientId, //
+                    MQVersion.getVersionDesc(clientChannelInfo.getVersion())));
+            return response;
+        }
+
+        try {
+            RemotingCommand newRequest = 
RemotingCommand.createRequestCommand(requestCode, null);
+            newRequest.setExtFields(request.getExtFields());
+            newRequest.setBody(request.getBody());
+
+            RemotingCommand consumerResponse =
+                    
this.brokerController.getBroker2Client().callClient(clientChannelInfo.getChannel(),
 newRequest);
+            return consumerResponse;
+        } catch (RemotingTimeoutException e) {
+            response.setCode(ResponseCode.CONSUME_MSG_TIMEOUT);
+            response
+                    .setRemark(String.format("consumer <%s> <%s> Timeout: %s", 
consumerGroup, clientId, RemotingHelper.exceptionSimpleDesc(e)));
+            return response;
+        } catch (Exception e) {
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark(
+                    String.format("invoke consumer <%s> <%s> Exception: %s", 
consumerGroup, clientId, RemotingHelper.exceptionSimpleDesc(e)));
+            return response;
+        }
+    }
+
+}

Reply via email to