This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/native by this push:
new 52f5a0c [ISSUE #118]add messaage sendback. resolve #118 (#119)
52f5a0c is described below
commit 52f5a0c54884d73a5e3f0e4516b98b18309b5238
Author: xujianhai666 <[email protected]>
AuthorDate: Thu Jul 18 11:11:39 2019 +0800
[ISSUE #118]add messaage sendback. resolve #118 (#119)
#118
---
consumer/consumer.go | 3 -
consumer/option.go | 8 +++
consumer/process_queue.go | 10 ++++
consumer/push_consumer.go | 95 +++++++++++++++++++++++-------
examples/consumer/retry/concurrent/main.go | 73 +++++++++++++++++++++++
examples/consumer/retry/order/main.go | 72 ++++++++++++++++++++++
internal/client.go | 8 +--
internal/constants.go | 5 +-
internal/request.go | 24 ++++++++
internal/route.go | 39 ++++++++++++
primitive/ctx.go | 27 ++++++++-
11 files changed, 329 insertions(+), 35 deletions(-)
diff --git a/consumer/consumer.go b/consumer/consumer.go
index 7f80802..25861f9 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -352,9 +352,6 @@ func (dc *defaultConsumer) isSubscribeTopicNeedUpdate(topic
string) bool {
func (dc *defaultConsumer) doBalance() {
dc.subscriptionDataTable.Range(func(key, value interface{}) bool {
topic := key.(string)
- if strings.HasPrefix(topic, internal.RetryGroupTopicPrefix) {
- return true
- }
v, exist := dc.topicSubscribeInfoTable.Load(topic)
if !exist {
rlog.Warnf("do balance of group: %s, but topic: %s does
not exist.", dc.consumerGroup, topic)
diff --git a/consumer/option.go b/consumer/option.go
index f967e1e..dafff61 100644
--- a/consumer/option.go
+++ b/consumer/option.go
@@ -194,3 +194,11 @@ func WithCredentials(c primitive.Credentials) Option {
options.ClientOptions.Credentials = c
}
}
+
+// WithMaxReconsumeTimes set MaxReconsumeTimes of options, if message
reconsume greater than MaxReconsumeTimes, it will
+// be sent to retry or dlq topic. more info reference by
examples/consumer/retry.
+func WithMaxReconsumeTimes(times int32) Option {
+ return func(opts *consumerOptions) {
+ opts.MaxReconsumeTimes = times
+ }
+}
diff --git a/consumer/process_queue.go b/consumer/process_queue.go
index 71b52bf..869ea5d 100644
--- a/consumer/process_queue.go
+++ b/consumer/process_queue.go
@@ -110,6 +110,16 @@ func (pq *processQueue) putMessage(messages
...*primitive.MessageExt) {
}
}
+func (pq *processQueue) makeMessageToCosumeAgain(messages
...*primitive.MessageExt) {
+ pq.mutex.Lock()
+ for _, msg := range messages {
+ pq.consumingMsgOrderlyTreeMap.Remove(msg.QueueOffset)
+ pq.msgCache.Put(msg.QueueOffset, msg)
+ }
+
+ pq.mutex.Unlock()
+}
+
func (pq *processQueue) removeMessage(messages ...*primitive.MessageExt) int64
{
result := int64(-1)
pq.mutex.Lock()
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 1b8c7dc..3b011c3 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -25,6 +25,7 @@ import (
"time"
"github.com/apache/rocketmq-client-go/internal"
+ "github.com/apache/rocketmq-client-go/internal/remote"
"github.com/apache/rocketmq-client-go/primitive"
"github.com/apache/rocketmq-client-go/rlog"
"github.com/pkg/errors"
@@ -50,10 +51,8 @@ type pushConsumer struct {
consume func(context.Context,
...*primitive.MessageExt) (ConsumeResult, error)
submitToConsume func(*processQueue,
*primitive.MessageQueue)
subscribedTopic map[string]string
-
- interceptor primitive.Interceptor
-
- queueLock *QueueLock
+ interceptor primitive.Interceptor
+ queueLock *QueueLock
}
func NewPushConsumer(opts ...Option) (*pushConsumer, error) {
@@ -191,6 +190,15 @@ func (pc *pushConsumer) Subscribe(topic string, selector
MessageSelector,
data := buildSubscriptionData(topic, selector)
pc.subscriptionDataTable.Store(topic, data)
pc.subscribedTopic[topic] = ""
+
+ if pc.option.ConsumerModel == Clustering {
+ // add retry topic for clustering mode
+ retryTopic := internal.GetRetryTopic(pc.consumerGroup)
+ data = buildSubscriptionData(retryTopic,
MessageSelector{Expression: _SubAll})
+ pc.subscriptionDataTable.Store(retryTopic, data)
+ pc.subscribedTopic[retryTopic] = ""
+ }
+
pc.consume = f
return nil
}
@@ -512,10 +520,33 @@ func (pc *pushConsumer) correctTagsOffset(pr
*PullRequest) {
// TODO
}
-func (pc *pushConsumer) sendMessageBack(ctx *primitive.ConsumeMessageContext,
msg *primitive.MessageExt) bool {
+func (pc *pushConsumer) sendMessageBack(brokerName string, msg
*primitive.MessageExt, delayLevel int) bool {
+ var brokerAddr string
+ if len(brokerName) != 0 {
+ brokerAddr = internal.FindBrokerAddrByName(brokerName)
+ } else {
+ brokerAddr = msg.StoreHost
+ }
+ _, err := pc.client.InvokeSync(brokerAddr, pc.buildSendBackRequest(msg,
delayLevel), 3*time.Second)
+ if err != nil {
+ return false
+ }
return true
}
+func (pc *pushConsumer) buildSendBackRequest(msg *primitive.MessageExt,
delayLevel int) *remote.RemotingCommand {
+ req := &internal.ConsumerSendMsgBackRequest{
+ Group: pc.consumerGroup,
+ OriginTopic: msg.Topic,
+ Offset: msg.CommitLogOffset,
+ DelayLevel: delayLevel,
+ OriginMsgId: msg.MsgId,
+ MaxReconsumeTimes: pc.getMaxReconsumeTimes(),
+ }
+
+ return remote.NewRemotingCommand(internal.ReqConsumerSendMsgBack, req,
msg.Body)
+}
+
func (pc *pushConsumer) suspend() {
pc.pause = true
rlog.Infof("suspend consumer: %s", pc.consumerGroup)
@@ -615,6 +646,23 @@ func (pc *pushConsumer) consumeInner(ctx context.Context,
subMsgs []*primitive.M
}
}
+// resetRetryAndNamespace modify retry message.
+func (pc *pushConsumer) resetRetryAndNamespace(subMsgs
[]*primitive.MessageExt) {
+ groupTopic := internal.RetryGroupTopicPrefix + pc.consumerGroup
+ beginTime := time.Now()
+ for idx := range subMsgs {
+ msg := subMsgs[idx]
+ if msg.Properties != nil {
+ retryTopic :=
msg.Properties[primitive.PropertyRetryTopic]
+ if retryTopic == "" && groupTopic == msg.Topic {
+ msg.Topic = retryTopic
+ }
+
subMsgs[idx].Properties[primitive.PropertyConsumeStartTime] = strconv.FormatInt(
+ beginTime.UnixNano()/int64(time.Millisecond),
10)
+ }
+ }
+}
+
func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq
*primitive.MessageQueue) {
msgs := pq.getMessages()
if msgs == nil {
@@ -640,18 +688,7 @@ func (pc *pushConsumer) consumeMessageCurrently(pq
*processQueue, mq *primitive.
// TODO hook
beginTime := time.Now()
- groupTopic := internal.RetryGroupTopicPrefix +
pc.consumerGroup
- for idx := range subMsgs {
- msg := subMsgs[idx]
- if msg.Properties != nil {
- retryTopic :=
msg.Properties[primitive.PropertyRetryTopic]
- if retryTopic == "" && groupTopic ==
msg.Topic {
- msg.Topic = retryTopic
- }
-
subMsgs[idx].Properties[primitive.PropertyConsumeStartTime] = strconv.FormatInt(
-
beginTime.UnixNano()/int64(time.Millisecond), 10)
- }
- }
+ pc.resetRetryAndNamespace(subMsgs)
var result ConsumeResult
var err error
@@ -661,6 +698,9 @@ func (pc *pushConsumer) consumeMessageCurrently(pq
*processQueue, mq *primitive.
ctx := context.Background()
ctx = primitive.WithConsumerCtx(ctx, msgCtx)
ctx = primitive.WithMethod(ctx, primitive.ConsumerPush)
+ concurrentCtx :=
primitive.NewConsumeConcurrentlyContext()
+ concurrentCtx.MQ = *mq
+ ctx = primitive.WithConcurrentlyCtx(ctx, concurrentCtx)
result, err = pc.consumeInner(ctx, subMsgs)
@@ -691,7 +731,7 @@ func (pc *pushConsumer) consumeMessageCurrently(pq
*processQueue, mq *primitive.
} else {
for i := 0; i < len(msgs); i++ {
msg := msgs[i]
- if
!pc.sendMessageBack(msgCtx, msg) {
+ if
!pc.sendMessageBack(mq.BrokerName, msg,
concurrentCtx.DelayLevelWhenNextConsume) {
msg.ReconsumeTimes += 1
msgBackFailed =
append(msgBackFailed, msg)
}
@@ -755,6 +795,8 @@ func (pc *pushConsumer) consumeMessageOrderly(pq
*processQueue, mq *primitive.Me
batchSize := pc.option.ConsumeMessageBatchMaxSize
msgs := pq.takeMessages(batchSize)
+ pc.resetRetryAndNamespace(msgs)
+
if len(msgs) == 0 {
continueConsume = false
break
@@ -804,6 +846,7 @@ func (pc *pushConsumer) consumeMessageOrderly(pq
*processQueue, mq *primitive.Me
commitOffset = pq.commit()
case SuspendCurrentQueueAMoment:
if (pc.checkReconsumeTimes(msgs)) {
+ pq.putMessage(msgs...)
time.Sleep(time.Duration(orderlyCtx.SuspendCurrentQueueTimeMillis) *
time.Millisecond)
continueConsume = false;
} else {
@@ -843,12 +886,12 @@ func (pc *pushConsumer) consumeMessageOrderly(pq
*processQueue, mq *primitive.Me
func (pc *pushConsumer) checkReconsumeTimes(msgs []*primitive.MessageExt) bool
{
suspend := false
if len(msgs) != 0 {
- maxReconsumeTimes := pc.getMaxReconsumeTimes()
+ maxReconsumeTimes := pc.getOrderlyMaxReconsumeTimes()
for _, msg := range msgs {
if msg.ReconsumeTimes > maxReconsumeTimes {
+ rlog.Warn("msg will be send to retry topic due
to ReconsumeTimes > %d, \n", maxReconsumeTimes)
msg.Properties["RECONSUME_TIME"] =
strconv.Itoa(int(msg.ReconsumeTimes))
- if !pc.sendMessageBack(nil, msg) {
- // TODO: complete sendMessageBack
+ if !pc.sendMessageBack("", msg, -1) {
suspend = true
msg.ReconsumeTimes += 1
}
@@ -861,7 +904,7 @@ func (pc *pushConsumer) checkReconsumeTimes(msgs
[]*primitive.MessageExt) bool {
return suspend
}
-func (pc *pushConsumer) getMaxReconsumeTimes() int32 {
+func (pc *pushConsumer) getOrderlyMaxReconsumeTimes() int32 {
if pc.option.MaxReconsumeTimes == -1 {
return math.MaxInt32
} else {
@@ -869,6 +912,14 @@ func (pc *pushConsumer) getMaxReconsumeTimes() int32 {
}
}
+func (pc *pushConsumer) getMaxReconsumeTimes() int32 {
+ if pc.option.MaxReconsumeTimes == -1 {
+ return 16
+ } else {
+ return pc.option.MaxReconsumeTimes
+ }
+}
+
func (pc *pushConsumer) tryLocakLaterAndReconsume(mq *primitive.MessageQueue,
delay int64) {
time.Sleep(time.Duration(delay) * time.Millisecond)
if pc.lock(mq) == true {
diff --git a/examples/consumer/retry/concurrent/main.go
b/examples/consumer/retry/concurrent/main.go
new file mode 100644
index 0000000..8a21f83
--- /dev/null
+++ b/examples/consumer/retry/concurrent/main.go
@@ -0,0 +1,73 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+/**
+ * use concurrent consumer model, when Subscribe function return
consumer.ConsumeRetryLater, the message will be
+ * send to rocketmq retry topic. we could set DelayLevelWhenNextConsume in
ConsumeConcurrentlyContext, which used to
+ * indicate the delay of message re-send to origin topic from retry topic.
+ * in this example, we always set DelayLevelWhenNextConsume=1, means that the
message will be sent to origin topic after
+ * 1s. in case of the unlimit retry, we will return consumer.ConsumeSuccess
after ReconsumeTimes > 5
+ */
+package main
+
+import (
+ "context"
+ "fmt"
+ "os"
+ "time"
+
+ "github.com/apache/rocketmq-client-go"
+ "github.com/apache/rocketmq-client-go/consumer"
+ "github.com/apache/rocketmq-client-go/primitive"
+)
+
+func main() {
+ c, _ := rocketmq.NewPushConsumer(
+ consumer.WithGroupName("testGroup"),
+ consumer.WithNameServer([]string{"127.0.0.1:9876"}),
+ consumer.WithConsumerModel(consumer.Clustering),
+ )
+
+ retryLevel := 1 // mean 10s later before consume
+ err := c.Subscribe("TopicTest", consumer.MessageSelector{}, func(ctx
context.Context,
+ msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
+ fmt.Printf("subscribe callback len: %d \n", len(msgs))
+
+ concurrentCtx, _ := primitive.GetConcurrentlyCtx(ctx)
+ concurrentCtx.DelayLevelWhenNextConsume = retryLevel // only
run when return consumer.ConsumeRetryLater
+
+ for _, msg := range msgs {
+ if msg.ReconsumeTimes > 5 {
+ fmt.Printf("msg ReconsumeTimes > 5. msg: %v",
msg)
+ return consumer.ConsumeSuccess, nil
+ } else {
+ fmt.Printf("subscribe callback: %v \n", msg)
+ }
+ }
+ return consumer.ConsumeRetryLater, nil
+ })
+ if err != nil {
+ fmt.Println(err.Error())
+ }
+ // Note: start after subscribe
+ err = c.Start()
+ if err != nil {
+ fmt.Println(err.Error())
+ os.Exit(-1)
+ }
+ time.Sleep(time.Hour)
+}
diff --git a/examples/consumer/retry/order/main.go
b/examples/consumer/retry/order/main.go
new file mode 100644
index 0000000..f74ef68
--- /dev/null
+++ b/examples/consumer/retry/order/main.go
@@ -0,0 +1,72 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+/**
+ * use orderly consumer model, when Subscribe function return
consumer.SuspendCurrentQueueAMoment, it will be re-send to
+ * local msg queue for later consume if msg.ReconsumeTimes <
MaxReconsumeTimes, otherwise, it will be send to rocketmq
+ * DLQ topic, we should manually resolve the msg.
+ */
+package main
+
+import (
+ "context"
+ "fmt"
+ "os"
+ "time"
+
+ "github.com/apache/rocketmq-client-go"
+ "github.com/apache/rocketmq-client-go/consumer"
+ "github.com/apache/rocketmq-client-go/primitive"
+)
+
+func main() {
+ c, _ := rocketmq.NewPushConsumer(
+ consumer.WithGroupName("testGroup"),
+ consumer.WithNameServer([]string{"127.0.0.1:9876"}),
+ consumer.WithConsumerModel(consumer.Clustering),
+ consumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset),
+ consumer.WithConsumerOrder(true),
+ consumer.WithMaxReconsumeTimes(5),
+ )
+
+ err := c.Subscribe("TopicTest", consumer.MessageSelector{}, func(ctx
context.Context,
+ msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
+ orderlyCtx, _ := primitive.GetOrderlyCtx(ctx)
+ fmt.Printf("orderly context: %v\n", orderlyCtx)
+ fmt.Printf("subscribe orderly callback len: %d \n", len(msgs))
+
+ for _, msg := range msgs {
+ if msg.ReconsumeTimes > 5 {
+ fmt.Printf("msg ReconsumeTimes > 5. msg: %v",
msg)
+ } else {
+ fmt.Printf("subscribe orderly callback: %v \n",
msg)
+ }
+ }
+ return consumer.SuspendCurrentQueueAMoment, nil
+
+ })
+ if err != nil {
+ fmt.Println(err.Error())
+ }
+ // Note: start after subscribe
+ err = c.Start()
+ if err != nil {
+ fmt.Println(err.Error())
+ os.Exit(-1)
+ }
+ time.Sleep(time.Hour)
+}
diff --git a/internal/client.go b/internal/client.go
index 34870b3..ff4157d 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -136,10 +136,6 @@ type RMQClient interface {
CheckClientInBroker()
SendHeartbeatToAllBrokerWithLock()
UpdateTopicRouteInfo()
- SendMessageAsync(ctx context.Context, brokerAddrs, brokerName string,
request *SendMessageRequest,
- msgs []*primitive.Message, f func(result
*primitive.SendResult)) error
- SendMessageOneWay(ctx context.Context, brokerAddrs string, request
*SendMessageRequest,
- msgs []*primitive.Message) (*primitive.SendResult, error)
ProcessSendResponse(brokerName string, cmd *remote.RemotingCommand,
resp *primitive.SendResult, msgs ...*primitive.Message) error
@@ -361,9 +357,7 @@ func (c *rmqClient) UpdateTopicRouteInfo() {
consumer := value.(InnerConsumer)
list := consumer.SubscriptionDataList()
for idx := range list {
- if !strings.HasPrefix(list[idx].Topic,
RetryGroupTopicPrefix) {
- subscribedTopicSet[list[idx].Topic] = true
- }
+ subscribedTopicSet[list[idx].Topic] = true
}
return true
})
diff --git a/internal/constants.go b/internal/constants.go
index 5711750..a234c18 100644
--- a/internal/constants.go
+++ b/internal/constants.go
@@ -18,8 +18,9 @@ limitations under the License.
package internal
const (
- RetryGroupTopicPrefix = "%RETRY%"
- DefaultConsumerGroup = "DEFAULT_CONSUMER"
+ RetryGroupTopicPrefix = "%RETRY%"
+ DefaultConsumerGroup = "DEFAULT_CONSUMER"
+ ClientInnerProducerGroup = "CLIENT_INNER_PRODUCER"
)
func GetRetryTopic(group string) string {
diff --git a/internal/request.go b/internal/request.go
index 85f3519..e09a986 100644
--- a/internal/request.go
+++ b/internal/request.go
@@ -31,6 +31,7 @@ const (
ReqSearchOffsetByTimestamp = int16(30)
ReqGetMaxOffset = int16(30)
ReqHeartBeat = int16(34)
+ ReqConsumerSendMsgBack = int16(36)
ReqGetConsumerListByGroup = int16(38)
ReqLockBatchMQ = int16(41)
ReqUnlockBatchMQ = int16(42)
@@ -80,6 +81,29 @@ func (request *SendMessageRequest) Decode(properties
map[string]string) error {
return nil
}
+type ConsumerSendMsgBackRequest struct {
+ Group string `json:"group"`
+ Offset int64 `json:"offset"`
+ DelayLevel int `json:"delayLevel"`
+ OriginMsgId string `json:"originMsgId"`
+ OriginTopic string `json:"originTopic"`
+ UnitMode bool `json:"unitMode"`
+ MaxReconsumeTimes int32 `json:"maxReconsumeTimes"`
+}
+
+func (request *ConsumerSendMsgBackRequest) Encode() map[string]string {
+ maps := make(map[string]string)
+ maps["group"] = request.Group
+ maps["offset"] = strconv.FormatInt(request.Offset, 10)
+ maps["delayLevel"] = strconv.Itoa(request.DelayLevel)
+ maps["originMsgId"] = request.OriginMsgId
+ maps["originTopic"] = request.OriginTopic
+ maps["unitMode"] = strconv.FormatBool(request.UnitMode)
+ maps["maxReconsumeTimes"] = strconv.Itoa(int(request.MaxReconsumeTimes))
+
+ return maps
+}
+
type PullMessageRequest struct {
ConsumerGroup string `json:"consumerGroup"`
Topic string `json:"topic"`
diff --git a/internal/route.go b/internal/route.go
index 94fd5c0..8b1e713 100644
--- a/internal/route.go
+++ b/internal/route.go
@@ -65,6 +65,7 @@ var (
//subscribeInfoMap sync.Map
routeDataMap sync.Map
lockNamesrv sync.Mutex
+
)
func cleanOfflineBroker() {
@@ -254,7 +255,45 @@ func FetchSubscribeMessageQueues(topic string)
([]*primitive.MessageQueue, error
return mqs, nil
}
+func FindMQByTopic(topic string) *primitive.MessageQueue {
+ mqs, err := FetchPublishMessageQueues(topic)
+ if err != nil {
+ return nil
+ }
+ r := rand.New(rand.NewSource(time.Now().UnixNano()))
+ i := utils.AbsInt(r.Int())
+ return mqs[i%len(mqs)]
+}
+
+func FetchPublishMessageQueues(topic string) ([]*primitive.MessageQueue,
error) {
+ var (
+ err error
+ routeData *TopicRouteData
+ )
+
+ v, exist := routeDataMap.Load(topic)
+ if !exist {
+ routeData, err = queryTopicRouteInfoFromServer(topic)
+ if err != nil {
+ rlog.Error("queryTopicRouteInfoFromServer failed.
topic: %v", topic)
+ return nil, err
+ }
+ routeDataMap.Store(topic, routeData)
+ AddBroker(routeData)
+ } else {
+ routeData = v.(*TopicRouteData)
+ }
+
+ if err != nil {
+ return nil, err
+ }
+ publishinfo := routeData2PublishInfo(topic, routeData)
+
+ return publishinfo.MqList, nil
+}
+
func findBrokerVersion(brokerName, brokerAddr string) int32 {
+
versions, exist := brokerVersionMap.Load(brokerName)
if !exist {
diff --git a/primitive/ctx.go b/primitive/ctx.go
index a10319f..6e9b6d3 100644
--- a/primitive/ctx.go
+++ b/primitive/ctx.go
@@ -20,7 +20,10 @@ limitations under the License.
*/
package primitive
-import "context"
+import (
+ "context"
+ "math"
+)
type CtxKey int
@@ -28,6 +31,7 @@ const (
method CtxKey = iota
msgCtx
orderlyCtx
+ concurrentlyCtx
// method name in producer
SendSync = "SendSync"
@@ -91,3 +95,24 @@ func GetOrderlyCtx(ctx context.Context)
(*ConsumeOrderlyContext, bool) {
c, exist := ctx.Value(orderlyCtx).(*ConsumeOrderlyContext)
return c, exist
}
+
+type ConsumeConcurrentlyContext struct {
+ MQ MessageQueue
+ DelayLevelWhenNextConsume int
+ AckIndex int32
+}
+
+func NewConsumeConcurrentlyContext() *ConsumeConcurrentlyContext {
+ return &ConsumeConcurrentlyContext{
+ AckIndex: math.MaxInt32,
+ }
+}
+
+func WithConcurrentlyCtx(ctx context.Context, c *ConsumeConcurrentlyContext)
context.Context {
+ return context.WithValue(ctx, concurrentlyCtx, c)
+}
+
+func GetConcurrentlyCtx(ctx context.Context) (*ConsumeConcurrentlyContext,
bool) {
+ c, exist := ctx.Value(concurrentlyCtx).(*ConsumeConcurrentlyContext)
+ return c, exist
+}