This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new e673b0e  [ISSUE #105] Add PullConsumer. resolve #105 (#106)
e673b0e is described below

commit e673b0ef82247ddc9105b6c2d9889163b300a226
Author: xujianhai666 <[email protected]>
AuthorDate: Wed Jul 10 14:58:13 2019 +0800

    [ISSUE #105] Add PullConsumer. resolve #105 (#106)
    
    * add PullConsumer. resolve #105
    
    * fix code review.
---
 api.go                           |   8 +-
 consumer/consumer.go             | 166 ++++++++++++++++++++++++++-----
 consumer/option.go               |  10 +-
 consumer/pull_consumer.go        | 206 +++++++++++++++++++++++++--------------
 consumer/push_consumer.go        |  36 +++----
 examples/consumer/pull/main.go   |  65 ++++++++++++
 examples/consumer/simple/main.go |   2 +-
 internal/client.go               |   9 +-
 internal/remote/future.go        |   4 +-
 internal/remote/remote_client.go |   6 --
 utils/errors.go                  |  10 ++
 11 files changed, 388 insertions(+), 134 deletions(-)

diff --git a/api.go b/api.go
index ecd6ab8..eec98bf 100644
--- a/api.go
+++ b/api.go
@@ -54,7 +54,11 @@ type PullConsumer interface {
        Shutdown() error
        Pull(context.Context, string, consumer.MessageSelector, int) 
(*primitive.PullResult, error)
        PullFrom(context.Context, *primitive.MessageQueue, int64, int) 
(*primitive.PullResult, error)
-       // only update in memory
-       UpdateOffset(primitive.MessageQueue, int64) error
+       CurrentOffset(*primitive.MessageQueue) (int64, error)
+       UpdateOffset(*primitive.MessageQueue, int64) error
        PersistOffset(context.Context) error
 }
+
+func NewPullConsumer(opts ...consumer.Option) (PullConsumer, error) {
+       return consumer.NewPullConsumer(opts...)
+}
diff --git a/consumer/consumer.go b/consumer/consumer.go
index 5bd9674..cff84ba 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -18,13 +18,13 @@ limitations under the License.
 package consumer
 
 import (
+       "context"
        "encoding/json"
        "fmt"
        "sort"
        "strconv"
        "strings"
        "sync"
-       "sync/atomic"
        "time"
 
        "github.com/apache/rocketmq-client-go/internal"
@@ -32,6 +32,7 @@ import (
        "github.com/apache/rocketmq-client-go/primitive"
        "github.com/apache/rocketmq-client-go/rlog"
        "github.com/apache/rocketmq-client-go/utils"
+       "github.com/pkg/errors"
        "github.com/tidwall/gjson"
 )
 
@@ -64,6 +65,11 @@ const (
        _SubAll = "*"
 )
 
+var(
+       ErrCreated = errors.New("consumer group has been created")
+       ErrBrokerNotFound = errors.New("broker can not found")
+)
+
 // Message model defines the way how messages are delivered to each consumer 
clients.
 // </p>
 //
@@ -247,11 +253,42 @@ type defaultConsumer struct {
        prCh chan PullRequest
 }
 
-func (dc *defaultConsumer) persistConsumerOffset() {
+func (dc *defaultConsumer) start() error {
+
+       if dc.model == Clustering {
+               // set retry topic
+               retryTopic := internal.GetRetryTopic(dc.consumerGroup)
+               sub := buildSubscriptionData(retryTopic, MessageSelector{TAG, 
_SubAll})
+               dc.subscriptionDataTable.Store(retryTopic, sub)
+       }
+
+       dc.client = internal.GetOrNewRocketMQClient(dc.option.ClientOptions)
+       if dc.model == Clustering {
+               dc.option.ChangeInstanceNameToPID()
+               dc.storage = NewRemoteOffsetStore(dc.consumerGroup, dc.client)
+       } else {
+               dc.storage = NewLocalFileOffsetStore(dc.consumerGroup, 
dc.client.ClientID())
+       }
+
+       dc.client.UpdateTopicRouteInfo()
+       dc.client.Start()
+       dc.state = internal.StateRunning
+
+       return nil
+}
+
+func (dc *defaultConsumer) shutdown() error {
+       dc.state = internal.StateRunning
+       dc.client.Shutdown()
+
+       return nil
+}
+
+
+func (dc *defaultConsumer) persistConsumerOffset() error {
        err := dc.makeSureStateOK()
        if err != nil {
-               rlog.Errorf("consumer state error: %s", err.Error())
-               return
+               return err
        }
        mqs := make([]*primitive.MessageQueue, 0)
        dc.processQueueTable.Range(func(key, value interface{}) bool {
@@ -259,6 +296,22 @@ func (dc *defaultConsumer) persistConsumerOffset() {
                return true
        })
        dc.storage.persist(mqs)
+       return nil
+}
+
+func (c *defaultConsumer) updateOffset(queue *primitive.MessageQueue, offset 
int64) error {
+       c.storage.update(queue, offset, false)
+       return nil
+}
+
+func (dc *defaultConsumer) subscriptionAutomatically(topic string) {
+       _, exist := dc.subscriptionDataTable.Load(topic)
+       if !exist {
+               s := MessageSelector{
+                       Expression: _SubAll,
+               }
+               dc.subscriptionDataTable.Store(topic, 
buildSubscriptionData(topic, s))
+       }
 }
 
 func (dc *defaultConsumer) updateTopicSubscribeInfo(topic string, mqs 
[]*primitive.MessageQueue) {
@@ -671,6 +724,89 @@ func (dc *defaultConsumer) computePullFromWhere(mq 
*primitive.MessageQueue) int6
        return result
 }
 
+func (dc *defaultConsumer) pullInner(ctx context.Context, queue 
*primitive.MessageQueue, data *internal.SubscriptionData,
+       offset int64, numbers int, sysFlag int32, commitOffsetValue int64) 
(*primitive.PullResult, error) {
+
+       brokerResult := tryFindBroker(queue)
+       if brokerResult == nil {
+               rlog.Warnf("no broker found for %s", queue.String())
+               return nil, ErrBrokerNotFound
+       }
+
+       if brokerResult.Slave {
+               sysFlag = clearCommitOffsetFlag(sysFlag)
+       }
+
+       if (data.ExpType == string(TAG)) && brokerResult.BrokerVersion < 
internal.V4_1_0 {
+               return nil, fmt.Errorf("the broker [%s, %v] does not upgrade to 
support for filter message by %v",
+                       queue.BrokerName, brokerResult.BrokerVersion, 
data.ExpType)
+       }
+
+       pullRequest := &internal.PullMessageRequest{
+               ConsumerGroup: dc.consumerGroup,
+               Topic:         queue.Topic,
+               QueueId:       int32(queue.QueueId),
+               QueueOffset:   offset,
+               MaxMsgNums:    int32(numbers),
+               SysFlag:       sysFlag,
+               CommitOffset:  commitOffsetValue,
+               // TODO: 和java对齐
+               SuspendTimeoutMillis: _BrokerSuspendMaxTime,
+               SubExpression:        data.SubString,
+               // TODO: add subversion
+               ExpressionType: string(data.ExpType),
+       }
+
+       if data.ExpType == string(TAG) {
+               pullRequest.SubVersion = 0
+       } else {
+               pullRequest.SubVersion = data.SubVersion
+       }
+
+       // TODO: add computPullFromWhichFilterServer
+
+       return dc.client.PullMessage(context.Background(), 
brokerResult.BrokerAddr, pullRequest)
+}
+
+func (dc *defaultConsumer) processPullResult(mq *primitive.MessageQueue, 
result *primitive.PullResult, data *internal.SubscriptionData) {
+
+       updatePullFromWhichNode(mq, result.SuggestWhichBrokerId)
+
+       switch result.Status {
+       case primitive.PullFound:
+               result.SetMessageExts(primitive.DecodeMessage(result.GetBody()))
+               msgs := result.GetMessageExts()
+
+               // filter message according to tags
+               msgListFilterAgain := msgs
+               if len(data.Tags) > 0 && data.ClassFilterMode {
+                       msgListFilterAgain = make([]*primitive.MessageExt, 
len(msgs))
+                       for _, msg := range msgs {
+                               _, exist := data.Tags[msg.GetTags()]
+                               if exist {
+                                       msgListFilterAgain = 
append(msgListFilterAgain, msg)
+                               }
+                       }
+               }
+
+               // TODO: add filter message hook
+               for _, msg := range msgListFilterAgain {
+                       traFlag, _ := 
strconv.ParseBool(msg.Properties[primitive.PropertyTransactionPrepared])
+                       if traFlag {
+                               msg.TransactionId = 
msg.Properties[primitive.PropertyUniqueClientMessageIdKeyIndex]
+                       }
+
+                       if msg.Properties == nil {
+                               msg.Properties = make(map[string]string)
+                       }
+                       msg.Properties[primitive.PropertyMinOffset] = 
strconv.FormatInt(result.MinOffset, 10)
+                       msg.Properties[primitive.PropertyMaxOffset] = 
strconv.FormatInt(result.MaxOffset, 10)
+               }
+
+               result.SetMessageExts(msgListFilterAgain)
+       }
+}
+
 func (dc *defaultConsumer) findConsumerList(topic string) []string {
        brokerAddr := internal.FindBrokerAddrByTopic(topic)
        if brokerAddr == "" {
@@ -728,6 +864,10 @@ func (dc *defaultConsumer) queryMaxOffset(mq 
*primitive.MessageQueue) (int64, er
        return strconv.ParseInt(response.ExtFields["offset"], 10, 64)
 }
 
+func (dc *defaultConsumer) queryOffset(mq *primitive.MessageQueue) (int64) {
+       return dc.storage.read(mq, _ReadMemoryThenStore)
+}
+
 // SearchOffsetByTimestamp with specific queueId and topic
 func (dc *defaultConsumer) searchOffsetByTimestamp(mq *primitive.MessageQueue, 
timestamp int64) (int64, error) {
        brokerAddr := internal.FindBrokerAddrByName(mq.BrokerName)
@@ -786,24 +926,6 @@ func buildSubscriptionData(topic string, selector 
MessageSelector) *internal.Sub
        return subData
 }
 
-func getNextQueueOf(topic string) *primitive.MessageQueue {
-       queues, err := internal.FetchSubscribeMessageQueues(topic)
-       if err != nil && len(queues) > 0 {
-               rlog.Error(err.Error())
-               return nil
-       }
-       var index int64
-       v, exist := queueCounterTable.Load(topic)
-       if !exist {
-               index = -1
-               queueCounterTable.Store(topic, 0)
-       } else {
-               index = v.(int64)
-       }
-
-       return queues[int(atomic.AddInt64(&index, 1))%len(queues)]
-}
-
 func buildSysFlag(commitOffset, suspend, subscription, classFilter bool) int32 
{
        var flag int32 = 0
        if commitOffset {
diff --git a/consumer/option.go b/consumer/option.go
index 2a826e8..67bcaec 100644
--- a/consumer/option.go
+++ b/consumer/option.go
@@ -113,6 +113,14 @@ func defaultPushConsumerOptions() consumerOptions {
 
 type Option func(*consumerOptions)
 
+func defaultPullConsumerOptions() consumerOptions {
+       opts := consumerOptions{
+               ClientOptions: internal.DefaultClientOptions(),
+       }
+       opts.ClientOptions.GroupName = "DEFAULT_CONSUMER"
+       return opts
+}
+
 func WithConsumerModel(m MessageModel) Option {
        return func(options *consumerOptions) {
                options.ConsumerModel = m
@@ -173,4 +181,4 @@ func WithRetry(retries int) Option {
        return func(opts *consumerOptions) {
                opts.RetryTimes = retries
        }
-}
+}
\ No newline at end of file
diff --git a/consumer/pull_consumer.go b/consumer/pull_consumer.go
index c8bcdc5..19e2369 100644
--- a/consumer/pull_consumer.go
+++ b/consumer/pull_consumer.go
@@ -19,19 +19,38 @@ package consumer
 
 import (
        "context"
-       "errors"
        "fmt"
-       "strconv"
        "sync"
+       "sync/atomic"
 
        "github.com/apache/rocketmq-client-go/internal"
        "github.com/apache/rocketmq-client-go/primitive"
+       "github.com/apache/rocketmq-client-go/rlog"
+       "github.com/apache/rocketmq-client-go/utils"
+       "github.com/pkg/errors"
 )
 
 type PullConsumer interface {
+       // Start
        Start()
+
+       // Shutdown refuse all new pull operation, finish all submitted.
        Shutdown()
+
+       // Pull pull message of topic,  selector indicate which queue to pull.
        Pull(ctx context.Context, topic string, selector MessageSelector, 
numbers int) (*primitive.PullResult, error)
+
+       // PullFrom pull messages of queue from the offset to offset + numbers
+       PullFrom(ctx context.Context, queue *primitive.MessageQueue, offset 
int64, numbers int) (*primitive.PullResult, error)
+
+       // updateOffset update offset of queue in mem
+       UpdateOffset(queue *primitive.MessageQueue, offset int64) error
+
+       // PersistOffset persist all offset in mem.
+       PersistOffset(ctx context.Context) error
+
+       // CurrentOffset return the current offset of queue in mem.
+       CurrentOffset(queue *primitive.MessageQueue) (int64, error)
 }
 
 var (
@@ -39,20 +58,60 @@ var (
 )
 
 type defaultPullConsumer struct {
-       state     internal.ServiceState
+       *defaultConsumer
+
        option    consumerOptions
        client    internal.RMQClient
        GroupName string
        Model     MessageModel
        UnitMode  bool
+
+       interceptor primitive.Interceptor
 }
 
-func (c *defaultPullConsumer) Start() {
+func NewPullConsumer(options ...Option) (*defaultPullConsumer, error) {
+       defaultOpts := defaultPullConsumerOptions()
+       for _, apply := range options {
+               apply(&defaultOpts)
+       }
+
+       srvs, err := internal.NewNamesrv(defaultOpts.NameServerAddrs...)
+       if err != nil {
+               return nil, errors.Wrap(err, "new Namesrv failed.")
+       }
+       internal.RegisterNamsrv(srvs)
+
+       dc := &defaultConsumer{
+               consumerGroup: defaultOpts.GroupName,
+               cType:         _PullConsume,
+               state:         internal.StateCreateJust,
+               prCh:          make(chan PullRequest, 4),
+               model:         defaultOpts.ConsumerModel,
+               option:        defaultOpts,
+       }
+
+       c := &defaultPullConsumer{
+               defaultConsumer: dc,
+       }
+       return c, nil
+}
+
+func (c *defaultPullConsumer) Start() error {
        c.state = internal.StateRunning
+
+       var err error
+       c.once.Do(func() {
+               err = c.start()
+               if err != nil {
+                       return
+               }
+       })
+
+       return err
 }
 
 func (c *defaultPullConsumer) Pull(ctx context.Context, topic string, selector 
MessageSelector, numbers int) (*primitive.PullResult, error) {
-       mq := getNextQueueOf(topic)
+       mq := c.getNextQueueOf(topic)
        if mq == nil {
                return nil, fmt.Errorf("prepard to pull topic: %s, but no queue 
is founded", topic)
        }
@@ -64,10 +123,28 @@ func (c *defaultPullConsumer) Pull(ctx context.Context, 
topic string, selector M
                return nil, err
        }
 
-       processPullResult(mq, result, data)
+       c.processPullResult(mq, result, data)
        return result, nil
 }
 
+func (c *defaultPullConsumer) getNextQueueOf(topic string) 
*primitive.MessageQueue {
+       queues, err := internal.FetchSubscribeMessageQueues(topic)
+       if err != nil && len(queues) > 0 {
+               rlog.Error(err.Error())
+               return nil
+       }
+       var index int64
+       v, exist := queueCounterTable.Load(topic)
+       if !exist {
+               index = -1
+               queueCounterTable.Store(topic, 0)
+       } else {
+               index = v.(int64)
+       }
+
+       return queues[int(atomic.AddInt64(&index, 1))%len(queues)]
+}
+
 // SubscribeWithChan ack manually
 func (c *defaultPullConsumer) SubscribeWithChan(topic, selector 
MessageSelector) (chan *primitive.Message, error) {
        return nil, nil
@@ -83,62 +160,46 @@ func (c *defaultPullConsumer) ACK(msg *primitive.Message, 
result ConsumeResult)
 
 }
 
-func (c *defaultPullConsumer) pull(ctx context.Context, mq 
*primitive.MessageQueue, data *internal.SubscriptionData,
-       offset int64, numbers int) (*primitive.PullResult, error) {
+func (c *defaultConsumer) checkPull(ctx context.Context, mq 
*primitive.MessageQueue, offset int64, numbers int) error {
        err := c.makeSureStateOK()
        if err != nil {
-               return nil, err
+               return err
        }
 
        if mq == nil {
-               return nil, errors.New("MessageQueue is nil")
+               return utils.ErrMQEmpty
        }
 
        if offset < 0 {
-               return nil, errors.New("offset < 0")
+               return utils.ErrOffset
        }
 
        if numbers <= 0 {
-               numbers = 1
+               return utils.ErrNumbers
        }
-       c.subscriptionAutomatically(mq.Topic)
+       return nil
+}
 
-       brokerResult := tryFindBroker(mq)
-       if brokerResult == nil {
-               return nil, fmt.Errorf("the broker %s does not exist", 
mq.BrokerName)
-       }
+// TODO: add timeout limit
+// TODO: add hook
+func (c *defaultPullConsumer) pull(ctx context.Context, mq 
*primitive.MessageQueue, data *internal.SubscriptionData,
+       offset int64, numbers int) (*primitive.PullResult, error) {
 
-       if (data.ExpType == string(TAG)) && brokerResult.BrokerVersion < 
internal.V4_1_0 {
-               return nil, fmt.Errorf("the broker [%s, %v] does not upgrade to 
support for filter message by %v",
-                       mq.BrokerName, brokerResult.BrokerVersion, data.ExpType)
+       if err := c.checkPull(ctx, mq, offset, numbers); err != nil {
+               return nil, err
        }
 
-       sysFlag := buildSysFlag(false, true, true, false)
+       c.subscriptionAutomatically(mq.Topic)
 
-       if brokerResult.Slave {
-               sysFlag = clearCommitOffsetFlag(sysFlag)
-       }
-       pullRequest := &internal.PullMessageRequest{
-               ConsumerGroup:        c.GroupName,
-               Topic:                mq.Topic,
-               QueueId:              int32(mq.QueueId),
-               QueueOffset:          offset,
-               MaxMsgNums:           int32(numbers),
-               SysFlag:              sysFlag,
-               CommitOffset:         0,
-               SuspendTimeoutMillis: _BrokerSuspendMaxTime,
-               SubExpression:        data.SubString,
-               ExpressionType:       string(data.ExpType),
-       }
+       sysFlag := buildSysFlag(false, true, true, false)
 
-       if data.ExpType == string(TAG) {
-               pullRequest.SubVersion = 0
-       } else {
-               pullRequest.SubVersion = data.SubVersion
+       pullResp, err := c.pullInner(ctx, mq, data, offset, numbers, sysFlag, 0)
+       if err != nil {
+               return nil, err
        }
+       c.processPullResult(mq, pullResp, data)
 
-       // TODO computePullFromWhichFilterServer
-       return c.client.PullMessage(ctx, brokerResult.BrokerAddr, pullRequest)
+       return pullResp, err
 }
 
 func (c *defaultPullConsumer) makeSureStateOK() error {
@@ -148,42 +209,39 @@ func (c *defaultPullConsumer) makeSureStateOK() error {
        return nil
 }
 
-func (c *defaultPullConsumer) subscriptionAutomatically(topic string) {
-       // TODO
+func (c *defaultPullConsumer) nextOffsetOf(queue *primitive.MessageQueue) 
int64 {
+       return c.computePullFromWhere(queue)
 }
 
-func (c *defaultPullConsumer) nextOffsetOf(queue *primitive.MessageQueue) 
int64 {
-       return 0
-}
-
-func processPullResult(mq *primitive.MessageQueue, result 
*primitive.PullResult, data *internal.SubscriptionData) {
-       updatePullFromWhichNode(mq, result.SuggestWhichBrokerId)
-       switch result.Status {
-       case primitive.PullFound:
-               msgs := result.GetMessageExts()
-               msgListFilterAgain := msgs
-               if len(data.Tags) > 0 && data.ClassFilterMode {
-                       msgListFilterAgain = make([]*primitive.MessageExt, 
len(msgs))
-                       for _, msg := range msgs {
-                               _, exist := data.Tags[msg.GetTags()]
-                               if exist {
-                                       msgListFilterAgain = 
append(msgListFilterAgain, msg)
-                               }
-                       }
-               }
+// PullFrom pull messages of queue from the offset to offset + numbers
+func (c *defaultPullConsumer) PullFrom(ctx context.Context, queue 
*primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult, 
error) {
+       if err := c.checkPull(ctx, queue, offset, numbers); err != nil {
+               return nil, err
+       }
+
+       selector := MessageSelector{}
+       data := buildSubscriptionData(queue.Topic, selector)
 
-               // TODO hook
+       return c.pull(ctx, queue, data, offset, numbers)
+}
 
-               for _, msg := range msgListFilterAgain {
-                       traFlag, _ := 
strconv.ParseBool(msg.Properties[primitive.PropertyTransactionPrepared])
-                       if traFlag {
-                               msg.TransactionId = 
msg.Properties[primitive.PropertyUniqueClientMessageIdKeyIndex]
-                       }
+// updateOffset update offset of queue in mem
+func (c *defaultPullConsumer) UpdateOffset(queue *primitive.MessageQueue, 
offset int64) error {
+       return c.updateOffset(queue, offset)
+}
 
-                       msg.Properties[primitive.PropertyMinOffset] = 
strconv.FormatInt(result.MinOffset, 10)
-                       msg.Properties[primitive.PropertyMaxOffset] = 
strconv.FormatInt(result.MaxOffset, 10)
-               }
+// PersistOffset persist all offset in mem.
+func (c *defaultPullConsumer) PersistOffset(ctx context.Context) error {
+       return c.persistConsumerOffset()
+}
 
-               result.SetMessageExts(msgListFilterAgain)
-       }
+// CurrentOffset return the current offset of queue in mem.
+func (c *defaultPullConsumer) CurrentOffset(queue *primitive.MessageQueue) 
(int64, error) {
+       v := c.queryOffset(queue)
+       return v, nil
+}
+
+// Shutdown close defaultConsumer, refuse new request.
+func (c *defaultPullConsumer) Shutdown() error {
+       return c.defaultConsumer.shutdown()
 }
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index c5641a5..9000651 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -126,20 +126,18 @@ func (pc *pushConsumer) Start() error {
                pc.state = internal.StateStartFailed
                pc.validate()
 
-               if pc.model == Clustering {
-                       // set retry topic
-                       retryTopic := internal.GetRetryTopic(pc.consumerGroup)
-                       pc.subscriptionDataTable.Store(retryTopic, 
buildSubscriptionData(retryTopic,
-                               MessageSelector{TAG, _SubAll}))
+               err = pc.defaultConsumer.start()
+               if err != nil {
+                       return
                }
 
-               pc.client = 
internal.GetOrNewRocketMQClient(pc.option.ClientOptions)
-               if pc.model == Clustering {
-                       pc.option.ChangeInstanceNameToPID()
-                       pc.storage = NewRemoteOffsetStore(pc.consumerGroup, 
pc.client)
-               } else {
-                       pc.storage = NewLocalFileOffsetStore(pc.consumerGroup, 
pc.client.ClientID())
+               err := pc.client.RegisterConsumer(pc.consumerGroup, pc)
+               if err != nil {
+                       pc.state = internal.StateStartFailed
+                       rlog.Errorf("the consumer group: [%s] has been created, 
specify another name.", pc.consumerGroup)
+                       err = ErrCreated
                }
+
                go func() {
                        // todo start clean msg expired
                        // TODO quit
@@ -151,16 +149,6 @@ func (pc *pushConsumer) Start() error {
                        }
                }()
 
-               err = pc.client.RegisterConsumer(pc.consumerGroup, pc)
-               if err != nil {
-                       pc.state = internal.StateCreateJust
-                       rlog.Errorf("the consumer group: [%s] has been created, 
specify another name.", pc.consumerGroup)
-                       err = errors.New("consumer group has been created")
-                       return
-               }
-               pc.client.UpdateTopicRouteInfo()
-               pc.client.Start()
-               pc.state = internal.StateRunning
        })
 
        pc.client.UpdateTopicRouteInfo()
@@ -201,8 +189,8 @@ func (pc *pushConsumer) Rebalance() {
        pc.defaultConsumer.doBalance()
 }
 
-func (pc *pushConsumer) PersistConsumerOffset() {
-       pc.defaultConsumer.persistConsumerOffset()
+func (pc *pushConsumer) PersistConsumerOffset() error {
+       return pc.defaultConsumer.persistConsumerOffset()
 }
 
 func (pc *pushConsumer) UpdateTopicSubscribeInfo(topic string, mqs 
[]*primitive.MessageQueue) {
@@ -651,7 +639,7 @@ func (pc *pushConsumer) consumeMessageCurrently(pq 
*processQueue, mq *primitive.
                                        msgs := req.([]*primitive.MessageExt)
                                        r, e := pc.consume(ctx, msgs...)
 
-                                       realReply := reply.(ConsumeResultHolder)
+                                       realReply := 
reply.(*ConsumeResultHolder)
                                        realReply.ConsumeResult = r
                                        return e
                                })
diff --git a/examples/consumer/pull/main.go b/examples/consumer/pull/main.go
new file mode 100644
index 0000000..4076450
--- /dev/null
+++ b/examples/consumer/pull/main.go
@@ -0,0 +1,65 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package main
+
+import (
+       "context"
+       "fmt"
+       "time"
+
+       "github.com/apache/rocketmq-client-go/consumer"
+       "github.com/apache/rocketmq-client-go/primitive"
+       "github.com/apache/rocketmq-client-go/rlog"
+       "github.com/apache/rocketmq-client-go/utils"
+)
+
+func main() {
+       c, err := consumer.NewPullConsumer(consumer.WithGroupName("testGroup"), 
consumer.WithNameServer([]string{"127.0.0.1:9876"}))
+       if err != nil{
+               rlog.Fatal("fail to new pullConsumer: ", err)
+       }
+       c.Start()
+
+       ctx := context.Background()
+       queue := &primitive.MessageQueue{
+               Topic:      "TopicTest",
+               BrokerName: "", // replace with your broker name. otherwise, 
pull will failed.
+               QueueId:    0,
+       }
+
+       offset := int64(0)
+       for {
+               resp, err := c.PullFrom(ctx, queue, offset, 10)
+               if err != nil {
+                       if err == utils.ErrRequestTimeout {
+                               fmt.Printf("timeout \n")
+                               time.Sleep(1 *time.Second)
+                               continue
+                       }
+                       fmt.Printf("unexpectable err: %v \n", err)
+                       return
+               }
+               if resp.Status == primitive.PullFound {
+                       fmt.Printf("pull message success. nextOffset: %d \n", 
resp.NextBeginOffset)
+                       for _, msg := range resp.GetMessageExts() {
+                               fmt.Printf("pull msg: %v \n", msg)
+                       }
+               }
+               offset = resp.NextBeginOffset
+       }
+}
diff --git a/examples/consumer/simple/main.go b/examples/consumer/simple/main.go
index 43947f9..2ddc465 100644
--- a/examples/consumer/simple/main.go
+++ b/examples/consumer/simple/main.go
@@ -33,7 +33,7 @@ func main() {
                consumer.WithGroupName("testGroup"),
                consumer.WithNameServer([]string{"127.0.0.1:9876"}),
        )
-       err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx 
context.Context,
+       err := c.Subscribe("TopicTest", consumer.MessageSelector{}, func(ctx 
context.Context,
                msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
                fmt.Printf("subscribe callback: %v \n", msgs)
                return consumer.ConsumeSuccess, nil
diff --git a/internal/client.go b/internal/client.go
index 1db43c7..09b7ca3 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -76,7 +76,7 @@ type InnerProducer interface {
 }
 
 type InnerConsumer interface {
-       PersistConsumerOffset()
+       PersistConsumerOffset() error
        UpdateTopicSubscribeInfo(topic string, mqs []*primitive.MessageQueue)
        IsSubscribeTopicNeedUpdate(topic string) bool
        SubscriptionDataList() []*SubscriptionData
@@ -217,7 +217,10 @@ func (c *rmqClient) Start() {
                        for !c.close {
                                c.consumerMap.Range(func(key, value 
interface{}) bool {
                                        consumer := value.(InnerConsumer)
-                                       consumer.PersistConsumerOffset()
+                                       err := consumer.PersistConsumerOffset()
+                                       if err != nil {
+                                               rlog.Errorf("persist offset 
failed. err: %v", err)
+                                       }
                                        return true
                                })
                                time.Sleep(_PersistOffset)
@@ -305,7 +308,7 @@ func (c *rmqClient) SendHeartbeatToAllBrokerWithLock() {
        hbData.ProducerDatas = pData
        hbData.ConsumerDatas = cData
        if len(pData) == 0 && len(cData) == 0 {
-               rlog.Info("sending heartbeat, but no consumer and no consumer")
+               rlog.Info("sending heartbeat, but no producer and no consumer")
                return
        }
        brokerAddressesMap.Range(func(key, value interface{}) bool {
diff --git a/internal/remote/future.go b/internal/remote/future.go
index 93990e5..5a1c724 100644
--- a/internal/remote/future.go
+++ b/internal/remote/future.go
@@ -20,6 +20,8 @@ package remote
 import (
        "sync"
        "time"
+
+       "github.com/apache/rocketmq-client-go/utils"
 )
 
 // ResponseFuture
@@ -71,7 +73,7 @@ func (r *ResponseFuture) waitResponse() (*RemotingCommand, 
error) {
                        cmd, err = r.ResponseCommand, r.Err
                        goto done
                case <-timer.C:
-                       err = ErrRequestTimeout
+                       err = utils.ErrRequestTimeout
                        r.Err = err
                        goto done
                }
diff --git a/internal/remote/remote_client.go b/internal/remote/remote_client.go
index 1d0330a..8e32216 100644
--- a/internal/remote/remote_client.go
+++ b/internal/remote/remote_client.go
@@ -20,7 +20,6 @@ import (
        "bufio"
        "bytes"
        "encoding/binary"
-       "errors"
        "io"
        "net"
        "sync"
@@ -29,11 +28,6 @@ import (
        "github.com/apache/rocketmq-client-go/rlog"
 )
 
-var (
-       //ErrRequestTimeout for request timeout error
-       ErrRequestTimeout = errors.New("request timeout")
-)
-
 type ClientRequestFunc func(*RemotingCommand) *RemotingCommand
 
 type TcpOption struct {
diff --git a/utils/errors.go b/utils/errors.go
index a3c7ead..507d7bb 100644
--- a/utils/errors.go
+++ b/utils/errors.go
@@ -19,6 +19,16 @@ package utils
 
 import (
        "github.com/apache/rocketmq-client-go/rlog"
+       "github.com/pkg/errors"
+)
+
+var(
+       // ErrRequestTimeout for request timeout error
+       ErrRequestTimeout = errors.New("request timeout")
+
+       ErrMQEmpty = errors.New("MessageQueue is nil")
+       ErrOffset  = errors.New("offset < 0")
+       ErrNumbers = errors.New("numbers < 0")
 )
 
 func CheckError(action string, err error) {

Reply via email to