This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new cc1dfb2  Add Push Consumer (#47)
cc1dfb2 is described below

commit cc1dfb23aab9d8f18b21b587b0877ac2542b866b
Author: wenfeng <[email protected]>
AuthorDate: Fri Apr 26 14:00:48 2019 +0800

    Add Push Consumer (#47)
    
    * add push consumer
    
    * add strategy
    
    * push consumer done
    
    * test push consumer
    
    * fix push_consumer fatal error
    
    * add license
---
 config.go                                        |   1 +
 consumer.go                                      | 302 ---------
 consumer/consumer.go                             | 830 +++++++++++++++++++++++
 consumer/offset_store.go                         |  55 ++
 consumer/process_queue.go                        | 100 +++
 consumer/pull_consumer.go                        | 198 ++++++
 consumer/pull_consumer_test.go                   |  18 +
 consumer/push_consumer.go                        | 606 +++++++++++++++++
 consumer/push_consumer_test.go                   |  18 +
 consumer/statistics.go                           |  60 ++
 consumer/strategy.go                             | 130 ++++
 consumer_test.go                                 |  18 -
 examples/main.go                                 |  73 --
 examples/producer.go                             |  50 --
 examples/producer/main.go                        |  47 ++
 examples/producer_orderly.go                     |  75 --
 examples/pull_consumer.go                        |  74 --
 examples/push_consumer.go                        |  59 --
 go.mod                                           |   1 +
 go.sum                                           |   2 +
 kernel/client.go                                 | 347 ++++++++--
 kernel/client_test.go                            |  45 ++
 kernel/constants.go                              |  27 +
 kernel/message.go                                |  54 +-
 kernel/model.go                                  | 129 ++--
 kernel/perm.go                                   |  30 +-
 kernel/request.go                                |  49 +-
 kernel/route.go                                  | 148 ++--
 kernel/validators.go                             |  46 ++
 remote/codes.go                                  | 147 ----
 remote/{client.go => remote_client.go}           |  33 +-
 remote/{client_test.go => remote_client_test.go} |   0
 rlog/log.go                                      |  10 -
 utils/helper.go                                  |  13 +-
 utils/helper_test.go                             |   4 +-
 utils/math.go                                    |  32 +
 utils/string.go                                  |  40 ++
 37 files changed, 2773 insertions(+), 1098 deletions(-)

diff --git a/config.go b/config.go
new file mode 100644
index 0000000..5519947
--- /dev/null
+++ b/config.go
@@ -0,0 +1 @@
+package rocketmq
diff --git a/consumer.go b/consumer.go
deleted file mode 100644
index 5119f93..0000000
--- a/consumer.go
+++ /dev/null
@@ -1,302 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements.  See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package rocketmq
-
-import (
-       "context"
-       "errors"
-       "fmt"
-       "github.com/apache/rocketmq-client-go/kernel"
-       "github.com/apache/rocketmq-client-go/rlog"
-       "strconv"
-       "sync"
-       "sync/atomic"
-       "time"
-)
-
-type Consumer interface {
-       Start()
-       Pull(topic, expression string, numbers int) (*kernel.PullResult, error)
-       SubscribeWithChan(topic, expression string) (chan *kernel.Message, 
error)
-       SubscribeWithFunc(topic, expression string, f func(msg *kernel.Message) 
ConsumeResult) error
-       ACK(msg *kernel.Message, result ConsumeResult)
-}
-
-var (
-       queueCounterTable sync.Map
-)
-
-type ConsumeResult int
-
-type ConsumerType int
-
-const (
-       Original ConsumerType = iota
-       Orderly
-       Transaction
-
-       SubAll = "*"
-)
-
-type ConsumerConfig struct {
-       GroupName                  string
-       Model                      kernel.MessageModel
-       UnitMode                   bool
-       MaxReconsumeTimes          int
-       PullMessageTimeout         time.Duration
-       FromWhere                  kernel.ConsumeFromWhere
-       brokerSuspendMaxTimeMillis int64
-}
-
-func NewConsumer(config ConsumerConfig) Consumer {
-       return &defaultConsumer{
-               config: config,
-       }
-}
-
-type defaultConsumer struct {
-       state  kernel.ServiceState
-       config ConsumerConfig
-}
-
-func (c *defaultConsumer) Start() {
-       c.state = kernel.Running
-}
-
-func (c *defaultConsumer) Pull(topic, expression string, numbers int) 
(*kernel.PullResult, error) {
-       mq := getNextQueueOf(topic)
-       if mq == nil {
-               return nil, fmt.Errorf("prepard to pull topic: %s, but no queue 
is founded", topic)
-       }
-
-       data := getSubscriptionData(mq, expression)
-       result, err := c.pull(context.Background(), mq, data, 
c.nextOffsetOf(mq), numbers)
-
-       if err != nil {
-               return nil, err
-       }
-
-       processPullResult(mq, result, data)
-       return result, nil
-}
-
-// SubscribeWithChan ack manually
-func (c *defaultConsumer) SubscribeWithChan(topic, expression string) (chan 
*kernel.Message, error) {
-       return nil, nil
-}
-
-// SubscribeWithFunc ack automatic
-func (c *defaultConsumer) SubscribeWithFunc(topic, expression string,
-       f func(msg *kernel.Message) ConsumeResult) error {
-       return nil
-}
-
-func (c *defaultConsumer) ACK(msg *kernel.Message, result ConsumeResult) {
-
-}
-
-func (c *defaultConsumer) pull(ctx context.Context, mq *kernel.MessageQueue, 
data *kernel.SubscriptionData,
-       offset int64, numbers int) (*kernel.PullResult, error) {
-       err := c.makeSureStateOK()
-       if err != nil {
-               return nil, err
-       }
-
-       if mq == nil {
-               return nil, errors.New("MessageQueue is nil")
-       }
-
-       if offset < 0 {
-               return nil, errors.New("offset < 0")
-       }
-
-       if numbers <= 0 {
-               numbers = 1
-       }
-       c.subscriptionAutomatically(mq.Topic)
-
-       brokerResult := tryFindBroker(mq)
-       if brokerResult == nil {
-               return nil, fmt.Errorf("the broker %s does not exist", 
mq.BrokerName)
-       }
-
-       if (data.ExpType == kernel.TAG) && brokerResult.BrokerVersion < 
kernel.V4_1_0 {
-               return nil, fmt.Errorf("the broker [%s, %v] does not upgrade to 
support for filter message by %v",
-                       mq.BrokerName, brokerResult.BrokerVersion, data.ExpType)
-       }
-
-       sysFlag := buildSysFlag(false, true, true, false)
-
-       if brokerResult.Slave {
-               sysFlag = clearCommitOffsetFlag(sysFlag)
-       }
-       pullRequest := &kernel.PullMessageRequest{
-               ConsumerGroup:        c.config.GroupName,
-               Topic:                mq.Topic,
-               QueueId:              int32(mq.QueueId),
-               QueueOffset:          offset,
-               MaxMsgNums:           int32(numbers),
-               SysFlag:              sysFlag,
-               CommitOffset:         0,
-               SuspendTimeoutMillis: c.config.brokerSuspendMaxTimeMillis,
-               SubExpression:        data.SubString,
-               ExpressionType:       string(data.ExpType),
-       }
-
-       if data.ExpType == kernel.TAG {
-               pullRequest.SubVersion = 0
-       } else {
-               pullRequest.SubVersion = data.SubVersion
-       }
-
-       // TODO computePullFromWhichFilterServer
-       return kernel.PullMessage(ctx, brokerResult.BrokerAddr, pullRequest)
-}
-
-func (c *defaultConsumer) makeSureStateOK() error {
-       if c.state != kernel.Running {
-               return fmt.Errorf("the consumer state is [%d], not running", 
c.state)
-       }
-       return nil
-}
-
-func (c *defaultConsumer) subscriptionAutomatically(topic string) {
-       // TODO
-}
-
-func (c *defaultConsumer) nextOffsetOf(queue *kernel.MessageQueue) int64 {
-       return 0
-}
-
-func toMessage(messageExts []*kernel.MessageExt) []*kernel.Message {
-       msgs := make([]*kernel.Message, 0)
-
-       return msgs
-}
-
-func processPullResult(mq *kernel.MessageQueue, result *kernel.PullResult, 
data *kernel.SubscriptionData) {
-       updatePullFromWhichNode(mq, result.SuggestWhichBrokerId)
-       switch result.Status {
-       case kernel.PullFound:
-               msgs := result.GetMessageExts()
-               msgListFilterAgain := msgs
-               if len(data.Tags) > 0 && data.ClassFilterMode {
-                       msgListFilterAgain = make([]*kernel.MessageExt, 
len(msgs))
-                       for _, msg := range msgs {
-                               _, exist := data.Tags[msg.GetTags()]
-                               if exist {
-                                       msgListFilterAgain = 
append(msgListFilterAgain, msg)
-                               }
-                       }
-               }
-
-               // TODO hook
-
-               for _, msg := range msgListFilterAgain {
-                       traFlag, _ := 
strconv.ParseBool(msg.Properties[kernel.TransactionPrepared])
-                       if traFlag {
-                               msg.TransactionId = 
msg.Properties[kernel.UniqueClientMessageIdKeyIndex]
-                       }
-
-                       msg.Properties[kernel.MinOffset] = 
strconv.FormatInt(result.MinOffset, 10)
-                       msg.Properties[kernel.MaxOffset] = 
strconv.FormatInt(result.MaxOffset, 10)
-               }
-
-               result.SetMessageExts(msgListFilterAgain)
-       }
-}
-
-func getSubscriptionData(mq *kernel.MessageQueue, exp string) 
*kernel.SubscriptionData {
-       subData := &kernel.SubscriptionData{
-               Topic: mq.Topic,
-       }
-       if exp == "" || exp == SubAll {
-               subData.SubString = SubAll
-       } else {
-               // TODO
-       }
-       return subData
-}
-
-func getNextQueueOf(topic string) *kernel.MessageQueue {
-       queues, err := kernel.FetchSubscribeMessageQueues(topic)
-       if err != nil && len(queues) > 0 {
-               rlog.Error(err.Error())
-               return nil
-       }
-       var index int64
-       v, exist := queueCounterTable.Load(topic)
-       if !exist {
-               index = -1
-               queueCounterTable.Store(topic, 0)
-       } else {
-               index = v.(int64)
-       }
-
-       return queues[int(atomic.AddInt64(&index, 1))%len(queues)]
-}
-
-func buildSysFlag(commitOffset, suspend, subscription, classFilter bool) int32 
{
-       var flag int32 = 0
-       if commitOffset {
-               flag |= 0x1 << 0
-       }
-
-       if suspend {
-               flag |= 0x1 << 1
-       }
-
-       if subscription {
-               flag |= 0x1 << 2
-       }
-
-       if classFilter {
-               flag |= 0x1 << 3
-       }
-
-       return flag
-}
-
-func clearCommitOffsetFlag(sysFlag int32) int32 {
-       return sysFlag & (^0x1 << 0)
-}
-
-func tryFindBroker(mq *kernel.MessageQueue) *kernel.FindBrokerResult {
-       result := kernel.FindBrokerAddressInSubscribe(mq.BrokerName, 
recalculatePullFromWhichNode(mq), false)
-
-       if result == nil {
-               kernel.UpdateTopicRouteInfo(mq.Topic)
-       }
-       return kernel.FindBrokerAddressInSubscribe(mq.BrokerName, 
recalculatePullFromWhichNode(mq), false)
-}
-
-var (
-       pullFromWhichNodeTable sync.Map
-)
-
-func updatePullFromWhichNode(mq *kernel.MessageQueue, brokerId int64) {
-       pullFromWhichNodeTable.Store(mq.HashCode(), brokerId)
-}
-
-func recalculatePullFromWhichNode(mq *kernel.MessageQueue) int64 {
-       v, exist := pullFromWhichNodeTable.Load(mq.HashCode())
-       if exist {
-               return v.(int64)
-       }
-       return kernel.MasterId
-}
diff --git a/consumer/consumer.go b/consumer/consumer.go
new file mode 100644
index 0000000..60d3dd5
--- /dev/null
+++ b/consumer/consumer.go
@@ -0,0 +1,830 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package consumer
+
+import (
+       "encoding/json"
+       "fmt"
+       "github.com/apache/rocketmq-client-go/kernel"
+       "github.com/apache/rocketmq-client-go/remote"
+       "github.com/apache/rocketmq-client-go/rlog"
+       "github.com/apache/rocketmq-client-go/utils"
+       "github.com/tidwall/gjson"
+       "sort"
+       "strings"
+       "sync"
+       "sync/atomic"
+       "time"
+)
+
+const (
+       // Delay some time when exception error
+       _PullDelayTimeWhenError = 3 * time.Second
+
+       // Flow control interval
+       _PullDelayTimeWhenFlowControl = 50 * time.Millisecond
+
+       // Delay some time when suspend pull service
+       _PullDelayTimeWhenSuspend = 30 * time.Second
+
+       // Long polling mode, the Consumer connection max suspend time
+       _BrokerSuspendMaxTime = 20 * time.Second
+
+       // Long polling mode, the Consumer connection timeout (must greater 
than _BrokerSuspendMaxTime)
+       _ConsumerTimeoutWhenSuspend = 30 * time.Second
+
+       // Offset persistent interval for consumer
+       _PersistConsumerOffsetInterval = 5 * time.Second
+)
+
+// Message model defines the way how messages are delivered to each consumer 
clients.
+// </p>
+//
+// RocketMQ supports two message models: clustering and broadcasting. If 
clustering is set, consumer clients with
+// the same {@link #consumerGroup} would only consume shards of the messages 
subscribed, which achieves load
+// balances; Conversely, if the broadcasting is set, each consumer client will 
consume all subscribed messages
+// separately.
+// </p>
+//
+// This field defaults to clustering.
+type MessageModel int
+
+const (
+       BroadCasting MessageModel = iota
+       Clustering
+)
+
+func (mode MessageModel) String() string {
+       switch mode {
+       case BroadCasting:
+               return "BroadCasting"
+       case Clustering:
+               return "Clustering"
+       default:
+               return "Unknown"
+       }
+}
+
+// Consuming point on consumer booting.
+// </p>
+//
+// There are three consuming points:
+// <ul>
+// <li>
+// <code>CONSUME_FROM_LAST_OFFSET</code>: consumer clients pick up where it 
stopped previously.
+// If it were a newly booting up consumer client, according aging of the 
consumer group, there are two
+// cases:
+// <ol>
+// <li>
+// if the consumer group is created so recently that the earliest message 
being subscribed has yet
+// expired, which means the consumer group represents a lately launched 
business, consuming will
+// start from the very beginning;
+// </li>
+// <li>
+// if the earliest message being subscribed has expired, consuming will start 
from the latest
+// messages, meaning messages born prior to the booting timestamp would be 
ignored.
+// </li>
+// </ol>
+// </li>
+// <li>
+// <code>CONSUME_FROM_FIRST_OFFSET</code>: Consumer client will start from 
earliest messages available.
+// </li>
+// <li>
+// <code>CONSUME_FROM_TIMESTAMP</code>: Consumer client will start from 
specified timestamp, which means
+// messages born prior to {@link #consumeTimestamp} will be ignored
+// </li>
+// </ul>
+type ConsumeFromWhere int
+
+const (
+       ConsumeFromLastOffset ConsumeFromWhere = iota
+       ConsumeFromFirstOffset
+       ConsumeFromTimestamp
+)
+
+type ConsumeType string
+
+const (
+       _PullConsume = ConsumeType("pull")
+       _PushConsume = ConsumeType("push")
+)
+
+type ExpressionType string
+
+const (
+       /**
+        * <ul>
+        * Keywords:
+        * <li>{@code AND, OR, NOT, BETWEEN, IN, TRUE, FALSE, IS, NULL}</li>
+        * </ul>
+        * <p/>
+        * <ul>
+        * Data type:
+        * <li>Boolean, like: TRUE, FALSE</li>
+        * <li>String, like: 'abc'</li>
+        * <li>Decimal, like: 123</li>
+        * <li>Float number, like: 3.1415</li>
+        * </ul>
+        * <p/>
+        * <ul>
+        * Grammar:
+        * <li>{@code AND, OR}</li>
+        * <li>{@code >, >=, <, <=, =}</li>
+        * <li>{@code BETWEEN A AND B}, equals to {@code >=A AND <=B}</li>
+        * <li>{@code NOT BETWEEN A AND B}, equals to {@code >B OR <A}</li>
+        * <li>{@code IN ('a', 'b')}, equals to {@code ='a' OR ='b'}, this 
operation only support String type.</li>
+        * <li>{@code IS NULL}, {@code IS NOT NULL}, check parameter whether is 
null, or not.</li>
+        * <li>{@code =TRUE}, {@code =FALSE}, check parameter whether is true, 
or false.</li>
+        * </ul>
+        * <p/>
+        * <p>
+        * Example:
+        * (a > 10 AND a < 100) OR (b IS NOT NULL AND b=TRUE)
+        * </p>
+        */
+       SQL92 = ExpressionType("SQL92")
+
+       /**
+        * Only support or operation such as
+        * "tag1 || tag2 || tag3", <br>
+        * If null or * expression, meaning subscribe all.
+        */
+       TAG = ExpressionType("TAG")
+)
+
+func IsTagType(exp string) bool {
+       if exp == "" || exp == "TAG" {
+               return true
+       }
+       return false
+}
+
+const (
+       _SubAll = "*"
+)
+
+type PullRequest struct {
+       consumerGroup string
+       mq            *kernel.MessageQueue
+       pq            *ProcessQueue
+       nextOffset    int64
+       lockedFirst   bool
+}
+
+func (pr *PullRequest) String() string {
+       return fmt.Sprintf("[ConsumerGroup: %s, Topic: %s, MessageQueue: %d]",
+               pr.consumerGroup, pr.mq.Topic, pr.mq.QueueId)
+}
+
+type ConsumerOption struct {
+       kernel.ClientOption
+       // The socket timeout in milliseconds
+       ConsumerPullTimeout time.Duration
+
+       // Concurrently max span offset.it has no effect on sequential 
consumption
+       ConsumeConcurrentlyMaxSpan int
+
+       // Flow control threshold on queue level, each message queue will cache 
at most 1000 messages by default,
+       // Consider the {PullBatchSize}, the instantaneous value may exceed the 
limit
+       PullThresholdForQueue int
+
+       // Limit the cached message size on queue level, each message queue 
will cache at most 100 MiB messages by default,
+       // Consider the {@code pullBatchSize}, the instantaneous value may 
exceed the limit
+       //
+       // The size of a message only measured by message body, so it's not 
accurate
+       PullThresholdSizeForQueue int
+
+       // Flow control threshold on topic level, default value is -1(Unlimited)
+       //
+       // The value of {@code pullThresholdForQueue} will be overwrote and 
calculated based on
+       // {@code pullThresholdForTopic} if it is't unlimited
+       //
+       // For example, if the value of pullThresholdForTopic is 1000 and 10 
message queues are assigned to this consumer,
+       // then pullThresholdForQueue will be set to 100
+       PullThresholdForTopic int
+
+       // Limit the cached message size on topic level, default value is -1 
MiB(Unlimited)
+       //
+       // The value of {@code pullThresholdSizeForQueue} will be overwrote and 
calculated based on
+       // {@code pullThresholdSizeForTopic} if it is't unlimited
+       //
+       // For example, if the value of pullThresholdSizeForTopic is 1000 MiB 
and 10 message queues are
+       // assigned to this consumer, then pullThresholdSizeForQueue will be 
set to 100 MiB
+       PullThresholdSizeForTopic int
+
+       // Message pull Interval
+       PullInterval time.Duration
+
+       // Batch consumption size
+       ConsumeMessageBatchMaxSize int
+
+       // Batch pull size
+       PullBatchSize int32
+
+       // Whether update subscription relationship when every pull
+       PostSubscriptionWhenPull bool
+
+       // Max re-consume times. -1 means 16 times.
+       //
+       // If messages are re-consumed more than {@link #maxReconsumeTimes} 
before success, it's be directed to a deletion
+       // queue waiting.
+       MaxReconsumeTimes int
+
+       // Suspending pulling time for cases requiring slow pulling like 
flow-control scenario.
+       SuspendCurrentQueueTimeMillis time.Duration
+
+       // Maximum amount of time a message may block the consuming thread.
+       ConsumeTimeout time.Duration
+
+       ConsumerModel  MessageModel
+       Strategy       AllocateStrategy
+       ConsumeOrderly bool
+       FromWhere      ConsumeFromWhere
+       // TODO traceDispatcher
+}
+
+// TODO hook
+type defaultConsumer struct {
+       /**
+        * Consumers of the same role is required to have exactly same 
subscriptions and consumerGroup to correctly achieve
+        * load balance. It's required and needs to be globally unique.
+        * </p>
+        *
+        * See <a href="http://rocketmq.apache.org/docs/core-concept/";>here</a> 
for further discussion.
+        */
+       consumerGroup  string
+       model          MessageModel
+       allocate       func(string, string, []*kernel.MessageQueue, []string) 
[]*kernel.MessageQueue
+       unitMode       bool
+       consumeOrderly bool
+       fromWhere      ConsumeFromWhere
+
+       cType     ConsumeType
+       client    *kernel.RMQClient
+       mqChanged func(topic string, mqAll, mqDivided []*kernel.MessageQueue)
+       state     kernel.ServiceState
+       pause     bool
+       once      sync.Once
+       option    ConsumerOption
+       // key: int, hash(*kernel.MessageQueue)
+       // value: *processQueue
+       processQueueTable sync.Map
+
+       // key: topic(string)
+       // value: map[int]*kernel.MessageQueue
+       topicSubscribeInfoTable sync.Map
+
+       // key: topic
+       // value: *SubscriptionData
+       subscriptionDataTable sync.Map
+       storage               OffsetStore
+       // chan for push consumer
+       prCh chan PullRequest
+}
+
+func (dc *defaultConsumer) persistConsumerOffset() {
+       err := dc.makeSureStateOK()
+       if err != nil {
+               rlog.Errorf("consumer state error: %s", err.Error())
+               return
+       }
+       mqs := make([]*kernel.MessageQueue, 0)
+       dc.processQueueTable.Range(func(key, value interface{}) bool {
+               mqs = append(mqs, key.(*kernel.MessageQueue))
+               return true
+       })
+       dc.storage.persist(mqs)
+}
+
+func (dc *defaultConsumer) updateTopicSubscribeInfo(topic string, mqs 
[]*kernel.MessageQueue) {
+       _, exist := dc.subscriptionDataTable.Load(topic)
+       // does subscribe, if true, replace it
+       if exist {
+               mqSet := make(map[int]*kernel.MessageQueue, 0)
+               for idx := range mqs {
+                       mq := mqs[idx]
+                       mqSet[mq.HashCode()] = mq
+               }
+               dc.topicSubscribeInfoTable.Store(topic, mqs)
+       }
+}
+
+func (dc *defaultConsumer) isSubscribeTopicNeedUpdate(topic string) bool {
+       _, exist := dc.subscriptionDataTable.Load(topic)
+       if !exist {
+               return false
+       }
+       _, exist = dc.topicSubscribeInfoTable.Load(topic)
+       return !exist
+}
+
+func (dc *defaultConsumer) doBalance() {
+       dc.subscriptionDataTable.Range(func(key, value interface{}) bool {
+               topic := key.(string)
+               if strings.HasPrefix(topic, kernel.RetryGroupTopicPrefix) {
+                       return true
+               }
+               v, exist := dc.topicSubscribeInfoTable.Load(topic)
+               if !exist {
+                       rlog.Warnf("do balance of group: %s, but topic: %s does 
not exist.", dc.consumerGroup, topic)
+                       return true
+               }
+               mqs := v.([]*kernel.MessageQueue)
+               switch dc.model {
+               case BroadCasting:
+                       changed := dc.updateProcessQueueTable(topic, mqs)
+                       if changed {
+                               dc.mqChanged(topic, mqs, mqs)
+                               rlog.Infof("messageQueueChanged, Group: %s, 
Topic: %s, MessageQueues: %v",
+                                       dc.consumerGroup, topic, mqs)
+                       }
+               case Clustering:
+                       cidAll := dc.findConsumerList(topic)
+                       if cidAll == nil {
+                               rlog.Warnf("do balance for Group: %s, Topic: %s 
get consumer id list failed",
+                                       dc.consumerGroup, topic)
+                               return true
+                       }
+                       mqAll := make([]*kernel.MessageQueue, len(mqs))
+                       copy(mqAll, mqs)
+                       sort.Strings(cidAll)
+                       sort.SliceStable(mqAll, func(i, j int) bool {
+                               v := strings.Compare(mqAll[i].Topic, 
mqAll[j].Topic)
+                               if v != 0 {
+                                       return v > 0
+                               }
+
+                               v = strings.Compare(mqAll[i].BrokerName, 
mqAll[j].BrokerName)
+                               if v != 0 {
+                                       return v > 0
+                               }
+                               return (mqAll[i].QueueId - mqAll[j].QueueId) > 0
+                       })
+                       allocateResult := dc.allocate(dc.consumerGroup, 
dc.client.ClientID(), mqAll, cidAll)
+                       changed := dc.updateProcessQueueTable(topic, 
allocateResult)
+                       if changed {
+                               dc.mqChanged(topic, mqAll, allocateResult)
+                               rlog.Infof("do balance result changed, 
allocateMessageQueueStrategyName=%s, group=%s, "+
+                                       "topic=%s, clientId=%s, mqAllSize=%d, 
cidAllSize=%d, rebalanceResultSize=%d, "+
+                                       "rebalanceResultSet=%v", 
string(dc.option.Strategy), dc.consumerGroup, topic, dc.client.ClientID(), 
len(mqAll),
+                                       len(cidAll), len(allocateResult), 
allocateResult)
+
+                       }
+               }
+               return true
+       })
+}
+
+func (dc *defaultConsumer) SubscriptionDataList() []*kernel.SubscriptionData {
+       result := make([]*kernel.SubscriptionData, 0)
+       dc.subscriptionDataTable.Range(func(key, value interface{}) bool {
+               result = append(result, value.(*kernel.SubscriptionData))
+               return true
+       })
+       return result
+}
+
+func (dc *defaultConsumer) makeSureStateOK() error {
+       // TODO log
+       return nil //dc.state == StateRunning
+}
+
+type lockBatchRequestBody struct {
+       ConsumerGroup string                 `json:"consumerGroup"`
+       ClientId      string                 `json:"clientId"`
+       MQs           []*kernel.MessageQueue `json:"mqSet"`
+}
+
+func (dc *defaultConsumer) lock(mq *kernel.MessageQueue) bool {
+       brokerResult := kernel.FindBrokerAddressInSubscribe(mq.BrokerName, 
kernel.MasterId, true)
+
+       if brokerResult == nil {
+               return false
+       }
+
+       body := &lockBatchRequestBody{
+               ConsumerGroup: dc.consumerGroup,
+               ClientId:      dc.client.ClientID(),
+               MQs:           []*kernel.MessageQueue{mq},
+       }
+       lockedMQ := dc.doLock(brokerResult.BrokerAddr, body)
+       var lockOK bool
+       for idx := range lockedMQ {
+               _mq := lockedMQ[idx]
+               v, exist := dc.processQueueTable.Load(_mq)
+               if exist {
+                       pq := v.(*ProcessQueue)
+                       pq.locked = true
+                       pq.lastConsumeTime = time.Now()
+               }
+               if _mq.Equals(mq) {
+                       lockOK = true
+               }
+       }
+       rlog.Infof("the message queue lock %v, %s %s", lockOK, 
dc.consumerGroup, mq.String())
+       return lockOK
+}
+
+func (dc *defaultConsumer) unlock(mq *kernel.MessageQueue, oneway bool) {
+       brokerResult := kernel.FindBrokerAddressInSubscribe(mq.BrokerName, 
kernel.MasterId, true)
+
+       if brokerResult == nil {
+               return
+       }
+
+       body := &lockBatchRequestBody{
+               ConsumerGroup: dc.consumerGroup,
+               ClientId:      dc.client.ClientID(),
+               MQs:           []*kernel.MessageQueue{mq},
+       }
+       dc.doUnlock(brokerResult.BrokerAddr, body, oneway)
+       rlog.Warnf("unlock messageQueue. group:%s, clientId:%s, mq:%s",
+               dc.consumerGroup, dc.client.ClientID(), mq.String())
+}
+
+func (dc *defaultConsumer) lockAll(mq kernel.MessageQueue) {
+       mqMapSet := dc.buildProcessQueueTableByBrokerName()
+       for broker, mqs := range mqMapSet {
+               if len(mqs) == 0 {
+                       continue
+               }
+               brokerResult := kernel.FindBrokerAddressInSubscribe(broker, 
kernel.MasterId, true)
+               if brokerResult == nil {
+                       continue
+               }
+               body := &lockBatchRequestBody{
+                       ConsumerGroup: dc.consumerGroup,
+                       ClientId:      dc.client.ClientID(),
+                       MQs:           mqs,
+               }
+               lockedMQ := dc.doLock(brokerResult.BrokerAddr, body)
+               set := make(map[int]bool, 0)
+               for idx := range lockedMQ {
+                       _mq := lockedMQ[idx]
+                       v, exist := dc.processQueueTable.Load(_mq)
+                       if exist {
+                               pq := v.(*ProcessQueue)
+                               pq.locked = true
+                               pq.lastConsumeTime = time.Now()
+                       }
+                       set[_mq.HashCode()] = true
+               }
+               for idx := range mqs {
+                       _mq := mqs[idx]
+                       if !set[_mq.HashCode()] {
+                               v, exist := dc.processQueueTable.Load(_mq)
+                               if exist {
+                                       pq := v.(*ProcessQueue)
+                                       pq.locked = true
+                                       pq.lastLockTime = time.Now()
+                                       rlog.Warnf("the message queue: %s 
locked Failed, Group: %s", mq.String(), dc.consumerGroup)
+                               }
+                       }
+               }
+       }
+}
+
+func (dc *defaultConsumer) unlockAll(oneway bool) {
+       mqMapSet := dc.buildProcessQueueTableByBrokerName()
+       for broker, mqs := range mqMapSet {
+               if len(mqs) == 0 {
+                       continue
+               }
+               brokerResult := kernel.FindBrokerAddressInSubscribe(broker, 
kernel.MasterId, true)
+               if brokerResult == nil {
+                       continue
+               }
+               body := &lockBatchRequestBody{
+                       ConsumerGroup: dc.consumerGroup,
+                       ClientId:      dc.client.ClientID(),
+                       MQs:           mqs,
+               }
+               dc.doUnlock(brokerResult.BrokerAddr, body, oneway)
+               for idx := range mqs {
+                       _mq := mqs[idx]
+                       v, exist := dc.processQueueTable.Load(_mq)
+                       if exist {
+                               v.(*ProcessQueue).locked = false
+                               rlog.Warnf("the message queue: %s locked 
Failed, Group: %s", _mq.String(), dc.consumerGroup)
+                       }
+               }
+       }
+}
+
+func (dc *defaultConsumer) doLock(addr string, body *lockBatchRequestBody) 
[]kernel.MessageQueue {
+       data, _ := json.Marshal(body)
+       request := remote.NewRemotingCommand(kernel.ReqLockBatchMQ, nil, data)
+       response, err := remote.InvokeSync(addr, request, 1*time.Second)
+       if err != nil {
+               rlog.Errorf("lock mq to broker: %s error %s", addr, err.Error())
+               return nil
+       }
+       lockOKMQSet := struct {
+               MQs []kernel.MessageQueue `json:"lockOKMQSet"`
+       }{}
+       err = json.Unmarshal(response.Body, &lockOKMQSet)
+       if err != nil {
+               rlog.Errorf("Unmarshal lock mq body error %s", err.Error())
+               return nil
+       }
+       return lockOKMQSet.MQs
+}
+
+func (dc *defaultConsumer) doUnlock(addr string, body *lockBatchRequestBody, 
oneway bool) {
+       data, _ := json.Marshal(body)
+       request := remote.NewRemotingCommand(kernel.ReqUnlockBatchMQ, nil, data)
+       if oneway {
+               err := remote.InvokeOneWay(addr, request)
+               if err != nil {
+                       rlog.Errorf("lock mq to broker with oneway: %s error 
%s", addr, err.Error())
+               }
+       } else {
+               response, err := remote.InvokeSync(addr, request, 1*time.Second)
+               if err != nil {
+                       rlog.Errorf("lock mq to broker: %s error %s", addr, 
err.Error())
+               }
+               if response.Code != kernel.ResSuccess {
+                       // TODO error
+               }
+       }
+}
+
+func (dc *defaultConsumer) buildProcessQueueTableByBrokerName() 
map[string][]*kernel.MessageQueue {
+       result := make(map[string][]*kernel.MessageQueue, 0)
+
+       dc.processQueueTable.Range(func(key, value interface{}) bool {
+               mq := key.(*kernel.MessageQueue)
+               mqs, exist := result[mq.BrokerName]
+               if !exist {
+                       mqs = make([]*kernel.MessageQueue, 0)
+               }
+               mqs = append(mqs, mq)
+               result[mq.BrokerName] = mqs
+               return true
+       })
+
+       return result
+}
+
+func (dc *defaultConsumer) updateProcessQueueTable(topic string, mqs 
[]*kernel.MessageQueue) bool {
+       var changed bool
+       mqSet := make(map[*kernel.MessageQueue]bool)
+       for idx := range mqs {
+               mqSet[mqs[idx]] = true
+       }
+       // TODO
+       dc.processQueueTable.Range(func(key, value interface{}) bool {
+               mq := key.(*kernel.MessageQueue)
+               pq := value.(*ProcessQueue)
+               if mq.Topic == topic {
+                       if !mqSet[mq] {
+                               pq.dropped = true
+                               if dc.removeUnnecessaryMessageQueue(mq, pq) {
+                                       delete(mqSet, mq)
+                                       changed = true
+                                       rlog.Infof("do defaultConsumer, 
Group:%s, remove unnecessary mq: %s", dc.consumerGroup, mq.String())
+                               }
+                       } else if pq.isPullExpired() && dc.cType == 
_PushConsume {
+                               pq.dropped = true
+                               if dc.removeUnnecessaryMessageQueue(mq, pq) {
+                                       delete(mqSet, mq)
+                                       changed = true
+                                       rlog.Infof("do defaultConsumer, 
Group:%s, remove unnecessary mq: %s, "+
+                                               "because pull was paused, so 
try to fixed it", dc.consumerGroup, mq)
+                               }
+                       }
+               }
+               return true
+       })
+
+       if dc.cType == _PushConsume {
+               for mq := range mqSet {
+                       if dc.consumeOrderly && !dc.lock(mq) {
+                               rlog.Warnf("do defaultConsumer, Group:%s add a 
new mq failed, %s, because lock failed",
+                                       dc.consumerGroup, mq.String())
+                               continue
+                       }
+                       dc.storage.remove(mq)
+                       nextOffset := dc.computePullFromWhere(mq)
+                       if nextOffset >= 0 {
+                               _, exist := dc.processQueueTable.Load(mq)
+                               if exist {
+                                       rlog.Debugf("do defaultConsumer, Group: 
%s, mq already exist, %s", dc.consumerGroup, mq.String())
+                               } else {
+                                       rlog.Infof("do defaultConsumer, Group: 
%s, add a new mq, %s", dc.consumerGroup, mq.String())
+                                       pq := &ProcessQueue{}
+                                       dc.processQueueTable.Store(mq, pq)
+                                       pr := PullRequest{
+                                               consumerGroup: dc.consumerGroup,
+                                               mq:            mq,
+                                               pq:            pq,
+                                               nextOffset:    nextOffset,
+                                       }
+                                       dc.prCh <- pr
+                                       changed = true
+                               }
+                       } else {
+                               rlog.Warnf("do defaultConsumer failed, 
Group:%s, add new mq failed, {}", dc.consumerGroup, mq)
+                       }
+               }
+       }
+
+       return changed
+}
+
+func (dc *defaultConsumer) removeUnnecessaryMessageQueue(mq 
*kernel.MessageQueue, pq *ProcessQueue) bool {
+       dc.storage.persist([]*kernel.MessageQueue{mq})
+       dc.storage.remove(mq)
+       if dc.cType == _PushConsume && dc.consumeOrderly && Clustering == 
dc.model {
+               // TODO
+       }
+       return true
+}
+
+func (dc *defaultConsumer) computePullFromWhere(mq *kernel.MessageQueue) int64 
{
+       if dc.cType == _PullConsume {
+               return 0
+       }
+       var result = int64(-1)
+       lastOffset := dc.storage.read(mq, _ReadFromStore)
+       if lastOffset >= 0 {
+               result = lastOffset
+       } else {
+               switch dc.fromWhere {
+               case ConsumeFromLastOffset:
+                       if lastOffset == -1 {
+                               if strings.HasPrefix(mq.Topic, 
kernel.RetryGroupTopicPrefix) {
+                                       lastOffset, err := 
kernel.QueryMaxOffset(mq.Topic, mq.QueueId)
+                                       if err == nil {
+                                               result = lastOffset
+                                       } else {
+                                               rlog.Warnf("query max offset 
of: [%s:%d] error, %s", mq.Topic, mq.QueueId, err.Error())
+                                       }
+                               }
+                       }
+               case ConsumeFromFirstOffset:
+                       if lastOffset == -1 {
+                               result = 0
+                       }
+               case ConsumeFromTimestamp:
+                       if lastOffset == -1 {
+                               if strings.HasPrefix(mq.Topic, 
kernel.RetryGroupTopicPrefix) {
+                                       lastOffset, err := 
kernel.QueryMaxOffset(mq.Topic, mq.QueueId)
+                                       if err == nil {
+                                               result = lastOffset
+                                       } else {
+                                               rlog.Warnf("query max offset 
of: [%s:%d] error, %s", mq.Topic, mq.QueueId, err.Error())
+                                       }
+                               } else {
+                                       // TODO parse timestamp
+                               }
+                       }
+               default:
+               }
+       }
+
+       return result
+}
+
+func (dc *defaultConsumer) findConsumerList(topic string) []string {
+       brokerAddr := kernel.FindBrokerAddrByTopic(topic)
+       if brokerAddr == "" {
+               kernel.UpdateTopicRouteInfo(topic)
+               brokerAddr = kernel.FindBrokerAddrByTopic(topic)
+       }
+
+       if brokerAddr != "" {
+               req := &kernel.GetConsumerList{
+                       ConsumerGroup: dc.consumerGroup,
+               }
+               cmd := 
remote.NewRemotingCommand(kernel.ReqGetConsumerListByGroup, req, nil)
+               res, err := remote.InvokeSync(brokerAddr, cmd, 3*time.Second) 
// TODO 超时机制有问题
+               if err != nil {
+                       rlog.Errorf("get consumer list of [%s] from %s error: 
%s", dc.consumerGroup, brokerAddr, err.Error())
+                       return nil
+               }
+               result := gjson.ParseBytes(res.Body)
+               list := make([]string, 0)
+               arr := result.Get("consumerIdList").Array()
+               for idx := range arr {
+                       list = append(list, arr[idx].String())
+               }
+               return list
+       }
+       return nil
+}
+
+func buildSubscriptionData(topic string, selector MessageSelector) 
*kernel.SubscriptionData {
+       subData := &kernel.SubscriptionData{
+               Topic:     topic,
+               SubString: selector.Expression,
+               ExpType:   string(selector.Type),
+       }
+
+       if selector.Type != "" && selector.Type != TAG {
+               return subData
+       }
+
+       if selector.Expression == "" || selector.Expression == _SubAll {
+               subData.ExpType = string(TAG)
+               subData.SubString = _SubAll
+       } else {
+               tags := strings.Split(selector.Expression, "\\|\\|")
+               for idx := range tags {
+                       trimString := strings.Trim(tags[idx], " ")
+                       if trimString != "" {
+                               if !subData.Tags[trimString] {
+                                       subData.Tags[trimString] = true
+                               }
+                               hCode := utils.HashString(trimString)
+                               if !subData.Codes[int32(hCode)] {
+                                       subData.Codes[int32(hCode)] = true
+                               }
+                       }
+               }
+       }
+       return subData
+}
+
+func getNextQueueOf(topic string) *kernel.MessageQueue {
+       queues, err := kernel.FetchSubscribeMessageQueues(topic)
+       if err != nil && len(queues) > 0 {
+               rlog.Error(err.Error())
+               return nil
+       }
+       var index int64
+       v, exist := queueCounterTable.Load(topic)
+       if !exist {
+               index = -1
+               queueCounterTable.Store(topic, 0)
+       } else {
+               index = v.(int64)
+       }
+
+       return queues[int(atomic.AddInt64(&index, 1))%len(queues)]
+}
+
+func buildSysFlag(commitOffset, suspend, subscription, classFilter bool) int32 
{
+       var flag int32 = 0
+       if commitOffset {
+               flag |= 0x1 << 0
+       }
+
+       if suspend {
+               flag |= 0x1 << 1
+       }
+
+       if subscription {
+               flag |= 0x1 << 2
+       }
+
+       if classFilter {
+               flag |= 0x1 << 3
+       }
+
+       return flag
+}
+
+func clearCommitOffsetFlag(sysFlag int32) int32 {
+       return sysFlag & (^0x1 << 0)
+}
+
+func tryFindBroker(mq *kernel.MessageQueue) *kernel.FindBrokerResult {
+       result := kernel.FindBrokerAddressInSubscribe(mq.BrokerName, 
recalculatePullFromWhichNode(mq), false)
+
+       if result == nil {
+               kernel.UpdateTopicRouteInfo(mq.Topic)
+       }
+       return kernel.FindBrokerAddressInSubscribe(mq.BrokerName, 
recalculatePullFromWhichNode(mq), false)
+}
+
+var (
+       pullFromWhichNodeTable sync.Map
+)
+
+func updatePullFromWhichNode(mq *kernel.MessageQueue, brokerId int64) {
+       pullFromWhichNodeTable.Store(mq.HashCode(), brokerId)
+}
+
+func recalculatePullFromWhichNode(mq *kernel.MessageQueue) int64 {
+       v, exist := pullFromWhichNodeTable.Load(mq.HashCode())
+       if exist {
+               return v.(int64)
+       }
+       return kernel.MasterId
+}
diff --git a/consumer/offset_store.go b/consumer/offset_store.go
new file mode 100644
index 0000000..a11280d
--- /dev/null
+++ b/consumer/offset_store.go
@@ -0,0 +1,55 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package consumer
+
+import "github.com/apache/rocketmq-client-go/kernel"
+
+type readType int
+
+const (
+       _ReadFromMemory readType = iota
+       _ReadFromStore
+       _ReadMemoryThenStore
+)
+
+type OffsetStore interface {
+       load()
+       persist(mqs []*kernel.MessageQueue)
+       remove(mq *kernel.MessageQueue)
+       read(mq *kernel.MessageQueue, t readType) int64
+       update(mq *kernel.MessageQueue, offset int64, increaseOnly bool)
+}
+
+type localFileOffsetStore struct {
+}
+
+func (local *localFileOffsetStore) load()                                      
                     {}
+func (local *localFileOffsetStore) persist(mqs []*kernel.MessageQueue)         
                     {}
+func (local *localFileOffsetStore) remove(mq *kernel.MessageQueue)             
                     {}
+func (local *localFileOffsetStore) read(mq *kernel.MessageQueue, t readType) 
int64                  { return 0 }
+func (local *localFileOffsetStore) update(mq *kernel.MessageQueue, offset 
int64, increaseOnly bool) {}
+
+type remoteBrokerOffsetStore struct {
+}
+
+func (remote *remoteBrokerOffsetStore) load()                                  
        {}
+func (remote *remoteBrokerOffsetStore) persist(mqs []*kernel.MessageQueue)     
        {}
+func (remote *remoteBrokerOffsetStore) remove(mq *kernel.MessageQueue)         
        {}
+func (remote *remoteBrokerOffsetStore) read(mq *kernel.MessageQueue, t 
readType) int64 { return 0 }
+func (remote *remoteBrokerOffsetStore) update(mq *kernel.MessageQueue, offset 
int64, increaseOnly bool) {
+}
diff --git a/consumer/process_queue.go b/consumer/process_queue.go
new file mode 100644
index 0000000..0db0766
--- /dev/null
+++ b/consumer/process_queue.go
@@ -0,0 +1,100 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package consumer
+
+import (
+       "container/list"
+       "github.com/apache/rocketmq-client-go/kernel"
+       "sync"
+       "time"
+)
+
+const (
+       _RebalanceLockMaxTime = 30 * time.Second
+       _RebalanceInterval    = 20 * time.Second
+       _PullMaxIdleTime      = 120 * time.Second
+)
+
+type ProcessQueue struct {
+       mutex                      sync.RWMutex
+       msgCache                   list.List // sorted
+       cachedMsgCount             int
+       cachedMsgSize              int64
+       consumeLock                sync.Mutex
+       consumingMsgOrderlyTreeMap sync.Map
+       tryUnlockTimes             int64
+       queueOffsetMax             int64
+       dropped                    bool
+       lastPullTime               time.Time
+       lastConsumeTime            time.Time
+       locked                     bool
+       lastLockTime               time.Time
+       consuming                  bool
+       msgAccCnt                  int64
+       once                       sync.Once
+}
+
+func (pq *ProcessQueue) isPullExpired() bool {
+       return false
+}
+
+func (pq *ProcessQueue) getMaxSpan() int {
+       return pq.msgCache.Len()
+}
+
+func (pq *ProcessQueue) putMessage(messages []*kernel.MessageExt) {
+       pq.once.Do(func() {
+               pq.msgCache.Init()
+       })
+       localList := list.New()
+       for idx := range messages {
+               localList.PushBack(messages[idx])
+       }
+       pq.mutex.Lock()
+       pq.msgCache.PushBackList(localList)
+       pq.mutex.Unlock()
+}
+
+func (pq *ProcessQueue) removeMessage(number int) int {
+       i := 0
+       pq.mutex.Lock()
+       for ; i < number && pq.msgCache.Len() > 0; i++ {
+               pq.msgCache.Remove(pq.msgCache.Front())
+       }
+       pq.mutex.Unlock()
+       return i
+}
+
+func (pq *ProcessQueue) takeMessages(number int) []*kernel.MessageExt {
+       for pq.msgCache.Len() == 0 {
+               time.Sleep(10 * time.Millisecond)
+       }
+       result := make([]*kernel.MessageExt, number)
+       i := 0
+       pq.mutex.Lock()
+       for ; i < number; i++ {
+               e := pq.msgCache.Front()
+               if e == nil {
+                       break
+               }
+               result[i] = e.Value.(*kernel.MessageExt)
+               pq.msgCache.Remove(e)
+       }
+       pq.mutex.Unlock()
+       return result[:i]
+}
diff --git a/consumer/pull_consumer.go b/consumer/pull_consumer.go
new file mode 100644
index 0000000..1b4f950
--- /dev/null
+++ b/consumer/pull_consumer.go
@@ -0,0 +1,198 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package consumer
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "github.com/apache/rocketmq-client-go/kernel"
+       "strconv"
+       "sync"
+)
+
+type MessageSelector struct {
+       Type       ExpressionType
+       Expression string
+}
+
+type PullConsumer interface {
+       Start()
+       Shutdown()
+       Pull(ctx context.Context, topic string, selector MessageSelector, 
numbers int) (*kernel.PullResult, error)
+}
+
+var (
+       queueCounterTable sync.Map
+)
+
+func NewConsumer(config ConsumerOption) *defaultPullConsumer {
+       return &defaultPullConsumer{
+               option: config,
+       }
+}
+
+type defaultPullConsumer struct {
+       state     kernel.ServiceState
+       option    ConsumerOption
+       client    *kernel.RMQClient
+       GroupName string
+       Model     MessageModel
+       UnitMode  bool
+}
+
+func (c *defaultPullConsumer) Start() {
+       c.state = kernel.StateRunning
+}
+
+func (c *defaultPullConsumer) Pull(ctx context.Context, topic string, selector 
MessageSelector, numbers int) (*kernel.PullResult, error) {
+       mq := getNextQueueOf(topic)
+       if mq == nil {
+               return nil, fmt.Errorf("prepard to pull topic: %s, but no queue 
is founded", topic)
+       }
+
+       data := buildSubscriptionData(mq.Topic, selector)
+       result, err := c.pull(context.Background(), mq, data, 
c.nextOffsetOf(mq), numbers)
+
+       if err != nil {
+               return nil, err
+       }
+
+       processPullResult(mq, result, data)
+       return result, nil
+}
+
+// SubscribeWithChan ack manually
+func (c *defaultPullConsumer) SubscribeWithChan(topic, selector 
MessageSelector) (chan *kernel.Message, error) {
+       return nil, nil
+}
+
+// SubscribeWithFunc ack automatic
+func (c *defaultPullConsumer) SubscribeWithFunc(topic, selector 
MessageSelector,
+       f func(msg *kernel.Message) ConsumeResult) error {
+       return nil
+}
+
+func (c *defaultPullConsumer) ACK(msg *kernel.Message, result ConsumeResult) {
+
+}
+
+func (c *defaultPullConsumer) pull(ctx context.Context, mq 
*kernel.MessageQueue, data *kernel.SubscriptionData,
+       offset int64, numbers int) (*kernel.PullResult, error) {
+       err := c.makeSureStateOK()
+       if err != nil {
+               return nil, err
+       }
+
+       if mq == nil {
+               return nil, errors.New("MessageQueue is nil")
+       }
+
+       if offset < 0 {
+               return nil, errors.New("offset < 0")
+       }
+
+       if numbers <= 0 {
+               numbers = 1
+       }
+       c.subscriptionAutomatically(mq.Topic)
+
+       brokerResult := tryFindBroker(mq)
+       if brokerResult == nil {
+               return nil, fmt.Errorf("the broker %s does not exist", 
mq.BrokerName)
+       }
+
+       if (data.ExpType == string(TAG)) && brokerResult.BrokerVersion < 
kernel.V4_1_0 {
+               return nil, fmt.Errorf("the broker [%s, %v] does not upgrade to 
support for filter message by %v",
+                       mq.BrokerName, brokerResult.BrokerVersion, data.ExpType)
+       }
+
+       sysFlag := buildSysFlag(false, true, true, false)
+
+       if brokerResult.Slave {
+               sysFlag = clearCommitOffsetFlag(sysFlag)
+       }
+       pullRequest := &kernel.PullMessageRequest{
+               ConsumerGroup:        c.GroupName,
+               Topic:                mq.Topic,
+               QueueId:              int32(mq.QueueId),
+               QueueOffset:          offset,
+               MaxMsgNums:           int32(numbers),
+               SysFlag:              sysFlag,
+               CommitOffset:         0,
+               SuspendTimeoutMillis: _BrokerSuspendMaxTime,
+               SubExpression:        data.SubString,
+               ExpressionType:       string(data.ExpType),
+       }
+
+       if data.ExpType == string(TAG) {
+               pullRequest.SubVersion = 0
+       } else {
+               pullRequest.SubVersion = data.SubVersion
+       }
+
+       // TODO computePullFromWhichFilterServer
+       return c.client.PullMessage(ctx, brokerResult.BrokerAddr, pullRequest)
+}
+
+func (c *defaultPullConsumer) makeSureStateOK() error {
+       if c.state != kernel.StateRunning {
+               return fmt.Errorf("the consumer state is [%d], not running", 
c.state)
+       }
+       return nil
+}
+
+func (c *defaultPullConsumer) subscriptionAutomatically(topic string) {
+       // TODO
+}
+
+func (c *defaultPullConsumer) nextOffsetOf(queue *kernel.MessageQueue) int64 {
+       return 0
+}
+
+func processPullResult(mq *kernel.MessageQueue, result *kernel.PullResult, 
data *kernel.SubscriptionData) {
+       updatePullFromWhichNode(mq, result.SuggestWhichBrokerId)
+       switch result.Status {
+       case kernel.PullFound:
+               msgs := result.GetMessageExts()
+               msgListFilterAgain := msgs
+               if len(data.Tags) > 0 && data.ClassFilterMode {
+                       msgListFilterAgain = make([]*kernel.MessageExt, 
len(msgs))
+                       for _, msg := range msgs {
+                               _, exist := data.Tags[msg.GetTags()]
+                               if exist {
+                                       msgListFilterAgain = 
append(msgListFilterAgain, msg)
+                               }
+                       }
+               }
+
+               // TODO hook
+
+               for _, msg := range msgListFilterAgain {
+                       traFlag, _ := 
strconv.ParseBool(msg.Properties[kernel.PropertyTransactionPrepared])
+                       if traFlag {
+                               msg.TransactionId = 
msg.Properties[kernel.PropertyUniqueClientMessageIdKeyIndex]
+                       }
+
+                       msg.Properties[kernel.PropertyMinOffset] = 
strconv.FormatInt(result.MinOffset, 10)
+                       msg.Properties[kernel.PropertyMaxOffset] = 
strconv.FormatInt(result.MaxOffset, 10)
+               }
+
+               result.SetMessageExts(msgListFilterAgain)
+       }
+}
diff --git a/consumer/pull_consumer_test.go b/consumer/pull_consumer_test.go
new file mode 100644
index 0000000..f7bc454
--- /dev/null
+++ b/consumer/pull_consumer_test.go
@@ -0,0 +1,18 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package consumer
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
new file mode 100644
index 0000000..58f4c68
--- /dev/null
+++ b/consumer/push_consumer.go
@@ -0,0 +1,606 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package consumer
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "github.com/apache/rocketmq-client-go/kernel"
+       "github.com/apache/rocketmq-client-go/rlog"
+       "math"
+       "strconv"
+       "time"
+)
+
+// In most scenarios, this is the mostly recommended usage to consume messages.
+//
+// Technically speaking, this push client is virtually a wrapper of the 
underlying pull service. Specifically, on
+// arrival of messages pulled from brokers, it roughly invokes the registered 
callback handler to feed the messages.
+//
+// See quick start/Consumer in the example module for a typical usage.
+//
+// <strong>Thread Safety:</strong> After initialization, the instance can be 
regarded as thread-safe.
+type ConsumeResult int
+
+const (
+       Mb                           = 1024 * 1024
+       ConsumeSuccess ConsumeResult = iota
+       ConsumeRetryLater
+)
+
+type PushConsumer interface {
+       Start() error
+       Shutdown()
+       Subscribe(topic string, selector MessageSelector,
+               f func(*ConsumeMessageContext, []*kernel.MessageExt) 
(ConsumeResult, error)) error
+}
+
+type pushConsumer struct {
+       *defaultConsumer
+       /**
+        * Backtracking consumption time with second precision. Time format is
+        * 20131223171201<br>
+        * Implying Seventeen twelve and 01 seconds on December 23, 2013 
year<br>
+        * Default backtracking consumption time Half an hour ago.
+        */
+       ConsumeTimestamp             time.Duration
+       queueFlowControlTimes        int
+       queueMaxSpanFlowControlTimes int
+       consume                      func(*ConsumeMessageContext, 
[]*kernel.MessageExt) (ConsumeResult, error)
+       submitToConsume              func(*ProcessQueue, *kernel.MessageQueue)
+       subscribedTopic              map[string]string
+}
+
+func NewPushConsumer(consumerGroup string, opt ConsumerOption) PushConsumer {
+       dc := &defaultConsumer{
+               consumerGroup:  consumerGroup,
+               cType:          _PushConsume,
+               state:          kernel.StateCreateJust,
+               prCh:           make(chan PullRequest, 4),
+               model:          opt.ConsumerModel,
+               consumeOrderly: opt.ConsumeOrderly,
+               fromWhere:      opt.FromWhere,
+               option:         opt,
+       }
+
+       switch opt.Strategy {
+       case StrategyAveragely:
+               dc.allocate = allocateByAveragely
+       case StrategyAveragelyCircle:
+               dc.allocate = allocateByAveragelyCircle
+       case StrategyConfig:
+               dc.allocate = allocateByConfig
+       case StrategyConsistentHash:
+               dc.allocate = allocateByConsistentHash
+       case StrategyMachineNearby:
+               dc.allocate = allocateByMachineNearby
+       case StrategyMachineRoom:
+               dc.allocate = allocateByMachineRoom
+       default:
+               dc.allocate = allocateByAveragely
+       }
+
+       p := &pushConsumer{
+               defaultConsumer:  dc,
+               ConsumeTimestamp: 30 * time.Minute,
+               subscribedTopic:  make(map[string]string, 0),
+       }
+       dc.mqChanged = p.messageQueueChanged
+       if p.consumeOrderly {
+               p.submitToConsume = p.consumeMessageOrderly
+       } else {
+               p.submitToConsume = p.consumeMessageCurrently
+       }
+       return p
+}
+
+func (pc *pushConsumer) Start() error {
+       var err error
+       pc.once.Do(func() {
+               rlog.Infof("the consumerGroup=%s start beginning. 
messageModel=%v, unitMode=%v",
+                       pc.consumerGroup, pc.model, pc.unitMode)
+               pc.state = kernel.StateStartFailed
+               pc.validate()
+
+               // set retry topic
+               if pc.model == Clustering {
+                       retryTopic := kernel.GetRetryTopic(pc.consumerGroup)
+                       pc.subscriptionDataTable.Store(retryTopic, 
buildSubscriptionData(retryTopic,
+                               MessageSelector{TAG, _SubAll}))
+               }
+
+               pc.client = 
kernel.GetOrNewRocketMQClient(pc.option.ClientOption)
+               if pc.model == Clustering {
+                       pc.option.ChangeInstanceNameToPID()
+                       pc.storage = &remoteBrokerOffsetStore{}
+               } else {
+                       pc.storage = &localFileOffsetStore{}
+               }
+               pc.storage.load()
+               go func() {
+                       // todo start clean msg expired
+                       // TODO quit
+                       for {
+                               pr := <-pc.prCh
+                               go func() {
+                                       fmt.Println(pr.String())
+                                       pc.pullMessage(&pr)
+                               }()
+                       }
+               }()
+
+               err = pc.client.RegisterConsumer(pc.consumerGroup, pc)
+               if err != nil {
+                       pc.state = kernel.StateCreateJust
+                       rlog.Errorf("the consumer group: [%s] has been created, 
specify another name.", pc.consumerGroup)
+                       err = errors.New("consumer group has been created")
+                       return
+               }
+               pc.client.UpdateTopicRouteInfo()
+               pc.client.RebalanceImmediately()
+               pc.client.Start()
+               pc.state = kernel.StateRunning
+       })
+
+       pc.client.UpdateTopicRouteInfo()
+       pc.client.RebalanceImmediately()
+       pc.client.CheckClientInBroker()
+       pc.client.SendHeartbeatToAllBrokerWithLock()
+       return err
+}
+
+func (pc *pushConsumer) Shutdown() {}
+
+func (pc *pushConsumer) Subscribe(topic string, selector MessageSelector,
+       f func(*ConsumeMessageContext, []*kernel.MessageExt) (ConsumeResult, 
error)) error {
+       if pc.state != kernel.StateCreateJust {
+               return errors.New("subscribe topic only started before")
+       }
+       data := buildSubscriptionData(topic, selector)
+       pc.subscriptionDataTable.Store(topic, data)
+       pc.subscribedTopic[topic] = ""
+       pc.consume = f
+       return nil
+}
+
+func (pc *pushConsumer) Rebalance() {
+       pc.defaultConsumer.doBalance()
+}
+
+func (pc *pushConsumer) PersistConsumerOffset() {
+       pc.defaultConsumer.persistConsumerOffset()
+}
+
+func (pc *pushConsumer) UpdateTopicSubscribeInfo(topic string, mqs 
[]*kernel.MessageQueue) {
+       pc.defaultConsumer.updateTopicSubscribeInfo(topic, mqs)
+}
+
+func (pc *pushConsumer) IsSubscribeTopicNeedUpdate(topic string) bool {
+       return pc.defaultConsumer.isSubscribeTopicNeedUpdate(topic)
+}
+
+func (pc *pushConsumer) SubscriptionDataList() []*kernel.SubscriptionData {
+       return pc.defaultConsumer.SubscriptionDataList()
+}
+
+func (pc *pushConsumer) IsUnitMode() bool {
+       return pc.unitMode
+}
+
+func (pc *pushConsumer) messageQueueChanged(topic string, mqAll, mqDivided 
[]*kernel.MessageQueue) {
+       // TODO
+}
+
+func (pc *pushConsumer) validate() {
+       kernel.ValidateGroup(pc.consumerGroup)
+
+       if pc.consumerGroup == kernel.DefaultConsumerGroup {
+               // TODO FQA
+               rlog.Fatalf("consumerGroup can't equal [%s], please specify 
another one.", kernel.DefaultConsumerGroup)
+       }
+
+       if len(pc.subscribedTopic) == 0 {
+               rlog.Fatal("number of subscribed topics is 0.")
+       }
+
+       if pc.option.ConsumeConcurrentlyMaxSpan < 1 || 
pc.option.ConsumeConcurrentlyMaxSpan > 65535 {
+               if pc.option.ConsumeConcurrentlyMaxSpan == 0 {
+                       pc.option.ConsumeConcurrentlyMaxSpan = 1000
+               } else {
+                       rlog.Fatal("option.ConsumeConcurrentlyMaxSpan out of 
range [1, 65535]")
+               }
+       }
+
+       if pc.option.PullThresholdForQueue < 1 || 
pc.option.PullThresholdForQueue > 65535 {
+               if pc.option.PullThresholdForQueue == 0 {
+                       pc.option.PullThresholdForQueue = 1024
+               } else {
+                       rlog.Fatal("option.PullThresholdForQueue out of range 
[1, 65535]")
+               }
+       }
+
+       if pc.option.PullThresholdForTopic < 1 || 
pc.option.PullThresholdForTopic > 6553500 {
+               if pc.option.PullThresholdForTopic == 0 {
+                       pc.option.PullThresholdForTopic = 102400
+               } else {
+                       rlog.Fatal("option.PullThresholdForTopic out of range 
[1, 6553500]")
+               }
+       }
+
+       if pc.option.PullThresholdSizeForQueue < 1 || 
pc.option.PullThresholdSizeForQueue > 1024 {
+               if pc.option.PullThresholdSizeForQueue == 0 {
+                       pc.option.PullThresholdSizeForQueue = 512
+               } else {
+                       rlog.Fatal("option.PullThresholdSizeForQueue out of 
range [1, 1024]")
+               }
+       }
+
+       if pc.option.PullThresholdSizeForTopic < 1 || 
pc.option.PullThresholdSizeForTopic > 102400 {
+               if pc.option.PullThresholdSizeForTopic == 0 {
+                       pc.option.PullThresholdSizeForTopic = 51200
+               } else {
+                       rlog.Fatal("option.PullThresholdSizeForTopic out of 
range [1, 102400]")
+               }
+       }
+
+       if pc.option.PullInterval < 0 || pc.option.PullInterval > 65535 {
+               rlog.Fatal("option.PullInterval out of range [0, 65535]")
+       }
+
+       if pc.option.ConsumeMessageBatchMaxSize < 1 || 
pc.option.ConsumeMessageBatchMaxSize > 1024 {
+               if pc.option.ConsumeMessageBatchMaxSize == 0 {
+                       pc.option.ConsumeMessageBatchMaxSize = 512
+               } else {
+                       rlog.Fatal("option.ConsumeMessageBatchMaxSize out of 
range [1, 1024]")
+               }
+       }
+
+       if pc.option.PullBatchSize < 1 || pc.option.PullBatchSize > 1024 {
+               if pc.option.PullBatchSize == 0 {
+                       pc.option.PullBatchSize = 1
+               } else {
+                       rlog.Fatal("option.PullBatchSize out of range [1, 
1024]")
+               }
+       }
+}
+
+func (pc *pushConsumer) pullMessage(request *PullRequest) {
+       rlog.Infof("start a nwe Pull Message task %s for [%s]", 
request.String(), pc.consumerGroup)
+       var sleepTime time.Duration
+       pq := request.pq
+       go func() {
+               for {
+                       pc.submitToConsume(request.pq, request.mq)
+               }
+       }()
+       for {
+       NEXT:
+               if pq.dropped {
+                       rlog.Infof("the request: [%s] was dropped, so stop 
task", request.String())
+                       return
+               }
+               if sleepTime > 0 {
+                       rlog.Infof("pull MessageQueue: %d sleep %d ms", 
request.mq.QueueId, sleepTime/time.Millisecond)
+                       time.Sleep(sleepTime)
+               }
+               // reset time
+               sleepTime = pc.option.PullInterval
+               pq.lastPullTime = time.Now()
+               err := pc.makeSureStateOK()
+               rlog.Debugf("pull MessageQueue: %d", request.mq.QueueId)
+               if err != nil {
+                       rlog.Warnf("consumer state error: %s", err.Error())
+                       sleepTime = _PullDelayTimeWhenError
+                       goto NEXT
+               }
+
+               if pc.pause {
+                       rlog.Infof("consumer [%s] of [%s] was paused, execute 
pull request [%s] later",
+                               pc.option.InstanceName, pc.consumerGroup, 
request.String())
+                       sleepTime = _PullDelayTimeWhenSuspend
+                       goto NEXT
+               }
+
+               cachedMessageSizeInMiB := int(pq.cachedMsgSize / Mb)
+               if pq.cachedMsgCount > pc.option.PullThresholdForQueue {
+                       if pc.queueFlowControlTimes%1000 == 0 {
+                               rlog.Warnf("the cached message count exceeds 
the threshold %d, so do flow control, "+
+                                       "minOffset=%d, maxOffset=%d, count=%d, 
size=%d MiB, pullRequest=%s, flowControlTimes=%d",
+                                       pc.option.PullThresholdForQueue, 0, 
pq.msgCache.Front().Value.(int64),
+                                       pq.msgCache.Back().Value.(int64),
+                                       pq.msgCache, cachedMessageSizeInMiB, 
request.String(), pc.queueFlowControlTimes)
+                       }
+                       pc.queueFlowControlTimes++
+                       sleepTime = _PullDelayTimeWhenFlowControl
+                       goto NEXT
+               }
+
+               if cachedMessageSizeInMiB > pc.option.PullThresholdSizeForQueue 
{
+                       if pc.queueFlowControlTimes%1000 == 0 {
+                               rlog.Warnf("the cached message size exceeds the 
threshold %d MiB, so do flow control, "+
+                                       "minOffset=%d, maxOffset=%d, count=%d, 
size=%d MiB, pullRequest=%s, flowControlTimes=%d",
+                                       pc.option.PullThresholdSizeForQueue, 0, 
//processQueue.getMsgTreeMap().firstKey(),
+                                       0, // TODO 
processQueue.getMsgTreeMap().lastKey(),
+                                       pq.msgCache, cachedMessageSizeInMiB, 
request.String(), pc.queueFlowControlTimes)
+                       }
+                       pc.queueFlowControlTimes++
+                       sleepTime = _PullDelayTimeWhenFlowControl
+                       goto NEXT
+               }
+
+               if !pc.consumeOrderly {
+                       if pq.getMaxSpan() > 
pc.option.ConsumeConcurrentlyMaxSpan {
+
+                               if pc.queueMaxSpanFlowControlTimes%1000 == 0 {
+                                       rlog.Warnf("the queue's messages, span 
too long, so do flow control, minOffset=%d, "+
+                                               "maxOffset=%d, maxSpan=%d, 
pullRequest=%s, flowControlTimes=%d",
+                                               0, 
//processQueue.getMsgTreeMap().firstKey(),
+                                               0, // 
processQueue.getMsgTreeMap().lastKey(),
+                                               pq.getMaxSpan(),
+                                               request.String(), 
pc.queueMaxSpanFlowControlTimes)
+                               }
+                               sleepTime = _PullDelayTimeWhenFlowControl
+                               goto NEXT
+                       }
+               } else {
+                       if pq.locked {
+                               if !request.lockedFirst {
+                                       offset := 
pc.computePullFromWhere(request.mq)
+                                       brokerBusy := offset < 
request.nextOffset
+                                       rlog.Infof("the first time to pull 
message, so fix offset from broker. "+
+                                               "pullRequest: [%s] NewOffset: 
%d brokerBusy: %v",
+                                               request.String(), offset, 
brokerBusy)
+                                       if brokerBusy {
+                                               rlog.Infof("[NOTIFY_ME]the 
first time to pull message, but pull request offset"+
+                                                       " larger than broker 
consume offset. pullRequest: [%s] NewOffset: %d",
+                                                       request.String(), 
offset)
+                                       }
+
+                                       request.lockedFirst = true
+                                       request.nextOffset = offset
+                               }
+                       } else {
+                               rlog.Infof("pull message later because not 
locked in broker, [%s]", request.String())
+                               sleepTime = _PullDelayTimeWhenError
+                               goto NEXT
+                       }
+               }
+
+               v, exist := pc.subscriptionDataTable.Load(request.mq.Topic)
+               if !exist {
+                       rlog.Warnf("find the consumer's subscription failed, 
%s", request.String())
+                       sleepTime = _PullDelayTimeWhenError
+                       goto NEXT
+               }
+               beginTime := time.Now()
+               var (
+                       commitOffsetEnable bool
+                       commitOffsetValue  int64
+                       subExpression      string
+               )
+
+               if pc.model == Clustering {
+                       commitOffsetValue = pc.storage.read(request.mq, 
_ReadFromMemory)
+                       if commitOffsetValue > 0 {
+                               commitOffsetEnable = true
+                       }
+               }
+
+               sd := v.(*kernel.SubscriptionData)
+               classFilter := sd.ClassFilterMode
+               if pc.option.PostSubscriptionWhenPull && classFilter {
+                       subExpression = sd.SubString
+               }
+
+               sysFlag := buildSysFlag(commitOffsetEnable, true, subExpression 
!= "", classFilter)
+
+               pullRequest := &kernel.PullMessageRequest{
+                       ConsumerGroup:  pc.consumerGroup,
+                       Topic:          request.mq.Topic,
+                       QueueId:        int32(request.mq.QueueId),
+                       QueueOffset:    request.nextOffset,
+                       MaxMsgNums:     pc.option.PullBatchSize,
+                       SysFlag:        sysFlag,
+                       CommitOffset:   0,
+                       SubExpression:  _SubAll,
+                       ExpressionType: string(TAG), // TODO
+               }
+               //
+               //if data.ExpType == string(TAG) {
+               //      pullRequest.SubVersion = 0
+               //} else {
+               //      pullRequest.SubVersion = data.SubVersion
+               //}
+
+               //ch := make(chan *kernel.PullResult)
+               brokerResult := tryFindBroker(request.mq)
+               if brokerResult == nil {
+                       rlog.Warnf("no broker found for %s", 
request.mq.String())
+                       sleepTime = _PullDelayTimeWhenError
+                       goto NEXT
+               }
+               result, err := pc.client.PullMessage(context.Background(), 
brokerResult.BrokerAddr, pullRequest)
+               if err != nil {
+                       rlog.Warnf("pull message from %s error: %s", 
"127.0.0.1:10911", err.Error())
+                       sleepTime = _PullDelayTimeWhenError
+                       goto NEXT
+               }
+
+               if result.Status == kernel.PullBrokerTimeout {
+                       rlog.Warnf("pull broker: %s timeout", "127.0.0.1:10911")
+                       sleepTime = _PullDelayTimeWhenError
+                       goto NEXT
+               }
+
+               switch result.Status {
+               case kernel.PullFound:
+                       rlog.Debugf("Topic: %s, QueueId: %d found messages: 
%d", request.mq.Topic, request.mq.QueueId,
+                               len(result.GetMessageExts()))
+                       prevRequestOffset := request.nextOffset
+                       request.nextOffset = result.NextBeginOffset
+
+                       rt := time.Now().Sub(beginTime)
+                       increasePullRT(pc.consumerGroup, request.mq.Topic, rt)
+
+                       msgFounded := result.GetMessageExts()
+                       firstMsgOffset := int64(math.MaxInt64)
+                       if msgFounded != nil && len(msgFounded) != 0 {
+                               firstMsgOffset = msgFounded[0].QueueOffset
+                               increasePullTPS(pc.consumerGroup, 
request.mq.Topic, len(msgFounded))
+                               pq.putMessage(msgFounded)
+                       }
+                       if result.NextBeginOffset < prevRequestOffset || 
firstMsgOffset < prevRequestOffset {
+                               rlog.Warnf("[BUG] pull message result maybe 
data wrong, [nextBeginOffset=%s, "+
+                                       "firstMsgOffset=%d, 
prevRequestOffset=%d]", result.NextBeginOffset, firstMsgOffset, 
prevRequestOffset)
+                       }
+               case kernel.PullNoNewMsg:
+                       rlog.Infof("Topic: %s, QueueId: %d, no more msg", 
request.mq.Topic, request.mq.QueueId)
+               case kernel.PullNoMsgMatched:
+                       request.nextOffset = result.NextBeginOffset
+                       pc.correctTagsOffset(request)
+               case kernel.PullOffsetIllegal:
+                       rlog.Warnf("the pull request offset illegal, {} {}", 
request.String(), result.String())
+                       request.nextOffset = result.NextBeginOffset
+                       pq.dropped = true
+                       go func() {
+                               time.Sleep(10 * time.Second)
+                               pc.storage.update(request.mq, 
request.nextOffset, false)
+                               
pc.storage.persist([]*kernel.MessageQueue{request.mq})
+                               pc.storage.remove(request.mq)
+                               rlog.Warnf("fix the pull request offset: %s", 
request.String())
+                       }()
+               default:
+                       rlog.Warnf("")
+                       sleepTime = _PullDelayTimeWhenError
+               }
+       }
+}
+
+func (pc *pushConsumer) correctTagsOffset(pr *PullRequest) {
+       // TODO
+}
+
+func (pc *pushConsumer) sendMessageBack(ctx *ConsumeMessageContext, msg 
*kernel.MessageExt) bool {
+       return true
+}
+
+type ConsumeMessageContext struct {
+       consumerGroup string
+       msgs          []*kernel.MessageExt
+       mq            *kernel.MessageQueue
+       success       bool
+       status        string
+       // mqTractContext
+       properties map[string]string
+}
+
+func (pc *pushConsumer) consumeMessageCurrently(pq *ProcessQueue, mq 
*kernel.MessageQueue) {
+       msgs := pq.takeMessages(32)
+       if msgs == nil {
+               return
+       }
+       for count := 0; count < len(msgs); count++ {
+               var subMsgs []*kernel.MessageExt
+               if count+pc.option.ConsumeMessageBatchMaxSize > len(msgs) {
+                       subMsgs = msgs[count:]
+                       count = len(msgs)
+               } else {
+                       next := count + pc.option.ConsumeMessageBatchMaxSize
+                       subMsgs = msgs[count:next]
+                       count = next
+               }
+               go func() {
+               RETRY:
+                       if pq.dropped {
+                               rlog.Infof("the message queue not be able to 
consume, because it was dropped. group=%s, mq=%s",
+                                       pc.consumerGroup, mq.String())
+                               return
+                       }
+
+                       ctx := &ConsumeMessageContext{
+                               properties: make(map[string]string),
+                       }
+                       // TODO hook
+                       beginTime := time.Now()
+                       groupTopic := kernel.RetryGroupTopicPrefix + 
pc.consumerGroup
+                       for idx := range subMsgs {
+                               msg := subMsgs[idx]
+                               retryTopic := 
msg.Properties[kernel.PropertyRetryTopic]
+                               if retryTopic == "" && groupTopic == msg.Topic {
+                                       msg.Topic = retryTopic
+                               }
+                               
subMsgs[idx].Properties[kernel.PropertyConsumeStartTime] = strconv.FormatInt(
+                                       
beginTime.UnixNano()/int64(time.Millisecond), 10)
+                       }
+                       result, err := pc.consume(ctx, subMsgs)
+                       consumeRT := time.Now().Sub(beginTime)
+                       if err != nil {
+                               ctx.properties["ConsumeContextType"] = 
"EXCEPTION"
+                       } else if consumeRT >= pc.option.ConsumeTimeout {
+                               ctx.properties["ConsumeContextType"] = "TIMEOUT"
+                       } else if result == ConsumeSuccess {
+                               ctx.properties["ConsumeContextType"] = "SUCCESS"
+                       } else {
+                               ctx.properties["ConsumeContextType"] = 
"RECONSUME_LATER"
+                       }
+
+                       // TODO hook
+                       increaseConsumeRT(pc.consumerGroup, mq.Topic, consumeRT)
+
+                       if !pq.dropped {
+                               msgBackFailed := make([]*kernel.MessageExt, 0)
+                               if result == ConsumeSuccess {
+                                       increaseConsumeOKTPS(pc.consumerGroup, 
mq.Topic, len(subMsgs))
+                               } else {
+                                       
increaseConsumeFailedTPS(pc.consumerGroup, mq.Topic, len(subMsgs))
+                                       if pc.model == BroadCasting {
+                                               for i := 0; i < len(msgs); i++ {
+                                                       
rlog.Warnf("BROADCASTING, the message=%s consume failed, drop it, {}", 
subMsgs[i])
+                                               }
+                                       } else {
+                                               for i := 0; i < len(msgs); i++ {
+                                                       msg := msgs[i]
+                                                       if 
!pc.sendMessageBack(ctx, msg) {
+                                                               
msg.ReconsumeTimes += 1
+                                                               msgBackFailed = 
append(msgBackFailed, msg)
+                                                       }
+                                               }
+                                       }
+                               }
+
+                               offset := pq.removeMessage(len(subMsgs))
+
+                               if offset >= 0 && !pq.dropped {
+                                       pc.storage.update(mq, int64(offset), 
true)
+                               }
+                               if len(msgBackFailed) > 0 {
+                                       subMsgs = msgBackFailed
+                                       time.Sleep(5 * time.Second)
+                                       goto RETRY
+                               }
+                       } else {
+                               rlog.Warnf("processQueue is dropped without 
process consume result. messageQueue=%s, msgs=%+v",
+                                       mq, msgs)
+                       }
+               }()
+       }
+}
+
+func (pc *pushConsumer) consumeMessageOrderly(pq *ProcessQueue, mq 
*kernel.MessageQueue) {
+}
diff --git a/consumer/push_consumer_test.go b/consumer/push_consumer_test.go
new file mode 100644
index 0000000..f7bc454
--- /dev/null
+++ b/consumer/push_consumer_test.go
@@ -0,0 +1,18 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package consumer
diff --git a/consumer/statistics.go b/consumer/statistics.go
new file mode 100644
index 0000000..29045a0
--- /dev/null
+++ b/consumer/statistics.go
@@ -0,0 +1,60 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package consumer
+
+import "time"
+
+var (
+       topicAndGroupConsumeOKTPS     = &statsItemSet{statsName: 
"CONSUME_OK_TPS"}
+       topicAndGroupConsumeRT        = &statsItemSet{statsName: 
"CONSUME_FAILED_TPS"}
+       topicAndGroupConsumeFailedTPS = &statsItemSet{statsName: "CONSUME_RT"}
+       topicAndGroupPullTPS          = &statsItemSet{statsName: "PULL_TPS"}
+       topicAndGroupPullRT           = &statsItemSet{statsName: "PULL_RT"}
+)
+
+type statsItem struct {
+}
+
+type statsItemSet struct {
+       statsName      string
+       statsItemTable map[string]statsItem
+}
+
+func (set *statsItemSet) addValue(key string, incValue, incTimes int) {
+
+}
+
+func increasePullRT(group, topic string, rt time.Duration) {
+
+}
+
+func increaseConsumeRT(group, topic string, rt time.Duration) {
+
+}
+
+func increasePullTPS(group, topic string, msgNumber int) {
+
+}
+
+func increaseConsumeOKTPS(group, topic string, msgNumber int) {
+
+}
+
+func increaseConsumeFailedTPS(group, topic string, msgNumber int) {
+
+}
diff --git a/consumer/strategy.go b/consumer/strategy.go
new file mode 100644
index 0000000..1cb6b2b
--- /dev/null
+++ b/consumer/strategy.go
@@ -0,0 +1,130 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package consumer
+
+import (
+       "github.com/apache/rocketmq-client-go/kernel"
+       "github.com/apache/rocketmq-client-go/rlog"
+       "github.com/apache/rocketmq-client-go/utils"
+)
+
+// Strategy Algorithm for message allocating between consumers
+type AllocateStrategy string
+
+const (
+       // An allocate strategy proxy for based on machine room nearside 
priority. An actual allocate strategy can be
+       // specified.
+       //
+       // If any consumer is alive in a machine room, the message queue of the 
broker which is deployed in the same machine
+       // should only be allocated to those. Otherwise, those message queues 
can be shared along all consumers since there are
+       // no alive consumer to monopolize them.
+       StrategyMachineNearby = AllocateStrategy("MachineNearby")
+
+       // Average Hashing queue algorithm
+       StrategyAveragely = AllocateStrategy("Averagely")
+
+       // Cycle average Hashing queue algorithm
+       StrategyAveragelyCircle = AllocateStrategy("AveragelyCircle")
+
+       // Use Message Queue specified
+       StrategyConfig = AllocateStrategy("Config")
+
+       // Computer room Hashing queue algorithm, such as Alipay logic room
+       StrategyMachineRoom = AllocateStrategy("MachineRoom")
+
+       // Consistent Hashing queue algorithm
+       StrategyConsistentHash = AllocateStrategy("ConsistentHash")
+)
+
+func allocateByAveragely(consumerGroup, currentCID string, mqAll 
[]*kernel.MessageQueue,
+       cidAll []string) []*kernel.MessageQueue {
+       if currentCID == "" || utils.IsArrayEmpty(mqAll) || 
utils.IsArrayEmpty(cidAll) {
+               return nil
+       }
+       var (
+               find  bool
+               index int
+       )
+
+       for idx := range cidAll {
+               if cidAll[idx] == currentCID {
+                       find = true
+                       index = idx
+                       break
+               }
+       }
+       if !find {
+               rlog.Infof("[BUG] ConsumerGroup=%s, ConsumerId=%s not in 
cidAll:%+v", consumerGroup, currentCID, cidAll)
+               return nil
+       }
+
+       mqSize := len(mqAll)
+       cidSize := len(cidAll)
+       mod := mqSize % cidSize
+
+       var averageSize int
+       if mqSize <= cidSize {
+               averageSize = 1
+       } else {
+               if mod > 0 && index < mod {
+                       averageSize = mqSize/cidSize + 1
+               } else {
+                       averageSize = mqSize / cidSize
+               }
+       }
+
+       var startIndex int
+       if mod > 0 && index < mod {
+               startIndex = index * averageSize
+       } else {
+               startIndex = index*averageSize + mod
+       }
+
+       num := utils.MinInt(averageSize, mqSize-startIndex)
+       result := make([]*kernel.MessageQueue, num)
+       for i := 0; i < num; i++ {
+               result[i] = mqAll[(startIndex+i)%mqSize]
+       }
+       return result
+}
+
+// TODO
+func allocateByMachineNearby(consumerGroup, currentCID string, mqAll 
[]*kernel.MessageQueue,
+       cidAll []string) []*kernel.MessageQueue {
+       return allocateByAveragely(consumerGroup, currentCID, mqAll, cidAll)
+}
+
+func allocateByAveragelyCircle(consumerGroup, currentCID string, mqAll 
[]*kernel.MessageQueue,
+       cidAll []string) []*kernel.MessageQueue {
+       return allocateByAveragely(consumerGroup, currentCID, mqAll, cidAll)
+}
+
+func allocateByConfig(consumerGroup, currentCID string, mqAll 
[]*kernel.MessageQueue,
+       cidAll []string) []*kernel.MessageQueue {
+       return allocateByAveragely(consumerGroup, currentCID, mqAll, cidAll)
+}
+
+func allocateByMachineRoom(consumerGroup, currentCID string, mqAll 
[]*kernel.MessageQueue,
+       cidAll []string) []*kernel.MessageQueue {
+       return allocateByAveragely(consumerGroup, currentCID, mqAll, cidAll)
+}
+
+func allocateByConsistentHash(consumerGroup, currentCID string, mqAll 
[]*kernel.MessageQueue,
+       cidAll []string) []*kernel.MessageQueue {
+       return allocateByAveragely(consumerGroup, currentCID, mqAll, cidAll)
+}
diff --git a/consumer_test.go b/consumer_test.go
deleted file mode 100644
index aaba3cb..0000000
--- a/consumer_test.go
+++ /dev/null
@@ -1,18 +0,0 @@
-package rocketmq
-
-import (
-       "fmt"
-       "testing"
-)
-
-func TestDefaultConsumer_Pull(t *testing.T) {
-       consumer := NewConsumer(ConsumerConfig{
-               GroupName: "testGroup",
-       })
-       consumer.Start()
-       result, err := consumer.Pull("test", "*", 32)
-       if err != nil {
-               t.Fatal(err.Error())
-       }
-       fmt.Println(len(result.GetMessageExts()))
-}
diff --git a/examples/main.go b/examples/main.go
deleted file mode 100644
index 72a2a68..0000000
--- a/examples/main.go
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package main
-
-import (
-       "github.com/apache/rocketmq-client-go/core"
-       "gopkg.in/alecthomas/kingpin.v2"
-       "os"
-)
-
-var (
-       rmq     = kingpin.New("rocketmq", "RocketMQ cmd tools")
-       namesrv = rmq.Flag("namesrv", "NameServer 
address.").Default("localhost:9876").Short('n').String()
-       topic   = rmq.Flag("topic", "topic 
name.").Short('t').Required().String()
-       gid     = rmq.Flag("groupId", "group 
Id").Short('g').Default("testGroup").String()
-       amount  = rmq.Flag("amount", "how many message to produce or 
consume").Default("64").Short('a').Int()
-
-       produce     = rmq.Command("produce", "send messages to RocketMQ")
-       body        = produce.Flag("body", "message 
body").Short('b').Required().String()
-       workerCount = produce.Flag("workerCount", "works of send message with 
orderly").Default("1").Short('w').Int()
-       orderly     = produce.Flag("orderly", "send msg 
orderly").Short('o').Bool()
-
-       consume = rmq.Command("consume", "consumes message from RocketMQ")
-)
-
-func main() {
-       switch kingpin.MustParse(rmq.Parse(os.Args[1:])) {
-       case produce.FullCommand():
-               pConfig := &rocketmq.ProducerConfig{ClientConfig: 
rocketmq.ClientConfig{
-                       GroupID:    *gid,
-                       NameServer: *namesrv,
-                       LogC: &rocketmq.LogConfig{
-                               Path:     "example",
-                               FileSize: 64 * 1 << 10,
-                               FileNum:  1,
-                               Level:    rocketmq.LogLevelDebug,
-                       },
-               }}
-               if *orderly {
-                       sendMessageOrderly(pConfig)
-               } else {
-                       sendMessage(pConfig)
-               }
-       case consume.FullCommand():
-               cConfig := &rocketmq.PushConsumerConfig{ClientConfig: 
rocketmq.ClientConfig{
-                       GroupID:    *gid,
-                       NameServer: *namesrv,
-                       LogC: &rocketmq.LogConfig{
-                               Path:     "example",
-                               FileSize: 64 * 1 << 10,
-                               FileNum:  1,
-                               Level:    rocketmq.LogLevelInfo,
-                       },
-               }}
-
-               ConsumeWithPush(cConfig)
-       }
-}
diff --git a/examples/producer.go b/examples/producer.go
deleted file mode 100644
index e1c4d2f..0000000
--- a/examples/producer.go
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package main
-
-import (
-       "fmt"
-       "github.com/apache/rocketmq-client-go/core"
-)
-
-func sendMessage(config *rocketmq.ProducerConfig) {
-       producer, err := rocketmq.NewProducer(config)
-
-       if err != nil {
-               fmt.Println("create Producer failed, error:", err)
-               return
-       }
-
-       err = producer.Start()
-       if err != nil {
-               fmt.Println("start producer error", err)
-               return
-       }
-       defer producer.Shutdown()
-
-       fmt.Printf("Producer: %s started... \n", producer)
-       for i := 0; i < *amount; i++ {
-               msg := fmt.Sprintf("%s-%d", *body, i)
-               result, err := 
producer.SendMessageSync(&rocketmq.Message{Topic: *topic, Body: msg})
-               if err != nil {
-                       fmt.Println("Error:", err)
-               }
-               fmt.Printf("send message: %s result: %s\n", msg, result)
-       }
-       fmt.Println("shutdown producer.")
-}
diff --git a/examples/producer/main.go b/examples/producer/main.go
new file mode 100644
index 0000000..5c12584
--- /dev/null
+++ b/examples/producer/main.go
@@ -0,0 +1,47 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package main
+
+import (
+       "fmt"
+       "github.com/apache/rocketmq-client-go/consumer"
+       "github.com/apache/rocketmq-client-go/kernel"
+       "os"
+       "time"
+)
+
+func main() {
+       c := consumer.NewPushConsumer("testGroup", consumer.ConsumerOption{
+               ConsumerModel: consumer.Clustering,
+               FromWhere:     consumer.ConsumeFromFirstOffset,
+       })
+       err := c.Subscribe("testTopic", consumer.MessageSelector{}, func(ctx 
*consumer.ConsumeMessageContext,
+               msgs []*kernel.MessageExt) (consumer.ConsumeResult, error) {
+               fmt.Println(msgs)
+               return consumer.ConsumeSuccess, nil
+       })
+       if err != nil {
+               fmt.Println(err.Error())
+       }
+       err = c.Start()
+       if err != nil {
+               fmt.Println(err.Error())
+               os.Exit(-1)
+       }
+       time.Sleep(time.Hour)
+}
diff --git a/examples/producer_orderly.go b/examples/producer_orderly.go
deleted file mode 100644
index 9943f5b..0000000
--- a/examples/producer_orderly.go
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package main
-
-import (
-       "fmt"
-       "sync"
-       "sync/atomic"
-
-       "github.com/apache/rocketmq-client-go/core"
-)
-
-type queueSelectorByOrderID struct{}
-
-func (s queueSelectorByOrderID) Select(size int, m *rocketmq.Message, arg 
interface{}) int {
-       return arg.(int) % size
-}
-
-type worker struct {
-       p            rocketmq.Producer
-       leftMsgCount int64
-}
-
-func (w *worker) run() {
-       selector := queueSelectorByOrderID{}
-       for atomic.AddInt64(&w.leftMsgCount, -1) >= 0 {
-               r, err := w.p.SendMessageOrderly(
-                       &rocketmq.Message{Topic: *topic, Body: *body}, 
selector, 7 /*orderID*/, 3,
-               )
-               if err != nil {
-                       println("Send Orderly Error:", err)
-               }
-               fmt.Printf("send orderly result:%+v\n", r)
-       }
-}
-
-func sendMessageOrderly(config *rocketmq.ProducerConfig) {
-       producer, err := rocketmq.NewProducer(config)
-       if err != nil {
-               fmt.Println("create Producer failed, error:", err)
-               return
-       }
-
-       producer.Start()
-       defer producer.Shutdown()
-
-       wg := sync.WaitGroup{}
-       wg.Add(*workerCount)
-
-       workers := make([]worker, *workerCount)
-       for i := range workers {
-               workers[i].p = producer
-               workers[i].leftMsgCount = (int64)(*amount)
-       }
-
-       for i := range workers {
-               go func(w *worker) { w.run(); wg.Done() }(&workers[i])
-       }
-
-       wg.Wait()
-}
diff --git a/examples/pull_consumer.go b/examples/pull_consumer.go
deleted file mode 100644
index 1b209c0..0000000
--- a/examples/pull_consumer.go
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package main
-
-import (
-       "fmt"
-       "time"
-
-       "github.com/apache/rocketmq-client-go/core"
-)
-
-func ConsumeWithPull(config *rocketmq.PullConsumerConfig, topic string) {
-
-       consumer, err := rocketmq.NewPullConsumer(config)
-       if err != nil {
-               fmt.Printf("new pull consumer error:%s\n", err)
-               return
-       }
-
-       err = consumer.Start()
-       if err != nil {
-               fmt.Printf("start consumer error:%s\n", err)
-               return
-       }
-       defer consumer.Shutdown()
-
-       mqs := consumer.FetchSubscriptionMessageQueues(topic)
-       fmt.Printf("fetch subscription mqs:%+v\n", mqs)
-
-       total, offsets, now := 0, map[int]int64{}, time.Now()
-
-PULL:
-       for {
-               for _, mq := range mqs {
-                       pr := consumer.Pull(mq, "*", offsets[mq.ID], 32)
-                       total += len(pr.Messages)
-                       fmt.Printf("pull %s, result:%+v\n", mq.String(), pr)
-
-                       switch pr.Status {
-                       case rocketmq.PullNoNewMsg:
-                               break PULL
-                       case rocketmq.PullFound:
-                               fallthrough
-                       case rocketmq.PullNoMatchedMsg:
-                               fallthrough
-                       case rocketmq.PullOffsetIllegal:
-                               offsets[mq.ID] = pr.NextBeginOffset
-                       case rocketmq.PullBrokerTimeout:
-                               fmt.Println("broker timeout occur")
-                       }
-               }
-       }
-
-       var timePerMessage time.Duration
-       if total > 0 {
-               timePerMessage = time.Since(now) / time.Duration(total)
-       }
-       fmt.Printf("total message:%d, per message time:%d\n", total, 
timePerMessage)
-}
diff --git a/examples/push_consumer.go b/examples/push_consumer.go
deleted file mode 100644
index 38e434c..0000000
--- a/examples/push_consumer.go
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package main
-
-import (
-       "fmt"
-       "github.com/apache/rocketmq-client-go/core"
-       "sync/atomic"
-)
-
-func ConsumeWithPush(config *rocketmq.PushConsumerConfig) {
-
-       consumer, err := rocketmq.NewPushConsumer(config)
-       if err != nil {
-               println("create Consumer failed, error:", err)
-               return
-       }
-
-       ch := make(chan interface{})
-       var count = (int64)(*amount)
-       // MUST subscribe topic before consumer started.
-       consumer.Subscribe(*topic, "*", func(msg *rocketmq.MessageExt) 
rocketmq.ConsumeStatus {
-               fmt.Printf("A message received: \"%s\" \n", msg.Body)
-               if atomic.AddInt64(&count, -1) <= 0 {
-                       ch <- "quit"
-               }
-               return rocketmq.ConsumeSuccess
-       })
-
-       err = consumer.Start()
-       if err != nil {
-               println("consumer start failed,", err)
-               return
-       }
-
-       fmt.Printf("consumer: %s started...\n", consumer)
-       <-ch
-       err = consumer.Shutdown()
-       if err != nil {
-               println("consumer shutdown failed")
-               return
-       }
-       println("consumer has shutdown.")
-}
diff --git a/go.mod b/go.mod
index 2701b7c..58ca7c7 100644
--- a/go.mod
+++ b/go.mod
@@ -5,6 +5,7 @@ go 1.11
 require (
        github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc // 
indirect
        github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf // 
indirect
+       github.com/emirpasic/gods v1.12.0
        github.com/sirupsen/logrus v1.3.0
        github.com/stretchr/testify v1.3.0
        github.com/tidwall/gjson v1.2.1
diff --git a/go.sum b/go.sum
index 7a45ece..85b0d5e 100644
--- a/go.sum
+++ b/go.sum
@@ -5,6 +5,8 @@ github.com/alecthomas/units 
v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF
 github.com/davecgh/go-spew v1.1.0/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/davecgh/go-spew v1.1.1 
h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/emirpasic/gods v1.12.0 
h1:QAUIPSaCu4G+POclxeqb3F+WPpdKqFGlw36+yOzGlrg=
+github.com/emirpasic/gods v1.12.0/go.mod 
h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3ymbK86o=
 github.com/konsorten/go-windows-terminal-sequences v1.0.1 
h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
 github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod 
h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
 github.com/pmezard/go-difflib v1.0.0 
h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
diff --git a/kernel/client.go b/kernel/client.go
index f9185ac..e83d598 100644
--- a/kernel/client.go
+++ b/kernel/client.go
@@ -23,34 +23,58 @@ import (
        "fmt"
        "github.com/apache/rocketmq-client-go/remote"
        "github.com/apache/rocketmq-client-go/rlog"
-       "github.com/apache/rocketmq-client-go/utils"
        "os"
        "strconv"
+       "strings"
        "sync"
        "time"
 )
 
 const (
        defaultTraceRegionID = "DefaultRegion"
-       tranceOff            = "false"
-)
 
-var (
-       namesrvAddrs                  = os.Getenv("rocketmq.namesrv.addr")
-       clientIP                      = utils.LocalIP()
-       instanceName                  = os.Getenv("rocketmq.client.name")
-       pollNameServerInterval        = 30 * time.Second
-       heartbeatBrokerInterval       = 30 * time.Second
-       persistConsumerOffsetInterval = 5 * time.Second
-       unitMode                      = false
-       vipChannelEnabled, _          = 
strconv.ParseBool(os.Getenv("com.rocketmq.sendMessageWithVIPChannel"))
-       clientID                      = string(clientIP) + "@" + instanceName
+       // tracing message switch
+       _TranceOff = "false"
+
+       // Pulling topic information interval from the named server
+       _PullNameServerInterval = 30 * time.Second
+
+       // Pulling topic information interval from the named server
+       _HeartbeatBrokerInterval = 30 * time.Second
+
+       // Offset persistent interval for consumer
+       _PersistOffset = 5 * time.Second
+
+       // Rebalance interval
+       _RebalanceInterval = 20 * time.Millisecond
 )
 
 var (
-       ErrServiceState = errors.New("service state is not Running, please 
check")
+       ErrServiceState = errors.New("service state is not running, please 
check")
 )
 
+type ClientOption struct {
+       NameServerAddr    string
+       ClientIP          string
+       InstanceName      string
+       UnitMode          bool
+       UnitName          string
+       VIPChannelEnabled bool
+       UseTLS            bool
+}
+
+func (opt *ClientOption) ChangeInstanceNameToPID() {
+       if opt.InstanceName == "DEFAULT" {
+               opt.InstanceName = strconv.Itoa(os.Getegid())
+       }
+}
+
+func (opt *ClientOption) String() string {
+       return fmt.Sprintf("ClientOption [NameServerAddr=%s, ClientIP=%s, 
InstanceName=%s, "+
+               "UnitMode=%v, UnitName=%s, VIPChannelEnabled=%v, UseTLS=%v]", 
opt.NameServerAddr, opt.ClientIP,
+               opt.InstanceName, opt.UnitMode, opt.UnitName, 
opt.VIPChannelEnabled, opt.UseTLS)
+}
+
 type InnerProducer interface {
        PublishTopicList() []string
        UpdateTopicPublishInfo(topic string, info *TopicPublishInfo)
@@ -62,15 +86,169 @@ type InnerProducer interface {
 }
 
 type InnerConsumer interface {
-       DoRebalance()
        PersistConsumerOffset()
        UpdateTopicSubscribeInfo(topic string, mqs []*MessageQueue)
        IsSubscribeTopicNeedUpdate(topic string) bool
+       SubscriptionDataList() []*SubscriptionData
+       Rebalance()
        IsUnitMode() bool
 }
 
+type RMQClient struct {
+       option ClientOption
+       // group -> InnerProducer
+       producerMap sync.Map
+
+       // group -> InnerConsumer
+       consumerMap sync.Map
+}
+
+var clientMap sync.Map
+
+func GetOrNewRocketMQClient(option ClientOption) *RMQClient {
+       // TODO
+       return &RMQClient{option: option}
+}
+
+func (c *RMQClient) Start() {
+       // TODO fetchNameServerAddr
+       go func() {}()
+
+       // schedule update route info
+       go func() {
+               // delay
+               time.Sleep(50 * time.Millisecond)
+               for {
+                       c.UpdateTopicRouteInfo()
+                       time.Sleep(_PullNameServerInterval)
+               }
+       }()
+
+       // TODO cleanOfflineBroker & sendHeartbeatToAllBrokerWithLock
+       go func() {}()
+
+       // schedule persist offset
+       go func() {
+               time.Sleep(10 * time.Second)
+               for {
+                       c.consumerMap.Range(func(key, value interface{}) bool {
+                               consumer := value.(InnerConsumer)
+                               consumer.PersistConsumerOffset()
+                               return true
+                       })
+                       time.Sleep(_PersistOffset)
+               }
+       }()
+
+       go func() {
+               for {
+                       c.RebalanceImmediately()
+                       time.Sleep(time.Second)
+               }
+       }()
+}
+
+func (c *RMQClient) ClientID() string {
+       //id := c.option.ClientIP + "@" + c.option.InstanceName
+       //if c.option.UnitName != "" {
+       //      id += "@" + c.option.UnitName
+       //}
+       return "127.0.0.1:10911@DEFAULT"
+}
+
+func (c *RMQClient) CheckClientInBroker() {
+
+}
+
+func (c *RMQClient) SendHeartbeatToAllBrokerWithLock() {
+       hbData := &heartbeatData{
+               ClientId: c.ClientID(),
+       }
+       pData := make([]producerData, 0)
+       c.producerMap.Range(func(key, value interface{}) bool {
+               pData = append(pData, producerData(key.(string)))
+               return true
+       })
+
+       cData := make([]consumerData, 0)
+       c.consumerMap.Range(func(key, value interface{}) bool {
+               consumer := value.(InnerConsumer)
+               cData = append(cData, consumerData{
+                       GroupName:         key.(string),
+                       CType:             "PUSH",
+                       MessageModel:      "CLUSTERING",
+                       Where:             "CONSUME_FROM_FIRST_OFFSET",
+                       UnitMode:          consumer.IsUnitMode(),
+                       SubscriptionDatas: consumer.SubscriptionDataList(),
+               })
+               return true
+       })
+       hbData.ProducerDatas = pData
+       hbData.ConsumerDatas = cData
+       if len(pData) == 0 && len(cData) == 0 {
+               rlog.Warn("sending heartbeat, but no consumer and no producer")
+               return
+       }
+       brokerAddressesMap.Range(func(key, value interface{}) bool {
+               brokerName := key.(string)
+               data := value.(*BrokerData)
+               for id, addr := range data.BrokerAddresses {
+                       cmd := remote.NewRemotingCommand(ReqHeartBeat, nil, 
hbData.encode())
+                       response, err := remote.InvokeSync(addr, cmd, 
3*time.Second)
+                       if err != nil {
+                               rlog.Warnf("send heart beat to broker error: 
%s", err.Error())
+                               return true
+                       }
+                       if response.Code == ResSuccess {
+                               v, exist := brokerVersionMap.Load(brokerName)
+                               var m map[string]int32
+                               if exist {
+                                       m = v.(map[string]int32)
+                               } else {
+                                       m = make(map[string]int32, 4)
+                                       brokerVersionMap.Store(brokerName, m)
+                               }
+                               m[brokerName] = int32(response.Version)
+                               rlog.Infof("send heart beat to broker[%s %s %s] 
success", brokerName, id, addr)
+                       }
+               }
+               return true
+       })
+}
+
+func (c *RMQClient) UpdateTopicRouteInfo() {
+       publishTopicSet := make(map[string]bool, 0)
+       c.producerMap.Range(func(key, value interface{}) bool {
+               producer := value.(InnerProducer)
+               list := producer.PublishTopicList()
+               for idx := range list {
+                       publishTopicSet[list[idx]] = true
+               }
+               return true
+       })
+       for topic := range publishTopicSet {
+               c.UpdatePublishInfo(topic, UpdateTopicRouteInfo(topic))
+       }
+
+       subscribedTopicSet := make(map[string]bool, 0)
+       c.consumerMap.Range(func(key, value interface{}) bool {
+               consumer := value.(InnerConsumer)
+               list := consumer.SubscriptionDataList()
+               for idx := range list {
+                       if !strings.HasPrefix(list[idx].Topic, 
RetryGroupTopicPrefix) {
+                               subscribedTopicSet[list[idx].Topic] = true
+                       }
+               }
+               return true
+       })
+
+       for topic := range subscribedTopicSet {
+               c.UpdateSubscribeInfo(topic, UpdateTopicRouteInfo(topic))
+       }
+}
+
 // SendMessage with batch by sync
-func SendMessageSync(ctx context.Context, brokerAddrs, brokerName string, 
request *SendMessageRequest,
+func (c *RMQClient) SendMessageSync(ctx context.Context, brokerAddrs, 
brokerName string, request *SendMessageRequest,
        msgs []*Message) (*SendResult, error) {
        cmd := remote.NewRemotingCommand(ReqSendBatchMessage, request, 
encodeMessages(msgs))
        response, err := remote.InvokeSync(brokerAddrs, cmd, 3*time.Second)
@@ -79,16 +257,16 @@ func SendMessageSync(ctx context.Context, brokerAddrs, 
brokerName string, reques
                return nil, err
        }
 
-       return processSendResponse(brokerName, msgs, response), nil
+       return c.processSendResponse(brokerName, msgs, response), nil
 }
 
 // SendMessageAsync send message with batch by async
-func SendMessageAsync(ctx context.Context, brokerAddrs, brokerName string, 
request *SendMessageRequest,
+func (c *RMQClient) SendMessageAsync(ctx context.Context, brokerAddrs, 
brokerName string, request *SendMessageRequest,
        msgs []*Message, f func(result *SendResult)) error {
        return nil
 }
 
-func SendMessageOneWay(ctx context.Context, brokerAddrs string, request 
*SendMessageRequest,
+func (c *RMQClient) SendMessageOneWay(ctx context.Context, brokerAddrs string, 
request *SendMessageRequest,
        msgs []*Message) (*SendResult, error) {
        cmd := remote.NewRemotingCommand(ReqSendBatchMessage, request, 
encodeMessages(msgs))
        err := remote.InvokeOneWay(brokerAddrs, cmd)
@@ -98,7 +276,7 @@ func SendMessageOneWay(ctx context.Context, brokerAddrs 
string, request *SendMes
        return nil, err
 }
 
-func processSendResponse(brokerName string, msgs []*Message, cmd 
*remote.RemotingCommand) *SendResult {
+func (c *RMQClient) processSendResponse(brokerName string, msgs []*Message, 
cmd *remote.RemotingCommand) *SendResult {
        var status SendStatus
        switch cmd.Code {
        case ResFlushDiskTimeout:
@@ -118,11 +296,11 @@ func processSendResponse(brokerName string, msgs 
[]*Message, cmd *remote.Remotin
 
        msgIDs := make([]string, 0)
        for i := 0; i < len(msgs); i++ {
-               msgIDs = append(msgIDs, 
msgs[i].Properties[UniqueClientMessageIdKeyIndex])
+               msgIDs = append(msgIDs, 
msgs[i].Properties[PropertyUniqueClientMessageIdKeyIndex])
        }
 
-       regionId := cmd.ExtFields[MsgRegion]
-       trace := cmd.ExtFields[TraceSwitch]
+       regionId := cmd.ExtFields[PropertyMsgRegion]
+       trace := cmd.ExtFields[PropertyTraceSwitch]
 
        if regionId == "" {
                regionId = defaultTraceRegionID
@@ -140,23 +318,22 @@ func processSendResponse(brokerName string, msgs 
[]*Message, cmd *remote.Remotin
                QueueOffset:   sendResponse.QueueOffset,
                TransactionID: sendResponse.TransactionId,
                RegionID:      regionId,
-               TraceOn:       trace != "" && trace != tranceOff,
+               TraceOn:       trace != "" && trace != _TranceOff,
        }
 }
 
 // PullMessage with sync
-func PullMessage(ctx context.Context, brokerAddrs string, request 
*PullMessageRequest) (*PullResult, error) {
+func (c *RMQClient) PullMessage(ctx context.Context, brokerAddrs string, 
request *PullMessageRequest) (*PullResult, error) {
        cmd := remote.NewRemotingCommand(ReqPullMessage, request, nil)
-
        res, err := remote.InvokeSync(brokerAddrs, cmd, 3*time.Second)
        if err != nil {
                return nil, err
        }
 
-       return processPullResponse(res)
+       return c.processPullResponse(res)
 }
 
-func processPullResponse(response *remote.RemotingCommand) (*PullResult, 
error) {
+func (c *RMQClient) processPullResponse(response *remote.RemotingCommand) 
(*PullResult, error) {
        pullResult := &PullResult{}
        switch response.Code {
        case ResSuccess:
@@ -164,7 +341,7 @@ func processPullResponse(response *remote.RemotingCommand) 
(*PullResult, error)
        case ResPullNotFound:
                pullResult.Status = PullNoNewMsg
        case ResPullRetryImmediately:
-               pullResult.Status = PullNoMatchedMsg
+               pullResult.Status = PullNoMsgMatched
        case ResPullOffsetMoved:
                pullResult.Status = PullOffsetIllegal
        default:
@@ -197,70 +374,128 @@ func processPullResponse(response 
*remote.RemotingCommand) (*PullResult, error)
 }
 
 // PullMessageAsync pull message async
-func PullMessageAsync(ctx context.Context, brokerAddrs string, request 
*PullMessageRequest, f func(result *PullResult)) error {
+func (c *RMQClient) PullMessageAsync(ctx context.Context, brokerAddrs string, 
request *PullMessageRequest, f func(result *PullResult)) error {
        return nil
 }
 
 // QueryMaxOffset with specific queueId and topic
-func QueryMaxOffset(topic string, queueId int) error {
-       return nil
+func QueryMaxOffset(topic string, queueId int) (int64, error) {
+       return 0, nil
 }
 
 // QueryConsumerOffset with specific queueId and topic of consumerGroup
-func QueryConsumerOffset(consumerGroup, topic string, queue int) (int64, 
error) {
+func (c *RMQClient) QueryConsumerOffset(consumerGroup, topic string, queue 
int) (int64, error) {
        return 0, nil
 }
 
 // SearchOffsetByTimestamp with specific queueId and topic
-func SearchOffsetByTimestamp(topic string, queue int, timestamp int64) (int64, 
error) {
+func (c *RMQClient) SearchOffsetByTimestamp(topic string, queue int, timestamp 
int64) (int64, error) {
        return 0, nil
 }
 
 // UpdateConsumerOffset with specific queueId and topic
-func UpdateConsumerOffset(consumerGroup, topic string, queue int, offset 
int64) error {
+func (c *RMQClient) UpdateConsumerOffset(consumerGroup, topic string, queue 
int, offset int64) error {
        return nil
 }
 
-var (
-       // group -> InnerProducer
-       producerMap sync.Map
-
-       // group -> InnerConsumer
-       consumerMap sync.Map
-)
-
-func CheckClientInBroker() {
+func (c *RMQClient) RegisterConsumer(group string, consumer InnerConsumer) 
error {
+       c.consumerMap.Store(group, consumer)
+       return nil
+}
 
+func (c *RMQClient) UnregisterConsumer(group string) {
 }
 
-func RegisterConsumer(group string, consumer InnerConsumer) {
+func (c *RMQClient) RegisterProducer(group string, producer InnerProducer) {
+}
 
+func (c *RMQClient) UnregisterProducer(group string) {
 }
 
-func UnregisterConsumer(group string) {
+func (c *RMQClient) SelectProducer(group string) InnerProducer {
+       return nil
+}
 
+func (c *RMQClient) SelectConsumer(group string) InnerConsumer {
+       return nil
 }
 
-func RegisterProducer(group string, producer InnerProducer) {
+func (c *RMQClient) RebalanceImmediately() {
+       c.consumerMap.Range(func(key, value interface{}) bool {
+               consumer := value.(InnerConsumer)
+               consumer.Rebalance()
+               return true
+       })
+}
 
+func (c *RMQClient) UpdatePublishInfo(topic string, data *TopicRouteData) {
+       if !c.isNeedUpdatePublishInfo(topic) {
+               return
+       }
+       c.producerMap.Range(func(key, value interface{}) bool {
+               consumer := value.(InnerProducer)
+               publishInfo := routeData2PublishInfo(topic, data)
+               publishInfo.HaveTopicRouterInfo = true
+               consumer.UpdateTopicPublishInfo(topic, publishInfo)
+               return true
+       })
 }
 
-func UnregisterProducer(group string) {
+func (c *RMQClient) isNeedUpdatePublishInfo(topic string) bool {
+       var result bool
+       c.producerMap.Range(func(key, value interface{}) bool {
+               consumer := value.(InnerProducer)
+               if consumer.IsPublishTopicNeedUpdate(topic) {
+                       result = true
+                       return false
+               }
+               return true
+       })
+       return result
+}
 
+func (c *RMQClient) UpdateSubscribeInfo(topic string, data *TopicRouteData) {
+       if !c.isNeedUpdateSubscribeInfo(topic) {
+               return
+       }
+       c.consumerMap.Range(func(key, value interface{}) bool {
+               consumer := value.(InnerConsumer)
+               // TODO
+               consumer.UpdateTopicSubscribeInfo(topic, 
routeData2SubscribeInfo(topic, data))
+               return true
+       })
 }
 
-func SelectProducer(group string) InnerProducer {
-       return nil
+func (c *RMQClient) isNeedUpdateSubscribeInfo(topic string) bool {
+       var result bool
+       c.consumerMap.Range(func(key, value interface{}) bool {
+               consumer := value.(InnerConsumer)
+               if consumer.IsSubscribeTopicNeedUpdate(topic) {
+                       result = true
+                       return false
+               }
+               return true
+       })
+       return result
 }
 
-func SelectConsumer(group string) InnerConsumer {
-       return nil
+func routeData2SubscribeInfo(topic string, data *TopicRouteData) 
[]*MessageQueue {
+       list := make([]*MessageQueue, 0)
+       for idx := range data.QueueDataList {
+               qd := data.QueueDataList[idx]
+               if queueIsReadable(qd.Perm) {
+                       for i := 0; i < qd.ReadQueueNums; i++ {
+                               list = append(list, &MessageQueue{
+                                       Topic:      topic,
+                                       BrokerName: qd.BrokerName,
+                                       QueueId:    i,
+                               })
+                       }
+               }
+       }
+       return list
 }
 
 func encodeMessages(message []*Message) []byte {
        return nil
 }
-
-func sendHeartbeatToAllBroker() {
-
-}
diff --git a/kernel/client_test.go b/kernel/client_test.go
new file mode 100644
index 0000000..7a9723c
--- /dev/null
+++ b/kernel/client_test.go
@@ -0,0 +1,45 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package kernel
+
+import (
+       "context"
+       "testing"
+)
+
+func TestRMQClient_PullMessage(t *testing.T) {
+       client := GetOrNewRocketMQClient(ClientOption{})
+       req := &PullMessageRequest{
+               ConsumerGroup:  "testGroup",
+               Topic:          "wenfeng",
+               QueueId:        0,
+               QueueOffset:    0,
+               MaxMsgNums:     32,
+               SysFlag:        0x1 << 2,
+               SubExpression:  "*",
+               ExpressionType: "TAG",
+       }
+       res, err := client.PullMessage(context.Background(), "127.0.0.1:10911", 
req)
+       if err != nil {
+               t.Fatal(err.Error())
+       }
+
+       for _, a := range res.GetMessageExts() {
+               t.Log(string(a.Body))
+       }
+}
diff --git a/kernel/constants.go b/kernel/constants.go
new file mode 100644
index 0000000..a9eacac
--- /dev/null
+++ b/kernel/constants.go
@@ -0,0 +1,27 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package kernel
+
+const (
+       RetryGroupTopicPrefix = "%RETRY%"
+       DefaultConsumerGroup  = "DEFAULT_CONSUMER"
+)
+
+func GetRetryTopic(group string) string {
+       return RetryGroupTopicPrefix + group
+}
diff --git a/kernel/message.go b/kernel/message.go
index 843a743..117d766 100644
--- a/kernel/message.go
+++ b/kernel/message.go
@@ -20,32 +20,32 @@ package kernel
 import "fmt"
 
 const (
-       KeySeparator                   = " "
-       Keys                           = "KEYS"
-       Tags                           = "TAGS"
-       WaitStoreMsgOk                 = "WAIT"
-       DelayTimeLevel                 = "DELAY"
-       RetryTopic                     = "RETRY_TOPIC"
-       RealTopic                      = "REAL_TOPIC"
-       RealQueueId                    = "REAL_QID"
-       TransactionPrepared            = "TRAN_MSG"
-       ProducerGroup                  = "PGROUP"
-       MinOffset                      = "MIN_OFFSET"
-       MaxOffset                      = "MAX_OFFSET"
-       BuyerId                        = "BUYER_ID"
-       OriginMessageId                = "ORIGIN_MESSAGE_ID"
-       TransferFlag                   = "TRANSFER_FLAG"
-       CorrectionFlag                 = "CORRECTION_FLAG"
-       MQ2Flag                        = "MQ2_FLAG"
-       ReconsumeTime                  = "RECONSUME_TIME"
-       MsgRegion                      = "MSG_REGION"
-       TraceSwitch                    = "TRACE_ON"
-       UniqueClientMessageIdKeyIndex  = "UNIQ_KEY"
-       MaxReconsumeTimes              = "MAX_RECONSUME_TIMES"
-       ConsumeStartTime               = "CONSUME_START_TIME"
-       TranscationPreparedQueueOffset = "TRAN_PREPARED_QUEUE_OFFSET"
-       TranscationCheckTimes          = "TRANSACTION_CHECK_TIMES"
-       CheckImmunityTimeInSeconds     = "CHECK_IMMUNITY_TIME_IN_SECONDS"
+       PropertyKeySeparator                   = " "
+       PropertyKeys                           = "KEYS"
+       PropertyTags                           = "TAGS"
+       PropertyWaitStoreMsgOk                 = "WAIT"
+       PropertyDelayTimeLevel                 = "DELAY"
+       PropertyRetryTopic                     = "RETRY_TOPIC"
+       PropertyRealTopic                      = "REAL_TOPIC"
+       PropertyRealQueueId                    = "REAL_QID"
+       PropertyTransactionPrepared            = "TRAN_MSG"
+       PropertyProducerGroup                  = "PGROUP"
+       PropertyMinOffset                      = "MIN_OFFSET"
+       PropertyMaxOffset                      = "MAX_OFFSET"
+       PropertyBuyerId                        = "BUYER_ID"
+       PropertyOriginMessageId                = "ORIGIN_MESSAGE_ID"
+       PropertyTransferFlag                   = "TRANSFER_FLAG"
+       PropertyCorrectionFlag                 = "CORRECTION_FLAG"
+       PropertyMQ2Flag                        = "MQ2_FLAG"
+       PropertyReconsumeTime                  = "RECONSUME_TIME"
+       PropertyMsgRegion                      = "MSG_REGION"
+       PropertyTraceSwitch                    = "TRACE_ON"
+       PropertyUniqueClientMessageIdKeyIndex  = "UNIQ_KEY"
+       PropertyMaxReconsumeTimes              = "MAX_RECONSUME_TIMES"
+       PropertyConsumeStartTime               = "CONSUME_START_TIME"
+       PropertyTranscationPreparedQueueOffset = "TRAN_PREPARED_QUEUE_OFFSET"
+       PropertyTranscationCheckTimes          = "TRANSACTION_CHECK_TIMES"
+       PropertyCheckImmunityTimeInSeconds     = 
"CHECK_IMMUNITY_TIME_IN_SECONDS"
 )
 
 type Message struct {
@@ -106,7 +106,7 @@ type MessageExt struct {
 }
 
 func (msgExt *MessageExt) GetTags() string {
-       return msgExt.Properties[Tags]
+       return msgExt.Properties[PropertyTags]
 }
 
 func (msgExt *MessageExt) String() string {
diff --git a/kernel/model.go b/kernel/model.go
index f9593ab..d9f4d00 100644
--- a/kernel/model.go
+++ b/kernel/model.go
@@ -20,7 +20,9 @@ package kernel
 import (
        "bytes"
        "encoding/binary"
+       "encoding/json"
        "fmt"
+       "github.com/apache/rocketmq-client-go/rlog"
        "github.com/apache/rocketmq-client-go/utils"
 )
 
@@ -62,7 +64,7 @@ type PullStatus int
 const (
        PullFound PullStatus = iota
        PullNoNewMsg
-       PullNoMatchedMsg
+       PullNoMsgMatched
        PullOffsetIllegal
        PullBrokerTimeout
 )
@@ -92,6 +94,10 @@ func (result *PullResult) GetMessages() []*Message {
        return toMessages(result.messageExts)
 }
 
+func (result *PullResult) String() string {
+       return ""
+}
+
 func decodeMessage(data []byte) []*MessageExt {
        msgs := make([]*MessageExt, 0)
        buf := bytes.NewBuffer(data)
@@ -224,10 +230,15 @@ func (mq *MessageQueue) HashCode() int {
        return result
 }
 
+func (mq *MessageQueue) Equals(queue *MessageQueue) bool {
+       // TODO
+       return true
+}
+
 type FindBrokerResult struct {
        BrokerAddr    string
        Slave         bool
-       BrokerVersion int
+       BrokerVersion int32
 }
 
 type (
@@ -236,90 +247,16 @@ type (
 
        consumeType string
 
-       MessageModel     int
-       ConsumeFromWhere int
-       ServiceState     int
+       ServiceState int
 )
 
 const (
-       ConsumeActively  = consumeType("PULL")
-       ConsumePassively = consumeType("PUSH")
-
-       BroadCasting = MessageModel(1)
-       Clustering   = MessageModel(2)
-
-       ConsumeFromLastOffset ConsumeFromWhere = iota
-       ConsumeFromFirstOffset
-       ConsumeFromTimestamp
-
-       CreateJust ServiceState = iota
-       Running
-       Shutdown
+       StateCreateJust ServiceState = iota
+       StateStartFailed
+       StateRunning
+       StateShutdown
 )
 
-func (mode MessageModel) String() string {
-       switch mode {
-       case BroadCasting:
-               return "BroadCasting"
-       case Clustering:
-               return "Clustering"
-       default:
-               return "Unknown"
-       }
-}
-
-type ExpressionType string
-
-const (
-       /**
-        * <ul>
-        * Keywords:
-        * <li>{@code AND, OR, NOT, BETWEEN, IN, TRUE, FALSE, IS, NULL}</li>
-        * </ul>
-        * <p/>
-        * <ul>
-        * Data type:
-        * <li>Boolean, like: TRUE, FALSE</li>
-        * <li>String, like: 'abc'</li>
-        * <li>Decimal, like: 123</li>
-        * <li>Float number, like: 3.1415</li>
-        * </ul>
-        * <p/>
-        * <ul>
-        * Grammar:
-        * <li>{@code AND, OR}</li>
-        * <li>{@code >, >=, <, <=, =}</li>
-        * <li>{@code BETWEEN A AND B}, equals to {@code >=A AND <=B}</li>
-        * <li>{@code NOT BETWEEN A AND B}, equals to {@code >B OR <A}</li>
-        * <li>{@code IN ('a', 'b')}, equals to {@code ='a' OR ='b'}, this 
operation only support String type.</li>
-        * <li>{@code IS NULL}, {@code IS NOT NULL}, check parameter whether is 
null, or not.</li>
-        * <li>{@code =TRUE}, {@code =FALSE}, check parameter whether is true, 
or false.</li>
-        * </ul>
-        * <p/>
-        * <p>
-        * Example:
-        * (a > 10 AND a < 100) OR (b IS NOT NULL AND b=TRUE)
-        * </p>
-        */
-       SQL92 = ExpressionType("SQL92")
-
-       /**
-        * Only support or operation such as
-        * "tag1 || tag2 || tag3", <br>
-        * If null or * expression, meaning subscribe all.
-        */
-       TAG = ExpressionType("TAG")
-)
-
-func IsTagType(exp string) bool {
-       if exp == "" || exp == "TAG" {
-               return true
-       }
-       return false
-}
-
-var SubAll = "*"
-
 type SubscriptionData struct {
        ClassFilterMode bool
        Topic           string
@@ -327,20 +264,30 @@ type SubscriptionData struct {
        Tags            map[string]bool
        Codes           map[int32]bool
        SubVersion      int64
-       ExpType         ExpressionType
+       ExpType         string
 }
 
 type consumerData struct {
-       groupName         string
-       cType             consumeType
-       messageModel      MessageModel
-       where             ConsumeFromWhere
-       subscriptionDatas []SubscriptionData
-       unitMode          bool
+       GroupName         string              `json:"groupName"`
+       CType             consumeType         `json:"consumeType"`
+       MessageModel      string              `json:"messageModel"`
+       Where             string              `json:"consumeFromWhere"`
+       SubscriptionDatas []*SubscriptionData `json:"subscriptionDataSet"`
+       UnitMode          bool                `json:"unitMode"`
 }
 
 type heartbeatData struct {
-       clientId      string
-       producerDatas []producerData
-       consumerDatas []consumerData
+       ClientId      string         `json:"clientID"`
+       ProducerDatas []producerData `json:"producerDataSet"`
+       ConsumerDatas []consumerData `json:"consumerDataSet"`
+}
+
+func (data *heartbeatData) encode() []byte {
+       d, err := json.Marshal(data)
+       if err != nil {
+               rlog.Errorf("marshal heartbeatData error: %s", err.Error())
+               return nil
+       }
+       rlog.Info(string(d))
+       return d
 }
diff --git a/kernel/perm.go b/kernel/perm.go
index e78a876..2d568d5 100644
--- a/kernel/perm.go
+++ b/kernel/perm.go
@@ -1,19 +1,19 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file dqueueIstributed with
- * thqueueIs work for additional information regarding copyright ownership.
- * The ASF licenses thqueueIs file to You under the Apache License, Version 2.0
- * (the "License"); you may not use thqueueIs file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  dqueueIstributed under the License queueIs dqueueIstributed on an "AS IS" 
BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permqueueIssions and
- *  limitations under the License.
- */
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
 
 package kernel
 
diff --git a/kernel/request.go b/kernel/request.go
index c5e0fef..2647ad5 100644
--- a/kernel/request.go
+++ b/kernel/request.go
@@ -17,12 +17,19 @@ limitations under the License.
 
 package kernel
 
-import "fmt"
+import (
+       "fmt"
+       "time"
+)
 
 const (
-       ReqPullMessage         = int16(11)
-       ReqGetRouteInfoByTopic = int16(105)
-       ReqSendBatchMessage    = int16(320)
+       ReqPullMessage            = int16(11)
+       ReqHeartBeat              = int16(34)
+       ReqGetConsumerListByGroup = int16(38)
+       ReqLockBatchMQ            = int16(41)
+       ReqUnlockBatchMQ          = int16(42)
+       ReqGetRouteInfoByTopic    = int16(105)
+       ReqSendBatchMessage       = int16(320)
 )
 
 type SendMessageRequest struct {
@@ -49,24 +56,24 @@ func (request *SendMessageRequest) Decode(properties 
map[string]string) error {
 }
 
 type PullMessageRequest struct {
-       ConsumerGroup        string `json:"consumerGroup"`
-       Topic                string `json:"topic"`
-       QueueId              int32  `json:"queueId"`
-       QueueOffset          int64  `json:"queueOffset"`
-       MaxMsgNums           int32  `json:"maxMsgNums"`
-       SysFlag              int32  `json:"sysFlag"`
-       CommitOffset         int64  `json:"commitOffset"`
-       SuspendTimeoutMillis int64  `json:"suspendTimeoutMillis"`
-       SubExpression        string `json:"subscription"`
-       SubVersion           int64  `json:"subVersion"`
-       ExpressionType       string `json:"expressionType"`
+       ConsumerGroup        string        `json:"consumerGroup"`
+       Topic                string        `json:"topic"`
+       QueueId              int32         `json:"queueId"`
+       QueueOffset          int64         `json:"queueOffset"`
+       MaxMsgNums           int32         `json:"maxMsgNums"`
+       SysFlag              int32         `json:"sysFlag"`
+       CommitOffset         int64         `json:"commitOffset"`
+       SuspendTimeoutMillis time.Duration `json:"suspendTimeoutMillis"`
+       SubExpression        string        `json:"subscription"`
+       SubVersion           int64         `json:"subVersion"`
+       ExpressionType       string        `json:"expressionType"`
 }
 
 func (request *PullMessageRequest) Encode() map[string]string {
        maps := make(map[string]string)
        maps["consumerGroup"] = request.ConsumerGroup
        maps["topic"] = request.Topic
-       maps["queueId"] = fmt.Sprintf("%d", request.QueueOffset)
+       maps["queueId"] = fmt.Sprintf("%d", request.QueueId)
        maps["queueOffset"] = fmt.Sprintf("%d", request.QueueOffset)
        maps["maxMsgNums"] = fmt.Sprintf("%d", request.MaxMsgNums)
        maps["sysFlag"] = fmt.Sprintf("%d", request.SysFlag)
@@ -78,6 +85,16 @@ func (request *PullMessageRequest) Encode() 
map[string]string {
        return maps
 }
 
+type GetConsumerList struct {
+       ConsumerGroup string `json:"consumerGroup"`
+}
+
+func (request *GetConsumerList) Encode() map[string]string {
+       maps := make(map[string]string)
+       maps["consumerGroup"] = request.ConsumerGroup
+       return maps
+}
+
 type GetMaxOffsetRequest struct {
        Topic   string `json:"topic"`
        QueueId int32  `json:"queueId"`
diff --git a/kernel/route.go b/kernel/route.go
index dc08edc..f8cd884 100644
--- a/kernel/route.go
+++ b/kernel/route.go
@@ -22,7 +22,9 @@ import (
        "errors"
        "github.com/apache/rocketmq-client-go/remote"
        "github.com/apache/rocketmq-client-go/rlog"
+       "github.com/apache/rocketmq-client-go/utils"
        "github.com/tidwall/gjson"
+       "math/rand"
        "sort"
        "strconv"
        "strings"
@@ -32,7 +34,7 @@ import (
 )
 
 const (
-       requestTimeout   = 3000
+       requestTimeout   = 3 * time.Second
        defaultTopic     = "TBW102"
        defaultQueueNums = 4
        MasterId         = int64(0)
@@ -49,9 +51,10 @@ var (
        // brokerName -> map[string]int32
        brokerVersionMap sync.Map
 
-       publishInfoMap sync.Map
-       routeDataMap   sync.Map
-       lockNamesrv    sync.Mutex
+       //publishInfoMap sync.Map
+       //subscribeInfoMap sync.Map
+       routeDataMap sync.Map
+       lockNamesrv  sync.Mutex
 )
 
 // key is topic, value is TopicPublishInfo
@@ -59,7 +62,7 @@ type TopicPublishInfo struct {
        OrderTopic          bool
        HaveTopicRouterInfo bool
        MqList              []*MessageQueue
-       RouteData           *topicRouteData
+       RouteData           *TopicRouteData
        TopicQueueIndex     int32
 }
 
@@ -76,55 +79,63 @@ func (info *TopicPublishInfo) fetchQueueIndex() int {
        return int(qIndex) % length
 }
 
-func UpdateTopicRouteInfo(topic string) {
+func UpdateTopicRouteInfo(topic string) *TopicRouteData {
        // Todo process lock timeout
        lockNamesrv.Lock()
        defer lockNamesrv.Unlock()
 
-       routeData, err := queryTopicRouteInfoFromServer(topic, requestTimeout)
+       routeData, err := queryTopicRouteInfoFromServer(topic)
        if err != nil {
                rlog.Warnf("query topic route from server error: %s", err)
-               return
+               return nil
        }
 
        if routeData == nil {
                rlog.Warnf("queryTopicRouteInfoFromServer return nil, Topic: 
%s", topic)
-               return
+               return nil
        }
 
-       var changed bool
        oldRouteData, exist := routeDataMap.Load(topic)
-       if !exist || routeData == nil {
-               changed = true
-       } else {
-               changed = 
topicRouteDataIsChange(oldRouteData.(*topicRouteData), routeData)
-       }
-
-       if !changed {
-               changed = isNeedUpdateTopicRouteInfo(topic)
-       } else {
-               rlog.Infof("the topic[%s] route info changed, old[%v] 
,new[%s]", topic, oldRouteData, routeData)
+       changed := true
+       if exist {
+               changed = 
topicRouteDataIsChange(oldRouteData.(*TopicRouteData), routeData)
        }
 
-       if !changed {
-               return
+       if changed {
+               routeDataMap.Store(topic, routeData)
+               rlog.Infof("the topic [%s] route info changed, old %v ,new %s", 
topic,
+                       oldRouteData, routeData.String())
+               for _, brokerData := range routeData.BrokerDataList {
+                       brokerAddressesMap.Store(brokerData.BrokerName, 
brokerData)
+               }
        }
 
-       newTopicRouteData := routeData.clone()
+       return routeData.clone()
+}
 
-       for _, brokerData := range newTopicRouteData.BrokerDataList {
-               brokerAddressesMap.Store(brokerData.BrokerName, 
brokerData.BrokerAddresses)
+func FindBrokerAddrByTopic(topic string) string {
+       v, exist := routeDataMap.Load(topic)
+       if !exist {
+               return ""
        }
-
-       // update publish info
-       publishInfo := routeData2PublishInfo(topic, routeData)
-       publishInfo.HaveTopicRouterInfo = true
-
-       old, _ := publishInfoMap.Load(topic)
-       publishInfoMap.Store(topic, publishInfoMap)
-       if old != nil {
-               rlog.Infof("Old TopicPublishInfo [%s] removed.", old)
+       routeData := v.(*TopicRouteData)
+       if len(routeData.BrokerDataList) == 0 {
+               return ""
+       }
+       i := utils.AbsInt(rand.Int())
+       bd := routeData.BrokerDataList[i%len(routeData.BrokerDataList)]
+       addr := bd.BrokerAddresses[MasterId]
+       if addr == "" && len(bd.BrokerAddresses) > 0 {
+               i = i % len(bd.BrokerAddresses)
+               for _, v := range bd.BrokerAddresses {
+                       if i <= 0 {
+                               addr = v
+                               break
+                       }
+                       i--
+               }
        }
+       return addr
 }
 
 func FindBrokerAddressInPublish(brokerName string) string {
@@ -144,18 +155,20 @@ func FindBrokerAddressInSubscribe(brokerName string, 
brokerId int64, onlyThisBro
                found      = false
        )
 
-       addrs, exist := brokerAddressesMap.Load(brokerName)
+       v, exist := brokerAddressesMap.Load(brokerName)
 
-       if exist {
-               for k, v := range addrs.(map[int64]string) {
-                       if v != "" {
-                               found = true
-                               if k != MasterId {
-                                       slave = true
-                               }
-                               brokerAddr = v
-                               break
+       if !exist {
+               return nil
+       }
+       data := v.(*BrokerData)
+       for k, v := range data.BrokerAddresses {
+               if v != "" {
+                       found = true
+                       if k != MasterId {
+                               slave = true
                        }
+                       brokerAddr = v
+                       break
                }
        }
 
@@ -172,7 +185,7 @@ func FindBrokerAddressInSubscribe(brokerName string, 
brokerId int64, onlyThisBro
 }
 
 func FetchSubscribeMessageQueues(topic string) ([]*MessageQueue, error) {
-       routeData, err := queryTopicRouteInfoFromServer(topic, 3*time.Second)
+       routeData, err := queryTopicRouteInfoFromServer(topic)
 
        if err != nil {
                return nil, err
@@ -190,14 +203,14 @@ func FetchSubscribeMessageQueues(topic string) 
([]*MessageQueue, error) {
        return mqs, nil
 }
 
-func findBrokerVersion(brokerName, brokerAddr string) int {
+func findBrokerVersion(brokerName, brokerAddr string) int32 {
        versions, exist := brokerVersionMap.Load(brokerName)
 
        if !exist {
                return 0
        }
 
-       v, exist := versions.(map[string]int)[brokerAddr]
+       v, exist := versions.(map[string]int32)[brokerAddr]
 
        if exist {
                return v
@@ -205,12 +218,12 @@ func findBrokerVersion(brokerName, brokerAddr string) int 
{
        return 0
 }
 
-func queryTopicRouteInfoFromServer(topic string, timeout time.Duration) 
(*topicRouteData, error) {
+func queryTopicRouteInfoFromServer(topic string) (*TopicRouteData, error) {
        request := &GetRouteInfoRequest{
                Topic: topic,
        }
        rc := remote.NewRemotingCommand(ReqGetRouteInfoByTopic, request, nil)
-       response, err := remote.InvokeSync(getNameServerAddress(), rc, timeout)
+       response, err := remote.InvokeSync(getNameServerAddress(), rc, 
requestTimeout)
 
        if err != nil {
                return nil, err
@@ -221,11 +234,11 @@ func queryTopicRouteInfoFromServer(topic string, timeout 
time.Duration) (*topicR
                if response.Body == nil {
                        return nil, errors.New(response.Remark)
                }
-               routeData := &topicRouteData{}
+               routeData := &TopicRouteData{}
 
                err = routeData.decode(string(response.Body))
                if err != nil {
-                       rlog.Warnf("decode topicRouteData error: %s", err)
+                       rlog.Warnf("decode TopicRouteData error: %s", err)
                        return nil, err
                }
                return routeData, nil
@@ -236,7 +249,7 @@ func queryTopicRouteInfoFromServer(topic string, timeout 
time.Duration) (*topicR
        }
 }
 
-func topicRouteDataIsChange(oldData *topicRouteData, newData *topicRouteData) 
bool {
+func topicRouteDataIsChange(oldData *TopicRouteData, newData *TopicRouteData) 
bool {
        if oldData == nil || newData == nil {
                return true
        }
@@ -259,13 +272,7 @@ func topicRouteDataIsChange(oldData *topicRouteData, 
newData *topicRouteData) bo
        return !oldDataCloned.equals(newDataCloned)
 }
 
-func isNeedUpdateTopicRouteInfo(topic string) bool {
-       value, exist := publishInfoMap.Load(topic)
-
-       return !exist || value.(*TopicPublishInfo).isOK()
-}
-
-func routeData2PublishInfo(topic string, data *topicRouteData) 
*TopicPublishInfo {
+func routeData2PublishInfo(topic string, data *TopicRouteData) 
*TopicPublishInfo {
        publishInfo := &TopicPublishInfo{
                RouteData:  data,
                OrderTopic: false,
@@ -329,16 +336,20 @@ func getNameServerAddress() string {
        return "127.0.0.1:9876"
 }
 
-// topicRouteData topicRouteData
-type topicRouteData struct {
+// TopicRouteData TopicRouteData
+type TopicRouteData struct {
        OrderTopicConf string
        QueueDataList  []*QueueData  `json:"queueDatas"`
        BrokerDataList []*BrokerData `json:"brokerDatas"`
 }
 
-func (routeData *topicRouteData) decode(data string) error {
+func (routeData *TopicRouteData) decode(data string) error {
        res := gjson.Parse(data)
-       json.Unmarshal([]byte(res.Get("queueDatas").String()), 
&routeData.QueueDataList)
+       err := json.Unmarshal([]byte(res.Get("queueDatas").String()), 
&routeData.QueueDataList)
+
+       if err != nil {
+               return err
+       }
 
        bds := res.Get("brokerDatas").Array()
        routeData.BrokerDataList = make([]*BrokerData, len(bds))
@@ -365,8 +376,8 @@ func (routeData *topicRouteData) decode(data string) error {
        return nil
 }
 
-func (routeData *topicRouteData) clone() *topicRouteData {
-       cloned := &topicRouteData{
+func (routeData *TopicRouteData) clone() *TopicRouteData {
+       cloned := &TopicRouteData{
                OrderTopicConf: routeData.OrderTopicConf,
                QueueDataList:  make([]*QueueData, 
len(routeData.QueueDataList)),
                BrokerDataList: make([]*BrokerData, 
len(routeData.BrokerDataList)),
@@ -383,10 +394,15 @@ func (routeData *topicRouteData) clone() *topicRouteData {
        return cloned
 }
 
-func (routeData *topicRouteData) equals(data *topicRouteData) bool {
+func (routeData *TopicRouteData) equals(data *TopicRouteData) bool {
        return false
 }
 
+func (routeData *TopicRouteData) String() string {
+       data, _ := json.Marshal(routeData)
+       return string(data)
+}
+
 // QueueData QueueData
 type QueueData struct {
        BrokerName     string `json:"brokerName"`
diff --git a/kernel/validators.go b/kernel/validators.go
new file mode 100644
index 0000000..f34bd58
--- /dev/null
+++ b/kernel/validators.go
@@ -0,0 +1,46 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package kernel
+
+import (
+       "github.com/apache/rocketmq-client-go/rlog"
+       "regexp"
+)
+
+const (
+       _ValidPattern       = "^[%|a-zA-Z0-9_-]+$"
+       _CharacterMaxLength = 255
+)
+
+var (
+       _Pattern, _ = regexp.Compile("_ValidPattern")
+)
+
+func ValidateGroup(group string) {
+       if group == "" {
+               rlog.Fatal("consumerGroup is empty")
+       }
+
+       //if !_Pattern.Match([]byte(group)) {
+       //      rlog.Fatalf("the specified group[%s] contains illegal 
characters, allowing only %s", group, _ValidPattern)
+       //}
+
+       if len(group) > _CharacterMaxLength {
+               rlog.Fatal("the specified group is longer than group max length 
255.")
+       }
+}
diff --git a/remote/codes.go b/remote/codes.go
deleted file mode 100644
index b94d2e2..0000000
--- a/remote/codes.go
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package remote
-
-const (
-       SEND_MESSAGE                        = 10
-       PULL_MESSAGE                        = 11
-       QUERY_MESSAGE                       = 12
-       QUERY_BROKER_OFFSET                 = 13
-       QUERY_CONSUMER_OFFSET               = 14
-       UPDATE_CONSUMER_OFFSET              = 15
-       UPDATE_AND_CREATE_TOPIC             = 17
-       GET_ALL_TOPIC_CONFIG                = 21
-       GET_TOPIC_CONFIG_LIST               = 22
-       GET_TOPIC_NAME_LIST                 = 23
-       UPDATE_BROKER_CONFIG                = 25
-       GET_BROKER_CONFIG                   = 26
-       TRIGGER_DELETE_FILES                = 27
-       GET_BROKER_RUNTIME_INFO             = 28
-       SEARCH_OFFSET_BY_TIMESTAMP          = 29
-       GET_MAX_OFFSET                      = 30
-       GET_MIN_OFFSET                      = 31
-       GET_EARLIEST_MSG_STORETIME          = 32
-       VIEW_MESSAGE_BY_ID                  = 33
-       HEART_BEAT                          = 34
-       UNREGISTER_CLIENT                   = 35
-       CONSUMER_SEND_MSG_BACK              = 36
-       END_TRANSACTION                     = 37
-       GET_CONSUMER_LIST_BY_GROUP          = 38
-       CHECK_TRANSACTION_STATE             = 39
-       NOTIFY_CONSUMER_IDS_CHANGED         = 40
-       LOCK_BATCH_MQ                       = 41
-       UNLOCK_BATCH_MQ                     = 42
-       GET_ALL_CONSUMER_OFFSET             = 43
-       GET_ALL_DELAY_OFFSET                = 45
-       PUT_KV_CONFIG                       = 100
-       GET_KV_CONFIG                       = 101
-       DELETE_KV_CONFIG                    = 102
-       REGISTER_BROKER                     = 103
-       UNREGISTER_BROKER                   = 104
-       GET_ROUTEINTO_BY_TOPIC              = 105
-       GET_BROKER_CLUSTER_INFO             = 106
-       UPDATE_AND_CREATE_SUBSCRIPTIONGROUP = 200
-       GET_ALL_SUBSCRIPTIONGROUP_CONFIG    = 201
-       GET_TOPIC_STATS_INFO                = 202
-       GET_CONSUMER_CONNECTION_LIST        = 203
-       GET_PRODUCER_CONNECTION_LIST        = 204
-       WIPE_WRITE_PERM_OF_BROKER           = 205
-
-       GET_ALL_TOPIC_LIST_FROM_NAMESERVER = 206
-       DELETE_SUBSCRIPTIONGROUP           = 207
-       GET_CONSUME_STATS                  = 208
-       SUSPEND_CONSUMER                   = 209
-       RESUME_CONSUMER                    = 210
-       RESET_CONSUMER_OFFSET_IN_CONSUMER  = 211
-       RESET_CONSUMER_OFFSET_IN_BROKER    = 212
-       ADJUST_CONSUMER_THREAD_POOL        = 213
-       WHO_CONSUME_THE_MESSAGE            = 214
-
-       DELETE_TOPIC_IN_BROKER    = 215
-       DELETE_TOPIC_IN_NAMESRV   = 216
-       GET_KV_CONFIG_BY_VALUE    = 217
-       DELETE_KV_CONFIG_BY_VALUE = 218
-       GET_KVLIST_BY_NAMESPACE   = 219
-
-       RESET_CONSUMER_CLIENT_OFFSET         = 220
-       GET_CONSUMER_STATUS_FROM_CLIENT      = 221
-       INVOKE_BROKER_TO_RESET_OFFSET        = 222
-       INVOKE_BROKER_TO_GET_CONSUMER_STATUS = 223
-
-       QUERY_TOPIC_CONSUME_BY_WHO = 300
-
-       GET_TOPICS_BY_CLUSTER = 224
-
-       REGISTER_FILTER_SERVER            = 301
-       REGISTER_MESSAGE_FILTER_CLASS     = 302
-       QUERY_CONSUME_TIME_SPAN           = 303
-       GET_SYSTEM_TOPIC_LIST_FROM_NS     = 304
-       GET_SYSTEM_TOPIC_LIST_FROM_BROKER = 305
-
-       CLEAN_EXPIRED_CONSUMEQUEUE = 306
-
-       GET_CONSUMER_RUNNING_INFO = 307
-
-       QUERY_CORRECTION_OFFSET = 308
-
-       CONSUME_MESSAGE_DIRECTLY = 309
-
-       SEND_MESSAGE_V2 = 310
-
-       GET_UNIT_TOPIC_LIST                = 311
-       GET_HAS_UNIT_SUB_TOPIC_LIST        = 312
-       GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST = 313
-       CLONE_GROUP_OFFSET                 = 314
-
-       VIEW_BROKER_STATS_DATA = 315
-)
-
-const (
-       SUCCESS                       = 0
-       SYSTEM_ERROR                  = 1
-       SYSTEM_BUSY                   = 2
-       REQUEST_CODE_NOT_SUPPORTED    = 3
-       TRANSACTION_FAILED            = 4
-       FLUSH_DISK_TIMEOUT            = 10
-       SLAVE_NOT_AVAILABLE           = 11
-       FLUSH_SLAVE_TIMEOUT           = 12
-       MESSAGE_ILLEGAL               = 13
-       SERVICE_NOT_AVAILABLE         = 14
-       VERSION_NOT_SUPPORTED         = 15
-       NO_PERMISSION                 = 16
-       TOPIC_NOT_EXIST               = 17
-       TOPIC_EXIST_ALREADY           = 18
-       PULL_NOT_FOUND                = 19
-       PULL_RETRY_IMMEDIATELY        = 20
-       PULL_OFFSET_MOVED             = 21
-       QUERY_NOT_FOUND               = 22
-       SUBSCRIPTION_PARSE_FAILED     = 23
-       SUBSCRIPTION_NOT_EXIST        = 24
-       SUBSCRIPTION_NOT_LATEST       = 25
-       SUBSCRIPTION_GROUP_NOT_EXIST  = 26
-       TRANSACTION_SHOULD_COMMIT     = 200
-       TRANSACTION_SHOULD_ROLLBACK   = 201
-       TRANSACTION_STATE_UNKNOW      = 202
-       TRANSACTION_STATE_GROUP_WRONG = 203
-       NO_BUYER_ID                   = 204
-
-       NOT_IN_CURRENT_UNIT = 205
-
-       CONSUMER_NOT_ONLINE = 206
-
-       CONSUME_MSG_TIMEOUT = 207
-)
diff --git a/remote/client.go b/remote/remote_client.go
similarity index 88%
rename from remote/client.go
rename to remote/remote_client.go
index bfc4c36..9f5839a 100644
--- a/remote/client.go
+++ b/remote/remote_client.go
@@ -21,6 +21,7 @@ import (
        "bytes"
        "encoding/binary"
        "errors"
+       "github.com/apache/rocketmq-client-go/rlog"
        "io"
        "net"
        "sync"
@@ -164,22 +165,26 @@ func connect(addr string) (net.Conn, error) {
 
 func receiveResponse(r net.Conn) {
        scanner := createScanner(r)
-       for scanner.Scan() {
+       for {
+               scanner.Scan()
                receivedRemotingCommand, err := decode(scanner.Bytes())
                if err != nil {
                        closeConnection(r)
+                       rlog.Error(err.Error())
                        break
                }
                if receivedRemotingCommand.isResponseType() {
-                       resp, ok := 
responseTable.Load(receivedRemotingCommand.Opaque)
-                       if ok {
+                       resp, exist := 
responseTable.Load(receivedRemotingCommand.Opaque)
+                       if exist {
                                
responseTable.Delete(receivedRemotingCommand.Opaque)
                                responseFuture := resp.(*ResponseFuture)
-                               responseFuture.ResponseCommand = 
receivedRemotingCommand
-                               responseFuture.executeInvokeCallback()
-                               if responseFuture.Done != nil {
-                                       responseFuture.Done <- true
-                               }
+                               go func() {
+                                       responseFuture.ResponseCommand = 
receivedRemotingCommand
+                                       responseFuture.executeInvokeCallback()
+                                       if responseFuture.Done != nil {
+                                               responseFuture.Done <- true
+                                       }
+                               }()
                        }
                } else {
                        // todo handler request from peer
@@ -190,10 +195,20 @@ func receiveResponse(r net.Conn) {
 func createScanner(r io.Reader) *bufio.Scanner {
        scanner := bufio.NewScanner(r)
        scanner.Split(func(data []byte, atEOF bool) (int, []byte, error) {
+               defer func() {
+                       if err := recover(); err != nil {
+                               rlog.Errorf("panic: %v", err)
+                       }
+               }()
                if !atEOF {
                        if len(data) >= 4 {
                                var length int32
-                               binary.Read(bytes.NewReader(data[0:4]), 
binary.BigEndian, &length)
+                               err := binary.Read(bytes.NewReader(data[0:4]), 
binary.BigEndian, &length)
+                               if err != nil {
+                                       rlog.Errorf("split data error: %s", 
err.Error())
+                                       return 0, nil, err
+                               }
+
                                if int(length)+4 <= len(data) {
                                        return int(length) + 4, data[4 : 
length+4], nil
                                }
diff --git a/remote/client_test.go b/remote/remote_client_test.go
similarity index 100%
rename from remote/client_test.go
rename to remote/remote_client_test.go
diff --git a/rlog/log.go b/rlog/log.go
index c73c9ba..4f5adaf 100644
--- a/rlog/log.go
+++ b/rlog/log.go
@@ -22,8 +22,6 @@ import (
 )
 
 type Logger interface {
-       Print(i ...interface{})
-       Printf(format string, args ...interface{})
        Debug(i ...interface{})
        Debugf(format string, args ...interface{})
        Info(i ...interface{})
@@ -42,14 +40,6 @@ func SetLogger(log Logger) {
        rLog = log
 }
 
-func Print(i ...interface{}) {
-       rLog.Print(i...)
-}
-
-func Printf(format string, args ...interface{}) {
-       rLog.Printf(format, args...)
-}
-
 func Debug(i ...interface{}) {
        rLog.Debug(i...)
 }
diff --git a/utils/helper.go b/utils/helper.go
index 485ae15..1ad0441 100644
--- a/utils/helper.go
+++ b/utils/helper.go
@@ -22,7 +22,6 @@ import (
        "encoding/binary"
        "errors"
        "fmt"
-       "hash/crc32"
        "net"
        "os"
        "sync"
@@ -99,14 +98,10 @@ func ClassLoaderID() int32 {
        return 0
 }
 
-// HashString hashes a string to a unique hashcode.
-func HashString(s string) int {
-       if s == "" {
-               return 0
-       }
-       return int(crc32.ChecksumIEEE([]byte(s)))
-}
-
 func UnCompress(data []byte) []byte {
        return data
 }
+
+func IsArrayEmpty(i ...interface{}) bool {
+       return i == nil || len(i) == 0
+}
diff --git a/utils/helper_test.go b/utils/helper_test.go
index 4e1877c..967b0ae 100644
--- a/utils/helper_test.go
+++ b/utils/helper_test.go
@@ -16,7 +16,9 @@
  */
 package utils
 
-import "testing"
+import (
+       "testing"
+)
 
 func TestClassLoaderID(t *testing.T) {
        if ClassLoaderID() != 0 {
diff --git a/utils/math.go b/utils/math.go
new file mode 100644
index 0000000..816631e
--- /dev/null
+++ b/utils/math.go
@@ -0,0 +1,32 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package utils
+
+func AbsInt(i int) int {
+       if i >= 0 {
+               return i
+       }
+       return -i
+}
+
+func MinInt(a, b int) int {
+       if a < b {
+               return a
+       }
+       return b
+}
diff --git a/utils/string.go b/utils/string.go
new file mode 100644
index 0000000..6a74808
--- /dev/null
+++ b/utils/string.go
@@ -0,0 +1,40 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package utils
+
+import "fmt"
+
+// HashString hashes a string to a unique hashcode.
+func HashString(s string) int {
+       val := []byte(s)
+       var h int
+
+       for idx := range val {
+               h = 31*h + int(val[idx])
+       }
+
+       return h
+}
+
+func StrJoin(str, key string, value interface{}) string {
+       if key == "" || value == "" {
+               return str
+       }
+
+       return str + key + ": " + fmt.Sprint(value) + ", "
+}

Reply via email to