xujianhai666 commented on a change in pull request #106: [ISSUE #105] Add 
PullConsumer. resolve #105
URL: https://github.com/apache/rocketmq-client-go/pull/106#discussion_r301902618
 
 

 ##########
 File path: consumer/pull_consumer.go
 ##########
 @@ -19,40 +19,104 @@ package consumer
 
 import (
        "context"
-       "errors"
        "fmt"
-       "strconv"
        "sync"
+       "sync/atomic"
 
        "github.com/apache/rocketmq-client-go/internal"
        "github.com/apache/rocketmq-client-go/primitive"
+       "github.com/apache/rocketmq-client-go/rlog"
+       "github.com/pkg/errors"
+)
+
+var (
+       ErrMQEmpty = errors.New("MessageQueue is nil")
+       ErrOffset  = errors.New("offset < 0")
+       ErrNumbers = errors.New("numbers < 0")
 )
 
 type PullConsumer interface {
+       // Start
        Start()
+
+       // Shutdown refuse all new pull operation, finish all submitted.
        Shutdown()
+
+       // Pull pull message of topic,  selector indicate which queue to pull.
        Pull(ctx context.Context, topic string, selector MessageSelector, 
numbers int) (*primitive.PullResult, error)
+
+       // PullFrom pull messages of queue from the offset to offset + numbers
+       PullFrom(ctx context.Context, queue *primitive.MessageQueue, offset 
int64, numbers int) (*primitive.PullResult, error)
+
+       // updateOffset update offset of queue in mem
+       UpdateOffset(queue *primitive.MessageQueue, offset int64) error
+
+       // PersistOffset persist all offset in mem.
+       PersistOffset(ctx context.Context) error
+
+       // CurrentOffset return the current offset of queue in mem.
+       CurrentOffset(queue *primitive.MessageQueue) (int64, error)
 }
 
 var (
        queueCounterTable sync.Map
 )
 
 type defaultPullConsumer struct {
-       state     internal.ServiceState
+       *defaultConsumer
+
        option    consumerOptions
        client    internal.RMQClient
        GroupName string
        Model     MessageModel
        UnitMode  bool
+
+       interceptor primitive.Interceptor
+}
+
+func NewPullConsumer(options ...Option) (*defaultPullConsumer, error) {
+       defaultOpts := defaultPullConsumerOptions()
+       for _, apply := range options {
+               apply(&defaultOpts)
+       }
+
+       srvs, err := internal.NewNamesrv(defaultOpts.NameServerAddrs...)
+       if err != nil {
+               return nil, errors.Wrap(err, "new Namesrv failed.")
+       }
+       internal.RegisterNamsrv(srvs)
+
+       dc := &defaultConsumer{
+               consumerGroup: defaultOpts.GroupName,
+               cType:         _PushConsume,
 
 Review comment:
   done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to