xujianhai666 commented on a change in pull request #106: [ISSUE #105] Add
PullConsumer. resolve #105
URL: https://github.com/apache/rocketmq-client-go/pull/106#discussion_r301902618
##########
File path: consumer/pull_consumer.go
##########
@@ -19,40 +19,104 @@ package consumer
import (
"context"
- "errors"
"fmt"
- "strconv"
"sync"
+ "sync/atomic"
"github.com/apache/rocketmq-client-go/internal"
"github.com/apache/rocketmq-client-go/primitive"
+ "github.com/apache/rocketmq-client-go/rlog"
+ "github.com/pkg/errors"
+)
+
+var (
+ ErrMQEmpty = errors.New("MessageQueue is nil")
+ ErrOffset = errors.New("offset < 0")
+ ErrNumbers = errors.New("numbers < 0")
)
type PullConsumer interface {
+ // Start
Start()
+
+ // Shutdown refuse all new pull operation, finish all submitted.
Shutdown()
+
+ // Pull pull message of topic, selector indicate which queue to pull.
Pull(ctx context.Context, topic string, selector MessageSelector,
numbers int) (*primitive.PullResult, error)
+
+ // PullFrom pull messages of queue from the offset to offset + numbers
+ PullFrom(ctx context.Context, queue *primitive.MessageQueue, offset
int64, numbers int) (*primitive.PullResult, error)
+
+ // updateOffset update offset of queue in mem
+ UpdateOffset(queue *primitive.MessageQueue, offset int64) error
+
+ // PersistOffset persist all offset in mem.
+ PersistOffset(ctx context.Context) error
+
+ // CurrentOffset return the current offset of queue in mem.
+ CurrentOffset(queue *primitive.MessageQueue) (int64, error)
}
var (
queueCounterTable sync.Map
)
type defaultPullConsumer struct {
- state internal.ServiceState
+ *defaultConsumer
+
option consumerOptions
client internal.RMQClient
GroupName string
Model MessageModel
UnitMode bool
+
+ interceptor primitive.Interceptor
+}
+
+func NewPullConsumer(options ...Option) (*defaultPullConsumer, error) {
+ defaultOpts := defaultPullConsumerOptions()
+ for _, apply := range options {
+ apply(&defaultOpts)
+ }
+
+ srvs, err := internal.NewNamesrv(defaultOpts.NameServerAddrs...)
+ if err != nil {
+ return nil, errors.Wrap(err, "new Namesrv failed.")
+ }
+ internal.RegisterNamsrv(srvs)
+
+ dc := &defaultConsumer{
+ consumerGroup: defaultOpts.GroupName,
+ cType: _PushConsume,
Review comment:
done
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services