This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new 529c872  Add ClientProcessor and optimize (#52)
529c872 is described below

commit 529c872f2473247f8735d43a2dbc3c9c72c7d93e
Author: wenfeng <[email protected]>
AuthorDate: Tue May 7 11:35:14 2019 +0800

    Add ClientProcessor and optimize (#52)
    
    * replace list with TreeMap in process_queue
    
    * refactor for processor
    
    * update feature list
    
    * add cleanOfflineBroker
---
 benchmark/producer.go                   |   8 +-
 benchmark/stable.go                     |   6 +-
 consumer/consumer.go                    | 100 ++++++++++++----
 consumer/offset_store.go                |  99 ++++++++-------
 consumer/process_queue.go               | 205 ++++++++++++++++++++++++++------
 consumer/push_consumer.go               | 119 ++++++++++++++----
 core/api.go                             |   4 +-
 core/error.go                           |   2 +-
 core/producer.go                        |   4 +-
 docs/feature.md                         | 142 ++++++++++------------
 examples/{producer => consumer}/main.go |   7 +-
 go.mod                                  |   3 -
 go.sum                                  |   6 -
 kernel/client.go                        | 113 +++++++-----------
 kernel/model.go                         |   2 +-
 kernel/request.go                       |  29 +++--
 kernel/route.go                         |  99 ++++++++++++++-
 remote/processor.go                     |  26 ----
 remote/remote_client.go                 | 118 +++++++++++-------
 rlog/log.go                             |  12 +-
 utils/helper.go                         |  25 ----
 utils/helper_test.go                    |   9 --
 utils/net.go                            |  30 +++++
 utils/net_test.go                       |   7 ++
 24 files changed, 749 insertions(+), 426 deletions(-)

diff --git a/benchmark/producer.go b/benchmark/producer.go
index e183269..4551615 100644
--- a/benchmark/producer.go
+++ b/benchmark/producer.go
@@ -108,7 +108,7 @@ type producer struct {
 
 func init() {
        p := &producer{}
-       flags := flag.NewFlagSet("producer", flag.ExitOnError)
+       flags := flag.NewFlagSet("consumer", flag.ExitOnError)
        p.flags = flags
 
        flags.StringVar(&p.topic, "t", "", "topic name")
@@ -118,7 +118,7 @@ func init() {
        flags.IntVar(&p.testMinutes, "m", 10, "test minutes")
        flags.IntVar(&p.bodySize, "s", 32, "body size")
 
-       registerCommand("producer", p)
+       registerCommand("consumer", p)
 }
 
 func (bp *producer) produceMsg(stati *statiBenchmarkProducerSnapshot, exit 
chan struct{}) {
@@ -126,14 +126,14 @@ func (bp *producer) produceMsg(stati 
*statiBenchmarkProducerSnapshot, exit chan
                ClientConfig: rocketmq.ClientConfig{GroupID: bp.groupID, 
NameServer: bp.nameSrv},
        })
        if err != nil {
-               fmt.Printf("new producer error:%s\n", err)
+               fmt.Printf("new consumer error:%s\n", err)
                return
        }
 
        p.Start()
        defer p.Shutdown()
 
-       topic, tag := bp.topic, "benchmark-producer"
+       topic, tag := bp.topic, "benchmark-consumer"
 
 AGAIN:
        select {
diff --git a/benchmark/stable.go b/benchmark/stable.go
index 6c7a12c..46c7b5c 100644
--- a/benchmark/stable.go
+++ b/benchmark/stable.go
@@ -145,13 +145,13 @@ func (stp *stableTestProducer) run(args []string) {
                ClientConfig: rocketmq.ClientConfig{GroupID: stp.groupID, 
NameServer: stp.nameSrv},
        })
        if err != nil {
-               fmt.Printf("new producer error:%s\n", err)
+               fmt.Printf("new consumer error:%s\n", err)
                return
        }
 
        err = p.Start()
        if err != nil {
-               fmt.Printf("start producer error:%s\n", err)
+               fmt.Printf("start consumer error:%s\n", err)
                return
        }
        defer p.Shutdown()
@@ -256,7 +256,7 @@ func (stc *stableTestConsumer) pullMessage() {
 }
 
 func init() {
-       // producer
+       // consumer
        name := "stableTestProducer"
        p := &stableTestProducer{stableTest: &stableTest{}}
        p.buildFlags(name)
diff --git a/consumer/consumer.go b/consumer/consumer.go
index f128f66..a6d9500 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -26,6 +26,7 @@ import (
        "github.com/apache/rocketmq-client-go/utils"
        "github.com/tidwall/gjson"
        "sort"
+       "strconv"
        "strings"
        "sync"
        "sync/atomic"
@@ -181,7 +182,7 @@ const (
 type PullRequest struct {
        consumerGroup string
        mq            *kernel.MessageQueue
-       pq            *ProcessQueue
+       pq            *processQueue
        nextOffset    int64
        lockedFirst   bool
 }
@@ -209,7 +210,7 @@ type ConsumerOption struct {
 
        // Flow control threshold on queue level, each message queue will cache 
at most 1000 messages by default,
        // Consider the {PullBatchSize}, the instantaneous value may exceed the 
limit
-       PullThresholdForQueue int
+       PullThresholdForQueue int64
 
        // Limit the cached message size on queue level, each message queue 
will cache at most 100 MiB messages by default,
        // Consider the {@code pullBatchSize}, the instantaneous value may 
exceed the limit
@@ -408,8 +409,10 @@ func (dc *defaultConsumer) SubscriptionDataList() 
[]*kernel.SubscriptionData {
 }
 
 func (dc *defaultConsumer) makeSureStateOK() error {
-       // TODO log
-       return nil //dc.state == StateRunning
+       if dc.state != kernel.StateRunning {
+               return fmt.Errorf("state not running, actually: %v", dc.state)
+       }
+       return nil
 }
 
 type lockBatchRequestBody struct {
@@ -436,7 +439,7 @@ func (dc *defaultConsumer) lock(mq *kernel.MessageQueue) 
bool {
                _mq := lockedMQ[idx]
                v, exist := dc.processQueueTable.Load(_mq)
                if exist {
-                       pq := v.(*ProcessQueue)
+                       pq := v.(*processQueue)
                        pq.locked = true
                        pq.lastConsumeTime = time.Now()
                }
@@ -486,7 +489,7 @@ func (dc *defaultConsumer) lockAll(mq kernel.MessageQueue) {
                        _mq := lockedMQ[idx]
                        v, exist := dc.processQueueTable.Load(_mq)
                        if exist {
-                               pq := v.(*ProcessQueue)
+                               pq := v.(*processQueue)
                                pq.locked = true
                                pq.lastConsumeTime = time.Now()
                        }
@@ -497,7 +500,7 @@ func (dc *defaultConsumer) lockAll(mq kernel.MessageQueue) {
                        if !set[_mq.HashCode()] {
                                v, exist := dc.processQueueTable.Load(_mq)
                                if exist {
-                                       pq := v.(*ProcessQueue)
+                                       pq := v.(*processQueue)
                                        pq.locked = true
                                        pq.lastLockTime = time.Now()
                                        rlog.Warnf("the message queue: %s 
locked Failed, Group: %s", mq.String(), dc.consumerGroup)
@@ -527,7 +530,7 @@ func (dc *defaultConsumer) unlockAll(oneway bool) {
                        _mq := mqs[idx]
                        v, exist := dc.processQueueTable.Load(_mq)
                        if exist {
-                               v.(*ProcessQueue).locked = false
+                               v.(*processQueue).locked = false
                                rlog.Warnf("the message queue: %s locked 
Failed, Group: %s", _mq.String(), dc.consumerGroup)
                        }
                }
@@ -537,7 +540,7 @@ func (dc *defaultConsumer) unlockAll(oneway bool) {
 func (dc *defaultConsumer) doLock(addr string, body *lockBatchRequestBody) 
[]kernel.MessageQueue {
        data, _ := json.Marshal(body)
        request := remote.NewRemotingCommand(kernel.ReqLockBatchMQ, nil, data)
-       response, err := remote.InvokeSync(addr, request, 1*time.Second)
+       response, err := dc.client.InvokeSync(addr, request, 1*time.Second)
        if err != nil {
                rlog.Errorf("lock mq to broker: %s error %s", addr, err.Error())
                return nil
@@ -557,12 +560,12 @@ func (dc *defaultConsumer) doUnlock(addr string, body 
*lockBatchRequestBody, one
        data, _ := json.Marshal(body)
        request := remote.NewRemotingCommand(kernel.ReqUnlockBatchMQ, nil, data)
        if oneway {
-               err := remote.InvokeOneWay(addr, request, 3*time.Second)
+               err := dc.client.InvokeOneWay(addr, request, 3*time.Second)
                if err != nil {
                        rlog.Errorf("lock mq to broker with oneway: %s error 
%s", addr, err.Error())
                }
        } else {
-               response, err := remote.InvokeSync(addr, request, 1*time.Second)
+               response, err := dc.client.InvokeSync(addr, request, 
1*time.Second)
                if err != nil {
                        rlog.Errorf("lock mq to broker: %s error %s", addr, 
err.Error())
                }
@@ -599,12 +602,13 @@ func (dc *defaultConsumer) updateProcessQueueTable(topic 
string, mqs []*kernel.M
        // TODO
        dc.processQueueTable.Range(func(key, value interface{}) bool {
                mq := key.(*kernel.MessageQueue)
-               pq := value.(*ProcessQueue)
+               pq := value.(*processQueue)
                if mq.Topic == topic {
                        if !mqSet[mq] {
                                pq.dropped = true
                                if dc.removeUnnecessaryMessageQueue(mq, pq) {
-                                       delete(mqSet, mq)
+                                       //delete(mqSet, mq)
+                                       dc.processQueueTable.Delete(key)
                                        changed = true
                                        rlog.Infof("do defaultConsumer, 
Group:%s, remove unnecessary mq: %s", dc.consumerGroup, mq.String())
                                }
@@ -640,7 +644,7 @@ func (dc *defaultConsumer) updateProcessQueueTable(topic 
string, mqs []*kernel.M
                                        rlog.Debugf("do defaultConsumer, Group: 
%s, mq already exist, %s", dc.consumerGroup, mq.String())
                                } else {
                                        rlog.Infof("do defaultConsumer, Group: 
%s, add a new mq, %s", dc.consumerGroup, mq.String())
-                                       pq := &ProcessQueue{}
+                                       pq := newProcessQueue()
                                        dc.processQueueTable.Store(mq, pq)
                                        pr := PullRequest{
                                                consumerGroup: dc.consumerGroup,
@@ -660,12 +664,9 @@ func (dc *defaultConsumer) updateProcessQueueTable(topic 
string, mqs []*kernel.M
        return changed
 }
 
-func (dc *defaultConsumer) removeUnnecessaryMessageQueue(mq 
*kernel.MessageQueue, pq *ProcessQueue) bool {
+func (dc *defaultConsumer) removeUnnecessaryMessageQueue(mq 
*kernel.MessageQueue, pq *processQueue) bool {
        dc.storage.persist([]*kernel.MessageQueue{mq})
        dc.storage.remove(mq)
-       if dc.cType == _PushConsume && dc.consumeOrderly && Clustering == 
dc.model {
-               // TODO
-       }
        return true
 }
 
@@ -684,7 +685,7 @@ func (dc *defaultConsumer) computePullFromWhere(mq 
*kernel.MessageQueue) int64 {
                                if strings.HasPrefix(mq.Topic, 
kernel.RetryGroupTopicPrefix) {
                                        lastOffset = 0
                                } else {
-                                       lastOffset, err := 
kernel.QueryMaxOffset(mq)
+                                       lastOffset, err := dc.queryMaxOffset(mq)
                                        if err == nil {
                                                result = lastOffset
                                        } else {
@@ -701,7 +702,7 @@ func (dc *defaultConsumer) computePullFromWhere(mq 
*kernel.MessageQueue) int64 {
                case ConsumeFromTimestamp:
                        if lastOffset == -1 {
                                if strings.HasPrefix(mq.Topic, 
kernel.RetryGroupTopicPrefix) {
-                                       lastOffset, err := 
kernel.QueryMaxOffset(mq)
+                                       lastOffset, err := dc.queryMaxOffset(mq)
                                        if err == nil {
                                                result = lastOffset
                                        } else {
@@ -713,7 +714,7 @@ func (dc *defaultConsumer) computePullFromWhere(mq 
*kernel.MessageQueue) int64 {
                                        if err != nil {
                                                result = -1
                                        } else {
-                                               lastOffset, err := 
kernel.SearchOffsetByTimestamp(mq, t.Unix())
+                                               lastOffset, err := 
dc.searchOffsetByTimestamp(mq, t.Unix())
                                                if err != nil {
                                                        result = -1
                                                } else {
@@ -741,7 +742,7 @@ func (dc *defaultConsumer) findConsumerList(topic string) 
[]string {
                        ConsumerGroup: dc.consumerGroup,
                }
                cmd := 
remote.NewRemotingCommand(kernel.ReqGetConsumerListByGroup, req, nil)
-               res, err := remote.InvokeSync(brokerAddr, cmd, 3*time.Second) 
// TODO 超时机制有问题
+               res, err := dc.client.InvokeSync(brokerAddr, cmd, 
3*time.Second) // TODO 超时机制有问题
                if err != nil {
                        rlog.Errorf("get consumer list of [%s] from %s error: 
%s", dc.consumerGroup, brokerAddr, err.Error())
                        return nil
@@ -757,6 +758,61 @@ func (dc *defaultConsumer) findConsumerList(topic string) 
[]string {
        return nil
 }
 
+func (dc *defaultConsumer) sendBack(msg *kernel.MessageExt, level int) error {
+       return nil
+}
+
+// QueryMaxOffset with specific queueId and topic
+func (dc *defaultConsumer) queryMaxOffset(mq *kernel.MessageQueue) (int64, 
error) {
+       brokerAddr := kernel.FindBrokerAddrByName(mq.BrokerName)
+       if brokerAddr == "" {
+               kernel.UpdateTopicRouteInfo(mq.Topic)
+               brokerAddr = kernel.FindBrokerAddrByName(mq.Topic)
+       }
+       if brokerAddr == "" {
+               return -1, fmt.Errorf("the broker [%s] does not exist", 
mq.BrokerName)
+       }
+
+       request := &kernel.GetMaxOffsetRequest{
+               Topic:   mq.Topic,
+               QueueId: mq.QueueId,
+       }
+
+       cmd := remote.NewRemotingCommand(kernel.ReqGetMaxOffset, request, nil)
+       response, err := dc.client.InvokeSync(brokerAddr, cmd, 3*time.Second)
+       if err != nil {
+               return -1, err
+       }
+
+       return strconv.ParseInt(response.ExtFields["offset"], 10, 64)
+}
+
+// SearchOffsetByTimestamp with specific queueId and topic
+func (dc *defaultConsumer) searchOffsetByTimestamp(mq *kernel.MessageQueue, 
timestamp int64) (int64, error) {
+       brokerAddr := kernel.FindBrokerAddrByName(mq.BrokerName)
+       if brokerAddr == "" {
+               kernel.UpdateTopicRouteInfo(mq.Topic)
+               brokerAddr = kernel.FindBrokerAddrByName(mq.Topic)
+       }
+       if brokerAddr == "" {
+               return -1, fmt.Errorf("the broker [%s] does not exist", 
mq.BrokerName)
+       }
+
+       request := &kernel.SearchOffsetRequest{
+               Topic:     mq.Topic,
+               QueueId:   mq.QueueId,
+               Timestamp: timestamp,
+       }
+
+       cmd := remote.NewRemotingCommand(kernel.ReqSearchOffsetByTimestamp, 
request, nil)
+       response, err := dc.client.InvokeSync(brokerAddr, cmd, 3*time.Second)
+       if err != nil {
+               return -1, err
+       }
+
+       return strconv.ParseInt(response.ExtFields["offset"], 10, 64)
+}
+
 func buildSubscriptionData(topic string, selector MessageSelector) 
*kernel.SubscriptionData {
        subData := &kernel.SubscriptionData{
                Topic:     topic,
diff --git a/consumer/offset_store.go b/consumer/offset_store.go
index 6b6d719..e368945 100644
--- a/consumer/offset_store.go
+++ b/consumer/offset_store.go
@@ -50,7 +50,6 @@ func init() {
 }
 
 type OffsetStore interface {
-       load()
        persist(mqs []*kernel.MessageQueue)
        remove(mq *kernel.MessageQueue)
        read(mq *kernel.MessageQueue, t readType) int64
@@ -110,7 +109,7 @@ func (local *localFileOffsetStore) read(mq 
*kernel.MessageQueue, t readType) int
 }
 
 func (local *localFileOffsetStore) update(mq *kernel.MessageQueue, offset 
int64, increaseOnly bool) {
-       rlog.Infof("update offset: %s to %d", mq, offset)
+       rlog.Debugf("update offset: %s to %d", mq, offset)
        localOffset, exist := local.OffsetTable[mq.Topic]
        if !exist {
                localOffset = make(map[int]*queueOffset)
@@ -163,35 +162,33 @@ func (local *localFileOffsetStore) persist(mqs 
[]*kernel.MessageQueue) {
 }
 
 func (local *localFileOffsetStore) remove(mq *kernel.MessageQueue) {
-       // unsupported
+       // nothing to do
 }
 
 type remoteBrokerOffsetStore struct {
        group       string
        OffsetTable map[string]map[int]*queueOffset `json:"OffsetTable"`
+       client      *kernel.RMQClient
        mutex       sync.RWMutex
 }
 
-func NewRemoteOffsetStore(group string) OffsetStore {
+func NewRemoteOffsetStore(group string, client *kernel.RMQClient) OffsetStore {
        return &remoteBrokerOffsetStore{
                group:       group,
+               client:      client,
                OffsetTable: make(map[string]map[int]*queueOffset),
        }
 }
 
-func (remote *remoteBrokerOffsetStore) load() {
-       // unsupported
-}
-
-func (remote *remoteBrokerOffsetStore) persist(mqs []*kernel.MessageQueue) {
-       remote.mutex.Lock()
-       defer remote.mutex.Unlock()
+func (r *remoteBrokerOffsetStore) persist(mqs []*kernel.MessageQueue) {
+       r.mutex.Lock()
+       defer r.mutex.Unlock()
        if len(mqs) == 0 {
                return
        }
        for idx := range mqs {
                mq := mqs[idx]
-               offsets, exist := remote.OffsetTable[mq.Topic]
+               offsets, exist := r.OffsetTable[mq.Topic]
                if !exist {
                        continue
                }
@@ -200,23 +197,23 @@ func (remote *remoteBrokerOffsetStore) persist(mqs 
[]*kernel.MessageQueue) {
                        continue
                }
 
-               err := updateConsumeOffsetToBroker(remote.group, mq.Topic, off)
+               err := r.updateConsumeOffsetToBroker(r.group, mq.Topic, off)
                if err != nil {
                        rlog.Warnf("update offset to broker error: %s, group: 
%s, queue: %s, offset: %d",
-                               err.Error(), remote.group, mq.String(), 
off.Offset)
+                               err.Error(), r.group, mq.String(), off.Offset)
                } else {
-                       rlog.Infof("update offset to broker success, group: %s, 
topic: %s, queue: %v", remote.group, mq.Topic, off)
+                       rlog.Debugf("update offset to broker success, group: 
%s, topic: %s, queue: %v", r.group, mq.Topic, off)
                }
        }
 }
 
-func (remote *remoteBrokerOffsetStore) remove(mq *kernel.MessageQueue) {
-       remote.mutex.Lock()
-       defer remote.mutex.Unlock()
+func (r *remoteBrokerOffsetStore) remove(mq *kernel.MessageQueue) {
+       r.mutex.Lock()
+       defer r.mutex.Unlock()
        if mq == nil {
                return
        }
-       offset, exist := remote.OffsetTable[mq.Topic]
+       offset, exist := r.OffsetTable[mq.Topic]
        if !exist {
                return
        }
@@ -224,38 +221,38 @@ func (remote *remoteBrokerOffsetStore) remove(mq 
*kernel.MessageQueue) {
        delete(offset, mq.QueueId)
 }
 
-func (remote *remoteBrokerOffsetStore) read(mq *kernel.MessageQueue, t 
readType) int64 {
-       remote.mutex.RLock()
+func (r *remoteBrokerOffsetStore) read(mq *kernel.MessageQueue, t readType) 
int64 {
+       r.mutex.RLock()
        if t == _ReadFromMemory || t == _ReadMemoryThenStore {
-               off := readFromMemory(remote.OffsetTable, mq)
+               off := readFromMemory(r.OffsetTable, mq)
                if off >= 0 || (off == -1 && t == _ReadFromMemory) {
-                       remote.mutex.RUnlock()
+                       r.mutex.RUnlock()
                        return off
                }
        }
-       off, err := fetchConsumeOffsetFromBroker(remote.group, mq)
+       off, err := r.fetchConsumeOffsetFromBroker(r.group, mq)
        if err != nil {
                rlog.Errorf("fetch offset of %s error: %s", mq.String(), 
err.Error())
-               remote.mutex.RUnlock()
+               r.mutex.RUnlock()
                return -1
        }
-       remote.mutex.RUnlock()
-       remote.update(mq, off, true)
+       r.mutex.RUnlock()
+       r.update(mq, off, true)
        return off
 }
 
-func (remote *remoteBrokerOffsetStore) update(mq *kernel.MessageQueue, offset 
int64, increaseOnly bool) {
-       rlog.Infof("update offset: %s to %d", mq, offset)
-       remote.mutex.Lock()
-       defer remote.mutex.Unlock()
-       localOffset, exist := remote.OffsetTable[mq.Topic]
+func (r *remoteBrokerOffsetStore) update(mq *kernel.MessageQueue, offset 
int64, increaseOnly bool) {
+       rlog.Debugf("update offset: %s to %d", mq, offset)
+       r.mutex.Lock()
+       defer r.mutex.Unlock()
+       localOffset, exist := r.OffsetTable[mq.Topic]
        if !exist {
                localOffset = make(map[int]*queueOffset)
-               remote.OffsetTable[mq.Topic] = localOffset
+               r.OffsetTable[mq.Topic] = localOffset
        }
        q, exist := localOffset[mq.QueueId]
        if !exist {
-               rlog.Infof("new queueOffset: %d, off: %d", mq.QueueId, offset)
+               rlog.Infof("add a new queue: %s, off: %d", mq.String(), offset)
                q = &queueOffset{
                        QueueID: mq.QueueId,
                        Broker:  mq.BrokerName,
@@ -271,20 +268,7 @@ func (remote *remoteBrokerOffsetStore) update(mq 
*kernel.MessageQueue, offset in
        }
 }
 
-func readFromMemory(table map[string]map[int]*queueOffset, mq 
*kernel.MessageQueue) int64 {
-       localOffset, exist := table[mq.Topic]
-       if !exist {
-               return -1
-       }
-       off, exist := localOffset[mq.QueueId]
-       if !exist {
-               return -1
-       }
-
-       return off.Offset
-}
-
-func fetchConsumeOffsetFromBroker(group string, mq *kernel.MessageQueue) 
(int64, error) {
+func (r *remoteBrokerOffsetStore) fetchConsumeOffsetFromBroker(group string, 
mq *kernel.MessageQueue) (int64, error) {
        broker := kernel.FindBrokerAddrByName(mq.BrokerName)
        if broker == "" {
                kernel.UpdateTopicRouteInfo(mq.Topic)
@@ -299,7 +283,7 @@ func fetchConsumeOffsetFromBroker(group string, mq 
*kernel.MessageQueue) (int64,
                QueueId:       mq.QueueId,
        }
        cmd := remote.NewRemotingCommand(kernel.ReqQueryConsumerOffset, 
queryOffsetRequest, nil)
-       res, err := remote.InvokeSync(broker, cmd, 3*time.Second)
+       res, err := r.client.InvokeSync(broker, cmd, 3*time.Second)
        if err != nil {
                return -1, err
        }
@@ -316,7 +300,7 @@ func fetchConsumeOffsetFromBroker(group string, mq 
*kernel.MessageQueue) (int64,
        return off, nil
 }
 
-func updateConsumeOffsetToBroker(group, topic string, queue *queueOffset) 
error {
+func (r *remoteBrokerOffsetStore) updateConsumeOffsetToBroker(group, topic 
string, queue *queueOffset) error {
        broker := kernel.FindBrokerAddrByName(queue.Broker)
        if broker == "" {
                kernel.UpdateTopicRouteInfo(topic)
@@ -333,5 +317,18 @@ func updateConsumeOffsetToBroker(group, topic string, 
queue *queueOffset) error
                CommitOffset:  queue.Offset,
        }
        cmd := remote.NewRemotingCommand(kernel.ReqUpdateConsumerOffset, 
updateOffsetRequest, nil)
-       return remote.InvokeOneWay(broker, cmd, 5*time.Second)
+       return r.client.InvokeOneWay(broker, cmd, 5*time.Second)
+}
+
+func readFromMemory(table map[string]map[int]*queueOffset, mq 
*kernel.MessageQueue) int64 {
+       localOffset, exist := table[mq.Topic]
+       if !exist {
+               return -1
+       }
+       off, exist := localOffset[mq.QueueId]
+       if !exist {
+               return -1
+       }
+
+       return off.Offset
 }
diff --git a/consumer/process_queue.go b/consumer/process_queue.go
index f4367f9..77e84fa 100644
--- a/consumer/process_queue.go
+++ b/consumer/process_queue.go
@@ -18,9 +18,13 @@ limitations under the License.
 package consumer
 
 import (
-       "container/list"
        "github.com/apache/rocketmq-client-go/kernel"
+       "github.com/apache/rocketmq-client-go/rlog"
+       "github.com/emirpasic/gods/maps/treemap"
+       "github.com/emirpasic/gods/utils"
+       "strconv"
        "sync"
+       "sync/atomic"
        "time"
 )
 
@@ -30,10 +34,10 @@ const (
        _PullMaxIdleTime      = 120 * time.Second
 )
 
-type ProcessQueue struct {
+type processQueue struct {
+       msgCache                   *treemap.Map
        mutex                      sync.RWMutex
-       msgCache                   list.List // sorted
-       cachedMsgCount             int
+       cachedMsgCount             int64
        cachedMsgSize              int64
        consumeLock                sync.Mutex
        consumingMsgOrderlyTreeMap sync.Map
@@ -46,61 +50,192 @@ type ProcessQueue struct {
        lastLockTime               time.Time
        consuming                  bool
        msgAccCnt                  int64
-       once                       sync.Once
+       lockConsume                sync.Mutex
+       msgCh                      chan []*kernel.MessageExt
 }
 
-func (pq *ProcessQueue) isPullExpired() bool {
-       return false
-}
-
-func (pq *ProcessQueue) getMaxSpan() int {
-       return pq.msgCache.Len()
+func newProcessQueue() *processQueue {
+       pq := &processQueue{
+               msgCache:        treemap.NewWith(utils.Int64Comparator),
+               lastPullTime:    time.Now(),
+               lastConsumeTime: time.Now(),
+               lastLockTime:    time.Now(),
+               msgCh:           make(chan []*kernel.MessageExt, 32),
+       }
+       return pq
 }
 
-func (pq *ProcessQueue) putMessage(messages []*kernel.MessageExt) {
-       pq.once.Do(func() {
-               pq.msgCache.Init()
-       })
-       localList := list.New()
-       for idx := range messages {
-               localList.PushBack(messages[idx])
-               pq.queueOffsetMax = messages[idx].QueueOffset
+func (pq *processQueue) putMessage(messages ...*kernel.MessageExt) {
+       if messages == nil || len(messages) == 0 {
+               return
        }
        pq.mutex.Lock()
-       pq.msgCache.PushBackList(localList)
+       pq.msgCh <- messages // 放锁外面会挂
+       validMessageCount := 0
+       for idx := range messages {
+               msg := messages[idx]
+               _, found := pq.msgCache.Get(msg.QueueOffset)
+               if found {
+                       continue
+               }
+               pq.msgCache.Put(msg.QueueOffset, msg)
+               validMessageCount++
+               pq.queueOffsetMax = msg.QueueOffset
+               atomic.AddInt64(&pq.cachedMsgSize, int64(len(msg.Body)))
+       }
        pq.mutex.Unlock()
+
+       atomic.AddInt64(&pq.cachedMsgCount, int64(validMessageCount))
+
+       if pq.msgCache.Size() > 0 && !pq.consuming {
+               pq.consuming = true
+       }
+
+       msg := messages[len(messages)-1]
+       maxOffset, err := 
strconv.ParseInt(msg.Properties[kernel.PropertyMaxOffset], 10, 64)
+       if err != nil {
+               acc := maxOffset - msg.QueueOffset
+               if acc > 0 {
+                       pq.msgAccCnt = acc
+               }
+       }
 }
 
-func (pq *ProcessQueue) removeMessage(number int) int64 {
-       result := pq.queueOffsetMax + 1
+func (pq *processQueue) removeMessage(messages ...*kernel.MessageExt) int64 {
+       result := int64(-1)
        pq.mutex.Lock()
-       for i := 0; i < number && pq.msgCache.Len() > 0; i++ {
-               head := pq.msgCache.Front()
-               pq.msgCache.Remove(head)
-               result = head.Value.(*kernel.MessageExt).QueueOffset
+       pq.lastConsumeTime = time.Now()
+       if !pq.msgCache.Empty() {
+               result = pq.queueOffsetMax + 1
+               removedCount := 0
+               for idx := range messages {
+                       msg := messages[idx]
+                       _, found := pq.msgCache.Get(msg.QueueOffset)
+                       if !found {
+                               continue
+                       }
+                       pq.msgCache.Remove(msg.QueueOffset)
+                       removedCount++
+                       atomic.AddInt64(&pq.cachedMsgSize, 
int64(-len(msg.Body)))
+               }
+               atomic.AddInt64(&pq.cachedMsgCount, int64(-removedCount))
        }
-       pq.mutex.Unlock()
-       if pq.msgCache.Len() > 0 {
-               result = 
pq.msgCache.Front().Value.(*kernel.MessageExt).QueueOffset
+       if !pq.msgCache.Empty() {
+               first, _ := pq.msgCache.Min()
+               result = first.(int64)
        }
+       pq.mutex.Unlock()
        return result
 }
 
-func (pq *ProcessQueue) takeMessages(number int) []*kernel.MessageExt {
-       for pq.msgCache.Len() == 0 {
+func (pq *processQueue) isLockExpired() bool {
+       return time.Now().Sub(pq.lastLockTime) > _RebalanceLockMaxTime
+}
+
+func (pq *processQueue) isPullExpired() bool {
+       return time.Now().Sub(pq.lastPullTime) > _PullMaxIdleTime
+}
+
+func (pq *processQueue) cleanExpiredMsg(consumer defaultConsumer) {
+       if consumer.option.ConsumeOrderly {
+               return
+       }
+       var loop = 16
+       if pq.msgCache.Size() < 16 {
+               loop = pq.msgCache.Size()
+       }
+
+       for i := 0; i < loop; i++ {
+               pq.mutex.RLock()
+               if pq.msgCache.Empty() {
+                       pq.mutex.RLock()
+                       return
+               }
+               _, firstValue := pq.msgCache.Min()
+               msg := firstValue.(*kernel.MessageExt)
+               startTime := msg.Properties[kernel.PropertyConsumeStartTime]
+               if startTime != "" {
+                       st, err := strconv.ParseInt(startTime, 10, 64)
+                       if err != nil {
+                               rlog.Warnf("parse message start consume time 
error: %s, origin str is: %s", startTime)
+                               continue
+                       }
+                       if time.Now().Unix()-st <= 
int64(consumer.option.ConsumeTimeout) {
+                               pq.mutex.RLock()
+                               return
+                       }
+               }
+               pq.mutex.RLock()
+
+               err := consumer.sendBack(msg, 3)
+               if err != nil {
+                       rlog.Errorf("send message back to broker error: %s when 
clean expired messages", err.Error())
+                       continue
+               }
+               pq.removeMessage(msg)
+       }
+}
+
+func (pq *processQueue) getMaxSpan() int {
+       pq.mutex.RLock()
+       defer pq.mutex.RUnlock()
+       if pq.msgCache.Size() == 0 {
+               return 0
+       }
+       firstKey, _ := pq.msgCache.Min()
+       lastKey, _ := pq.msgCache.Max()
+       return int(lastKey.(int64) - firstKey.(int64))
+}
+
+func (pq *processQueue) getMessages() []*kernel.MessageExt {
+       return <-pq.msgCh
+}
+
+func (pq *processQueue) takeMessages(number int) []*kernel.MessageExt {
+       for pq.msgCache.Empty() {
                time.Sleep(10 * time.Millisecond)
        }
        result := make([]*kernel.MessageExt, number)
        i := 0
        pq.mutex.Lock()
        for ; i < number; i++ {
-               e := pq.msgCache.Front()
-               if e == nil {
+               k, v := pq.msgCache.Min()
+               if v == nil {
                        break
                }
-               result[i] = e.Value.(*kernel.MessageExt)
-               pq.msgCache.Remove(e)
+               result[i] = v.(*kernel.MessageExt)
+               pq.msgCache.Remove(k)
        }
        pq.mutex.Unlock()
        return result[:i]
 }
+
+func (pq *processQueue) Min() int64 {
+       if pq.msgCache.Empty() {
+               return -1
+       }
+       k, _ := pq.msgCache.Min()
+       if k != nil {
+               return k.(int64)
+       }
+       return -1
+}
+
+func (pq *processQueue) Max() int64 {
+       if pq.msgCache.Empty() {
+               return -1
+       }
+       k, _ := pq.msgCache.Max()
+       if k != nil {
+               return k.(int64)
+       }
+       return -1
+}
+
+func (pq *processQueue) clear() {
+       pq.mutex.Lock()
+       pq.msgCache.Clear()
+       pq.cachedMsgCount = 0
+       pq.cachedMsgSize = 0
+       pq.queueOffsetMax = 0
+}
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index fc78e3e..2039be1 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -22,6 +22,7 @@ import (
        "errors"
        "github.com/apache/rocketmq-client-go/kernel"
        "github.com/apache/rocketmq-client-go/rlog"
+       "github.com/apache/rocketmq-client-go/utils"
        "math"
        "strconv"
        "time"
@@ -55,11 +56,13 @@ type pushConsumer struct {
        queueFlowControlTimes        int
        queueMaxSpanFlowControlTimes int
        consume                      func(*ConsumeMessageContext, 
[]*kernel.MessageExt) (ConsumeResult, error)
-       submitToConsume              func(*ProcessQueue, *kernel.MessageQueue)
+       submitToConsume              func(*processQueue, *kernel.MessageQueue)
        subscribedTopic              map[string]string
 }
 
 func NewPushConsumer(consumerGroup string, opt ConsumerOption) PushConsumer {
+       opt.InstanceName = "DEFAULT"
+       opt.ClientIP = utils.LocalIP()
        dc := &defaultConsumer{
                consumerGroup:  consumerGroup,
                cType:          _PushConsume,
@@ -119,11 +122,10 @@ func (pc *pushConsumer) Start() error {
                pc.client = 
kernel.GetOrNewRocketMQClient(pc.option.ClientOption)
                if pc.model == Clustering {
                        pc.option.ChangeInstanceNameToPID()
-                       pc.storage = NewRemoteOffsetStore(pc.consumerGroup)
+                       pc.storage = NewRemoteOffsetStore(pc.consumerGroup, 
pc.client)
                } else {
                        pc.storage = NewLocalFileOffsetStore(pc.consumerGroup, 
pc.client.ClientID())
                }
-               pc.storage.load()
                go func() {
                        // todo start clean msg expired
                        // TODO quit
@@ -263,7 +265,7 @@ func (pc *pushConsumer) validate() {
 
        if pc.option.PullBatchSize < 1 || pc.option.PullBatchSize > 1024 {
                if pc.option.PullBatchSize == 0 {
-                       pc.option.PullBatchSize = 1
+                       pc.option.PullBatchSize = 32
                } else {
                        rlog.Fatal("option.PullBatchSize out of range [1, 
1024]")
                }
@@ -293,7 +295,6 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
                sleepTime = pc.option.PullInterval
                pq.lastPullTime = time.Now()
                err := pc.makeSureStateOK()
-               rlog.Debugf("pull MessageQueue: %d", request.mq.QueueId)
                if err != nil {
                        rlog.Warnf("consumer state error: %s", err.Error())
                        sleepTime = _PullDelayTimeWhenError
@@ -312,8 +313,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
                        if pc.queueFlowControlTimes%1000 == 0 {
                                rlog.Warnf("the cached message count exceeds 
the threshold %d, so do flow control, "+
                                        "minOffset=%d, maxOffset=%d, count=%d, 
size=%d MiB, pullRequest=%s, flowControlTimes=%d",
-                                       pc.option.PullThresholdForQueue, 0, 
pq.msgCache.Front().Value.(int64),
-                                       pq.msgCache.Back().Value.(int64),
+                                       pc.option.PullThresholdForQueue, 0, 
pq.Min(), pq.Max(),
                                        pq.msgCache, cachedMessageSizeInMiB, 
request.String(), pc.queueFlowControlTimes)
                        }
                        pc.queueFlowControlTimes++
@@ -325,8 +325,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
                        if pc.queueFlowControlTimes%1000 == 0 {
                                rlog.Warnf("the cached message size exceeds the 
threshold %d MiB, so do flow control, "+
                                        "minOffset=%d, maxOffset=%d, count=%d, 
size=%d MiB, pullRequest=%s, flowControlTimes=%d",
-                                       pc.option.PullThresholdSizeForQueue, 0, 
//processQueue.getMsgTreeMap().firstKey(),
-                                       0, // TODO 
processQueue.getMsgTreeMap().lastKey(),
+                                       pc.option.PullThresholdSizeForQueue, 
pq.Min(), pq.Max(),
                                        pq.msgCache, cachedMessageSizeInMiB, 
request.String(), pc.queueFlowControlTimes)
                        }
                        pc.queueFlowControlTimes++
@@ -338,12 +337,9 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
                        if pq.getMaxSpan() > 
pc.option.ConsumeConcurrentlyMaxSpan {
 
                                if pc.queueMaxSpanFlowControlTimes%1000 == 0 {
-                                       rlog.Warnf("the queue's messages, span 
too long, so do flow control, minOffset=%d, "+
-                                               "maxOffset=%d, maxSpan=%d, 
pullRequest=%s, flowControlTimes=%d",
-                                               0, 
//processQueue.getMsgTreeMap().firstKey(),
-                                               0, // 
processQueue.getMsgTreeMap().lastKey(),
-                                               pq.getMaxSpan(),
-                                               request.String(), 
pc.queueMaxSpanFlowControlTimes)
+                                       rlog.Warnf("the queue's messages, span 
too long, limit=%d, so do flow control, minOffset=%d, "+
+                                               "maxOffset=%d, maxSpan=%d, 
pullRequest=%s, flowControlTimes=%d", pc.option.ConsumeConcurrentlyMaxSpan,
+                                               pq.Min(), pq.Max(), 
pq.getMaxSpan(), request.String(), pc.queueMaxSpanFlowControlTimes)
                                }
                                sleepTime = _PullDelayTimeWhenFlowControl
                                goto NEXT
@@ -452,14 +448,14 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) 
{
                        if msgFounded != nil && len(msgFounded) != 0 {
                                firstMsgOffset = msgFounded[0].QueueOffset
                                increasePullTPS(pc.consumerGroup, 
request.mq.Topic, len(msgFounded))
-                               pq.putMessage(msgFounded)
+                               pq.putMessage(msgFounded...)
                        }
                        if result.NextBeginOffset < prevRequestOffset || 
firstMsgOffset < prevRequestOffset {
                                rlog.Warnf("[BUG] pull message result maybe 
data wrong, [nextBeginOffset=%s, "+
                                        "firstMsgOffset=%d, 
prevRequestOffset=%d]", result.NextBeginOffset, firstMsgOffset, 
prevRequestOffset)
                        }
                case kernel.PullNoNewMsg:
-                       rlog.Infof("Topic: %s, QueueId: %d, no more msg", 
request.mq.Topic, request.mq.QueueId)
+                       rlog.Debugf("Topic: %s, QueueId: %d no more msg, next 
offset: %d", request.mq.Topic, request.mq.QueueId, result.NextBeginOffset)
                case kernel.PullNoMsgMatched:
                        request.nextOffset = result.NextBeginOffset
                        pc.correctTagsOffset(request)
@@ -489,6 +485,87 @@ func (pc *pushConsumer) sendMessageBack(ctx 
*ConsumeMessageContext, msg *kernel.
        return true
 }
 
+func (pc *pushConsumer) suspend() {
+       pc.pause = true
+       rlog.Infof("suspend consumer: %s", pc.consumerGroup)
+}
+
+func (pc *pushConsumer) resume() {
+       pc.pause = false
+       pc.doBalance()
+       rlog.Infof("resume consumer: %s", pc.consumerGroup)
+}
+
+func (pc *pushConsumer) resetOffset(topic string, table 
map[kernel.MessageQueue]int64) {
+       //topic := cmd.ExtFields["topic"]
+       //group := cmd.ExtFields["group"]
+       //if topic == "" || group == "" {
+       //      rlog.Warnf("received reset offset command from: %s, but missing 
params.", from)
+       //      return
+       //}
+       //t, err := strconv.ParseInt(cmd.ExtFields["timestamp"], 10, 64)
+       //if err != nil {
+       //      rlog.Warnf("received reset offset command from: %s, but parse 
time error: %s", err.Error())
+       //      return
+       //}
+       //rlog.Infof("invoke reset offset operation from broker. brokerAddr=%s, 
topic=%s, group=%s, timestamp=%v",
+       //      from, topic, group, t)
+       //
+       //offsetTable := make(map[kernel.MessageQueue]int64, 0)
+       //err = json.Unmarshal(cmd.Body, &offsetTable)
+       //if err != nil {
+       //      rlog.Warnf("received reset offset command from: %s, but parse 
offset table: %s", err.Error())
+       //      return
+       //}
+       //v, exist := c.consumerMap.Load(group)
+       //if !exist {
+       //      rlog.Infof("[reset-offset] consumer dose not exist. group=%s", 
group)
+       //      return
+       //}
+
+       set := make(map[int]*kernel.MessageQueue, 0)
+       for k := range table {
+               set[k.HashCode()] = &k
+       }
+       pc.processQueueTable.Range(func(key, value interface{}) bool {
+               mqHash := value.(int)
+               pq := value.(*processQueue)
+               if set[mqHash] != nil {
+                       pq.dropped = true
+                       pq.clear()
+               }
+               return true
+       })
+       time.Sleep(10 * time.Second)
+       v, exist := pc.topicSubscribeInfoTable.Load(topic)
+       if !exist {
+               return
+       }
+       queuesOfTopic := v.(map[int]*kernel.MessageQueue)
+       for k := range queuesOfTopic {
+               q := set[k]
+               if q != nil {
+                       pc.storage.update(q, table[*q], false)
+                       v, exist := pc.processQueueTable.Load(k)
+                       if !exist {
+                               continue
+                       }
+                       pq := v.(*processQueue)
+                       pc.removeUnnecessaryMessageQueue(q, pq)
+                       delete(queuesOfTopic, k)
+               }
+       }
+}
+
+func (pc *pushConsumer) removeUnnecessaryMessageQueue(mq *kernel.MessageQueue, 
pq *processQueue) bool {
+       pc.defaultConsumer.removeUnnecessaryMessageQueue(mq, pq)
+       if !pc.consumeOrderly || Clustering != pc.model {
+               return true
+       }
+       // TODO orderly
+       return true
+}
+
 type ConsumeMessageContext struct {
        consumerGroup string
        msgs          []*kernel.MessageExt
@@ -499,8 +576,8 @@ type ConsumeMessageContext struct {
        properties map[string]string
 }
 
-func (pc *pushConsumer) consumeMessageCurrently(pq *ProcessQueue, mq 
*kernel.MessageQueue) {
-       msgs := pq.takeMessages(32)
+func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq 
*kernel.MessageQueue) {
+       msgs := pq.getMessages()
        if msgs == nil {
                return
        }
@@ -573,7 +650,7 @@ func (pc *pushConsumer) consumeMessageCurrently(pq 
*ProcessQueue, mq *kernel.Mes
                                        }
                                }
 
-                               offset := pq.removeMessage(len(subMsgs))
+                               offset := pq.removeMessage(subMsgs...)
 
                                if offset >= 0 && !pq.dropped {
                                        pc.storage.update(mq, int64(offset), 
true)
@@ -591,5 +668,5 @@ func (pc *pushConsumer) consumeMessageCurrently(pq 
*ProcessQueue, mq *kernel.Mes
        }
 }
 
-func (pc *pushConsumer) consumeMessageOrderly(pq *ProcessQueue, mq 
*kernel.MessageQueue) {
+func (pc *pushConsumer) consumeMessageOrderly(pq *processQueue, mq 
*kernel.MessageQueue) {
 }
diff --git a/core/api.go b/core/api.go
index 8a6fdd3..6969d1f 100644
--- a/core/api.go
+++ b/core/api.go
@@ -48,12 +48,12 @@ func (config *ClientConfig) String() string {
        return str
 }
 
-// NewProducer create a new producer with config
+// NewProducer create a new consumer with config
 func NewProducer(config *ProducerConfig) (Producer, error) {
        return newDefaultProducer(config)
 }
 
-// ProducerConfig define a producer
+// ProducerConfig define a consumer
 type ProducerConfig struct {
        ClientConfig
        SendMsgTimeout int
diff --git a/core/error.go b/core/error.go
index 6be7883..5f708ee 100644
--- a/core/error.go
+++ b/core/error.go
@@ -46,7 +46,7 @@ func (e rmqError) Error() string {
        case ErrMallocFailed:
                return "malloc memory failed"
        case ErrProducerStartFailed:
-               return "start producer failed"
+               return "start consumer failed"
        case ErrSendSyncFailed:
                return "send message with sync failed"
        case ErrSendOrderlyFailed:
diff --git a/core/producer.go b/core/producer.go
index 22b49ed..e75e0ff 100644
--- a/core/producer.go
+++ b/core/producer.go
@@ -178,7 +178,7 @@ func (p *defaultProducer) String() string {
        return p.config.String()
 }
 
-// Start the producer.
+// Start the consumer.
 func (p *defaultProducer) Start() error {
        err := rmqError(C.StartProducer(p.cproduer))
        if err != NIL {
@@ -187,7 +187,7 @@ func (p *defaultProducer) Start() error {
        return nil
 }
 
-// Shutdown the producer.
+// Shutdown the consumer.
 func (p *defaultProducer) Shutdown() error {
        err := rmqError(C.ShutdownProducer(p.cproduer))
 
diff --git a/docs/feature.md b/docs/feature.md
index bc23be2..9caa789 100644
--- a/docs/feature.md
+++ b/docs/feature.md
@@ -1,92 +1,72 @@
 # Feature
 
 ## Producer
-- [ ] ProducerType
-    - [ ] DefaultProducer
-    - [ ] TransactionProducer
-- [ ] API
-    - [ ] Send
-        - [ ] Sync
-        - [ ] Async
-        - [ ] OneWay
-- [ ] Other
-    - [ ] DelayMessage
-    - [ ] Config
-    - [ ] MessageId Generate
-    - [ ] CompressMsg
-    - [ ] TimeOut
-    - [ ] LoadBalance
-    - [ ] DefaultTopic
-    - [ ] VipChannel
-    - [ ] Retry
-    - [ ] SendMessageHook
-    - [ ] CheckRequestQueue
-    - [ ] CheckForbiddenHookList
-    - [ ] MQFaultStrategy
+
+### MessageType
+- [x] NormalMessage
+- [ ] TransactionMessage
+- [ ] DelayMessage
+
+### SendWith    
+- [x] Sync
+- [ ] Async
+- [x] OneWay
+
+### Other    
+- [ ] Config
+- [ ] MessageId Generate
+- [ ] CompressMsg
+- [ ] LoadBalance
+- [ ] DefaultTopic
+- [ ] VipChannel
+- [ ] Retry
+- [ ] Hook
+- [ ] CheckRequestQueue
+- [ ] MQFaultStrategy
 
 ## Consumer
-- [ ] ConsumerType
-    - [ ] PushConsumer
-    - [ ] PullConsumer
-- [ ] MessageListener
-    - [ ] Concurrently
-    - [ ] Orderly
-- [ ] MessageModel
-    - [ ] CLUSTERING
-    - [ ] BROADCASTING
-- [ ] OffsetStore
-    - [ ] RemoteBrokerOffsetStore
-        - [ ] many actions
-    - [ ] LocalFileOffsetStore
-- [ ] RebalanceService
-- [ ] PullMessageService
-- [ ] ConsumeMessageService
-- [ ] AllocateMessageQueueStrategy
-    - [ ] AllocateMessageQueueAveragely
-    - [ ] AllocateMessageQueueAveragelyByCircle
-    - [ ] AllocateMessageQueueByConfig
-    - [ ] AllocateMessageQueueByMachineRoom
-- [ ] Other
-    - [ ] Config
-    - [ ] ZIP
-    - [ ] AllocateMessageQueueStrategy
-    - [ ] ConsumeFromWhere
-        - [ ] CONSUME_FROM_LAST_OFFSET
-        - [ ] CONSUME_FROM_FIRST_OFFSET
-        - [ ] CONSUME_FROM_TIMESTAMP
-    - [ ] Retry(sendMessageBack)
-    - [ ] TimeOut(clearExpiredMessage)
-    - [ ] ACK(partSuccess)
-    - [ ] FlowControl(messageCanNotConsume)
-    - [ ] ConsumeMessageHook
-    - [ ] filterMessageHookList
 
-## Manager
-- [ ] Multiple Request API Wrapper
-    - many functions...
-- [ ] Task
-    - [ ] PollNameServer
-    - [ ] Heartbeat
-    - [ ] UpdateTopicRouteInfoFromNameServer
-    - [ ] CleanOfflineBroker
-    - [ ] PersistAllConsumerOffset
-    - [ ] ClearExpiredMessage(form consumer consumeMessageService)
-- [ ] Processor
-    - [ ] CHECK_TRANSACTION_STATE
-    - [ ] NOTIFY_CONSUMER_IDS_CHANGED
-    - [ ] RESET_CONSUMER_CLIENT_OFFSET
-    - [ ] GET_CONSUMER_STATUS_FROM_CLIENT
-    - [ ] GET_CONSUMER_RUNNING_INFO
-    - [ ] CONSUME_MESSAGE_DIRECTLY
+### ReceiveType
+- [x] Push
+- [ ] Pull
+
+### ConsumingType
+- [x] Concurrently
+- [ ] Orderly
+
+### MessageModel
+- [x] CLUSTERING
+- [x] BROADCASTING
+    
+### AllocateMessageQueueStrategy
+- [x] AllocateMessageQueueAveragely
+- [ ] AllocateMessageQueueAveragelyByCircle
+- [ ] AllocateMessageQueueByConfig
+- [ ] AllocateMessageQueueByMachineRoom
+
+### Other
+- [x] Rebalance
+- [x] Flow Control
+- [ ] compress
+- [x] ConsumeFromWhere
+- [ ] Retry(sendMessageBack)
+- [ ] Hook
+
+## Common
+- [ ] PollNameServer
+- [x] Heartbeat
+- [x] UpdateTopicRouteInfoFromNameServer
+- [ ] CleanOfflineBroker
+- [ ] ClearExpiredMessage(form consumer consumeMessageService)
     
 ## Remoting
-- [ ] API
-    - [ ] InvokeSync
-    - [ ] InvokeAsync
-    - [ ] InvokeOneWay
-- [ ] Serialize
-    - [ ] JSON
-    - [ ] ROCKETMQ
+- [x] API
+    - [x] InvokeSync
+    - [x] InvokeAsync
+    - [x] InvokeOneWay
+- [x] Serialize
+    - [x] JSON
+    - [x] ROCKETMQ
 - [ ] Other
     - [ ] VIPChannel
     - [ ] RPCHook
diff --git a/examples/producer/main.go b/examples/consumer/main.go
similarity index 92%
rename from examples/producer/main.go
rename to examples/consumer/main.go
index e2d0126..2b563ff 100644
--- a/examples/producer/main.go
+++ b/examples/consumer/main.go
@@ -22,6 +22,7 @@ import (
        "github.com/apache/rocketmq-client-go/consumer"
        "github.com/apache/rocketmq-client-go/kernel"
        "os"
+       "sync/atomic"
        "time"
 )
 
@@ -30,9 +31,13 @@ func main() {
                ConsumerModel: consumer.Clustering,
                FromWhere:     consumer.ConsumeFromFirstOffset,
        })
+       var count int64
        err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx 
*consumer.ConsumeMessageContext,
                msgs []*kernel.MessageExt) (consumer.ConsumeResult, error) {
-               fmt.Println(msgs)
+               c := atomic.AddInt64(&count, int64(len(msgs)))
+               if c%1000 == 0 {
+                       fmt.Println(c)
+               }
                return consumer.ConsumeSuccess, nil
        })
        if err != nil {
diff --git a/go.mod b/go.mod
index b2ca040..419d8ba 100644
--- a/go.mod
+++ b/go.mod
@@ -3,15 +3,12 @@ module github.com/apache/rocketmq-client-go
 go 1.11
 
 require (
-       github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc // 
indirect
-       github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf // 
indirect
        github.com/emirpasic/gods v1.12.0
        github.com/sirupsen/logrus v1.3.0
        github.com/stretchr/testify v1.3.0
        github.com/tidwall/gjson v1.2.1
        github.com/tidwall/match v1.0.1 // indirect
        github.com/tidwall/pretty v0.0.0-20180105212114-65a9db5fad51 // indirect
-       gopkg.in/alecthomas/kingpin.v2 v2.2.6
 )
 
 replace (
diff --git a/go.sum b/go.sum
index da32c5e..61c5e19 100644
--- a/go.sum
+++ b/go.sum
@@ -1,7 +1,3 @@
-github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc 
h1:cAKDfWh5VpdgMhJosfJnn5/FoN2SRZ4p7fJNX58YPaU=
-github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod 
h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
-github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf 
h1:qet1QNfXsQxTZqLG4oE62mJzwPIB8+Tee4RNCL9ulrY=
-github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod 
h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
 github.com/davecgh/go-spew v1.1.0/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/davecgh/go-spew v1.1.1 
h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@@ -30,5 +26,3 @@ golang.org/x/crypto v0.0.0-20180904163835-0709b304e793 
h1:u+LnwYTOOW7Ukr/fppxEb1
 golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod 
h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
 golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 
h1:I6FyU15t786LL7oL/hn43zqTuEGr4PN7F4XJ1p4E3Y8=
 golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod 
h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
-gopkg.in/alecthomas/kingpin.v2 v2.2.6 
h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc=
-gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod 
h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
diff --git a/kernel/client.go b/kernel/client.go
index e388411..ae258e4 100644
--- a/kernel/client.go
+++ b/kernel/client.go
@@ -47,7 +47,7 @@ const (
        _PersistOffset = 5 * time.Second
 
        // Rebalance interval
-       _RebalanceInterval = 20 * time.Millisecond
+       _RebalanceInterval = 100 * time.Millisecond
 )
 
 var (
@@ -82,7 +82,6 @@ type InnerProducer interface {
        IsPublishTopicNeedUpdate(topic string) bool
        GetCheckListener() func(msg *MessageExt)
        GetTransactionListener() TransactionListener
-       //CheckTransactionState()
        isUnitMode() bool
 }
 
@@ -103,13 +102,25 @@ type RMQClient struct {
        // group -> InnerConsumer
        consumerMap sync.Map
        once        sync.Once
+
+       remoteClient *remote.RemotingClient
 }
 
 var clientMap sync.Map
 
 func GetOrNewRocketMQClient(option ClientOption) *RMQClient {
-       client := &RMQClient{option: option}
-       actual, _ := clientMap.LoadOrStore(client.ClientID(), client)
+       client := &RMQClient{
+               option:       option,
+               remoteClient: remote.NewRemotingClient(),
+       }
+       actual, loaded := clientMap.LoadOrStore(client.ClientID(), client)
+       if !loaded {
+               
client.remoteClient.RegisterRequestFunc(ReqNotifyConsumerIdsChanged, func(req 
*remote.RemotingCommand) *remote.RemotingCommand {
+                       rlog.Infof("receive broker's notification, the consumer 
group: %s", req.ExtFields["consumerGroup"])
+                       client.RebalanceImmediately()
+                       return nil
+               })
+       }
        return actual.(*RMQClient)
 }
 
@@ -129,7 +140,13 @@ func (c *RMQClient) Start() {
                }()
 
                // TODO cleanOfflineBroker & sendHeartbeatToAllBrokerWithLock
-               go func() {}()
+               go func() {
+                       for {
+                               cleanOfflineBroker()
+                               c.SendHeartbeatToAllBrokerWithLock()
+                               time.Sleep(_HeartbeatBrokerInterval)
+                       }
+               }()
 
                // schedule persist offset
                go func() {
@@ -147,7 +164,7 @@ func (c *RMQClient) Start() {
                go func() {
                        for {
                                c.RebalanceImmediately()
-                               time.Sleep(time.Second)
+                               time.Sleep(_RebalanceInterval)
                        }
                }()
        })
@@ -161,9 +178,20 @@ func (c *RMQClient) ClientID() string {
        return id
 }
 
+func (c *RMQClient) InvokeSync(addr string, request *remote.RemotingCommand,
+       timeoutMillis time.Duration) (*remote.RemotingCommand, error) {
+       return c.remoteClient.InvokeSync(addr, request, timeoutMillis)
+}
+
+func (c *RMQClient) InvokeOneWay(addr string, request *remote.RemotingCommand,
+       timeoutMillis time.Duration) error {
+       return c.remoteClient.InvokeOneWay(addr, request, timeoutMillis)
+}
+
 func (c *RMQClient) CheckClientInBroker() {
 }
 
+// TODO
 func (c *RMQClient) SendHeartbeatToAllBrokerWithLock() {
        hbData := &heartbeatData{
                ClientId: c.ClientID(),
@@ -190,7 +218,7 @@ func (c *RMQClient) SendHeartbeatToAllBrokerWithLock() {
        hbData.ProducerDatas = pData
        hbData.ConsumerDatas = cData
        if len(pData) == 0 && len(cData) == 0 {
-               rlog.Warn("sending heartbeat, but no consumer and no producer")
+               rlog.Info("sending heartbeat, but no consumer and no consumer")
                return
        }
        brokerAddressesMap.Range(func(key, value interface{}) bool {
@@ -198,7 +226,7 @@ func (c *RMQClient) SendHeartbeatToAllBrokerWithLock() {
                data := value.(*BrokerData)
                for id, addr := range data.BrokerAddresses {
                        cmd := remote.NewRemotingCommand(ReqHeartBeat, nil, 
hbData.encode())
-                       response, err := remote.InvokeSync(addr, cmd, 
3*time.Second)
+                       response, err := c.remoteClient.InvokeSync(addr, cmd, 
3*time.Second)
                        if err != nil {
                                rlog.Warnf("send heart beat to broker error: 
%s", err.Error())
                                return true
@@ -213,7 +241,7 @@ func (c *RMQClient) SendHeartbeatToAllBrokerWithLock() {
                                        brokerVersionMap.Store(brokerName, m)
                                }
                                m[brokerName] = int32(response.Version)
-                               rlog.Infof("send heart beat to broker[%s %s %s] 
success", brokerName, id, addr)
+                               rlog.Infof("send heart beat to broker[%s %d %s] 
success", brokerName, id, addr)
                        }
                }
                return true
@@ -255,7 +283,7 @@ func (c *RMQClient) UpdateTopicRouteInfo() {
 func (c *RMQClient) SendMessageSync(ctx context.Context, brokerAddrs, 
brokerName string, request *SendMessageRequest,
        msgs []*Message) (*SendResult, error) {
        cmd := getRemotingCommand(request, msgs)
-       response, err := remote.InvokeSync(brokerAddrs, cmd, 3*time.Second)
+       response, err := c.remoteClient.InvokeSync(brokerAddrs, cmd, 
3*time.Second)
        if err != nil {
                rlog.Warnf("send messages with sync error: %v", err)
                return nil, err
@@ -281,7 +309,7 @@ func (c *RMQClient) SendMessageAsync(ctx context.Context, 
brokerAddrs, brokerNam
 func (c *RMQClient) SendMessageOneWay(ctx context.Context, brokerAddrs string, 
request *SendMessageRequest,
        msgs []*Message) (*SendResult, error) {
        cmd := remote.NewRemotingCommand(ReqSendBatchMessage, request, 
encodeMessages(msgs))
-       err := remote.InvokeOneWay(brokerAddrs, cmd, 3*time.Second)
+       err := c.remoteClient.InvokeOneWay(brokerAddrs, cmd, 3*time.Second)
        if err != nil {
                rlog.Warnf("send messages with oneway error: %v", err)
        }
@@ -337,7 +365,7 @@ func (c *RMQClient) processSendResponse(brokerName string, 
msgs []*Message, cmd
 // PullMessage with sync
 func (c *RMQClient) PullMessage(ctx context.Context, brokerAddrs string, 
request *PullMessageRequest) (*PullResult, error) {
        cmd := remote.NewRemotingCommand(ReqPullMessage, request, nil)
-       res, err := remote.InvokeSync(brokerAddrs, cmd, 3*time.Second)
+       res, err := c.remoteClient.InvokeSync(brokerAddrs, cmd, 3*time.Second)
        if err != nil {
                return nil, err
        }
@@ -390,67 +418,6 @@ func (c *RMQClient) PullMessageAsync(ctx context.Context, 
brokerAddrs string, re
        return nil
 }
 
-// QueryMaxOffset with specific queueId and topic
-func QueryMaxOffset(mq *MessageQueue) (int64, error) {
-       brokerAddr := FindBrokerAddrByName(mq.BrokerName)
-       if brokerAddr == "" {
-               UpdateTopicRouteInfo(mq.Topic)
-               brokerAddr = FindBrokerAddrByName(mq.Topic)
-       }
-       if brokerAddr == "" {
-               return -1, fmt.Errorf("the broker [%s] does not exist", 
mq.BrokerName)
-       }
-
-       request := &GetMaxOffsetRequest{
-               Topic:   mq.Topic,
-               QueueId: mq.QueueId,
-       }
-
-       cmd := remote.NewRemotingCommand(ReqGetMaxOffset, request, nil)
-       response, err := remote.InvokeSync(brokerAddr, cmd, 3*time.Second)
-       if err != nil {
-               return -1, err
-       }
-
-       return strconv.ParseInt(response.ExtFields["offset"], 10, 64)
-}
-
-// QueryConsumerOffset with specific queueId and topic of consumerGroup
-func (c *RMQClient) QueryConsumerOffset(consumerGroup, mq *MessageQueue) 
(int64, error) {
-       return 0, nil
-}
-
-// SearchOffsetByTimestamp with specific queueId and topic
-func SearchOffsetByTimestamp(mq *MessageQueue, timestamp int64) (int64, error) 
{
-       brokerAddr := FindBrokerAddrByName(mq.BrokerName)
-       if brokerAddr == "" {
-               UpdateTopicRouteInfo(mq.Topic)
-               brokerAddr = FindBrokerAddrByName(mq.Topic)
-       }
-       if brokerAddr == "" {
-               return -1, fmt.Errorf("the broker [%s] does not exist", 
mq.BrokerName)
-       }
-
-       request := &SearchOffsetRequest{
-               Topic:     mq.Topic,
-               QueueId:   mq.QueueId,
-               Timestamp: timestamp,
-       }
-
-       cmd := remote.NewRemotingCommand(ReqSearchOffsetByTimestamp, request, 
nil)
-       response, err := remote.InvokeSync(brokerAddr, cmd, 3*time.Second)
-       if err != nil {
-               return -1, err
-       }
-
-       return strconv.ParseInt(response.ExtFields["offset"], 10, 64)
-}
-
-// UpdateConsumerOffset with specific queueId and topic
-func (c *RMQClient) UpdateConsumerOffset(consumerGroup, topic string, queue 
int, offset int64) error {
-       return nil
-}
-
 func (c *RMQClient) RegisterConsumer(group string, consumer InnerConsumer) 
error {
        c.consumerMap.Store(group, consumer)
        return nil
diff --git a/kernel/model.go b/kernel/model.go
index d9f4d00..0ddc4c3 100644
--- a/kernel/model.go
+++ b/kernel/model.go
@@ -242,7 +242,7 @@ type FindBrokerResult struct {
 }
 
 type (
-       // groupName of producer
+       // groupName of consumer
        producerData string
 
        consumeType string
diff --git a/kernel/request.go b/kernel/request.go
index 2904242..9f16e44 100644
--- a/kernel/request.go
+++ b/kernel/request.go
@@ -24,18 +24,23 @@ import (
 )
 
 const (
-       ReqSendMessage         = int16(10)
-       ReqPullMessage             = int16(11)
-       ReqQueryConsumerOffset     = int16(14)
-       ReqUpdateConsumerOffset    = int16(15)
-       ReqSearchOffsetByTimestamp = int16(30)
-       ReqGetMaxOffset            = int16(30)
-       ReqHeartBeat               = int16(34)
-       ReqGetConsumerListByGroup  = int16(38)
-       ReqLockBatchMQ             = int16(41)
-       ReqUnlockBatchMQ           = int16(42)
-       ReqGetRouteInfoByTopic     = int16(105)
-       ReqSendBatchMessage        = int16(320)
+       ReqSendMessage              = int16(10)
+       ReqPullMessage              = int16(11)
+       ReqQueryConsumerOffset      = int16(14)
+       ReqUpdateConsumerOffset     = int16(15)
+       ReqSearchOffsetByTimestamp  = int16(30)
+       ReqGetMaxOffset             = int16(30)
+       ReqHeartBeat                = int16(34)
+       ReqGetConsumerListByGroup   = int16(38)
+       ReqLockBatchMQ              = int16(41)
+       ReqUnlockBatchMQ            = int16(42)
+       ReqGetRouteInfoByTopic      = int16(105)
+       ReqSendBatchMessage         = int16(320)
+       ReqCheckTransactionState    = int16(39)
+       ReqNotifyConsumerIdsChanged = int16(40)
+       ReqResetConsuemrOffset      = int16(220)
+       ReqGetConsumerRunningInfo   = int16(307)
+       ReqConsumeMessageDirectly   = int16(309)
 )
 
 type SendMessageRequest struct {
diff --git a/kernel/route.go b/kernel/route.go
index 6a232cf..a1bfe3a 100644
--- a/kernel/route.go
+++ b/kernel/route.go
@@ -42,6 +42,7 @@ const (
 
 var (
        ErrTopicNotExist = errors.New("topic not exist")
+       nameSrvClient    = remote.NewRemotingClient()
 )
 
 var (
@@ -57,6 +58,40 @@ var (
        lockNamesrv  sync.Mutex
 )
 
+func cleanOfflineBroker() {
+       // TODO optimize
+       lockNamesrv.Lock()
+       brokerAddressesMap.Range(func(key, value interface{}) bool {
+               brokerName := key.(string)
+               bd := value.(*BrokerData)
+               for k, v := range bd.BrokerAddresses {
+                       isBrokerAddrExistInTopicRoute := false
+                       routeDataMap.Range(func(key, value interface{}) bool {
+                               trd := value.(*TopicRouteData)
+                               for idx := range trd.BrokerDataList {
+                                       for _, v1 := range 
trd.BrokerDataList[idx].BrokerAddresses {
+                                               if v1 == v {
+                                                       
isBrokerAddrExistInTopicRoute = true
+                                                       return false
+                                               }
+                                       }
+                               }
+                               return true
+                       })
+                       if !isBrokerAddrExistInTopicRoute {
+                               delete(bd.BrokerAddresses, k)
+                               rlog.Infof("the broker: [name=%s, ID=%d, 
addr=%s,] is offline, remove it", brokerName, k, v)
+                       }
+               }
+               if len(bd.BrokerAddresses) == 0 {
+                       brokerAddressesMap.Delete(brokerName)
+                       rlog.Infof("the broker [name=%s] name's host is 
offline, remove it", brokerName)
+               }
+               return true
+       })
+       lockNamesrv.Unlock()
+}
+
 // key is topic, value is TopicPublishInfo
 type TopicPublishInfo struct {
        OrderTopic          bool
@@ -233,7 +268,7 @@ func queryTopicRouteInfoFromServer(topic string) 
(*TopicRouteData, error) {
                Topic: topic,
        }
        rc := remote.NewRemotingCommand(ReqGetRouteInfoByTopic, request, nil)
-       response, err := remote.InvokeSync(getNameServerAddress(), rc, 
requestTimeout)
+       response, err := nameSrvClient.InvokeSync(getNameServerAddress(), rc, 
requestTimeout)
 
        if err != nil {
                return nil, err
@@ -405,7 +440,25 @@ func (routeData *TopicRouteData) clone() *TopicRouteData {
 }
 
 func (routeData *TopicRouteData) equals(data *TopicRouteData) bool {
-       return false
+       if len(routeData.BrokerDataList) != len(data.BrokerDataList) {
+               return false
+       }
+       if len(routeData.QueueDataList) != len(data.QueueDataList) {
+               return false
+       }
+
+       for idx := range routeData.BrokerDataList {
+               if 
!routeData.BrokerDataList[idx].Equals(data.BrokerDataList[idx]) {
+                       return false
+               }
+       }
+
+       for idx := range routeData.QueueDataList {
+               if 
!routeData.QueueDataList[idx].Equals(data.QueueDataList[idx]) {
+                       return false
+               }
+       }
+       return true
 }
 
 func (routeData *TopicRouteData) String() string {
@@ -422,6 +475,30 @@ type QueueData struct {
        TopicSynFlag   int    `json:"topicSynFlag"`
 }
 
+func (q *QueueData) Equals(qd *QueueData) bool {
+       if q.BrokerName != qd.BrokerName {
+               return false
+       }
+
+       if q.ReadQueueNums != qd.ReadQueueNums {
+               return false
+       }
+
+       if q.WriteQueueNums != qd.WriteQueueNums {
+               return false
+       }
+
+       if q.Perm != qd.Perm {
+               return false
+       }
+
+       if q.TopicSynFlag != qd.TopicSynFlag {
+               return false
+       }
+
+       return true
+}
+
 // BrokerData BrokerData
 type BrokerData struct {
        Cluster             string           `json:"cluster"`
@@ -429,3 +506,21 @@ type BrokerData struct {
        BrokerAddresses     map[int64]string `json:"brokerAddrs"`
        brokerAddressesLock sync.RWMutex
 }
+
+func (b *BrokerData) Equals(bd *BrokerData) bool {
+       if b.Cluster != bd.Cluster {
+               return false
+       }
+
+       if b.BrokerName != bd.BrokerName {
+               return false
+       }
+
+       for k, v := range b.BrokerAddresses {
+               if bd.BrokerAddresses[k] != v {
+                       return false
+               }
+       }
+
+       return true
+}
diff --git a/remote/processor.go b/remote/processor.go
deleted file mode 100644
index 1e5d23e..0000000
--- a/remote/processor.go
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package remote
-
-type ClientRequestProcessor func(remotingCommand *RemotingCommand) 
(responseCommand *RemotingCommand)
-
-//CHECK_TRANSACTION_STATE
-//NOTIFY_CONSUMER_IDS_CHANGED
-//RESET_CONSUMER_CLIENT_OFFSET
-//GET_CONSUMER_STATUS_FROM_CLIENT
-//GET_CONSUMER_RUNNING_INFO
-//CONSUME_MESSAGE_DIRECTLY
diff --git a/remote/remote_client.go b/remote/remote_client.go
index bfe483c..93f81ce 100644
--- a/remote/remote_client.go
+++ b/remote/remote_client.go
@@ -85,14 +85,37 @@ func (r *ResponseFuture) waitResponse() (*RemotingCommand, 
error) {
        }
 }
 
-func InvokeSync(addr string, request *RemotingCommand, timeoutMillis 
time.Duration) (*RemotingCommand, error) {
-       conn, err := connect(addr)
+type ClientRequestFunc func(remotingCommand *RemotingCommand) (responseCommand 
*RemotingCommand)
+
+type TcpOption struct {
+       // TODO
+}
+
+type RemotingClient struct {
+       responseTable   sync.Map
+       connectionTable sync.Map
+       option          TcpOption
+       processors      map[int16]ClientRequestFunc
+}
+
+func NewRemotingClient() *RemotingClient {
+       return &RemotingClient{
+               processors: make(map[int16]ClientRequestFunc),
+       }
+}
+
+func (c *RemotingClient) RegisterRequestFunc(code int16, f ClientRequestFunc) {
+       c.processors[code] = f
+}
+
+func (c *RemotingClient) InvokeSync(addr string, request *RemotingCommand, 
timeoutMillis time.Duration) (*RemotingCommand, error) {
+       conn, err := c.connect(addr)
        if err != nil {
                return nil, err
        }
        resp := NewResponseFuture(request.Opaque, timeoutMillis, nil)
-       responseTable.Store(resp.Opaque, resp)
-       err = sendRequest(conn, request)
+       c.responseTable.Store(resp.Opaque, resp)
+       err = c.sendRequest(conn, request)
        if err != nil {
                return nil, err
        }
@@ -100,14 +123,14 @@ func InvokeSync(addr string, request *RemotingCommand, 
timeoutMillis time.Durati
        return resp.waitResponse()
 }
 
-func InvokeAsync(addr string, request *RemotingCommand, timeoutMillis 
time.Duration, callback func(*ResponseFuture)) error {
-       conn, err := connect(addr)
+func (c *RemotingClient) InvokeAsync(addr string, request *RemotingCommand, 
timeoutMillis time.Duration, callback func(*ResponseFuture)) error {
+       conn, err := c.connect(addr)
        if err != nil {
                return err
        }
        resp := NewResponseFuture(request.Opaque, timeoutMillis, callback)
-       responseTable.Store(resp.Opaque, resp)
-       err = sendRequest(conn, request)
+       c.responseTable.Store(resp.Opaque, resp)
+       err = c.sendRequest(conn, request)
        if err != nil {
                return err
        }
@@ -116,26 +139,21 @@ func InvokeAsync(addr string, request *RemotingCommand, 
timeoutMillis time.Durat
 
 }
 
-func InvokeOneWay(addr string, request *RemotingCommand, timeout 
time.Duration) error {
-       conn, err := connect(addr)
+func (c *RemotingClient) InvokeOneWay(addr string, request *RemotingCommand, 
timeout time.Duration) error {
+       conn, err := c.connect(addr)
        if err != nil {
                return err
        }
-       return sendRequest(conn, request)
+       return c.sendRequest(conn, request)
 }
 
-var (
-       responseTable   sync.Map
-       connectionTable sync.Map
-)
-
-func ScanResponseTable() {
+func (c *RemotingClient) ScanResponseTable() {
        rfs := make([]*ResponseFuture, 0)
-       responseTable.Range(func(key, value interface{}) bool {
+       c.responseTable.Range(func(key, value interface{}) bool {
                if resp, ok := value.(*ResponseFuture); ok {
                        if (resp.BeginTimestamp + int64(resp.TimeoutMillis) + 
1000) <= time.Now().Unix()*1000 {
                                rfs = append(rfs, resp)
-                               responseTable.Delete(key)
+                               c.responseTable.Delete(key)
                        }
                }
                return true
@@ -146,11 +164,11 @@ func ScanResponseTable() {
        }
 }
 
-func connect(addr string) (net.Conn, error) {
+func (c *RemotingClient) connect(addr string) (net.Conn, error) {
        //it needs additional locker.
        connectionLocker.Lock()
        defer connectionLocker.Unlock()
-       conn, ok := connectionTable.Load(addr)
+       conn, ok := c.connectionTable.Load(addr)
        if ok {
                return conn.(net.Conn), nil
        }
@@ -158,28 +176,27 @@ func connect(addr string) (net.Conn, error) {
        if err != nil {
                return nil, err
        }
-       connectionTable.Store(addr, tcpConn)
-       go receiveResponse(tcpConn)
+       c.connectionTable.Store(addr, tcpConn)
+       go c.receiveResponse(tcpConn)
        return tcpConn, nil
 }
 
-func receiveResponse(r net.Conn) {
-       scanner := createScanner(r)
-       for {
-               scanner.Scan()
-               receivedRemotingCommand, err := decode(scanner.Bytes())
+func (c *RemotingClient) receiveResponse(r net.Conn) {
+       scanner := c.createScanner(r)
+       for scanner.Scan() {
+               cmd, err := decode(scanner.Bytes())
                if err != nil {
-                       closeConnection(r)
-                       rlog.Error(err.Error())
+                       c.closeConnection(r)
+                       rlog.Errorf("decode RemotingCommand error: %s", 
err.Error())
                        break
                }
-               if receivedRemotingCommand.isResponseType() {
-                       resp, exist := 
responseTable.Load(receivedRemotingCommand.Opaque)
+               if cmd.isResponseType() {
+                       resp, exist := c.responseTable.Load(cmd.Opaque)
                        if exist {
-                               
responseTable.Delete(receivedRemotingCommand.Opaque)
+                               c.responseTable.Delete(cmd.Opaque)
                                responseFuture := resp.(*ResponseFuture)
                                go func() {
-                                       responseFuture.ResponseCommand = 
receivedRemotingCommand
+                                       responseFuture.ResponseCommand = cmd
                                        responseFuture.executeInvokeCallback()
                                        if responseFuture.Done != nil {
                                                responseFuture.Done <- true
@@ -187,12 +204,30 @@ func receiveResponse(r net.Conn) {
                                }()
                        }
                } else {
-                       // todo handler request from peer
+                       f := c.processors[cmd.Code]
+                       if f != nil {
+                               go func() { // 单个goroutine会造成死锁
+                                       res := f(cmd)
+                                       if res != nil {
+                                               err := c.sendRequest(r, cmd)
+                                               if err != nil {
+                                                       rlog.Warnf("send 
response to broker error: %s, type is: %d", err, res.Code)
+                                               }
+                                       }
+                               }()
+                       } else {
+                               rlog.Warnf("receive broker's requests, but no 
func to handle, code is: %d", cmd.Code)
+                       }
                }
        }
+       if scanner.Err() != nil {
+               rlog.Errorf("net: %s scanner exit, err: %s.", 
r.RemoteAddr().String(), scanner.Err())
+       } else {
+               rlog.Infof("net: %s scanner exit.", r.RemoteAddr().String())
+       }
 }
 
-func createScanner(r io.Reader) *bufio.Scanner {
+func (c *RemotingClient) createScanner(r io.Reader) *bufio.Scanner {
        scanner := bufio.NewScanner(r)
        scanner.Split(func(data []byte, atEOF bool) (int, []byte, error) {
                defer func() {
@@ -219,27 +254,26 @@ func createScanner(r io.Reader) *bufio.Scanner {
        return scanner
 }
 
-func sendRequest(conn net.Conn, request *RemotingCommand) error {
+func (c *RemotingClient) sendRequest(conn net.Conn, request *RemotingCommand) 
error {
        content, err := encode(request)
        if err != nil {
                return err
        }
        _, err = conn.Write(content)
        if err != nil {
-               closeConnection(conn)
+               c.closeConnection(conn)
                return err
        }
        return nil
 }
 
-func closeConnection(toCloseConn net.Conn) {
-       connectionTable.Range(func(key, value interface{}) bool {
+func (c *RemotingClient) closeConnection(toCloseConn net.Conn) {
+       c.connectionTable.Range(func(key, value interface{}) bool {
                if value == toCloseConn {
-                       connectionTable.Delete(key)
+                       c.connectionTable.Delete(key)
                        return false
                } else {
                        return true
                }
-
        })
 }
diff --git a/rlog/log.go b/rlog/log.go
index 4f5adaf..d9a1928 100644
--- a/rlog/log.go
+++ b/rlog/log.go
@@ -17,9 +17,7 @@
 
 package rlog
 
-import (
-       "github.com/sirupsen/logrus"
-)
+import "github.com/sirupsen/logrus"
 
 type Logger interface {
        Debug(i ...interface{})
@@ -34,7 +32,13 @@ type Logger interface {
        Fatalf(format string, args ...interface{})
 }
 
-var rLog Logger = logrus.New()
+var rLog Logger
+
+func init() {
+       r := logrus.New()
+       //r.SetLevel(logrus.DebugLevel)
+       rLog = r
+}
 
 func SetLogger(log Logger) {
        rLog = log
diff --git a/utils/helper.go b/utils/helper.go
index 1ad0441..c5495b0 100644
--- a/utils/helper.go
+++ b/utils/helper.go
@@ -20,9 +20,7 @@ package utils
 import (
        "bytes"
        "encoding/binary"
-       "errors"
        "fmt"
-       "net"
        "os"
        "sync"
        "time"
@@ -63,29 +61,6 @@ func updateTimestamp() {
        nextTimestamp = int64(time.Date(year, month, 1, 0, 0, 0, 0, 
time.Local).AddDate(0, 1, 0).Unix())
 }
 
-func LocalIP() []byte {
-       ip, err := clientIP4()
-       if err != nil {
-               return []byte{0, 0, 0, 0}
-       }
-       return ip
-}
-
-func clientIP4() ([]byte, error) {
-       addrs, err := net.InterfaceAddrs()
-       if err != nil {
-               return nil, errors.New("unexpected IP address")
-       }
-       for _, addr := range addrs {
-               if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() 
{
-                       if ip4 := ipnet.IP.To4(); ip4 != nil {
-                               return ip4, nil
-                       }
-               }
-       }
-       return nil, errors.New("unknown IP address")
-}
-
 func GetAddressByBytes(data []byte) string {
        return "127.0.0.1"
 }
diff --git a/utils/helper_test.go b/utils/helper_test.go
index 967b0ae..ee55cd2 100644
--- a/utils/helper_test.go
+++ b/utils/helper_test.go
@@ -26,15 +26,6 @@ func TestClassLoaderID(t *testing.T) {
        }
 }
 
-func TestLocalIP(t *testing.T) {
-       ip := LocalIP()
-       if ip[0] == 0 && ip[1] == 0 && ip[2] == 0 && ip[3] == 0 {
-               t.Errorf("failed to get host public ip4 address")
-       } else {
-               t.Logf("ip4 address: %v", ip)
-       }
-}
-
 func BenchmarkMessageClientID(b *testing.B) {
        for i := 0; i < b.N; i++ {
                MessageClientID()
diff --git a/utils/net.go b/utils/net.go
new file mode 100644
index 0000000..5da1edc
--- /dev/null
+++ b/utils/net.go
@@ -0,0 +1,30 @@
+package utils
+
+import (
+       "errors"
+       "fmt"
+       "net"
+)
+
+func LocalIP() string {
+       ip, err := clientIP4()
+       if err != nil {
+               return ""
+       }
+       return fmt.Sprintf("%d.%d.%d.%d", ip[0], ip[1], ip[2], ip[3])
+}
+
+func clientIP4() ([]byte, error) {
+       addrs, err := net.InterfaceAddrs()
+       if err != nil {
+               return nil, errors.New("unexpected IP address")
+       }
+       for _, addr := range addrs {
+               if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() 
{
+                       if ip4 := ipnet.IP.To4(); ip4 != nil {
+                               return ip4, nil
+                       }
+               }
+       }
+       return nil, errors.New("unknown IP address")
+}
diff --git a/utils/net_test.go b/utils/net_test.go
new file mode 100644
index 0000000..9f76062
--- /dev/null
+++ b/utils/net_test.go
@@ -0,0 +1,7 @@
+package utils
+
+import "testing"
+
+func TestLocalIP2(t *testing.T) {
+       t.Log(LocalIP())
+}

Reply via email to