http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
----------------------------------------------------------------------
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
 
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
new file mode 100644
index 0000000..c4e91a3
--- /dev/null
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
@@ -0,0 +1,381 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer;
+
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.QueryResult;
+import 
org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
+import org.apache.rocketmq.client.consumer.store.OffsetStore;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+
+import java.util.HashSet;
+import java.util.Set;
+
+
+/**
+ * Default pulling consumer
+ *
+ * @author shijia.wxr
+ */
+public class DefaultMQPullConsumer extends ClientConfig implements 
MQPullConsumer {
+    protected final transient DefaultMQPullConsumerImpl 
defaultMQPullConsumerImpl;
+
+    /**
+     * Do the same thing for the same Group, the application must be set,and
+     * guarantee Globally unique
+     */
+    private String consumerGroup;
+    /**
+     * Long polling mode, the Consumer connection max suspend time, it is not
+     * recommended to modify
+     */
+    private long brokerSuspendMaxTimeMillis = 1000 * 20;
+    /**
+     * Long polling mode, the Consumer connection timeout(must greater than
+     * brokerSuspendMaxTimeMillis), it is not recommended to modify
+     */
+    private long consumerTimeoutMillisWhenSuspend = 1000 * 30;
+    /**
+     * The socket timeout in milliseconds
+     */
+    private long consumerPullTimeoutMillis = 1000 * 10;
+    /**
+     * Consumption pattern,default is clustering
+     */
+    private MessageModel messageModel = MessageModel.CLUSTERING;
+    /**
+     * Message queue listener
+     */
+    private MessageQueueListener messageQueueListener;
+    /**
+     * Offset Storage
+     */
+    private OffsetStore offsetStore;
+    /**
+     * Topic set you want to register
+     */
+    private Set<String> registerTopics = new HashSet<String>();
+    /**
+     * Queue allocation algorithm
+     */
+    private AllocateMessageQueueStrategy allocateMessageQueueStrategy = new 
AllocateMessageQueueAveragely();
+    /**
+     * Whether the unit of subscription group
+     */
+    private boolean unitMode = false;
+
+    private int maxReconsumeTimes = 16;
+
+
+    public DefaultMQPullConsumer() {
+        this(MixAll.DEFAULT_CONSUMER_GROUP, null);
+    }
+
+
+    public DefaultMQPullConsumer(final String consumerGroup, RPCHook rpcHook) {
+        this.consumerGroup = consumerGroup;
+        defaultMQPullConsumerImpl = new DefaultMQPullConsumerImpl(this, 
rpcHook);
+    }
+
+
+    public DefaultMQPullConsumer(final String consumerGroup) {
+        this(consumerGroup, null);
+    }
+
+
+    public DefaultMQPullConsumer(RPCHook rpcHook) {
+        this(MixAll.DEFAULT_CONSUMER_GROUP, rpcHook);
+    }
+
+    @Override
+    public void createTopic(String key, String newTopic, int queueNum) throws 
MQClientException {
+        createTopic(key, newTopic, queueNum, 0);
+    }
+
+
+    @Override
+    public void createTopic(String key, String newTopic, int queueNum, int 
topicSysFlag) throws MQClientException {
+        this.defaultMQPullConsumerImpl.createTopic(key, newTopic, queueNum, 
topicSysFlag);
+    }
+
+
+    @Override
+    public long searchOffset(MessageQueue mq, long timestamp) throws 
MQClientException {
+        return this.defaultMQPullConsumerImpl.searchOffset(mq, timestamp);
+    }
+
+
+    @Override
+    public long maxOffset(MessageQueue mq) throws MQClientException {
+        return this.defaultMQPullConsumerImpl.maxOffset(mq);
+    }
+
+
+    @Override
+    public long minOffset(MessageQueue mq) throws MQClientException {
+        return this.defaultMQPullConsumerImpl.minOffset(mq);
+    }
+
+
+    @Override
+    public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException 
{
+        return this.defaultMQPullConsumerImpl.earliestMsgStoreTime(mq);
+    }
+
+
+    @Override
+    public MessageExt viewMessage(String offsetMsgId) throws 
RemotingException, MQBrokerException,
+            InterruptedException, MQClientException {
+        return this.defaultMQPullConsumerImpl.viewMessage(offsetMsgId);
+    }
+
+
+    @Override
+    public QueryResult queryMessage(String topic, String key, int maxNum, long 
begin, long end)
+            throws MQClientException, InterruptedException {
+        return this.defaultMQPullConsumerImpl.queryMessage(topic, key, maxNum, 
begin, end);
+    }
+
+
+    public AllocateMessageQueueStrategy getAllocateMessageQueueStrategy() {
+        return allocateMessageQueueStrategy;
+    }
+
+
+    public void setAllocateMessageQueueStrategy(AllocateMessageQueueStrategy 
allocateMessageQueueStrategy) {
+        this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
+    }
+
+
+    public long getBrokerSuspendMaxTimeMillis() {
+        return brokerSuspendMaxTimeMillis;
+    }
+
+
+    public void setBrokerSuspendMaxTimeMillis(long brokerSuspendMaxTimeMillis) 
{
+        this.brokerSuspendMaxTimeMillis = brokerSuspendMaxTimeMillis;
+    }
+
+
+    public String getConsumerGroup() {
+        return consumerGroup;
+    }
+
+
+    public void setConsumerGroup(String consumerGroup) {
+        this.consumerGroup = consumerGroup;
+    }
+
+
+    public long getConsumerPullTimeoutMillis() {
+        return consumerPullTimeoutMillis;
+    }
+
+
+    public void setConsumerPullTimeoutMillis(long consumerPullTimeoutMillis) {
+        this.consumerPullTimeoutMillis = consumerPullTimeoutMillis;
+    }
+
+
+    public long getConsumerTimeoutMillisWhenSuspend() {
+        return consumerTimeoutMillisWhenSuspend;
+    }
+
+
+    public void setConsumerTimeoutMillisWhenSuspend(long 
consumerTimeoutMillisWhenSuspend) {
+        this.consumerTimeoutMillisWhenSuspend = 
consumerTimeoutMillisWhenSuspend;
+    }
+
+
+    public MessageModel getMessageModel() {
+        return messageModel;
+    }
+
+
+    public void setMessageModel(MessageModel messageModel) {
+        this.messageModel = messageModel;
+    }
+
+
+    public MessageQueueListener getMessageQueueListener() {
+        return messageQueueListener;
+    }
+
+
+    public void setMessageQueueListener(MessageQueueListener 
messageQueueListener) {
+        this.messageQueueListener = messageQueueListener;
+    }
+
+
+    public Set<String> getRegisterTopics() {
+        return registerTopics;
+    }
+
+
+    public void setRegisterTopics(Set<String> registerTopics) {
+        this.registerTopics = registerTopics;
+    }
+
+
+    @Override
+    public void sendMessageBack(MessageExt msg, int delayLevel)
+            throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException {
+        this.defaultMQPullConsumerImpl.sendMessageBack(msg, delayLevel, null);
+    }
+
+
+    @Override
+    public void sendMessageBack(MessageExt msg, int delayLevel, String 
brokerName)
+            throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException {
+        this.defaultMQPullConsumerImpl.sendMessageBack(msg, delayLevel, 
brokerName);
+    }
+
+    @Override
+    public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws 
MQClientException {
+        return 
this.defaultMQPullConsumerImpl.fetchSubscribeMessageQueues(topic);
+    }
+
+    @Override
+    public void start() throws MQClientException {
+        this.defaultMQPullConsumerImpl.start();
+    }
+
+    @Override
+    public void shutdown() {
+        this.defaultMQPullConsumerImpl.shutdown();
+    }
+
+    @Override
+    public void registerMessageQueueListener(String topic, 
MessageQueueListener listener) {
+        synchronized (this.registerTopics) {
+            this.registerTopics.add(topic);
+            if (listener != null) {
+                this.messageQueueListener = listener;
+            }
+        }
+    }
+
+    @Override
+    public PullResult pull(MessageQueue mq, String subExpression, long offset, 
int maxNums)
+            throws MQClientException, RemotingException, MQBrokerException, 
InterruptedException {
+        return this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, 
maxNums);
+    }
+
+    @Override
+    public PullResult pull(MessageQueue mq, String subExpression, long offset, 
int maxNums, long timeout)
+            throws MQClientException, RemotingException, MQBrokerException, 
InterruptedException {
+        return this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, 
maxNums, timeout);
+    }
+
+    @Override
+    public void pull(MessageQueue mq, String subExpression, long offset, int 
maxNums, PullCallback pullCallback)
+            throws MQClientException, RemotingException, InterruptedException {
+        this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, 
maxNums, pullCallback);
+    }
+
+    @Override
+    public void pull(MessageQueue mq, String subExpression, long offset, int 
maxNums, PullCallback pullCallback, long timeout)
+            throws MQClientException, RemotingException, InterruptedException {
+        this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, 
maxNums, pullCallback, timeout);
+    }
+
+    @Override
+    public PullResult pullBlockIfNotFound(MessageQueue mq, String 
subExpression, long offset, int maxNums)
+            throws MQClientException, RemotingException, MQBrokerException, 
InterruptedException {
+        return this.defaultMQPullConsumerImpl.pullBlockIfNotFound(mq, 
subExpression, offset, maxNums);
+    }
+
+    @Override
+    public void pullBlockIfNotFound(MessageQueue mq, String subExpression, 
long offset, int maxNums, PullCallback pullCallback)
+            throws MQClientException, RemotingException, InterruptedException {
+        this.defaultMQPullConsumerImpl.pullBlockIfNotFound(mq, subExpression, 
offset, maxNums, pullCallback);
+    }
+
+    @Override
+    public void updateConsumeOffset(MessageQueue mq, long offset) throws 
MQClientException {
+        this.defaultMQPullConsumerImpl.updateConsumeOffset(mq, offset);
+    }
+
+    @Override
+    public long fetchConsumeOffset(MessageQueue mq, boolean fromStore) throws 
MQClientException {
+        return this.defaultMQPullConsumerImpl.fetchConsumeOffset(mq, 
fromStore);
+    }
+
+    @Override
+    public Set<MessageQueue> fetchMessageQueuesInBalance(String topic) throws 
MQClientException {
+        return 
this.defaultMQPullConsumerImpl.fetchMessageQueuesInBalance(topic);
+    }
+
+    @Override
+    public MessageExt viewMessage(String topic, String uniqKey) throws 
RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        try {
+            MessageDecoder.decodeMessageId(uniqKey);
+            return this.viewMessage(uniqKey);
+        } catch (Exception e) {
+        }
+        return this.defaultMQPullConsumerImpl.queryMessageByUniqKey(topic, 
uniqKey);
+    }
+
+    @Override
+    public void sendMessageBack(MessageExt msg, int delayLevel, String 
brokerName, String consumerGroup)
+            throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException {
+        this.defaultMQPullConsumerImpl.sendMessageBack(msg, delayLevel, 
brokerName, consumerGroup);
+    }
+
+    public OffsetStore getOffsetStore() {
+        return offsetStore;
+    }
+
+
+    public void setOffsetStore(OffsetStore offsetStore) {
+        this.offsetStore = offsetStore;
+    }
+
+
+    public DefaultMQPullConsumerImpl getDefaultMQPullConsumerImpl() {
+        return defaultMQPullConsumerImpl;
+    }
+
+
+    public boolean isUnitMode() {
+        return unitMode;
+    }
+
+
+    public void setUnitMode(boolean isUnitMode) {
+        this.unitMode = isUnitMode;
+    }
+
+
+    public int getMaxReconsumeTimes() {
+        return maxReconsumeTimes;
+    }
+
+
+    public void setMaxReconsumeTimes(final int maxReconsumeTimes) {
+        this.maxReconsumeTimes = maxReconsumeTimes;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
----------------------------------------------------------------------
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
 
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
new file mode 100644
index 0000000..cbed53b
--- /dev/null
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
@@ -0,0 +1,519 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer;
+
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.QueryResult;
+import org.apache.rocketmq.client.consumer.listener.MessageListener;
+import 
org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
+import 
org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
+import org.apache.rocketmq.client.consumer.store.OffsetStore;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+
+/**
+ * Wrapped push consumer.in fact,it works as remarkable as the pull consumer
+ *
+ * @author shijia.wxr
+ */
+public class DefaultMQPushConsumer extends ClientConfig implements 
MQPushConsumer {
+    protected final transient DefaultMQPushConsumerImpl 
defaultMQPushConsumerImpl;
+    /**
+     * Do the same thing for the same Group, the application must be set,and
+     * guarantee Globally unique
+     */
+    private String consumerGroup;
+    /**
+     * Consumption pattern,default is clustering
+     */
+    private MessageModel messageModel = MessageModel.CLUSTERING;
+    /**
+     * Consumption offset
+     */
+    private ConsumeFromWhere consumeFromWhere = 
ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
+    /**
+     * Backtracking consumption time with second precision.time format is
+     * 20131223171201<br>
+     * Implying Seventeen twelve and 01 seconds on December 23, 2013 year<br>
+     * Default backtracking consumption time Half an hour ago
+     */
+    private String consumeTimestamp = 
UtilAll.timeMillisToHumanString3(System.currentTimeMillis() - (1000 * 60 * 30));
+    /**
+     * Queue allocation algorithm
+     */
+    private AllocateMessageQueueStrategy allocateMessageQueueStrategy;
+
+    /**
+     * Subscription relationship
+     */
+    private Map<String /* topic */, String /* sub expression */> subscription 
= new HashMap<String, String>();
+    /**
+     * Message listener
+     */
+    private MessageListener messageListener;
+    /**
+     * Offset Storage
+     */
+    private OffsetStore offsetStore;
+    /**
+     * Minimum consumer thread number
+     */
+    private int consumeThreadMin = 20;
+    /**
+     * Max consumer thread number
+     */
+    private int consumeThreadMax = 64;
+
+    /**
+     * Threshold for dynamic adjustment of the number of thread pool
+     */
+    private long adjustThreadPoolNumsThreshold = 100000;
+
+    /**
+     * Concurrently max span offset.it has no effect on sequential consumption
+     */
+    private int consumeConcurrentlyMaxSpan = 2000;
+    /**
+     * Flow control threshold
+     */
+    private int pullThresholdForQueue = 1000;
+    /**
+     * Message pull Interval
+     */
+    private long pullInterval = 0;
+    /**
+     * Batch consumption size
+     */
+    private int consumeMessageBatchMaxSize = 1;
+    /**
+     * Batch pull size
+     */
+    private int pullBatchSize = 32;
+
+    /**
+     * Whether update subscription relationship when every pull
+     */
+    private boolean postSubscriptionWhenPull = false;
+
+    /**
+     * Whether the unit of subscription group
+     */
+    private boolean unitMode = false;
+
+    private int maxReconsumeTimes = -1;
+    private long suspendCurrentQueueTimeMillis = 1000;
+    private long consumeTimeout = 15;
+
+
+    public DefaultMQPushConsumer() {
+        this(MixAll.DEFAULT_CONSUMER_GROUP, null, new 
AllocateMessageQueueAveragely());
+    }
+
+
+    public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook, 
AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
+        this.consumerGroup = consumerGroup;
+        this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
+        defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, 
rpcHook);
+    }
+
+
+    public DefaultMQPushConsumer(RPCHook rpcHook) {
+        this(MixAll.DEFAULT_CONSUMER_GROUP, rpcHook, new 
AllocateMessageQueueAveragely());
+    }
+
+
+    public DefaultMQPushConsumer(final String consumerGroup) {
+        this(consumerGroup, null, new AllocateMessageQueueAveragely());
+    }
+
+    @Override
+    public void createTopic(String key, String newTopic, int queueNum) throws 
MQClientException {
+        createTopic(key, newTopic, queueNum, 0);
+    }
+
+
+    @Override
+    public void createTopic(String key, String newTopic, int queueNum, int 
topicSysFlag) throws MQClientException {
+        this.defaultMQPushConsumerImpl.createTopic(key, newTopic, queueNum, 
topicSysFlag);
+    }
+
+
+    @Override
+    public long searchOffset(MessageQueue mq, long timestamp) throws 
MQClientException {
+        return this.defaultMQPushConsumerImpl.searchOffset(mq, timestamp);
+    }
+
+
+    @Override
+    public long maxOffset(MessageQueue mq) throws MQClientException {
+        return this.defaultMQPushConsumerImpl.maxOffset(mq);
+    }
+
+
+    @Override
+    public long minOffset(MessageQueue mq) throws MQClientException {
+        return this.defaultMQPushConsumerImpl.minOffset(mq);
+    }
+
+
+    @Override
+    public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException 
{
+        return this.defaultMQPushConsumerImpl.earliestMsgStoreTime(mq);
+    }
+
+
+    @Override
+    public MessageExt viewMessage(String offsetMsgId) throws 
RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        return this.defaultMQPushConsumerImpl.viewMessage(offsetMsgId);
+    }
+
+
+    @Override
+    public QueryResult queryMessage(String topic, String key, int maxNum, long 
begin, long end)
+            throws MQClientException, InterruptedException {
+        return this.defaultMQPushConsumerImpl.queryMessage(topic, key, maxNum, 
begin, end);
+    }
+
+    @Override
+    public MessageExt viewMessage(String topic, String msgId) throws 
RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        try {
+            MessageDecoder.decodeMessageId(msgId);
+            return this.viewMessage(msgId);
+        } catch (Exception e) {
+        }
+        return this.defaultMQPushConsumerImpl.queryMessageByUniqKey(topic, 
msgId);
+    }
+
+    public AllocateMessageQueueStrategy getAllocateMessageQueueStrategy() {
+        return allocateMessageQueueStrategy;
+    }
+
+
+    public void setAllocateMessageQueueStrategy(AllocateMessageQueueStrategy 
allocateMessageQueueStrategy) {
+        this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
+    }
+
+
+    public int getConsumeConcurrentlyMaxSpan() {
+        return consumeConcurrentlyMaxSpan;
+    }
+
+
+    public void setConsumeConcurrentlyMaxSpan(int consumeConcurrentlyMaxSpan) {
+        this.consumeConcurrentlyMaxSpan = consumeConcurrentlyMaxSpan;
+    }
+
+
+    public ConsumeFromWhere getConsumeFromWhere() {
+        return consumeFromWhere;
+    }
+
+
+    public void setConsumeFromWhere(ConsumeFromWhere consumeFromWhere) {
+        this.consumeFromWhere = consumeFromWhere;
+    }
+
+
+    public int getConsumeMessageBatchMaxSize() {
+        return consumeMessageBatchMaxSize;
+    }
+
+
+    public void setConsumeMessageBatchMaxSize(int consumeMessageBatchMaxSize) {
+        this.consumeMessageBatchMaxSize = consumeMessageBatchMaxSize;
+    }
+
+
+    public String getConsumerGroup() {
+        return consumerGroup;
+    }
+
+
+    public void setConsumerGroup(String consumerGroup) {
+        this.consumerGroup = consumerGroup;
+    }
+
+
+    public int getConsumeThreadMax() {
+        return consumeThreadMax;
+    }
+
+
+    public void setConsumeThreadMax(int consumeThreadMax) {
+        this.consumeThreadMax = consumeThreadMax;
+    }
+
+
+    public int getConsumeThreadMin() {
+        return consumeThreadMin;
+    }
+
+
+    public void setConsumeThreadMin(int consumeThreadMin) {
+        this.consumeThreadMin = consumeThreadMin;
+    }
+
+
+    public DefaultMQPushConsumerImpl getDefaultMQPushConsumerImpl() {
+        return defaultMQPushConsumerImpl;
+    }
+
+
+    public MessageListener getMessageListener() {
+        return messageListener;
+    }
+
+
+    public void setMessageListener(MessageListener messageListener) {
+        this.messageListener = messageListener;
+    }
+
+
+    public MessageModel getMessageModel() {
+        return messageModel;
+    }
+
+
+    public void setMessageModel(MessageModel messageModel) {
+        this.messageModel = messageModel;
+    }
+
+
+    public int getPullBatchSize() {
+        return pullBatchSize;
+    }
+
+
+    public void setPullBatchSize(int pullBatchSize) {
+        this.pullBatchSize = pullBatchSize;
+    }
+
+
+    public long getPullInterval() {
+        return pullInterval;
+    }
+
+
+    public void setPullInterval(long pullInterval) {
+        this.pullInterval = pullInterval;
+    }
+
+
+    public int getPullThresholdForQueue() {
+        return pullThresholdForQueue;
+    }
+
+
+    public void setPullThresholdForQueue(int pullThresholdForQueue) {
+        this.pullThresholdForQueue = pullThresholdForQueue;
+    }
+
+
+    public Map<String, String> getSubscription() {
+        return subscription;
+    }
+
+
+    public void setSubscription(Map<String, String> subscription) {
+        this.subscription = subscription;
+    }
+
+
+    @Override
+    public void sendMessageBack(MessageExt msg, int delayLevel)
+            throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException {
+        this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, null);
+    }
+
+
+    @Override
+    public void sendMessageBack(MessageExt msg, int delayLevel, String 
brokerName)
+            throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException {
+        this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, 
brokerName);
+    }
+
+
+    @Override
+    public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws 
MQClientException {
+        return 
this.defaultMQPushConsumerImpl.fetchSubscribeMessageQueues(topic);
+    }
+
+
+    @Override
+    public void start() throws MQClientException {
+        this.defaultMQPushConsumerImpl.start();
+    }
+
+
+    @Override
+    public void shutdown() {
+        this.defaultMQPushConsumerImpl.shutdown();
+    }
+
+
+    @Override
+    @Deprecated
+    public void registerMessageListener(MessageListener messageListener) {
+        this.messageListener = messageListener;
+        
this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
+    }
+
+
+    @Override
+    public void registerMessageListener(MessageListenerConcurrently 
messageListener) {
+        this.messageListener = messageListener;
+        
this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
+    }
+
+
+    @Override
+    public void registerMessageListener(MessageListenerOrderly 
messageListener) {
+        this.messageListener = messageListener;
+        
this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
+    }
+
+
+    @Override
+    public void subscribe(String topic, String subExpression) throws 
MQClientException {
+        this.defaultMQPushConsumerImpl.subscribe(topic, subExpression);
+    }
+
+
+    @Override
+    public void subscribe(String topic, String fullClassName, String 
filterClassSource) throws MQClientException {
+        this.defaultMQPushConsumerImpl.subscribe(topic, fullClassName, 
filterClassSource);
+    }
+
+
+    @Override
+    public void unsubscribe(String topic) {
+        this.defaultMQPushConsumerImpl.unsubscribe(topic);
+    }
+
+
+    @Override
+    public void updateCorePoolSize(int corePoolSize) {
+        this.defaultMQPushConsumerImpl.updateCorePoolSize(corePoolSize);
+    }
+
+
+    @Override
+    public void suspend() {
+        this.defaultMQPushConsumerImpl.suspend();
+    }
+
+
+    @Override
+    public void resume() {
+        this.defaultMQPushConsumerImpl.resume();
+    }
+
+
+    public OffsetStore getOffsetStore() {
+        return offsetStore;
+    }
+
+
+    public void setOffsetStore(OffsetStore offsetStore) {
+        this.offsetStore = offsetStore;
+    }
+
+
+    public String getConsumeTimestamp() {
+        return consumeTimestamp;
+    }
+
+
+    public void setConsumeTimestamp(String consumeTimestamp) {
+        this.consumeTimestamp = consumeTimestamp;
+    }
+
+
+    public boolean isPostSubscriptionWhenPull() {
+        return postSubscriptionWhenPull;
+    }
+
+
+    public void setPostSubscriptionWhenPull(boolean postSubscriptionWhenPull) {
+        this.postSubscriptionWhenPull = postSubscriptionWhenPull;
+    }
+
+
+    public boolean isUnitMode() {
+        return unitMode;
+    }
+
+
+    public void setUnitMode(boolean isUnitMode) {
+        this.unitMode = isUnitMode;
+    }
+
+
+    public long getAdjustThreadPoolNumsThreshold() {
+        return adjustThreadPoolNumsThreshold;
+    }
+
+
+    public void setAdjustThreadPoolNumsThreshold(long 
adjustThreadPoolNumsThreshold) {
+        this.adjustThreadPoolNumsThreshold = adjustThreadPoolNumsThreshold;
+    }
+
+
+    public int getMaxReconsumeTimes() {
+        return maxReconsumeTimes;
+    }
+
+
+    public void setMaxReconsumeTimes(final int maxReconsumeTimes) {
+        this.maxReconsumeTimes = maxReconsumeTimes;
+    }
+
+
+    public long getSuspendCurrentQueueTimeMillis() {
+        return suspendCurrentQueueTimeMillis;
+    }
+
+
+    public void setSuspendCurrentQueueTimeMillis(final long 
suspendCurrentQueueTimeMillis) {
+        this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis;
+    }
+
+
+    public long getConsumeTimeout() {
+        return consumeTimeout;
+    }
+
+    public void setConsumeTimeout(final long consumeTimeout) {
+        this.consumeTimeout = consumeTimeout;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java
----------------------------------------------------------------------
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java 
b/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java
new file mode 100644
index 0000000..3e26ed6
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer;
+
+import org.apache.rocketmq.client.MQAdmin;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+
+import java.util.Set;
+
+
+/**
+ * Message queue consumer interface
+ *
+ * @author shijia.wxr
+ */
+public interface MQConsumer extends MQAdmin {
+    /**
+     * If consuming failure,message will be send back to the brokers,and delay 
consuming some time
+     *
+     * @param msg
+     * @param delayLevel
+     *
+     * @throws InterruptedException
+     * @throws MQBrokerException
+     * @throws RemotingException
+     * @throws MQClientException
+     */
+    @Deprecated
+    void sendMessageBack(final MessageExt msg, final int delayLevel) throws 
RemotingException,
+            MQBrokerException, InterruptedException, MQClientException;
+
+
+    /**
+     * If consuming failure,message will be send back to the broker,and delay 
consuming some time
+     *
+     * @param msg
+     * @param delayLevel
+     * @param brokerName
+     *
+     * @throws RemotingException
+     * @throws MQBrokerException
+     * @throws InterruptedException
+     * @throws MQClientException
+     */
+    void sendMessageBack(final MessageExt msg, final int delayLevel, final 
String brokerName)
+            throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException;
+
+
+    /**
+     * Fetch message queues from consumer cache according to the topic
+     *
+     * @param topic
+     *         message topic
+     *
+     * @return queue set
+     *
+     * @throws MQClientException
+     */
+    Set<MessageQueue> fetchSubscribeMessageQueues(final String topic) throws 
MQClientException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java
----------------------------------------------------------------------
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java 
b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java
new file mode 100644
index 0000000..d651562
--- /dev/null
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer;
+
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+
+import java.util.Set;
+
+
+/**
+ * Pulling consumer interface
+ *
+ * @author shijia.wxr
+ */
+public interface MQPullConsumer extends MQConsumer {
+    /**
+     * Start the consumer
+     *
+     * @throws MQClientException
+     */
+    void start() throws MQClientException;
+
+
+    /**
+     * Shutdown the consumer
+     */
+    void shutdown();
+
+
+    /**
+     * Register the message queue listener
+     *
+     * @param topic
+     * @param listener
+     */
+    void registerMessageQueueListener(final String topic, final 
MessageQueueListener listener);
+
+
+    /**
+     * Pulling the messages,not blocking
+     *
+     * @param mq
+     *         from which message queue
+     * @param subExpression
+     *         subscription expression.it only support or operation such as 
"tag1 || tag2 || tag3" <br>
+     *         if null or * expression,meaning subscribe all
+     * @param offset
+     *         from where to pull
+     * @param maxNums
+     *         max pulling numbers
+     *
+     * @return The resulting {@code PullRequest}
+     *
+     * @throws MQClientException
+     * @throws InterruptedException
+     * @throws MQBrokerException
+     * @throws RemotingException
+     */
+    PullResult pull(final MessageQueue mq, final String subExpression, final 
long offset,
+                    final int maxNums) throws MQClientException, 
RemotingException, MQBrokerException,
+            InterruptedException;
+
+
+    /**
+     * Pulling the messages in the specified timeout
+     *
+     * @param mq
+     * @param subExpression
+     * @param offset
+     * @param maxNums
+     * @param timeout
+     *
+     * @return The resulting {@code PullRequest}
+     *
+     * @throws MQClientException
+     * @throws RemotingException
+     * @throws MQBrokerException
+     * @throws InterruptedException
+     */
+    PullResult pull(final MessageQueue mq, final String subExpression, final 
long offset,
+                    final int maxNums, final long timeout) throws 
MQClientException, RemotingException,
+            MQBrokerException, InterruptedException;
+
+
+    /**
+     * Pulling the messages in a async. way
+     *
+     * @param mq
+     * @param subExpression
+     * @param offset
+     * @param maxNums
+     * @param pullCallback
+     *
+     * @throws MQClientException
+     * @throws RemotingException
+     * @throws InterruptedException
+     */
+    void pull(final MessageQueue mq, final String subExpression, final long 
offset, final int maxNums,
+              final PullCallback pullCallback) throws MQClientException, 
RemotingException,
+            InterruptedException;
+
+    /**
+     * Pulling the messages in a async. way
+     *
+     * @param mq
+     * @param subExpression
+     * @param offset
+     * @param maxNums
+     * @param pullCallback
+     * @param timeout
+     *
+     * @throws MQClientException
+     * @throws RemotingException
+     * @throws InterruptedException
+     */
+    void pull(final MessageQueue mq, final String subExpression, final long 
offset, final int maxNums,
+              final PullCallback pullCallback, long timeout) throws 
MQClientException, RemotingException,
+            InterruptedException;
+
+
+    /**
+     * Pulling the messages,if no message arrival,blocking some time
+     *
+     * @param mq
+     * @param subExpression
+     * @param offset
+     * @param maxNums
+     *
+     * @return The resulting {@code PullRequest}
+     *
+     * @throws MQClientException
+     * @throws RemotingException
+     * @throws MQBrokerException
+     * @throws InterruptedException
+     */
+    PullResult pullBlockIfNotFound(final MessageQueue mq, final String 
subExpression,
+                                   final long offset, final int maxNums) 
throws MQClientException, RemotingException,
+            MQBrokerException, InterruptedException;
+
+
+    /**
+     * Pulling the messages through callback function,if no message 
arrival,blocking.
+     *
+     * @param mq
+     * @param subExpression
+     * @param offset
+     * @param maxNums
+     * @param pullCallback
+     *
+     * @throws MQClientException
+     * @throws RemotingException
+     * @throws InterruptedException
+     */
+    void pullBlockIfNotFound(final MessageQueue mq, final String 
subExpression, final long offset,
+                             final int maxNums, final PullCallback 
pullCallback) throws MQClientException, RemotingException,
+            InterruptedException;
+
+
+    /**
+     * Update the offset
+     *
+     * @param mq
+     * @param offset
+     *
+     * @throws MQClientException
+     */
+    void updateConsumeOffset(final MessageQueue mq, final long offset) throws 
MQClientException;
+
+
+    /**
+     * Fetch the offset
+     *
+     * @param mq
+     * @param fromStore
+     *
+     * @return The fetched offset of given queue
+     *
+     * @throws MQClientException
+     */
+    long fetchConsumeOffset(final MessageQueue mq, final boolean fromStore) 
throws MQClientException;
+
+
+    /**
+     * Fetch the message queues according to the topic
+     *
+     * @param topic
+     *         message topic
+     *
+     * @return message queue set
+     *
+     * @throws MQClientException
+     */
+    Set<MessageQueue> fetchMessageQueuesInBalance(final String topic) throws 
MQClientException;
+
+    /**
+     * If consuming failure,message will be send back to the broker,and delay 
consuming in some time later.<br>
+     * Mind! message can only be consumed in the same group.
+     *
+     * @param msg
+     * @param delayLevel
+     * @param brokerName
+     * @param consumerGroup
+     *
+     * @throws RemotingException
+     * @throws MQBrokerException
+     * @throws InterruptedException
+     * @throws MQClientException
+     */
+    void sendMessageBack(MessageExt msg, int delayLevel, String brokerName, 
String consumerGroup)
+            throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java
----------------------------------------------------------------------
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java
 
b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java
new file mode 100644
index 0000000..33cc1c9
--- /dev/null
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer;
+
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.slf4j.Logger;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * Schedule service for pull consumer
+ *
+ * @author shijia.wxr
+ */
+public class MQPullConsumerScheduleService {
+    private final Logger log = ClientLogger.getLog();
+    private final MessageQueueListener messageQueueListener = new 
MessageQueueListenerImpl();
+    private final ConcurrentHashMap<MessageQueue, PullTaskImpl> taskTable =
+            new ConcurrentHashMap<MessageQueue, PullTaskImpl>();
+    private DefaultMQPullConsumer defaultMQPullConsumer;
+    private int pullThreadNums = 20;
+    private ConcurrentHashMap<String /* topic */, PullTaskCallback> 
callbackTable =
+            new ConcurrentHashMap<String, PullTaskCallback>();
+    private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
+
+    public MQPullConsumerScheduleService(final String consumerGroup) {
+        this.defaultMQPullConsumer = new DefaultMQPullConsumer(consumerGroup);
+        this.defaultMQPullConsumer.setMessageModel(MessageModel.CLUSTERING);
+    }
+
+    public void putTask(String topic, Set<MessageQueue> mqNewSet) {
+        Iterator<Entry<MessageQueue, PullTaskImpl>> it = 
this.taskTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<MessageQueue, PullTaskImpl> next = it.next();
+            if (next.getKey().getTopic().equals(topic)) {
+                if (!mqNewSet.contains(next.getKey())) {
+                    next.getValue().setCancelled(true);
+                    it.remove();
+                }
+            }
+        }
+
+        for (MessageQueue mq : mqNewSet) {
+            if (!this.taskTable.containsKey(mq)) {
+                PullTaskImpl command = new PullTaskImpl(mq);
+                this.taskTable.put(mq, command);
+                this.scheduledThreadPoolExecutor.schedule(command, 0, 
TimeUnit.MILLISECONDS);
+
+            }
+        }
+    }
+
+    public void start() throws MQClientException {
+        final String group = this.defaultMQPullConsumer.getConsumerGroup();
+        this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(
+                this.pullThreadNums,
+                new ThreadFactoryImpl("PullMsgThread-" + group)
+        );
+
+        
this.defaultMQPullConsumer.setMessageQueueListener(this.messageQueueListener);
+
+        this.defaultMQPullConsumer.start();
+
+        log.info("MQPullConsumerScheduleService start OK, {} {}",
+                this.defaultMQPullConsumer.getConsumerGroup(), 
this.callbackTable);
+    }
+
+    public void registerPullTaskCallback(final String topic, final 
PullTaskCallback callback) {
+        this.callbackTable.put(topic, callback);
+        this.defaultMQPullConsumer.registerMessageQueueListener(topic, null);
+    }
+
+    public void shutdown() {
+        if (this.scheduledThreadPoolExecutor != null) {
+            this.scheduledThreadPoolExecutor.shutdown();
+        }
+
+        if (this.defaultMQPullConsumer != null) {
+            this.defaultMQPullConsumer.shutdown();
+        }
+    }
+
+    public ConcurrentHashMap<String, PullTaskCallback> getCallbackTable() {
+        return callbackTable;
+    }
+
+    public void setCallbackTable(ConcurrentHashMap<String, PullTaskCallback> 
callbackTable) {
+        this.callbackTable = callbackTable;
+    }
+
+    public int getPullThreadNums() {
+        return pullThreadNums;
+    }
+
+    public void setPullThreadNums(int pullThreadNums) {
+        this.pullThreadNums = pullThreadNums;
+    }
+
+    public DefaultMQPullConsumer getDefaultMQPullConsumer() {
+        return defaultMQPullConsumer;
+    }
+
+    public void setDefaultMQPullConsumer(DefaultMQPullConsumer 
defaultMQPullConsumer) {
+        this.defaultMQPullConsumer = defaultMQPullConsumer;
+    }
+
+    public MessageModel getMessageModel() {
+        return this.defaultMQPullConsumer.getMessageModel();
+    }
+
+    public void setMessageModel(MessageModel messageModel) {
+        this.defaultMQPullConsumer.setMessageModel(messageModel);
+    }
+
+    class MessageQueueListenerImpl implements MessageQueueListener {
+        @Override
+        public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, 
Set<MessageQueue> mqDivided) {
+            MessageModel messageModel =
+                    
MQPullConsumerScheduleService.this.defaultMQPullConsumer.getMessageModel();
+            switch (messageModel) {
+                case BROADCASTING:
+                    MQPullConsumerScheduleService.this.putTask(topic, mqAll);
+                    break;
+                case CLUSTERING:
+                    MQPullConsumerScheduleService.this.putTask(topic, 
mqDivided);
+                    break;
+                default:
+                    break;
+            }
+        }
+    }
+
+    class PullTaskImpl implements Runnable {
+        private final MessageQueue messageQueue;
+        private volatile boolean cancelled = false;
+
+
+        public PullTaskImpl(final MessageQueue messageQueue) {
+            this.messageQueue = messageQueue;
+        }
+
+
+        @Override
+        public void run() {
+            String topic = this.messageQueue.getTopic();
+            if (!this.isCancelled()) {
+                PullTaskCallback pullTaskCallback =
+                        
MQPullConsumerScheduleService.this.callbackTable.get(topic);
+                if (pullTaskCallback != null) {
+                    final PullTaskContext context = new PullTaskContext();
+                    
context.setPullConsumer(MQPullConsumerScheduleService.this.defaultMQPullConsumer);
+                    try {
+                        pullTaskCallback.doPullTask(this.messageQueue, 
context);
+                    } catch (Throwable e) {
+                        context.setPullNextDelayTimeMillis(1000);
+                        log.error("doPullTask Exception", e);
+                    }
+
+                    if (!this.isCancelled()) {
+                        
MQPullConsumerScheduleService.this.scheduledThreadPoolExecutor.schedule(this,
+                                context.getPullNextDelayTimeMillis(), 
TimeUnit.MILLISECONDS);
+                    } else {
+                        log.warn("The Pull Task is cancelled after doPullTask, 
{}", messageQueue);
+                    }
+                } else {
+                    log.warn("Pull Task Callback not exist , {}", topic);
+                }
+            } else {
+                log.warn("The Pull Task is cancelled, {}", messageQueue);
+            }
+        }
+
+
+        public boolean isCancelled() {
+            return cancelled;
+        }
+
+
+        public void setCancelled(boolean cancelled) {
+            this.cancelled = cancelled;
+        }
+
+
+        public MessageQueue getMessageQueue() {
+            return messageQueue;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/MQPushConsumer.java
----------------------------------------------------------------------
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPushConsumer.java 
b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPushConsumer.java
new file mode 100644
index 0000000..982c839
--- /dev/null
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPushConsumer.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer;
+
+import org.apache.rocketmq.client.consumer.listener.MessageListener;
+import 
org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
+import org.apache.rocketmq.client.exception.MQClientException;
+
+
+/**
+ * Push consumer
+ *
+ * @author shijia.wxr
+ */
+public interface MQPushConsumer extends MQConsumer {
+    /**
+     * Start the consumer
+     *
+     * @throws MQClientException
+     */
+    void start() throws MQClientException;
+
+
+    /**
+     * Shutdown the consumer
+     */
+    void shutdown();
+
+
+    /**
+     * Register the message listener
+     *
+     * @param messageListener
+     */
+    @Deprecated
+    void registerMessageListener(MessageListener messageListener);
+
+
+    void registerMessageListener(final MessageListenerConcurrently 
messageListener);
+
+
+    void registerMessageListener(final MessageListenerOrderly messageListener);
+
+
+    /**
+     * Subscribe some topic
+     *
+     * @param topic
+     * @param subExpression
+     *         subscription expression.it only support or operation such as
+     *         "tag1 || tag2 || tag3" <br>
+     *         if null or * expression,meaning subscribe all
+     *
+     * @throws MQClientException
+     */
+    void subscribe(final String topic, final String subExpression) throws 
MQClientException;
+
+
+    /**
+     * Subscribe some topic
+     *
+     * @param topic
+     * @param fullClassName
+     *         full class name,must extend
+     *         org.apache.rocketmq.common.filter. MessageFilter
+     * @param filterClassSource
+     *         class source code,used UTF-8 file encoding,must be responsible
+     *         for your code safety
+     *
+     * @throws MQClientException
+     */
+    void subscribe(final String topic, final String fullClassName, final 
String filterClassSource) throws MQClientException;
+
+
+    /**
+     * Unsubscribe consumption some topic
+     *
+     * @param topic
+     *         message topic
+     */
+    void unsubscribe(final String topic);
+
+
+    /**
+     * Update the consumer thread pool size Dynamically
+     *
+     * @param corePoolSize
+     */
+    void updateCorePoolSize(int corePoolSize);
+
+
+    /**
+     * Suspend the consumption
+     */
+    void suspend();
+
+
+    /**
+     * Resume the consumption
+     */
+    void resume();
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/MessageQueueListener.java
----------------------------------------------------------------------
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/MessageQueueListener.java
 
b/client/src/main/java/org/apache/rocketmq/client/consumer/MessageQueueListener.java
new file mode 100644
index 0000000..e59a3ce
--- /dev/null
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/MessageQueueListener.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer;
+
+import org.apache.rocketmq.common.message.MessageQueue;
+
+import java.util.Set;
+
+
+/**
+ * A MessageQueueListener is implemented by the application and may be 
specified when a message queue changed
+ *
+ * @author shijia.wxr
+ * @author vongosling
+ */
+public interface MessageQueueListener {
+    /**
+     * @param topic
+     *         message topic
+     * @param mqAll
+     *         all queues in this message topic
+     * @param mqDivided
+     *         collection of queues,assigned to the current consumer
+     */
+    void messageQueueChanged(final String topic, final Set<MessageQueue> mqAll,
+                             final Set<MessageQueue> mqDivided);
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/PullCallback.java
----------------------------------------------------------------------
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/PullCallback.java 
b/client/src/main/java/org/apache/rocketmq/client/consumer/PullCallback.java
new file mode 100644
index 0000000..2429d5a
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/PullCallback.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer;
+
+/**
+ * Async message pulling interface
+ *
+ * @author shijia.wxr
+ */
+public interface PullCallback {
+    public void onSuccess(final PullResult pullResult);
+
+    public void onException(final Throwable e);
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/PullResult.java
----------------------------------------------------------------------
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/PullResult.java 
b/client/src/main/java/org/apache/rocketmq/client/consumer/PullResult.java
new file mode 100644
index 0000000..81dd497
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/PullResult.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer;
+
+import org.apache.rocketmq.common.message.MessageExt;
+
+import java.util.List;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class PullResult {
+    private final PullStatus pullStatus;
+    private final long nextBeginOffset;
+    private final long minOffset;
+    private final long maxOffset;
+    private List<MessageExt> msgFoundList;
+
+
+    public PullResult(PullStatus pullStatus, long nextBeginOffset, long 
minOffset, long maxOffset,
+                      List<MessageExt> msgFoundList) {
+        super();
+        this.pullStatus = pullStatus;
+        this.nextBeginOffset = nextBeginOffset;
+        this.minOffset = minOffset;
+        this.maxOffset = maxOffset;
+        this.msgFoundList = msgFoundList;
+    }
+
+
+    public PullStatus getPullStatus() {
+        return pullStatus;
+    }
+
+
+    public long getNextBeginOffset() {
+        return nextBeginOffset;
+    }
+
+
+    public long getMinOffset() {
+        return minOffset;
+    }
+
+
+    public long getMaxOffset() {
+        return maxOffset;
+    }
+
+
+    public List<MessageExt> getMsgFoundList() {
+        return msgFoundList;
+    }
+
+
+    public void setMsgFoundList(List<MessageExt> msgFoundList) {
+        this.msgFoundList = msgFoundList;
+    }
+
+
+    @Override
+    public String toString() {
+        return "PullResult [pullStatus=" + pullStatus + ", nextBeginOffset=" + 
nextBeginOffset
+                + ", minOffset=" + minOffset + ", maxOffset=" + maxOffset + ", 
msgFoundList="
+                + (msgFoundList == null ? 0 : msgFoundList.size()) + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/PullStatus.java
----------------------------------------------------------------------
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/PullStatus.java 
b/client/src/main/java/org/apache/rocketmq/client/consumer/PullStatus.java
new file mode 100644
index 0000000..b63a4c4
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/PullStatus.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer;
+
+/**
+ * @author shijia.wxr
+ */
+public enum PullStatus {
+    /**
+     * Founded
+     */
+    FOUND,
+    /**
+     * No new message can be pull
+     */
+    NO_NEW_MSG,
+    /**
+     * Filtering results can not match
+     */
+    NO_MATCHED_MSG,
+    /**
+     * Illegal offset,may be too big or too small
+     */
+    OFFSET_ILLEGAL
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/PullTaskCallback.java
----------------------------------------------------------------------
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/PullTaskCallback.java
 
b/client/src/main/java/org/apache/rocketmq/client/consumer/PullTaskCallback.java
new file mode 100644
index 0000000..f0e9b25
--- /dev/null
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/PullTaskCallback.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer;
+
+import org.apache.rocketmq.common.message.MessageQueue;
+
+
+public interface PullTaskCallback {
+    public void doPullTask(final MessageQueue mq, final PullTaskContext 
context);
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/PullTaskContext.java
----------------------------------------------------------------------
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/PullTaskContext.java 
b/client/src/main/java/org/apache/rocketmq/client/consumer/PullTaskContext.java
new file mode 100644
index 0000000..ba66a1f
--- /dev/null
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/PullTaskContext.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer;
+
+public class PullTaskContext {
+
+    private int pullNextDelayTimeMillis = 200;
+
+    private MQPullConsumer pullConsumer;
+
+
+    public int getPullNextDelayTimeMillis() {
+        return pullNextDelayTimeMillis;
+    }
+
+
+    public void setPullNextDelayTimeMillis(int pullNextDelayTimeMillis) {
+        this.pullNextDelayTimeMillis = pullNextDelayTimeMillis;
+    }
+
+
+    public MQPullConsumer getPullConsumer() {
+        return pullConsumer;
+    }
+
+
+    public void setPullConsumer(MQPullConsumer pullConsumer) {
+        this.pullConsumer = pullConsumer;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeConcurrentlyContext.java
----------------------------------------------------------------------
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeConcurrentlyContext.java
 
b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeConcurrentlyContext.java
new file mode 100644
index 0000000..03223ba
--- /dev/null
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeConcurrentlyContext.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer.listener;
+
+import org.apache.rocketmq.common.message.MessageQueue;
+
+
+/**
+ * Consumer concurrent consumption context
+ *
+ * @author shijia.wxr
+ */
+public class ConsumeConcurrentlyContext {
+    private final MessageQueue messageQueue;
+    /**
+     * Message consume retry strategy<br>
+     * -1,no retry,put into DLQ directly<br>
+     * 0,broker control retry frequency<br>
+     * >0,client control retry frequency
+     */
+    private int delayLevelWhenNextConsume = 0;
+    private int ackIndex = Integer.MAX_VALUE;
+
+    public ConsumeConcurrentlyContext(MessageQueue messageQueue) {
+        this.messageQueue = messageQueue;
+    }
+
+
+    public int getDelayLevelWhenNextConsume() {
+        return delayLevelWhenNextConsume;
+    }
+
+
+    public void setDelayLevelWhenNextConsume(int delayLevelWhenNextConsume) {
+        this.delayLevelWhenNextConsume = delayLevelWhenNextConsume;
+    }
+
+
+    public MessageQueue getMessageQueue() {
+        return messageQueue;
+    }
+
+
+    public int getAckIndex() {
+        return ackIndex;
+    }
+
+
+    public void setAckIndex(int ackIndex) {
+        this.ackIndex = ackIndex;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeConcurrentlyStatus.java
----------------------------------------------------------------------
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeConcurrentlyStatus.java
 
b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeConcurrentlyStatus.java
new file mode 100644
index 0000000..433ce36
--- /dev/null
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeConcurrentlyStatus.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer.listener;
+
+/**
+ * @author shijia.wxr
+ */
+public enum ConsumeConcurrentlyStatus {
+    /**
+     * Success consumption
+     */
+    CONSUME_SUCCESS,
+    /**
+     * Failure consumption,later try to consume
+     */
+    RECONSUME_LATER;
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeOrderlyContext.java
----------------------------------------------------------------------
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeOrderlyContext.java
 
b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeOrderlyContext.java
new file mode 100644
index 0000000..2adeb29
--- /dev/null
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeOrderlyContext.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer.listener;
+
+import org.apache.rocketmq.common.message.MessageQueue;
+
+
+/**
+ * Consumer Orderly consumption context
+ *
+ * @author shijia.wxr
+ */
+public class ConsumeOrderlyContext {
+    private final MessageQueue messageQueue;
+    private boolean autoCommit = true;
+    private long suspendCurrentQueueTimeMillis = -1;
+
+
+    public ConsumeOrderlyContext(MessageQueue messageQueue) {
+        this.messageQueue = messageQueue;
+    }
+
+
+    public boolean isAutoCommit() {
+        return autoCommit;
+    }
+
+
+    public void setAutoCommit(boolean autoCommit) {
+        this.autoCommit = autoCommit;
+    }
+
+
+    public MessageQueue getMessageQueue() {
+        return messageQueue;
+    }
+
+
+    public long getSuspendCurrentQueueTimeMillis() {
+        return suspendCurrentQueueTimeMillis;
+    }
+
+
+    public void setSuspendCurrentQueueTimeMillis(long 
suspendCurrentQueueTimeMillis) {
+        this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeOrderlyStatus.java
----------------------------------------------------------------------
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeOrderlyStatus.java
 
b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeOrderlyStatus.java
new file mode 100644
index 0000000..7da0b1f
--- /dev/null
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeOrderlyStatus.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer.listener;
+
+/**
+ * @author shijia.wxr
+ */
+public enum ConsumeOrderlyStatus {
+    /**
+     * Success consumption
+     */
+    SUCCESS,
+    /**
+     * Rollback consumption(only for binlog consumption)
+     */
+    @Deprecated
+    ROLLBACK,
+    /**
+     * Commit offset(only for binlog consumption)
+     */
+    @Deprecated
+    COMMIT,
+    /**
+     * Suspend current queue a moment
+     */
+    SUSPEND_CURRENT_QUEUE_A_MOMENT;
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeReturnType.java
----------------------------------------------------------------------
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeReturnType.java
 
b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeReturnType.java
new file mode 100644
index 0000000..82570ab
--- /dev/null
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeReturnType.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.client.consumer.listener;
+
+/**
+ * Created by alvin on 16-11-30.
+ */
+public enum ConsumeReturnType {
+    /**
+     * consume return success
+     */
+    SUCCESS,
+    /**
+     * consume timeout ,even if success
+     */
+    TIME_OUT,
+    /**
+     * consume throw exception
+     */
+    EXCEPTION,
+    /**
+     * consume return null
+     */
+    RETURNNULL,
+    /**
+     * consume return failed
+     */
+    FAILED
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/listener/MessageListener.java
----------------------------------------------------------------------
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/listener/MessageListener.java
 
b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/MessageListener.java
new file mode 100644
index 0000000..adc2651
--- /dev/null
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/MessageListener.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer.listener;
+
+/**
+ * A MessageListener object is used to receive asynchronously delivered 
messages.
+ *
+ * @author shijia.wxr
+ */
+public interface MessageListener {
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/listener/MessageListenerConcurrently.java
----------------------------------------------------------------------
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/listener/MessageListenerConcurrently.java
 
b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/MessageListenerConcurrently.java
new file mode 100644
index 0000000..3df6cc2
--- /dev/null
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/MessageListenerConcurrently.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer.listener;
+
+import org.apache.rocketmq.common.message.MessageExt;
+
+import java.util.List;
+
+
+/**
+ * A MessageListenerConcurrently object is used to receive asynchronously 
delivered messages concurrently
+ *
+ * @author shijia.wxr
+ */
+public interface MessageListenerConcurrently extends MessageListener {
+    /**
+     * It is not recommend to throw exception,rather than returning 
ConsumeConcurrentlyStatus.RECONSUME_LATER if consumption failure
+     *
+     * @param msgs
+     *         msgs.size() >= 1<br>
+     *         DefaultMQPushConsumer.consumeMessageBatchMaxSize=1,you can 
modify here
+     * @param context
+     *
+     * @return The consume status
+     */
+    ConsumeConcurrentlyStatus consumeMessage(final List<MessageExt> msgs,
+                                             final ConsumeConcurrentlyContext 
context);
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/listener/MessageListenerOrderly.java
----------------------------------------------------------------------
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/listener/MessageListenerOrderly.java
 
b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/MessageListenerOrderly.java
new file mode 100644
index 0000000..d1b6c79
--- /dev/null
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/MessageListenerOrderly.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer.listener;
+
+import org.apache.rocketmq.common.message.MessageExt;
+
+import java.util.List;
+
+
+/**
+ * A MessageListenerConcurrently object is used to receive asynchronously 
delivered messages orderly.one queue,one thread
+ *
+ * @author shijia.wxr
+ */
+public interface MessageListenerOrderly extends MessageListener {
+    /**
+     * It is not recommend to throw exception,rather than returning 
ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT if consumption failure
+     *
+     * @param msgs
+     *         msgs.size() >= 1<br>
+     *         DefaultMQPushConsumer.consumeMessageBatchMaxSize=1,you can 
modify here
+     * @param context
+     *
+     * @return The consume status
+     */
+    ConsumeOrderlyStatus consumeMessage(final List<MessageExt> msgs,
+                                        final ConsumeOrderlyContext context);
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragely.java
----------------------------------------------------------------------
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragely.java
 
b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragely.java
new file mode 100644
index 0000000..747df83
--- /dev/null
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragely.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer.rebalance;
+
+import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+ * Average Hashing queue algorithm
+ *
+ * @author manhong.yqd
+ */
+public class AllocateMessageQueueAveragely implements 
AllocateMessageQueueStrategy {
+    private final Logger log = ClientLogger.getLog();
+
+    @Override
+    public List<MessageQueue> allocate(String consumerGroup, String 
currentCID, List<MessageQueue> mqAll,
+                                       List<String> cidAll) {
+        if (currentCID == null || currentCID.length() < 1) {
+            throw new IllegalArgumentException("currentCID is empty");
+        }
+        if (mqAll == null || mqAll.isEmpty()) {
+            throw new IllegalArgumentException("mqAll is null or mqAll empty");
+        }
+        if (cidAll == null || cidAll.isEmpty()) {
+            throw new IllegalArgumentException("cidAll is null or cidAll 
empty");
+        }
+
+        List<MessageQueue> result = new ArrayList<MessageQueue>();
+        if (!cidAll.contains(currentCID)) {
+            log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in 
cidAll: {}",
+                    consumerGroup,
+                    currentCID,
+                    cidAll);
+            return result;
+        }
+
+        int index = cidAll.indexOf(currentCID);
+        int mod = mqAll.size() % cidAll.size();
+        int averageSize =
+                mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? 
mqAll.size() / cidAll.size()
+                        + 1 : mqAll.size() / cidAll.size());
+        int startIndex = (mod > 0 && index < mod) ? index * averageSize : 
index * averageSize + mod;
+        int range = Math.min(averageSize, mqAll.size() - startIndex);
+        for (int i = 0; i < range; i++) {
+            result.add(mqAll.get((startIndex + i) % mqAll.size()));
+        }
+        return result;
+    }
+
+    @Override
+    public String getName() {
+        return "AVG";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragelyByCircle.java
----------------------------------------------------------------------
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragelyByCircle.java
 
b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragelyByCircle.java
new file mode 100644
index 0000000..d6ab041
--- /dev/null
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragelyByCircle.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer.rebalance;
+
+import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+ * Cycle average Hashing queue algorithm
+ *
+ * @author manhong.yqd
+ */
+public class AllocateMessageQueueAveragelyByCircle implements 
AllocateMessageQueueStrategy {
+    private final Logger log = ClientLogger.getLog();
+
+    @Override
+    public List<MessageQueue> allocate(String consumerGroup, String 
currentCID, List<MessageQueue> mqAll,
+                                       List<String> cidAll) {
+        if (currentCID == null || currentCID.length() < 1) {
+            throw new IllegalArgumentException("currentCID is empty");
+        }
+        if (mqAll == null || mqAll.isEmpty()) {
+            throw new IllegalArgumentException("mqAll is null or mqAll empty");
+        }
+        if (cidAll == null || cidAll.isEmpty()) {
+            throw new IllegalArgumentException("cidAll is null or cidAll 
empty");
+        }
+
+        List<MessageQueue> result = new ArrayList<MessageQueue>();
+        if (!cidAll.contains(currentCID)) {
+            log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in 
cidAll: {}",
+                    consumerGroup,
+                    currentCID,
+                    cidAll);
+            return result;
+        }
+
+        int index = cidAll.indexOf(currentCID);
+        for (int i = index; i < mqAll.size(); i++) {
+            if (i % cidAll.size() == index) {
+                result.add(mqAll.get(i));
+            }
+        }
+        return result;
+    }
+
+    @Override
+    public String getName() {
+        return "AVG_BY_CIRCLE";
+    }
+}


Reply via email to