http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java
----------------------------------------------------------------------
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java
new file mode 100644
index 0000000..bdceeb0
--- /dev/null
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.processor;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.client.ClientChannelInfo;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.constant.PermName;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import 
org.apache.rocketmq.common.protocol.header.UnregisterClientRequestHeader;
+import 
org.apache.rocketmq.common.protocol.header.UnregisterClientResponseHeader;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData;
+import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
+import org.apache.rocketmq.common.protocol.heartbeat.ProducerData;
+import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.common.sysflag.TopicSysFlag;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import io.netty.channel.ChannelHandlerContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class ClientManageProcessor implements NettyRequestProcessor {
+    private static final Logger log = 
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+    private final BrokerController brokerController;
+
+    public ClientManageProcessor(final BrokerController brokerController) {
+        this.brokerController = brokerController;
+    }
+
+    @Override
+    public RemotingCommand processRequest(ChannelHandlerContext ctx, 
RemotingCommand request)
+            throws RemotingCommandException {
+        switch (request.getCode()) {
+            case RequestCode.HEART_BEAT:
+                return this.heartBeat(ctx, request);
+            case RequestCode.UNREGISTER_CLIENT:
+                return this.unregisterClient(ctx, request);
+            default:
+                break;
+        }
+        return null;
+    }
+
+    @Override
+    public boolean rejectRequest() {
+        return false;
+    }
+
+    public RemotingCommand heartBeat(ChannelHandlerContext ctx, 
RemotingCommand request) {
+        RemotingCommand response = RemotingCommand.createResponseCommand(null);
+        HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), 
HeartbeatData.class);
+        ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
+                ctx.channel(),
+                heartbeatData.getClientID(),
+                request.getLanguage(),
+                request.getVersion()
+        );
+
+        for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
+            SubscriptionGroupConfig subscriptionGroupConfig =
+                    
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(
+                            data.getGroupName());
+            boolean isNotifyConsumerIdsChangedEnable = true;
+            if (null != subscriptionGroupConfig) {
+                isNotifyConsumerIdsChangedEnable = 
subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();
+                int topicSysFlag = 0;
+                if (data.isUnitMode()) {
+                    topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
+                }
+                String newTopic = MixAll.getRetryTopic(data.getGroupName());
+                
this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
+                        newTopic,
+                        subscriptionGroupConfig.getRetryQueueNums(),
+                        PermName.PERM_WRITE | PermName.PERM_READ, 
topicSysFlag);
+            }
+
+            boolean changed = 
this.brokerController.getConsumerManager().registerConsumer(
+                    data.getGroupName(),
+                    clientChannelInfo,
+                    data.getConsumeType(),
+                    data.getMessageModel(),
+                    data.getConsumeFromWhere(),
+                    data.getSubscriptionDataSet(),
+                    isNotifyConsumerIdsChangedEnable
+            );
+
+            if (changed) {
+                log.info("registerConsumer info changed {} {}",
+                        data.toString(),
+                        RemotingHelper.parseChannelRemoteAddr(ctx.channel())
+                );
+            }
+        }
+
+        for (ProducerData data : heartbeatData.getProducerDataSet()) {
+            
this.brokerController.getProducerManager().registerProducer(data.getGroupName(),
+                    clientChannelInfo);
+        }
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+        return response;
+    }
+
+    public RemotingCommand unregisterClient(ChannelHandlerContext ctx, 
RemotingCommand request)
+            throws RemotingCommandException {
+        final RemotingCommand response =
+                
RemotingCommand.createResponseCommand(UnregisterClientResponseHeader.class);
+        final UnregisterClientRequestHeader requestHeader =
+                (UnregisterClientRequestHeader) request
+                        
.decodeCommandCustomHeader(UnregisterClientRequestHeader.class);
+
+        ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
+                ctx.channel(),
+                requestHeader.getClientID(),
+                request.getLanguage(),
+                request.getVersion());
+        {
+            final String group = requestHeader.getProducerGroup();
+            if (group != null) {
+                
this.brokerController.getProducerManager().unregisterProducer(group, 
clientChannelInfo);
+            }
+        }
+
+        {
+            final String group = requestHeader.getConsumerGroup();
+            if (group != null) {
+                SubscriptionGroupConfig subscriptionGroupConfig =
+                        
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(group);
+                boolean isNotifyConsumerIdsChangedEnable = true;
+                if (null != subscriptionGroupConfig) {
+                    isNotifyConsumerIdsChangedEnable = 
subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();
+                }
+                
this.brokerController.getConsumerManager().unregisterConsumer(group, 
clientChannelInfo, isNotifyConsumerIdsChangedEnable);
+            }
+        }
+
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+        return response;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
----------------------------------------------------------------------
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
new file mode 100644
index 0000000..09a2607
--- /dev/null
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.processor;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import io.netty.channel.ChannelHandlerContext;
+import org.apache.rocketmq.common.protocol.header.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class ConsumerManageProcessor implements NettyRequestProcessor {
+    private static final Logger log = 
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+
+    private final BrokerController brokerController;
+
+
+    public ConsumerManageProcessor(final BrokerController brokerController) {
+        this.brokerController = brokerController;
+    }
+
+    @Override
+    public RemotingCommand processRequest(ChannelHandlerContext ctx, 
RemotingCommand request)
+            throws RemotingCommandException {
+        switch (request.getCode()) {
+            case RequestCode.GET_CONSUMER_LIST_BY_GROUP:
+                return this.getConsumerListByGroup(ctx, request);
+            case RequestCode.UPDATE_CONSUMER_OFFSET:
+                return this.updateConsumerOffset(ctx, request);
+            case RequestCode.QUERY_CONSUMER_OFFSET:
+                return this.queryConsumerOffset(ctx, request);
+            default:
+                break;
+        }
+        return null;
+    }
+
+    @Override
+    public boolean rejectRequest() {
+        return false;
+    }
+
+
+    public RemotingCommand getConsumerListByGroup(ChannelHandlerContext ctx, 
RemotingCommand request)
+            throws RemotingCommandException {
+        final RemotingCommand response =
+                
RemotingCommand.createResponseCommand(GetConsumerListByGroupResponseHeader.class);
+        final GetConsumerListByGroupRequestHeader requestHeader =
+                (GetConsumerListByGroupRequestHeader) request
+                        
.decodeCommandCustomHeader(GetConsumerListByGroupRequestHeader.class);
+
+        ConsumerGroupInfo consumerGroupInfo =
+                
this.brokerController.getConsumerManager().getConsumerGroupInfo(
+                        requestHeader.getConsumerGroup());
+        if (consumerGroupInfo != null) {
+            List<String> clientIds = consumerGroupInfo.getAllClientId();
+            if (!clientIds.isEmpty()) {
+                GetConsumerListByGroupResponseBody body = new 
GetConsumerListByGroupResponseBody();
+                body.setConsumerIdList(clientIds);
+                response.setBody(body.encode());
+                response.setCode(ResponseCode.SUCCESS);
+                response.setRemark(null);
+                return response;
+            } else {
+                log.warn("getAllClientId failed, {} {}", 
requestHeader.getConsumerGroup(),
+                        RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+            }
+        } else {
+            log.warn("getConsumerGroupInfo failed, {} {}", 
requestHeader.getConsumerGroup(),
+                    RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+        }
+
+        response.setCode(ResponseCode.SYSTEM_ERROR);
+        response.setRemark("no consumer for this group, " + 
requestHeader.getConsumerGroup());
+        return response;
+    }
+
+    private RemotingCommand updateConsumerOffset(ChannelHandlerContext ctx, 
RemotingCommand request)
+            throws RemotingCommandException {
+        final RemotingCommand response =
+                
RemotingCommand.createResponseCommand(UpdateConsumerOffsetResponseHeader.class);
+        final UpdateConsumerOffsetRequestHeader requestHeader =
+                (UpdateConsumerOffsetRequestHeader) request
+                        
.decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class);
+        
this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
 requestHeader.getConsumerGroup(),
+                requestHeader.getTopic(), requestHeader.getQueueId(), 
requestHeader.getCommitOffset());
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+        return response;
+    }
+
+
+    private RemotingCommand queryConsumerOffset(ChannelHandlerContext ctx, 
RemotingCommand request)
+            throws RemotingCommandException {
+        final RemotingCommand response =
+                
RemotingCommand.createResponseCommand(QueryConsumerOffsetResponseHeader.class);
+        final QueryConsumerOffsetResponseHeader responseHeader =
+                (QueryConsumerOffsetResponseHeader) 
response.readCustomHeader();
+        final QueryConsumerOffsetRequestHeader requestHeader =
+                (QueryConsumerOffsetRequestHeader) request
+                        
.decodeCommandCustomHeader(QueryConsumerOffsetRequestHeader.class);
+
+        long offset =
+                this.brokerController.getConsumerOffsetManager().queryOffset(
+                        requestHeader.getConsumerGroup(), 
requestHeader.getTopic(), requestHeader.getQueueId());
+
+
+        if (offset >= 0) {
+            responseHeader.setOffset(offset);
+            response.setCode(ResponseCode.SUCCESS);
+            response.setRemark(null);
+        } else {
+            long minOffset =
+                    
this.brokerController.getMessageStore().getMinOffsetInQuque(requestHeader.getTopic(),
+                            requestHeader.getQueueId());
+            if (minOffset <= 0
+                    && 
!this.brokerController.getMessageStore().checkInDiskByConsumeOffset(
+                    requestHeader.getTopic(), requestHeader.getQueueId(), 0)) {
+                responseHeader.setOffset(0L);
+                response.setCode(ResponseCode.SUCCESS);
+                response.setRemark(null);
+            } else {
+                response.setCode(ResponseCode.QUERY_NOT_FOUND);
+                response.setRemark("Not found, V3_0_6_SNAPSHOT maybe this 
group consumer boot first");
+            }
+        }
+
+        return response;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
----------------------------------------------------------------------
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
new file mode 100644
index 0000000..fc38238
--- /dev/null
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.processor;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.TopicFilterType;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader;
+import org.apache.rocketmq.common.sysflag.MessageSysFlag;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.store.MessageExtBrokerInner;
+import org.apache.rocketmq.store.MessageStore;
+import org.apache.rocketmq.store.PutMessageResult;
+import io.netty.channel.ChannelHandlerContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class EndTransactionProcessor implements NettyRequestProcessor {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME);
+    private final BrokerController brokerController;
+
+    public EndTransactionProcessor(final BrokerController brokerController) {
+        this.brokerController = brokerController;
+    }
+
+    @Override
+    public RemotingCommand processRequest(ChannelHandlerContext ctx, 
RemotingCommand request) throws RemotingCommandException {
+        final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
+        final EndTransactionRequestHeader requestHeader =
+                (EndTransactionRequestHeader) 
request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
+
+
+        if (requestHeader.getFromTransactionCheck()) {
+            switch (requestHeader.getCommitOrRollback()) {
+                case MessageSysFlag.TRANSACTION_NOT_TYPE: {
+                    LOGGER.warn("check producer[{}] transaction state, but 
it's pending status."
+                                    + "RequestHeader: {} Remark: {}",
+                            
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
+                            requestHeader.toString(),
+                            request.getRemark());
+                    return null;
+                }
+
+                case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {
+                    LOGGER.warn("check producer[{}] transaction state, the 
producer commit the message."
+                                    + "RequestHeader: {} Remark: {}",
+                            
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
+                            requestHeader.toString(),
+                            request.getRemark());
+
+                    break;
+                }
+
+                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {
+                    LOGGER.warn("check producer[{}] transaction state, the 
producer rollback the message."
+                                    + "RequestHeader: {} Remark: {}",
+                            
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
+                            requestHeader.toString(),
+                            request.getRemark());
+                    break;
+                }
+                default:
+                    return null;
+            }
+        } else {
+            switch (requestHeader.getCommitOrRollback()) {
+                case MessageSysFlag.TRANSACTION_NOT_TYPE: {
+                    LOGGER.warn("the producer[{}] end transaction in sending 
message,  and it's pending status."
+                                    + "RequestHeader: {} Remark: {}",
+                            
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
+                            requestHeader.toString(),
+                            request.getRemark());
+                    return null;
+                }
+
+                case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {
+                    break;
+                }
+
+                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {
+                    LOGGER.warn("the producer[{}] end transaction in sending 
message, rollback the message."
+                                    + "RequestHeader: {} Remark: {}",
+                            
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
+                            requestHeader.toString(),
+                            request.getRemark());
+                    break;
+                }
+                default:
+                    return null;
+            }
+        }
+
+        final MessageExt msgExt = 
this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getCommitLogOffset());
+        if (msgExt != null) {
+            final String pgroupRead = 
msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
+            if (!pgroupRead.equals(requestHeader.getProducerGroup())) {
+                response.setCode(ResponseCode.SYSTEM_ERROR);
+                response.setRemark("the producer group wrong");
+                return response;
+            }
+
+            if (msgExt.getQueueOffset() != 
requestHeader.getTranStateTableOffset()) {
+                response.setCode(ResponseCode.SYSTEM_ERROR);
+                response.setRemark("the transaction state table offset wrong");
+                return response;
+            }
+
+            if (msgExt.getCommitLogOffset() != 
requestHeader.getCommitLogOffset()) {
+                response.setCode(ResponseCode.SYSTEM_ERROR);
+                response.setRemark("the commit log offset wrong");
+                return response;
+            }
+
+            MessageExtBrokerInner msgInner = 
this.endMessageTransaction(msgExt);
+            
msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), 
requestHeader.getCommitOrRollback()));
+
+            msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
+            
msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
+            msgInner.setStoreTimestamp(msgExt.getStoreTimestamp());
+            if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == 
requestHeader.getCommitOrRollback()) {
+                msgInner.setBody(null);
+            }
+
+            final MessageStore messageStore = 
this.brokerController.getMessageStore();
+            final PutMessageResult putMessageResult = 
messageStore.putMessage(msgInner);
+            if (putMessageResult != null) {
+                switch (putMessageResult.getPutMessageStatus()) {
+                    // Success
+                    case PUT_OK:
+                    case FLUSH_DISK_TIMEOUT:
+                    case FLUSH_SLAVE_TIMEOUT:
+                    case SLAVE_NOT_AVAILABLE:
+                        response.setCode(ResponseCode.SUCCESS);
+                        response.setRemark(null);
+                        break;
+                    // Failed
+                    case CREATE_MAPEDFILE_FAILED:
+                        response.setCode(ResponseCode.SYSTEM_ERROR);
+                        response.setRemark("create maped file failed.");
+                        break;
+                    case MESSAGE_ILLEGAL:
+                    case PROPERTIES_SIZE_EXCEEDED:
+                        response.setCode(ResponseCode.MESSAGE_ILLEGAL);
+                        response.setRemark("the message is illegal, maybe msg 
body or properties length not matched. msg body length limit 128k, msg 
properties length limit 32k.");
+                        break;
+                    case SERVICE_NOT_AVAILABLE:
+                        response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE);
+                        response.setRemark("service not available now.");
+                        break;
+                    case OS_PAGECACHE_BUSY:
+                        response.setCode(ResponseCode.SYSTEM_ERROR);
+                        response.setRemark("OS page cache busy, please try 
another machine");
+                        break;
+                    case UNKNOWN_ERROR:
+                        response.setCode(ResponseCode.SYSTEM_ERROR);
+                        response.setRemark("UNKNOWN_ERROR");
+                        break;
+                    default:
+                        response.setCode(ResponseCode.SYSTEM_ERROR);
+                        response.setRemark("UNKNOWN_ERROR DEFAULT");
+                        break;
+                }
+
+                return response;
+            } else {
+                response.setCode(ResponseCode.SYSTEM_ERROR);
+                response.setRemark("store putMessage return null");
+            }
+        } else {
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark("find prepared transaction message failed");
+            return response;
+        }
+
+        return response;
+    }
+
+    @Override
+    public boolean rejectRequest() {
+        return false;
+    }
+
+    private MessageExtBrokerInner endMessageTransaction(MessageExt msgExt) {
+        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
+        msgInner.setBody(msgExt.getBody());
+        msgInner.setFlag(msgExt.getFlag());
+        MessageAccessor.setProperties(msgInner, msgExt.getProperties());
+
+        TopicFilterType topicFilterType =
+                (msgInner.getSysFlag() & MessageSysFlag.MULTI_TAGS_FLAG) == 
MessageSysFlag.MULTI_TAGS_FLAG ? TopicFilterType.MULTI_TAG
+                        : TopicFilterType.SINGLE_TAG;
+        long tagsCodeValue = 
MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
+        msgInner.setTagsCode(tagsCodeValue);
+        
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
+
+        msgInner.setSysFlag(msgExt.getSysFlag());
+        msgInner.setBornTimestamp(msgExt.getBornTimestamp());
+        msgInner.setBornHost(msgExt.getBornHost());
+        msgInner.setStoreHost(msgExt.getStoreHost());
+        msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());
+
+        msgInner.setWaitStoreMsgOK(false);
+        MessageAccessor.clearProperty(msgInner, 
MessageConst.PROPERTY_DELAY_TIME_LEVEL);
+
+        msgInner.setTopic(msgExt.getTopic());
+        msgInner.setQueueId(msgExt.getQueueId());
+
+        return msgInner;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/processor/ForwardRequestProcessor.java
----------------------------------------------------------------------
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ForwardRequestProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ForwardRequestProcessor.java
new file mode 100644
index 0000000..acf25ea
--- /dev/null
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ForwardRequestProcessor.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.broker.processor;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import io.netty.channel.ChannelHandlerContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class ForwardRequestProcessor implements NettyRequestProcessor {
+    private static final Logger log = 
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+
+    private final BrokerController brokerController;
+
+
+    public ForwardRequestProcessor(final BrokerController brokerController) {
+        this.brokerController = brokerController;
+    }
+
+
+    @Override
+    public RemotingCommand processRequest(ChannelHandlerContext ctx, 
RemotingCommand request) {
+        return null;
+    }
+
+    @Override
+    public boolean rejectRequest() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
----------------------------------------------------------------------
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
new file mode 100644
index 0000000..3094079
--- /dev/null
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@ -0,0 +1,542 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.processor;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
+import org.apache.rocketmq.broker.longpolling.PullRequest;
+import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;
+import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
+import org.apache.rocketmq.broker.pagecache.ManyMessageTransfer;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.TopicFilterType;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.constant.PermName;
+import org.apache.rocketmq.common.filter.FilterAPI;
+import org.apache.rocketmq.common.help.FAQUrl;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.common.protocol.topic.OffsetMovedEvent;
+import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.common.sysflag.PullSysFlag;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.store.GetMessageResult;
+import org.apache.rocketmq.store.MessageExtBrokerInner;
+import org.apache.rocketmq.store.PutMessageResult;
+import org.apache.rocketmq.store.config.BrokerRole;
+import org.apache.rocketmq.store.stats.BrokerStatsManager;
+import io.netty.channel.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class PullMessageProcessor implements NettyRequestProcessor {
+    private static final Logger LOG = 
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+    private final BrokerController brokerController;
+    private List<ConsumeMessageHook> consumeMessageHookList;
+
+    public PullMessageProcessor(final BrokerController brokerController) {
+        this.brokerController = brokerController;
+    }
+
+    @Override
+    public RemotingCommand processRequest(final ChannelHandlerContext ctx, 
RemotingCommand request) throws RemotingCommandException {
+        return this.processRequest(ctx.channel(), request, true);
+    }
+
+    @Override
+    public boolean rejectRequest() {
+        return false;
+    }
+
+    private RemotingCommand processRequest(final Channel channel, 
RemotingCommand request, boolean brokerAllowSuspend)
+            throws RemotingCommandException {
+        RemotingCommand response = 
RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
+        final PullMessageResponseHeader responseHeader = 
(PullMessageResponseHeader) response.readCustomHeader();
+        final PullMessageRequestHeader requestHeader =
+                (PullMessageRequestHeader) 
request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
+
+
+        response.setOpaque(request.getOpaque());
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("receive PullMessage request command, " + request);
+        }
+
+
+        if 
(!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission()))
 {
+            response.setCode(ResponseCode.NO_PERMISSION);
+            response.setRemark("the broker[" + 
this.brokerController.getBrokerConfig().getBrokerIP1() + "] pulling message is 
forbidden");
+            return response;
+        }
+
+
+        SubscriptionGroupConfig subscriptionGroupConfig =
+                
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());
+        if (null == subscriptionGroupConfig) {
+            response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
+            response.setRemark("subscription group not exist, " + 
requestHeader.getConsumerGroup() + " "
+                    + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
+            return response;
+        }
+
+
+        if (!subscriptionGroupConfig.isConsumeEnable()) {
+            response.setCode(ResponseCode.NO_PERMISSION);
+            response.setRemark("subscription group no permission, " + 
requestHeader.getConsumerGroup());
+            return response;
+        }
+
+        final boolean hasSuspendFlag = 
PullSysFlag.hasSuspendFlag(requestHeader.getSysFlag());
+        final boolean hasCommitOffsetFlag = 
PullSysFlag.hasCommitOffsetFlag(requestHeader.getSysFlag());
+        final boolean hasSubscriptionFlag = 
PullSysFlag.hasSubscriptionFlag(requestHeader.getSysFlag());
+
+        final long suspendTimeoutMillisLong = hasSuspendFlag ? 
requestHeader.getSuspendTimeoutMillis() : 0;
+
+
+        TopicConfig topicConfig = 
this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
+        if (null == topicConfig) {
+            LOG.error("the topic " + requestHeader.getTopic() + " not exist, 
consumer: " + RemotingHelper.parseChannelRemoteAddr(channel));
+            response.setCode(ResponseCode.TOPIC_NOT_EXIST);
+            response.setRemark(
+                    "topic[" + requestHeader.getTopic() + "] not exist, apply 
first please!" + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
+            return response;
+        }
+
+
+        if (!PermName.isReadable(topicConfig.getPerm())) {
+            response.setCode(ResponseCode.NO_PERMISSION);
+            response.setRemark("the topic[" + requestHeader.getTopic() + "] 
pulling message is forbidden");
+            return response;
+        }
+
+
+        if (requestHeader.getQueueId() < 0 || requestHeader.getQueueId() >= 
topicConfig.getReadQueueNums()) {
+            String errorInfo = "queueId[" + requestHeader.getQueueId() + "] is 
illagal,Topic :" + requestHeader.getTopic()
+                    + " topicConfig.readQueueNums: " + 
topicConfig.getReadQueueNums() + " consumer: " + channel.remoteAddress();
+            LOG.warn(errorInfo);
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark(errorInfo);
+            return response;
+        }
+
+
+        SubscriptionData subscriptionData = null;
+        if (hasSubscriptionFlag) {
+            try {
+                subscriptionData = 
FilterAPI.buildSubscriptionData(requestHeader.getConsumerGroup(), 
requestHeader.getTopic(),
+                        requestHeader.getSubscription());
+            } catch (Exception e) {
+                LOG.warn("parse the consumer's subscription[{}] failed, group: 
{}", requestHeader.getSubscription(), //
+                        requestHeader.getConsumerGroup());
+                response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
+                response.setRemark("parse the consumer's subscription failed");
+                return response;
+            }
+        } else {
+            ConsumerGroupInfo consumerGroupInfo =
+                    
this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());
+            if (null == consumerGroupInfo) {
+                LOG.warn("the consumer's group info not exist, group: {}", 
requestHeader.getConsumerGroup());
+                response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
+                response.setRemark("the consumer's group info not exist" + 
FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
+                return response;
+            }
+
+            if (!subscriptionGroupConfig.isConsumeBroadcastEnable() //
+                    && consumerGroupInfo.getMessageModel() == 
MessageModel.BROADCASTING) {
+                response.setCode(ResponseCode.NO_PERMISSION);
+                response.setRemark("the consumer group[" + 
requestHeader.getConsumerGroup() + "] can not consume by broadcast way");
+                return response;
+            }
+
+            subscriptionData = 
consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());
+            if (null == subscriptionData) {
+                LOG.warn("the consumer's subscription not exist, group: {}, 
topic:{}", requestHeader.getConsumerGroup(), requestHeader.getTopic());
+                response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
+                response.setRemark("the consumer's subscription not exist" + 
FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
+                return response;
+            }
+
+
+            if (subscriptionData.getSubVersion() < 
requestHeader.getSubVersion()) {
+                LOG.warn("the broker's subscription is not latest, group: {} 
{}", requestHeader.getConsumerGroup(),
+                        subscriptionData.getSubString());
+                response.setCode(ResponseCode.SUBSCRIPTION_NOT_LATEST);
+                response.setRemark("the consumer's subscription not latest");
+                return response;
+            }
+        }
+
+        final GetMessageResult getMessageResult =
+                
this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(),
 requestHeader.getTopic(),
+                        requestHeader.getQueueId(), 
requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), 
subscriptionData);
+        if (getMessageResult != null) {
+            response.setRemark(getMessageResult.getStatus().name());
+            
responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());
+            responseHeader.setMinOffset(getMessageResult.getMinOffset());
+            responseHeader.setMaxOffset(getMessageResult.getMaxOffset());
+
+
+            if (getMessageResult.isSuggestPullingFromSlave()) {
+                
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
+            } else {
+                responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
+            }
+
+            switch 
(this.brokerController.getMessageStoreConfig().getBrokerRole()) {
+                case ASYNC_MASTER:
+                case SYNC_MASTER:
+                    break;
+                case SLAVE:
+                    if 
(!this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
+                        response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
+                        
responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
+                    }
+                    break;
+            }
+
+            if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
+                // consume too slow ,redirect to another machine
+                if (getMessageResult.isSuggestPullingFromSlave()) {
+                    
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
+                }
+                // consume ok
+                else {
+                    
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
+                }
+            } else {
+                responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
+            }
+
+            switch (getMessageResult.getStatus()) {
+                case FOUND:
+                    response.setCode(ResponseCode.SUCCESS);
+                    break;
+                case MESSAGE_WAS_REMOVING:
+                    response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
+                    break;
+                case NO_MATCHED_LOGIC_QUEUE:
+                case NO_MESSAGE_IN_QUEUE:
+                    if (0 != requestHeader.getQueueOffset()) {
+                        response.setCode(ResponseCode.PULL_OFFSET_MOVED);
+
+                        // XXX: warn and notify me
+                        LOG.info("the broker store no queue data, fix the 
request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}", //
+                                requestHeader.getQueueOffset(), //
+                                getMessageResult.getNextBeginOffset(), //
+                                requestHeader.getTopic(), //
+                                requestHeader.getQueueId(), //
+                                requestHeader.getConsumerGroup()//
+                        );
+                    } else {
+                        response.setCode(ResponseCode.PULL_NOT_FOUND);
+                    }
+                    break;
+                case NO_MATCHED_MESSAGE:
+                    response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
+                    break;
+                case OFFSET_FOUND_NULL:
+                    response.setCode(ResponseCode.PULL_NOT_FOUND);
+                    break;
+                case OFFSET_OVERFLOW_BADLY:
+                    response.setCode(ResponseCode.PULL_OFFSET_MOVED);
+                    // XXX: warn and notify me
+                    LOG.info("the request offset: " + 
requestHeader.getQueueOffset() + " over flow badly, broker max offset: "
+                            + getMessageResult.getMaxOffset() + ", consumer: " 
+ channel.remoteAddress());
+                    break;
+                case OFFSET_OVERFLOW_ONE:
+                    response.setCode(ResponseCode.PULL_NOT_FOUND);
+                    break;
+                case OFFSET_TOO_SMALL:
+                    response.setCode(ResponseCode.PULL_OFFSET_MOVED);
+                    LOG.info("the request offset too small. group={}, 
topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}",
+                            requestHeader.getConsumerGroup(), 
requestHeader.getTopic(), requestHeader.getQueueOffset(),
+                            getMessageResult.getMinOffset(), 
channel.remoteAddress());
+                    break;
+                default:
+                    assert false;
+                    break;
+            }
+
+            if (this.hasConsumeMessageHook()) {
+                ConsumeMessageContext context = new ConsumeMessageContext();
+                context.setConsumerGroup(requestHeader.getConsumerGroup());
+                context.setTopic(requestHeader.getTopic());
+                context.setQueueId(requestHeader.getQueueId());
+
+                String owner = 
request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
+
+                switch (response.getCode()) {
+                    case ResponseCode.SUCCESS:
+                        int commercialBaseCount = 
brokerController.getBrokerConfig().getCommercialBaseCount();
+                        int incValue = 
getMessageResult.getMsgCount4Commercial() * commercialBaseCount;
+
+                        
context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_SUCCESS);
+                        context.setCommercialRcvTimes(incValue);
+                        
context.setCommercialRcvSize(getMessageResult.getBufferTotalSize());
+                        context.setCommercialOwner(owner);
+
+                        break;
+                    case ResponseCode.PULL_NOT_FOUND:
+                        if (!brokerAllowSuspend) {
+
+
+                            
context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_EPOLLS);
+                            context.setCommercialRcvTimes(1);
+                            context.setCommercialOwner(owner);
+
+                        }
+                        break;
+                    case ResponseCode.PULL_RETRY_IMMEDIATELY:
+                    case ResponseCode.PULL_OFFSET_MOVED:
+                        
context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_EPOLLS);
+                        context.setCommercialRcvTimes(1);
+                        context.setCommercialOwner(owner);
+                        break;
+                    default:
+                        assert false;
+                        break;
+                }
+
+                this.executeConsumeMessageHookBefore(context);
+            }
+
+            switch (response.getCode()) {
+                case ResponseCode.SUCCESS:
+
+                    
this.brokerController.getBrokerStatsManager().incGroupGetNums(requestHeader.getConsumerGroup(),
 requestHeader.getTopic(),
+                            getMessageResult.getMessageCount());
+
+                    
this.brokerController.getBrokerStatsManager().incGroupGetSize(requestHeader.getConsumerGroup(),
 requestHeader.getTopic(),
+                            getMessageResult.getBufferTotalSize());
+
+                    
this.brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageResult.getMessageCount());
+                    if 
(this.brokerController.getBrokerConfig().isTransferMsgByHeap()) {
+                        final long beginTimeMills = 
this.brokerController.getMessageStore().now();
+                        final byte[] r = 
this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(), 
requestHeader.getTopic(), requestHeader.getQueueId());
+                        
this.brokerController.getBrokerStatsManager().incGroupGetLatency(requestHeader.getConsumerGroup(),
+                                requestHeader.getTopic(), 
requestHeader.getQueueId(),
+                                (int) 
(this.brokerController.getMessageStore().now() - beginTimeMills));
+                        response.setBody(r);
+                    } else {
+                        try {
+                            FileRegion fileRegion =
+                                    new 
ManyMessageTransfer(response.encodeHeader(getMessageResult.getBufferTotalSize()),
 getMessageResult);
+                            channel.writeAndFlush(fileRegion).addListener(new 
ChannelFutureListener() {
+                                @Override
+                                public void operationComplete(ChannelFuture 
future) throws Exception {
+                                    getMessageResult.release();
+                                    if (!future.isSuccess()) {
+                                        LOG.error("transfer many message by 
pagecache failed, " + channel.remoteAddress(), future.cause());
+                                    }
+                                }
+                            });
+                        } catch (Throwable e) {
+                            LOG.error("transfer many message by pagecache 
exception", e);
+                            getMessageResult.release();
+                        }
+
+                        response = null;
+                    }
+                    break;
+                case ResponseCode.PULL_NOT_FOUND:
+
+                    if (brokerAllowSuspend && hasSuspendFlag) {
+                        long pollingTimeMills = suspendTimeoutMillisLong;
+                        if 
(!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
+                            pollingTimeMills = 
this.brokerController.getBrokerConfig().getShortPollingTimeMills();
+                        }
+
+                        String topic = requestHeader.getTopic();
+                        long offset = requestHeader.getQueueOffset();
+                        int queueId = requestHeader.getQueueId();
+                        PullRequest pullRequest = new PullRequest(request, 
channel, pollingTimeMills,
+                                this.brokerController.getMessageStore().now(), 
offset, subscriptionData);
+                        
this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, 
queueId, pullRequest);
+                        response = null;
+                        break;
+                    }
+
+
+                case ResponseCode.PULL_RETRY_IMMEDIATELY:
+                    break;
+                case ResponseCode.PULL_OFFSET_MOVED:
+                    if 
(this.brokerController.getMessageStoreConfig().getBrokerRole() != 
BrokerRole.SLAVE
+                            || 
this.brokerController.getMessageStoreConfig().isOffsetCheckInSlave()) {
+                        MessageQueue mq = new MessageQueue();
+                        mq.setTopic(requestHeader.getTopic());
+                        mq.setQueueId(requestHeader.getQueueId());
+                        
mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
+
+                        OffsetMovedEvent event = new OffsetMovedEvent();
+                        
event.setConsumerGroup(requestHeader.getConsumerGroup());
+                        event.setMessageQueue(mq);
+                        event.setOffsetRequest(requestHeader.getQueueOffset());
+                        
event.setOffsetNew(getMessageResult.getNextBeginOffset());
+                        this.generateOffsetMovedEvent(event);
+                        LOG.warn(
+                                "PULL_OFFSET_MOVED:correction offset. 
topic={}, groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}",
+                                requestHeader.getTopic(), 
requestHeader.getConsumerGroup(), event.getOffsetRequest(), 
event.getOffsetNew(),
+                                responseHeader.getSuggestWhichBrokerId());
+                    } else {
+                        
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
+                        response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
+                        LOG.warn("PULL_OFFSET_MOVED:none correction. topic={}, 
groupId={}, requestOffset={}, suggestBrokerId={}",
+                                requestHeader.getTopic(), 
requestHeader.getConsumerGroup(), requestHeader.getQueueOffset(),
+                                responseHeader.getSuggestWhichBrokerId());
+                    }
+
+                    break;
+                default:
+                    assert false;
+            }
+        } else {
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark("store getMessage return null");
+        }
+
+
+        boolean storeOffsetEnable = brokerAllowSuspend;
+        storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;
+        storeOffsetEnable = storeOffsetEnable
+                && 
this.brokerController.getMessageStoreConfig().getBrokerRole() != 
BrokerRole.SLAVE;
+        if (storeOffsetEnable) {
+            
this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),
+                    requestHeader.getConsumerGroup(), 
requestHeader.getTopic(), requestHeader.getQueueId(), 
requestHeader.getCommitOffset());
+        }
+        return response;
+    }
+
+
+    public boolean hasConsumeMessageHook() {
+        return consumeMessageHookList != null && 
!this.consumeMessageHookList.isEmpty();
+    }
+
+    public void executeConsumeMessageHookBefore(final ConsumeMessageContext 
context) {
+        if (hasConsumeMessageHook()) {
+            for (ConsumeMessageHook hook : this.consumeMessageHookList) {
+                try {
+                    hook.consumeMessageBefore(context);
+                } catch (Throwable e) {
+                }
+            }
+        }
+    }
+
+    private byte[] readGetMessageResult(final GetMessageResult 
getMessageResult, final String group, final String topic, final int queueId) {
+        final ByteBuffer byteBuffer = 
ByteBuffer.allocate(getMessageResult.getBufferTotalSize());
+
+        long storeTimestamp = 0;
+        try {
+            List<ByteBuffer> messageBufferList = 
getMessageResult.getMessageBufferList();
+            for (ByteBuffer bb : messageBufferList) {
+
+                byteBuffer.put(bb);
+                storeTimestamp = 
bb.getLong(MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSTION);
+            }
+        } finally {
+            getMessageResult.release();
+        }
+
+        
this.brokerController.getBrokerStatsManager().recordDiskFallBehindTime(group, 
topic, queueId, this.brokerController.getMessageStore().now() - storeTimestamp);
+        return byteBuffer.array();
+    }
+
+    private void generateOffsetMovedEvent(final OffsetMovedEvent event) {
+        try {
+            MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
+            msgInner.setTopic(MixAll.OFFSET_MOVED_EVENT);
+            msgInner.setTags(event.getConsumerGroup());
+            msgInner.setDelayTimeLevel(0);
+            msgInner.setKeys(event.getConsumerGroup());
+            msgInner.setBody(event.encode());
+            msgInner.setFlag(0);
+            
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
+            
msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(TopicFilterType.SINGLE_TAG,
 msgInner.getTags()));
+
+            msgInner.setQueueId(0);
+            msgInner.setSysFlag(0);
+            msgInner.setBornTimestamp(System.currentTimeMillis());
+            
msgInner.setBornHost(RemotingUtil.string2SocketAddress(this.brokerController.getBrokerAddr()));
+            msgInner.setStoreHost(msgInner.getBornHost());
+
+            msgInner.setReconsumeTimes(0);
+
+            PutMessageResult putMessageResult = 
this.brokerController.getMessageStore().putMessage(msgInner);
+        } catch (Exception e) {
+            LOG.warn(String.format("generateOffsetMovedEvent Exception, %s", 
event.toString()), e);
+        }
+    }
+
+    public void excuteRequestWhenWakeup(final Channel channel, final 
RemotingCommand request) throws RemotingCommandException {
+        Runnable run = new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    final RemotingCommand response = 
PullMessageProcessor.this.processRequest(channel, request, false);
+
+                    if (response != null) {
+                        response.setOpaque(request.getOpaque());
+                        response.markResponseType();
+                        try {
+                            channel.writeAndFlush(response).addListener(new 
ChannelFutureListener() {
+                                @Override
+                                public void operationComplete(ChannelFuture 
future) throws Exception {
+                                    if (!future.isSuccess()) {
+                                        LOG.error("processRequestWrapper 
response to " + future.channel().remoteAddress() + " failed",
+                                                future.cause());
+                                        LOG.error(request.toString());
+                                        LOG.error(response.toString());
+                                    }
+                                }
+                            });
+                        } catch (Throwable e) {
+                            LOG.error("processRequestWrapper process request 
over, but response failed", e);
+                            LOG.error(request.toString());
+                            LOG.error(response.toString());
+                        }
+                    }
+                } catch (RemotingCommandException e1) {
+                    LOG.error("excuteRequestWhenWakeup run", e1);
+                }
+            }
+        };
+
+        this.brokerController.getPullMessageExecutor().submit(run);
+    }
+
+    public void registerConsumeMessageHook(List<ConsumeMessageHook> 
sendMessageHookList) {
+        this.consumeMessageHookList = sendMessageHookList;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java
----------------------------------------------------------------------
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java
new file mode 100644
index 0000000..5390e28
--- /dev/null
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.broker.processor;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.pagecache.OneMessageTransfer;
+import org.apache.rocketmq.broker.pagecache.QueryMessageTransfer;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.header.QueryMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.header.QueryMessageResponseHeader;
+import org.apache.rocketmq.common.protocol.header.ViewMessageRequestHeader;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.store.QueryMessageResult;
+import org.apache.rocketmq.store.SelectMappedBufferResult;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.FileRegion;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class QueryMessageProcessor implements NettyRequestProcessor {
+    private static final Logger log = 
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+
+    private final BrokerController brokerController;
+
+
+    public QueryMessageProcessor(final BrokerController brokerController) {
+        this.brokerController = brokerController;
+    }
+
+
+    @Override
+    public RemotingCommand processRequest(ChannelHandlerContext ctx, 
RemotingCommand request)
+            throws RemotingCommandException {
+        switch (request.getCode()) {
+            case RequestCode.QUERY_MESSAGE:
+                return this.queryMessage(ctx, request);
+            case RequestCode.VIEW_MESSAGE_BY_ID:
+                return this.viewMessageById(ctx, request);
+            default:
+                break;
+        }
+
+        return null;
+    }
+
+    @Override
+    public boolean rejectRequest() {
+        return false;
+    }
+
+
+    public RemotingCommand queryMessage(ChannelHandlerContext ctx, 
RemotingCommand request)
+            throws RemotingCommandException {
+        final RemotingCommand response =
+                
RemotingCommand.createResponseCommand(QueryMessageResponseHeader.class);
+        final QueryMessageResponseHeader responseHeader =
+                (QueryMessageResponseHeader) response.readCustomHeader();
+        final QueryMessageRequestHeader requestHeader =
+                (QueryMessageRequestHeader) request
+                        
.decodeCommandCustomHeader(QueryMessageRequestHeader.class);
+
+
+        response.setOpaque(request.getOpaque());
+
+
+        String isUniqueKey = 
request.getExtFields().get(MixAll.UNIQUE_MSG_QUERY_FLAG);
+        if (isUniqueKey != null && isUniqueKey.equals("true")) {
+            
requestHeader.setMaxNum(this.brokerController.getMessageStoreConfig().getDefaultQueryMaxNum());
+        }
+
+        final QueryMessageResult queryMessageResult =
+                
this.brokerController.getMessageStore().queryMessage(requestHeader.getTopic(),
+                        requestHeader.getKey(), requestHeader.getMaxNum(), 
requestHeader.getBeginTimestamp(),
+                        requestHeader.getEndTimestamp());
+        assert queryMessageResult != null;
+
+        
responseHeader.setIndexLastUpdatePhyoffset(queryMessageResult.getIndexLastUpdatePhyoffset());
+        
responseHeader.setIndexLastUpdateTimestamp(queryMessageResult.getIndexLastUpdateTimestamp());
+
+
+        if (queryMessageResult.getBufferTotalSize() > 0) {
+            response.setCode(ResponseCode.SUCCESS);
+            response.setRemark(null);
+
+            try {
+                FileRegion fileRegion =
+                        new 
QueryMessageTransfer(response.encodeHeader(queryMessageResult
+                                .getBufferTotalSize()), queryMessageResult);
+                ctx.channel().writeAndFlush(fileRegion).addListener(new 
ChannelFutureListener() {
+                    @Override
+                    public void operationComplete(ChannelFuture future) throws 
Exception {
+                        queryMessageResult.release();
+                        if (!future.isSuccess()) {
+                            log.error("transfer query message by pagecache 
failed, ", future.cause());
+                        }
+                    }
+                });
+            } catch (Throwable e) {
+                log.error("", e);
+                queryMessageResult.release();
+            }
+
+            return null;
+        }
+
+        response.setCode(ResponseCode.QUERY_NOT_FOUND);
+        response.setRemark("can not find message, maybe time range not 
correct");
+        return response;
+    }
+
+
+    public RemotingCommand viewMessageById(ChannelHandlerContext ctx, 
RemotingCommand request)
+            throws RemotingCommandException {
+        final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
+        final ViewMessageRequestHeader requestHeader =
+                (ViewMessageRequestHeader) 
request.decodeCommandCustomHeader(ViewMessageRequestHeader.class);
+
+
+        response.setOpaque(request.getOpaque());
+
+        final SelectMappedBufferResult selectMappedBufferResult =
+                
this.brokerController.getMessageStore().selectOneMessageByOffset(requestHeader.getOffset());
+        if (selectMappedBufferResult != null) {
+            response.setCode(ResponseCode.SUCCESS);
+            response.setRemark(null);
+
+            try {
+                FileRegion fileRegion =
+                        new 
OneMessageTransfer(response.encodeHeader(selectMappedBufferResult.getSize()),
+                                selectMappedBufferResult);
+                ctx.channel().writeAndFlush(fileRegion).addListener(new 
ChannelFutureListener() {
+                    @Override
+                    public void operationComplete(ChannelFuture future) throws 
Exception {
+                        selectMappedBufferResult.release();
+                        if (!future.isSuccess()) {
+                            log.error("transfer one message by pagecache 
failed, ", future.cause());
+                        }
+                    }
+                });
+            } catch (Throwable e) {
+                log.error("", e);
+                selectMappedBufferResult.release();
+            }
+
+            return null;
+        } else {
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark("can not find message by the offset, " + 
requestHeader.getOffset());
+        }
+
+        return response;
+    }
+}

Reply via email to