This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/native by this push:
new e673b0e [ISSUE #105] Add PullConsumer. resolve #105 (#106)
e673b0e is described below
commit e673b0ef82247ddc9105b6c2d9889163b300a226
Author: xujianhai666 <[email protected]>
AuthorDate: Wed Jul 10 14:58:13 2019 +0800
[ISSUE #105] Add PullConsumer. resolve #105 (#106)
* add PullConsumer. resolve #105
* fix code review.
---
api.go | 8 +-
consumer/consumer.go | 166 ++++++++++++++++++++++++++-----
consumer/option.go | 10 +-
consumer/pull_consumer.go | 206 +++++++++++++++++++++++++--------------
consumer/push_consumer.go | 36 +++----
examples/consumer/pull/main.go | 65 ++++++++++++
examples/consumer/simple/main.go | 2 +-
internal/client.go | 9 +-
internal/remote/future.go | 4 +-
internal/remote/remote_client.go | 6 --
utils/errors.go | 10 ++
11 files changed, 388 insertions(+), 134 deletions(-)
diff --git a/api.go b/api.go
index ecd6ab8..eec98bf 100644
--- a/api.go
+++ b/api.go
@@ -54,7 +54,11 @@ type PullConsumer interface {
Shutdown() error
Pull(context.Context, string, consumer.MessageSelector, int)
(*primitive.PullResult, error)
PullFrom(context.Context, *primitive.MessageQueue, int64, int)
(*primitive.PullResult, error)
- // only update in memory
- UpdateOffset(primitive.MessageQueue, int64) error
+ CurrentOffset(*primitive.MessageQueue) (int64, error)
+ UpdateOffset(*primitive.MessageQueue, int64) error
PersistOffset(context.Context) error
}
+
+func NewPullConsumer(opts ...consumer.Option) (PullConsumer, error) {
+ return consumer.NewPullConsumer(opts...)
+}
diff --git a/consumer/consumer.go b/consumer/consumer.go
index 5bd9674..cff84ba 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -18,13 +18,13 @@ limitations under the License.
package consumer
import (
+ "context"
"encoding/json"
"fmt"
"sort"
"strconv"
"strings"
"sync"
- "sync/atomic"
"time"
"github.com/apache/rocketmq-client-go/internal"
@@ -32,6 +32,7 @@ import (
"github.com/apache/rocketmq-client-go/primitive"
"github.com/apache/rocketmq-client-go/rlog"
"github.com/apache/rocketmq-client-go/utils"
+ "github.com/pkg/errors"
"github.com/tidwall/gjson"
)
@@ -64,6 +65,11 @@ const (
_SubAll = "*"
)
+var(
+ ErrCreated = errors.New("consumer group has been created")
+ ErrBrokerNotFound = errors.New("broker can not found")
+)
+
// Message model defines the way how messages are delivered to each consumer
clients.
// </p>
//
@@ -247,11 +253,42 @@ type defaultConsumer struct {
prCh chan PullRequest
}
-func (dc *defaultConsumer) persistConsumerOffset() {
+func (dc *defaultConsumer) start() error {
+
+ if dc.model == Clustering {
+ // set retry topic
+ retryTopic := internal.GetRetryTopic(dc.consumerGroup)
+ sub := buildSubscriptionData(retryTopic, MessageSelector{TAG,
_SubAll})
+ dc.subscriptionDataTable.Store(retryTopic, sub)
+ }
+
+ dc.client = internal.GetOrNewRocketMQClient(dc.option.ClientOptions)
+ if dc.model == Clustering {
+ dc.option.ChangeInstanceNameToPID()
+ dc.storage = NewRemoteOffsetStore(dc.consumerGroup, dc.client)
+ } else {
+ dc.storage = NewLocalFileOffsetStore(dc.consumerGroup,
dc.client.ClientID())
+ }
+
+ dc.client.UpdateTopicRouteInfo()
+ dc.client.Start()
+ dc.state = internal.StateRunning
+
+ return nil
+}
+
+func (dc *defaultConsumer) shutdown() error {
+ dc.state = internal.StateRunning
+ dc.client.Shutdown()
+
+ return nil
+}
+
+
+func (dc *defaultConsumer) persistConsumerOffset() error {
err := dc.makeSureStateOK()
if err != nil {
- rlog.Errorf("consumer state error: %s", err.Error())
- return
+ return err
}
mqs := make([]*primitive.MessageQueue, 0)
dc.processQueueTable.Range(func(key, value interface{}) bool {
@@ -259,6 +296,22 @@ func (dc *defaultConsumer) persistConsumerOffset() {
return true
})
dc.storage.persist(mqs)
+ return nil
+}
+
+func (c *defaultConsumer) updateOffset(queue *primitive.MessageQueue, offset
int64) error {
+ c.storage.update(queue, offset, false)
+ return nil
+}
+
+func (dc *defaultConsumer) subscriptionAutomatically(topic string) {
+ _, exist := dc.subscriptionDataTable.Load(topic)
+ if !exist {
+ s := MessageSelector{
+ Expression: _SubAll,
+ }
+ dc.subscriptionDataTable.Store(topic,
buildSubscriptionData(topic, s))
+ }
}
func (dc *defaultConsumer) updateTopicSubscribeInfo(topic string, mqs
[]*primitive.MessageQueue) {
@@ -671,6 +724,89 @@ func (dc *defaultConsumer) computePullFromWhere(mq
*primitive.MessageQueue) int6
return result
}
+func (dc *defaultConsumer) pullInner(ctx context.Context, queue
*primitive.MessageQueue, data *internal.SubscriptionData,
+ offset int64, numbers int, sysFlag int32, commitOffsetValue int64)
(*primitive.PullResult, error) {
+
+ brokerResult := tryFindBroker(queue)
+ if brokerResult == nil {
+ rlog.Warnf("no broker found for %s", queue.String())
+ return nil, ErrBrokerNotFound
+ }
+
+ if brokerResult.Slave {
+ sysFlag = clearCommitOffsetFlag(sysFlag)
+ }
+
+ if (data.ExpType == string(TAG)) && brokerResult.BrokerVersion <
internal.V4_1_0 {
+ return nil, fmt.Errorf("the broker [%s, %v] does not upgrade to
support for filter message by %v",
+ queue.BrokerName, brokerResult.BrokerVersion,
data.ExpType)
+ }
+
+ pullRequest := &internal.PullMessageRequest{
+ ConsumerGroup: dc.consumerGroup,
+ Topic: queue.Topic,
+ QueueId: int32(queue.QueueId),
+ QueueOffset: offset,
+ MaxMsgNums: int32(numbers),
+ SysFlag: sysFlag,
+ CommitOffset: commitOffsetValue,
+ // TODO: 和java对齐
+ SuspendTimeoutMillis: _BrokerSuspendMaxTime,
+ SubExpression: data.SubString,
+ // TODO: add subversion
+ ExpressionType: string(data.ExpType),
+ }
+
+ if data.ExpType == string(TAG) {
+ pullRequest.SubVersion = 0
+ } else {
+ pullRequest.SubVersion = data.SubVersion
+ }
+
+ // TODO: add computPullFromWhichFilterServer
+
+ return dc.client.PullMessage(context.Background(),
brokerResult.BrokerAddr, pullRequest)
+}
+
+func (dc *defaultConsumer) processPullResult(mq *primitive.MessageQueue,
result *primitive.PullResult, data *internal.SubscriptionData) {
+
+ updatePullFromWhichNode(mq, result.SuggestWhichBrokerId)
+
+ switch result.Status {
+ case primitive.PullFound:
+ result.SetMessageExts(primitive.DecodeMessage(result.GetBody()))
+ msgs := result.GetMessageExts()
+
+ // filter message according to tags
+ msgListFilterAgain := msgs
+ if len(data.Tags) > 0 && data.ClassFilterMode {
+ msgListFilterAgain = make([]*primitive.MessageExt,
len(msgs))
+ for _, msg := range msgs {
+ _, exist := data.Tags[msg.GetTags()]
+ if exist {
+ msgListFilterAgain =
append(msgListFilterAgain, msg)
+ }
+ }
+ }
+
+ // TODO: add filter message hook
+ for _, msg := range msgListFilterAgain {
+ traFlag, _ :=
strconv.ParseBool(msg.Properties[primitive.PropertyTransactionPrepared])
+ if traFlag {
+ msg.TransactionId =
msg.Properties[primitive.PropertyUniqueClientMessageIdKeyIndex]
+ }
+
+ if msg.Properties == nil {
+ msg.Properties = make(map[string]string)
+ }
+ msg.Properties[primitive.PropertyMinOffset] =
strconv.FormatInt(result.MinOffset, 10)
+ msg.Properties[primitive.PropertyMaxOffset] =
strconv.FormatInt(result.MaxOffset, 10)
+ }
+
+ result.SetMessageExts(msgListFilterAgain)
+ }
+}
+
func (dc *defaultConsumer) findConsumerList(topic string) []string {
brokerAddr := internal.FindBrokerAddrByTopic(topic)
if brokerAddr == "" {
@@ -728,6 +864,10 @@ func (dc *defaultConsumer) queryMaxOffset(mq
*primitive.MessageQueue) (int64, er
return strconv.ParseInt(response.ExtFields["offset"], 10, 64)
}
+func (dc *defaultConsumer) queryOffset(mq *primitive.MessageQueue) (int64) {
+ return dc.storage.read(mq, _ReadMemoryThenStore)
+}
+
// SearchOffsetByTimestamp with specific queueId and topic
func (dc *defaultConsumer) searchOffsetByTimestamp(mq *primitive.MessageQueue,
timestamp int64) (int64, error) {
brokerAddr := internal.FindBrokerAddrByName(mq.BrokerName)
@@ -786,24 +926,6 @@ func buildSubscriptionData(topic string, selector
MessageSelector) *internal.Sub
return subData
}
-func getNextQueueOf(topic string) *primitive.MessageQueue {
- queues, err := internal.FetchSubscribeMessageQueues(topic)
- if err != nil && len(queues) > 0 {
- rlog.Error(err.Error())
- return nil
- }
- var index int64
- v, exist := queueCounterTable.Load(topic)
- if !exist {
- index = -1
- queueCounterTable.Store(topic, 0)
- } else {
- index = v.(int64)
- }
-
- return queues[int(atomic.AddInt64(&index, 1))%len(queues)]
-}
-
func buildSysFlag(commitOffset, suspend, subscription, classFilter bool) int32
{
var flag int32 = 0
if commitOffset {
diff --git a/consumer/option.go b/consumer/option.go
index 2a826e8..67bcaec 100644
--- a/consumer/option.go
+++ b/consumer/option.go
@@ -113,6 +113,14 @@ func defaultPushConsumerOptions() consumerOptions {
type Option func(*consumerOptions)
+func defaultPullConsumerOptions() consumerOptions {
+ opts := consumerOptions{
+ ClientOptions: internal.DefaultClientOptions(),
+ }
+ opts.ClientOptions.GroupName = "DEFAULT_CONSUMER"
+ return opts
+}
+
func WithConsumerModel(m MessageModel) Option {
return func(options *consumerOptions) {
options.ConsumerModel = m
@@ -173,4 +181,4 @@ func WithRetry(retries int) Option {
return func(opts *consumerOptions) {
opts.RetryTimes = retries
}
-}
+}
\ No newline at end of file
diff --git a/consumer/pull_consumer.go b/consumer/pull_consumer.go
index c8bcdc5..19e2369 100644
--- a/consumer/pull_consumer.go
+++ b/consumer/pull_consumer.go
@@ -19,19 +19,38 @@ package consumer
import (
"context"
- "errors"
"fmt"
- "strconv"
"sync"
+ "sync/atomic"
"github.com/apache/rocketmq-client-go/internal"
"github.com/apache/rocketmq-client-go/primitive"
+ "github.com/apache/rocketmq-client-go/rlog"
+ "github.com/apache/rocketmq-client-go/utils"
+ "github.com/pkg/errors"
)
type PullConsumer interface {
+ // Start
Start()
+
+ // Shutdown refuse all new pull operation, finish all submitted.
Shutdown()
+
+ // Pull pull message of topic, selector indicate which queue to pull.
Pull(ctx context.Context, topic string, selector MessageSelector,
numbers int) (*primitive.PullResult, error)
+
+ // PullFrom pull messages of queue from the offset to offset + numbers
+ PullFrom(ctx context.Context, queue *primitive.MessageQueue, offset
int64, numbers int) (*primitive.PullResult, error)
+
+ // updateOffset update offset of queue in mem
+ UpdateOffset(queue *primitive.MessageQueue, offset int64) error
+
+ // PersistOffset persist all offset in mem.
+ PersistOffset(ctx context.Context) error
+
+ // CurrentOffset return the current offset of queue in mem.
+ CurrentOffset(queue *primitive.MessageQueue) (int64, error)
}
var (
@@ -39,20 +58,60 @@ var (
)
type defaultPullConsumer struct {
- state internal.ServiceState
+ *defaultConsumer
+
option consumerOptions
client internal.RMQClient
GroupName string
Model MessageModel
UnitMode bool
+
+ interceptor primitive.Interceptor
}
-func (c *defaultPullConsumer) Start() {
+func NewPullConsumer(options ...Option) (*defaultPullConsumer, error) {
+ defaultOpts := defaultPullConsumerOptions()
+ for _, apply := range options {
+ apply(&defaultOpts)
+ }
+
+ srvs, err := internal.NewNamesrv(defaultOpts.NameServerAddrs...)
+ if err != nil {
+ return nil, errors.Wrap(err, "new Namesrv failed.")
+ }
+ internal.RegisterNamsrv(srvs)
+
+ dc := &defaultConsumer{
+ consumerGroup: defaultOpts.GroupName,
+ cType: _PullConsume,
+ state: internal.StateCreateJust,
+ prCh: make(chan PullRequest, 4),
+ model: defaultOpts.ConsumerModel,
+ option: defaultOpts,
+ }
+
+ c := &defaultPullConsumer{
+ defaultConsumer: dc,
+ }
+ return c, nil
+}
+
+func (c *defaultPullConsumer) Start() error {
c.state = internal.StateRunning
+
+ var err error
+ c.once.Do(func() {
+ err = c.start()
+ if err != nil {
+ return
+ }
+ })
+
+ return err
}
func (c *defaultPullConsumer) Pull(ctx context.Context, topic string, selector
MessageSelector, numbers int) (*primitive.PullResult, error) {
- mq := getNextQueueOf(topic)
+ mq := c.getNextQueueOf(topic)
if mq == nil {
return nil, fmt.Errorf("prepard to pull topic: %s, but no queue
is founded", topic)
}
@@ -64,10 +123,28 @@ func (c *defaultPullConsumer) Pull(ctx context.Context,
topic string, selector M
return nil, err
}
- processPullResult(mq, result, data)
+ c.processPullResult(mq, result, data)
return result, nil
}
+func (c *defaultPullConsumer) getNextQueueOf(topic string)
*primitive.MessageQueue {
+ queues, err := internal.FetchSubscribeMessageQueues(topic)
+ if err != nil && len(queues) > 0 {
+ rlog.Error(err.Error())
+ return nil
+ }
+ var index int64
+ v, exist := queueCounterTable.Load(topic)
+ if !exist {
+ index = -1
+ queueCounterTable.Store(topic, 0)
+ } else {
+ index = v.(int64)
+ }
+
+ return queues[int(atomic.AddInt64(&index, 1))%len(queues)]
+}
+
// SubscribeWithChan ack manually
func (c *defaultPullConsumer) SubscribeWithChan(topic, selector
MessageSelector) (chan *primitive.Message, error) {
return nil, nil
@@ -83,62 +160,46 @@ func (c *defaultPullConsumer) ACK(msg *primitive.Message,
result ConsumeResult)
}
-func (c *defaultPullConsumer) pull(ctx context.Context, mq
*primitive.MessageQueue, data *internal.SubscriptionData,
- offset int64, numbers int) (*primitive.PullResult, error) {
+func (c *defaultConsumer) checkPull(ctx context.Context, mq
*primitive.MessageQueue, offset int64, numbers int) error {
err := c.makeSureStateOK()
if err != nil {
- return nil, err
+ return err
}
if mq == nil {
- return nil, errors.New("MessageQueue is nil")
+ return utils.ErrMQEmpty
}
if offset < 0 {
- return nil, errors.New("offset < 0")
+ return utils.ErrOffset
}
if numbers <= 0 {
- numbers = 1
+ return utils.ErrNumbers
}
- c.subscriptionAutomatically(mq.Topic)
+ return nil
+}
- brokerResult := tryFindBroker(mq)
- if brokerResult == nil {
- return nil, fmt.Errorf("the broker %s does not exist",
mq.BrokerName)
- }
+// TODO: add timeout limit
+// TODO: add hook
+func (c *defaultPullConsumer) pull(ctx context.Context, mq
*primitive.MessageQueue, data *internal.SubscriptionData,
+ offset int64, numbers int) (*primitive.PullResult, error) {
- if (data.ExpType == string(TAG)) && brokerResult.BrokerVersion <
internal.V4_1_0 {
- return nil, fmt.Errorf("the broker [%s, %v] does not upgrade to
support for filter message by %v",
- mq.BrokerName, brokerResult.BrokerVersion, data.ExpType)
+ if err := c.checkPull(ctx, mq, offset, numbers); err != nil {
+ return nil, err
}
- sysFlag := buildSysFlag(false, true, true, false)
+ c.subscriptionAutomatically(mq.Topic)
- if brokerResult.Slave {
- sysFlag = clearCommitOffsetFlag(sysFlag)
- }
- pullRequest := &internal.PullMessageRequest{
- ConsumerGroup: c.GroupName,
- Topic: mq.Topic,
- QueueId: int32(mq.QueueId),
- QueueOffset: offset,
- MaxMsgNums: int32(numbers),
- SysFlag: sysFlag,
- CommitOffset: 0,
- SuspendTimeoutMillis: _BrokerSuspendMaxTime,
- SubExpression: data.SubString,
- ExpressionType: string(data.ExpType),
- }
+ sysFlag := buildSysFlag(false, true, true, false)
- if data.ExpType == string(TAG) {
- pullRequest.SubVersion = 0
- } else {
- pullRequest.SubVersion = data.SubVersion
+ pullResp, err := c.pullInner(ctx, mq, data, offset, numbers, sysFlag, 0)
+ if err != nil {
+ return nil, err
}
+ c.processPullResult(mq, pullResp, data)
- // TODO computePullFromWhichFilterServer
- return c.client.PullMessage(ctx, brokerResult.BrokerAddr, pullRequest)
+ return pullResp, err
}
func (c *defaultPullConsumer) makeSureStateOK() error {
@@ -148,42 +209,39 @@ func (c *defaultPullConsumer) makeSureStateOK() error {
return nil
}
-func (c *defaultPullConsumer) subscriptionAutomatically(topic string) {
- // TODO
+func (c *defaultPullConsumer) nextOffsetOf(queue *primitive.MessageQueue)
int64 {
+ return c.computePullFromWhere(queue)
}
-func (c *defaultPullConsumer) nextOffsetOf(queue *primitive.MessageQueue)
int64 {
- return 0
-}
-
-func processPullResult(mq *primitive.MessageQueue, result
*primitive.PullResult, data *internal.SubscriptionData) {
- updatePullFromWhichNode(mq, result.SuggestWhichBrokerId)
- switch result.Status {
- case primitive.PullFound:
- msgs := result.GetMessageExts()
- msgListFilterAgain := msgs
- if len(data.Tags) > 0 && data.ClassFilterMode {
- msgListFilterAgain = make([]*primitive.MessageExt,
len(msgs))
- for _, msg := range msgs {
- _, exist := data.Tags[msg.GetTags()]
- if exist {
- msgListFilterAgain =
append(msgListFilterAgain, msg)
- }
- }
- }
+// PullFrom pull messages of queue from the offset to offset + numbers
+func (c *defaultPullConsumer) PullFrom(ctx context.Context, queue
*primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult,
error) {
+ if err := c.checkPull(ctx, queue, offset, numbers); err != nil {
+ return nil, err
+ }
+
+ selector := MessageSelector{}
+ data := buildSubscriptionData(queue.Topic, selector)
- // TODO hook
+ return c.pull(ctx, queue, data, offset, numbers)
+}
- for _, msg := range msgListFilterAgain {
- traFlag, _ :=
strconv.ParseBool(msg.Properties[primitive.PropertyTransactionPrepared])
- if traFlag {
- msg.TransactionId =
msg.Properties[primitive.PropertyUniqueClientMessageIdKeyIndex]
- }
+// updateOffset update offset of queue in mem
+func (c *defaultPullConsumer) UpdateOffset(queue *primitive.MessageQueue,
offset int64) error {
+ return c.updateOffset(queue, offset)
+}
- msg.Properties[primitive.PropertyMinOffset] =
strconv.FormatInt(result.MinOffset, 10)
- msg.Properties[primitive.PropertyMaxOffset] =
strconv.FormatInt(result.MaxOffset, 10)
- }
+// PersistOffset persist all offset in mem.
+func (c *defaultPullConsumer) PersistOffset(ctx context.Context) error {
+ return c.persistConsumerOffset()
+}
- result.SetMessageExts(msgListFilterAgain)
- }
+// CurrentOffset return the current offset of queue in mem.
+func (c *defaultPullConsumer) CurrentOffset(queue *primitive.MessageQueue)
(int64, error) {
+ v := c.queryOffset(queue)
+ return v, nil
+}
+
+// Shutdown close defaultConsumer, refuse new request.
+func (c *defaultPullConsumer) Shutdown() error {
+ return c.defaultConsumer.shutdown()
}
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index c5641a5..9000651 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -126,20 +126,18 @@ func (pc *pushConsumer) Start() error {
pc.state = internal.StateStartFailed
pc.validate()
- if pc.model == Clustering {
- // set retry topic
- retryTopic := internal.GetRetryTopic(pc.consumerGroup)
- pc.subscriptionDataTable.Store(retryTopic,
buildSubscriptionData(retryTopic,
- MessageSelector{TAG, _SubAll}))
+ err = pc.defaultConsumer.start()
+ if err != nil {
+ return
}
- pc.client =
internal.GetOrNewRocketMQClient(pc.option.ClientOptions)
- if pc.model == Clustering {
- pc.option.ChangeInstanceNameToPID()
- pc.storage = NewRemoteOffsetStore(pc.consumerGroup,
pc.client)
- } else {
- pc.storage = NewLocalFileOffsetStore(pc.consumerGroup,
pc.client.ClientID())
+ err := pc.client.RegisterConsumer(pc.consumerGroup, pc)
+ if err != nil {
+ pc.state = internal.StateStartFailed
+ rlog.Errorf("the consumer group: [%s] has been created,
specify another name.", pc.consumerGroup)
+ err = ErrCreated
}
+
go func() {
// todo start clean msg expired
// TODO quit
@@ -151,16 +149,6 @@ func (pc *pushConsumer) Start() error {
}
}()
- err = pc.client.RegisterConsumer(pc.consumerGroup, pc)
- if err != nil {
- pc.state = internal.StateCreateJust
- rlog.Errorf("the consumer group: [%s] has been created,
specify another name.", pc.consumerGroup)
- err = errors.New("consumer group has been created")
- return
- }
- pc.client.UpdateTopicRouteInfo()
- pc.client.Start()
- pc.state = internal.StateRunning
})
pc.client.UpdateTopicRouteInfo()
@@ -201,8 +189,8 @@ func (pc *pushConsumer) Rebalance() {
pc.defaultConsumer.doBalance()
}
-func (pc *pushConsumer) PersistConsumerOffset() {
- pc.defaultConsumer.persistConsumerOffset()
+func (pc *pushConsumer) PersistConsumerOffset() error {
+ return pc.defaultConsumer.persistConsumerOffset()
}
func (pc *pushConsumer) UpdateTopicSubscribeInfo(topic string, mqs
[]*primitive.MessageQueue) {
@@ -651,7 +639,7 @@ func (pc *pushConsumer) consumeMessageCurrently(pq
*processQueue, mq *primitive.
msgs := req.([]*primitive.MessageExt)
r, e := pc.consume(ctx, msgs...)
- realReply := reply.(ConsumeResultHolder)
+ realReply :=
reply.(*ConsumeResultHolder)
realReply.ConsumeResult = r
return e
})
diff --git a/examples/consumer/pull/main.go b/examples/consumer/pull/main.go
new file mode 100644
index 0000000..4076450
--- /dev/null
+++ b/examples/consumer/pull/main.go
@@ -0,0 +1,65 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package main
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "github.com/apache/rocketmq-client-go/consumer"
+ "github.com/apache/rocketmq-client-go/primitive"
+ "github.com/apache/rocketmq-client-go/rlog"
+ "github.com/apache/rocketmq-client-go/utils"
+)
+
+func main() {
+ c, err := consumer.NewPullConsumer(consumer.WithGroupName("testGroup"),
consumer.WithNameServer([]string{"127.0.0.1:9876"}))
+ if err != nil{
+ rlog.Fatal("fail to new pullConsumer: ", err)
+ }
+ c.Start()
+
+ ctx := context.Background()
+ queue := &primitive.MessageQueue{
+ Topic: "TopicTest",
+ BrokerName: "", // replace with your broker name. otherwise,
pull will failed.
+ QueueId: 0,
+ }
+
+ offset := int64(0)
+ for {
+ resp, err := c.PullFrom(ctx, queue, offset, 10)
+ if err != nil {
+ if err == utils.ErrRequestTimeout {
+ fmt.Printf("timeout \n")
+ time.Sleep(1 *time.Second)
+ continue
+ }
+ fmt.Printf("unexpectable err: %v \n", err)
+ return
+ }
+ if resp.Status == primitive.PullFound {
+ fmt.Printf("pull message success. nextOffset: %d \n",
resp.NextBeginOffset)
+ for _, msg := range resp.GetMessageExts() {
+ fmt.Printf("pull msg: %v \n", msg)
+ }
+ }
+ offset = resp.NextBeginOffset
+ }
+}
diff --git a/examples/consumer/simple/main.go b/examples/consumer/simple/main.go
index 43947f9..2ddc465 100644
--- a/examples/consumer/simple/main.go
+++ b/examples/consumer/simple/main.go
@@ -33,7 +33,7 @@ func main() {
consumer.WithGroupName("testGroup"),
consumer.WithNameServer([]string{"127.0.0.1:9876"}),
)
- err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx
context.Context,
+ err := c.Subscribe("TopicTest", consumer.MessageSelector{}, func(ctx
context.Context,
msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
fmt.Printf("subscribe callback: %v \n", msgs)
return consumer.ConsumeSuccess, nil
diff --git a/internal/client.go b/internal/client.go
index 1db43c7..09b7ca3 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -76,7 +76,7 @@ type InnerProducer interface {
}
type InnerConsumer interface {
- PersistConsumerOffset()
+ PersistConsumerOffset() error
UpdateTopicSubscribeInfo(topic string, mqs []*primitive.MessageQueue)
IsSubscribeTopicNeedUpdate(topic string) bool
SubscriptionDataList() []*SubscriptionData
@@ -217,7 +217,10 @@ func (c *rmqClient) Start() {
for !c.close {
c.consumerMap.Range(func(key, value
interface{}) bool {
consumer := value.(InnerConsumer)
- consumer.PersistConsumerOffset()
+ err := consumer.PersistConsumerOffset()
+ if err != nil {
+ rlog.Errorf("persist offset
failed. err: %v", err)
+ }
return true
})
time.Sleep(_PersistOffset)
@@ -305,7 +308,7 @@ func (c *rmqClient) SendHeartbeatToAllBrokerWithLock() {
hbData.ProducerDatas = pData
hbData.ConsumerDatas = cData
if len(pData) == 0 && len(cData) == 0 {
- rlog.Info("sending heartbeat, but no consumer and no consumer")
+ rlog.Info("sending heartbeat, but no producer and no consumer")
return
}
brokerAddressesMap.Range(func(key, value interface{}) bool {
diff --git a/internal/remote/future.go b/internal/remote/future.go
index 93990e5..5a1c724 100644
--- a/internal/remote/future.go
+++ b/internal/remote/future.go
@@ -20,6 +20,8 @@ package remote
import (
"sync"
"time"
+
+ "github.com/apache/rocketmq-client-go/utils"
)
// ResponseFuture
@@ -71,7 +73,7 @@ func (r *ResponseFuture) waitResponse() (*RemotingCommand,
error) {
cmd, err = r.ResponseCommand, r.Err
goto done
case <-timer.C:
- err = ErrRequestTimeout
+ err = utils.ErrRequestTimeout
r.Err = err
goto done
}
diff --git a/internal/remote/remote_client.go b/internal/remote/remote_client.go
index 1d0330a..8e32216 100644
--- a/internal/remote/remote_client.go
+++ b/internal/remote/remote_client.go
@@ -20,7 +20,6 @@ import (
"bufio"
"bytes"
"encoding/binary"
- "errors"
"io"
"net"
"sync"
@@ -29,11 +28,6 @@ import (
"github.com/apache/rocketmq-client-go/rlog"
)
-var (
- //ErrRequestTimeout for request timeout error
- ErrRequestTimeout = errors.New("request timeout")
-)
-
type ClientRequestFunc func(*RemotingCommand) *RemotingCommand
type TcpOption struct {
diff --git a/utils/errors.go b/utils/errors.go
index a3c7ead..507d7bb 100644
--- a/utils/errors.go
+++ b/utils/errors.go
@@ -19,6 +19,16 @@ package utils
import (
"github.com/apache/rocketmq-client-go/rlog"
+ "github.com/pkg/errors"
+)
+
+var(
+ // ErrRequestTimeout for request timeout error
+ ErrRequestTimeout = errors.New("request timeout")
+
+ ErrMQEmpty = errors.New("MessageQueue is nil")
+ ErrOffset = errors.New("offset < 0")
+ ErrNumbers = errors.New("numbers < 0")
)
func CheckError(action string, err error) {