This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new 52f5a0c  [ISSUE #118]add messaage sendback. resolve #118 (#119)
52f5a0c is described below

commit 52f5a0c54884d73a5e3f0e4516b98b18309b5238
Author: xujianhai666 <[email protected]>
AuthorDate: Thu Jul 18 11:11:39 2019 +0800

    [ISSUE #118]add messaage sendback. resolve #118 (#119)
    
    #118
---
 consumer/consumer.go                       |  3 -
 consumer/option.go                         |  8 +++
 consumer/process_queue.go                  | 10 ++++
 consumer/push_consumer.go                  | 95 +++++++++++++++++++++++-------
 examples/consumer/retry/concurrent/main.go | 73 +++++++++++++++++++++++
 examples/consumer/retry/order/main.go      | 72 ++++++++++++++++++++++
 internal/client.go                         |  8 +--
 internal/constants.go                      |  5 +-
 internal/request.go                        | 24 ++++++++
 internal/route.go                          | 39 ++++++++++++
 primitive/ctx.go                           | 27 ++++++++-
 11 files changed, 329 insertions(+), 35 deletions(-)

diff --git a/consumer/consumer.go b/consumer/consumer.go
index 7f80802..25861f9 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -352,9 +352,6 @@ func (dc *defaultConsumer) isSubscribeTopicNeedUpdate(topic 
string) bool {
 func (dc *defaultConsumer) doBalance() {
        dc.subscriptionDataTable.Range(func(key, value interface{}) bool {
                topic := key.(string)
-               if strings.HasPrefix(topic, internal.RetryGroupTopicPrefix) {
-                       return true
-               }
                v, exist := dc.topicSubscribeInfoTable.Load(topic)
                if !exist {
                        rlog.Warnf("do balance of group: %s, but topic: %s does 
not exist.", dc.consumerGroup, topic)
diff --git a/consumer/option.go b/consumer/option.go
index f967e1e..dafff61 100644
--- a/consumer/option.go
+++ b/consumer/option.go
@@ -194,3 +194,11 @@ func WithCredentials(c primitive.Credentials) Option {
                options.ClientOptions.Credentials = c
        }
 }
+
+// WithMaxReconsumeTimes set MaxReconsumeTimes of options, if message 
reconsume greater than MaxReconsumeTimes, it will
+// be sent to retry or dlq topic. more info reference by 
examples/consumer/retry.
+func WithMaxReconsumeTimes(times int32) Option {
+       return func(opts *consumerOptions) {
+               opts.MaxReconsumeTimes = times
+       }
+}
diff --git a/consumer/process_queue.go b/consumer/process_queue.go
index 71b52bf..869ea5d 100644
--- a/consumer/process_queue.go
+++ b/consumer/process_queue.go
@@ -110,6 +110,16 @@ func (pq *processQueue) putMessage(messages 
...*primitive.MessageExt) {
        }
 }
 
+func (pq *processQueue) makeMessageToCosumeAgain(messages 
...*primitive.MessageExt) {
+       pq.mutex.Lock()
+       for _, msg := range messages {
+               pq.consumingMsgOrderlyTreeMap.Remove(msg.QueueOffset)
+               pq.msgCache.Put(msg.QueueOffset, msg)
+       }
+
+       pq.mutex.Unlock()
+}
+
 func (pq *processQueue) removeMessage(messages ...*primitive.MessageExt) int64 
{
        result := int64(-1)
        pq.mutex.Lock()
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 1b8c7dc..3b011c3 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -25,6 +25,7 @@ import (
        "time"
 
        "github.com/apache/rocketmq-client-go/internal"
+       "github.com/apache/rocketmq-client-go/internal/remote"
        "github.com/apache/rocketmq-client-go/primitive"
        "github.com/apache/rocketmq-client-go/rlog"
        "github.com/pkg/errors"
@@ -50,10 +51,8 @@ type pushConsumer struct {
        consume                      func(context.Context, 
...*primitive.MessageExt) (ConsumeResult, error)
        submitToConsume              func(*processQueue, 
*primitive.MessageQueue)
        subscribedTopic              map[string]string
-
-       interceptor primitive.Interceptor
-
-       queueLock *QueueLock
+       interceptor                  primitive.Interceptor
+       queueLock                    *QueueLock
 }
 
 func NewPushConsumer(opts ...Option) (*pushConsumer, error) {
@@ -191,6 +190,15 @@ func (pc *pushConsumer) Subscribe(topic string, selector 
MessageSelector,
        data := buildSubscriptionData(topic, selector)
        pc.subscriptionDataTable.Store(topic, data)
        pc.subscribedTopic[topic] = ""
+
+       if pc.option.ConsumerModel == Clustering {
+               // add retry topic for clustering mode
+               retryTopic := internal.GetRetryTopic(pc.consumerGroup)
+               data = buildSubscriptionData(retryTopic, 
MessageSelector{Expression: _SubAll})
+               pc.subscriptionDataTable.Store(retryTopic, data)
+               pc.subscribedTopic[retryTopic] = ""
+       }
+
        pc.consume = f
        return nil
 }
@@ -512,10 +520,33 @@ func (pc *pushConsumer) correctTagsOffset(pr 
*PullRequest) {
        // TODO
 }
 
-func (pc *pushConsumer) sendMessageBack(ctx *primitive.ConsumeMessageContext, 
msg *primitive.MessageExt) bool {
+func (pc *pushConsumer) sendMessageBack(brokerName string, msg 
*primitive.MessageExt, delayLevel int) bool {
+       var brokerAddr string
+       if len(brokerName) != 0 {
+               brokerAddr = internal.FindBrokerAddrByName(brokerName)
+       } else {
+               brokerAddr = msg.StoreHost
+       }
+       _, err := pc.client.InvokeSync(brokerAddr, pc.buildSendBackRequest(msg, 
delayLevel), 3*time.Second)
+       if err != nil {
+               return false
+       }
        return true
 }
 
+func (pc *pushConsumer) buildSendBackRequest(msg *primitive.MessageExt, 
delayLevel int) *remote.RemotingCommand {
+       req := &internal.ConsumerSendMsgBackRequest{
+               Group:             pc.consumerGroup,
+               OriginTopic:       msg.Topic,
+               Offset:            msg.CommitLogOffset,
+               DelayLevel:        delayLevel,
+               OriginMsgId:       msg.MsgId,
+               MaxReconsumeTimes: pc.getMaxReconsumeTimes(),
+       }
+
+       return remote.NewRemotingCommand(internal.ReqConsumerSendMsgBack, req, 
msg.Body)
+}
+
 func (pc *pushConsumer) suspend() {
        pc.pause = true
        rlog.Infof("suspend consumer: %s", pc.consumerGroup)
@@ -615,6 +646,23 @@ func (pc *pushConsumer) consumeInner(ctx context.Context, 
subMsgs []*primitive.M
        }
 }
 
+// resetRetryAndNamespace modify retry message.
+func (pc *pushConsumer) resetRetryAndNamespace(subMsgs 
[]*primitive.MessageExt) {
+       groupTopic := internal.RetryGroupTopicPrefix + pc.consumerGroup
+       beginTime := time.Now()
+       for idx := range subMsgs {
+               msg := subMsgs[idx]
+               if msg.Properties != nil {
+                       retryTopic := 
msg.Properties[primitive.PropertyRetryTopic]
+                       if retryTopic == "" && groupTopic == msg.Topic {
+                               msg.Topic = retryTopic
+                       }
+                       
subMsgs[idx].Properties[primitive.PropertyConsumeStartTime] = strconv.FormatInt(
+                               beginTime.UnixNano()/int64(time.Millisecond), 
10)
+               }
+       }
+}
+
 func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq 
*primitive.MessageQueue) {
        msgs := pq.getMessages()
        if msgs == nil {
@@ -640,18 +688,7 @@ func (pc *pushConsumer) consumeMessageCurrently(pq 
*processQueue, mq *primitive.
 
                        // TODO hook
                        beginTime := time.Now()
-                       groupTopic := internal.RetryGroupTopicPrefix + 
pc.consumerGroup
-                       for idx := range subMsgs {
-                               msg := subMsgs[idx]
-                               if msg.Properties != nil {
-                                       retryTopic := 
msg.Properties[primitive.PropertyRetryTopic]
-                                       if retryTopic == "" && groupTopic == 
msg.Topic {
-                                               msg.Topic = retryTopic
-                                       }
-                                       
subMsgs[idx].Properties[primitive.PropertyConsumeStartTime] = strconv.FormatInt(
-                                               
beginTime.UnixNano()/int64(time.Millisecond), 10)
-                               }
-                       }
+                       pc.resetRetryAndNamespace(subMsgs)
                        var result ConsumeResult
 
                        var err error
@@ -661,6 +698,9 @@ func (pc *pushConsumer) consumeMessageCurrently(pq 
*processQueue, mq *primitive.
                        ctx := context.Background()
                        ctx = primitive.WithConsumerCtx(ctx, msgCtx)
                        ctx = primitive.WithMethod(ctx, primitive.ConsumerPush)
+                       concurrentCtx := 
primitive.NewConsumeConcurrentlyContext()
+                       concurrentCtx.MQ = *mq
+                       ctx = primitive.WithConcurrentlyCtx(ctx, concurrentCtx)
 
                        result, err = pc.consumeInner(ctx, subMsgs)
 
@@ -691,7 +731,7 @@ func (pc *pushConsumer) consumeMessageCurrently(pq 
*processQueue, mq *primitive.
                                        } else {
                                                for i := 0; i < len(msgs); i++ {
                                                        msg := msgs[i]
-                                                       if 
!pc.sendMessageBack(msgCtx, msg) {
+                                                       if 
!pc.sendMessageBack(mq.BrokerName, msg, 
concurrentCtx.DelayLevelWhenNextConsume) {
                                                                
msg.ReconsumeTimes += 1
                                                                msgBackFailed = 
append(msgBackFailed, msg)
                                                        }
@@ -755,6 +795,8 @@ func (pc *pushConsumer) consumeMessageOrderly(pq 
*processQueue, mq *primitive.Me
                        batchSize := pc.option.ConsumeMessageBatchMaxSize
                        msgs := pq.takeMessages(batchSize)
 
+                       pc.resetRetryAndNamespace(msgs)
+
                        if len(msgs) == 0 {
                                continueConsume = false
                                break
@@ -804,6 +846,7 @@ func (pc *pushConsumer) consumeMessageOrderly(pq 
*processQueue, mq *primitive.Me
                                        commitOffset = pq.commit()
                                case SuspendCurrentQueueAMoment:
                                        if (pc.checkReconsumeTimes(msgs)) {
+                                               pq.putMessage(msgs...)
                                                
time.Sleep(time.Duration(orderlyCtx.SuspendCurrentQueueTimeMillis) * 
time.Millisecond)
                                                continueConsume = false;
                                        } else {
@@ -843,12 +886,12 @@ func (pc *pushConsumer) consumeMessageOrderly(pq 
*processQueue, mq *primitive.Me
 func (pc *pushConsumer) checkReconsumeTimes(msgs []*primitive.MessageExt) bool 
{
        suspend := false
        if len(msgs) != 0 {
-               maxReconsumeTimes := pc.getMaxReconsumeTimes()
+               maxReconsumeTimes := pc.getOrderlyMaxReconsumeTimes()
                for _, msg := range msgs {
                        if msg.ReconsumeTimes > maxReconsumeTimes {
+                               rlog.Warn("msg will be send to retry topic due 
to ReconsumeTimes > %d, \n", maxReconsumeTimes)
                                msg.Properties["RECONSUME_TIME"] = 
strconv.Itoa(int(msg.ReconsumeTimes))
-                               if !pc.sendMessageBack(nil, msg) {
-                                       // TODO: complete sendMessageBack
+                               if !pc.sendMessageBack("", msg, -1) {
                                        suspend = true
                                        msg.ReconsumeTimes += 1
                                }
@@ -861,7 +904,7 @@ func (pc *pushConsumer) checkReconsumeTimes(msgs 
[]*primitive.MessageExt) bool {
        return suspend
 }
 
-func (pc *pushConsumer) getMaxReconsumeTimes() int32 {
+func (pc *pushConsumer) getOrderlyMaxReconsumeTimes() int32 {
        if pc.option.MaxReconsumeTimes == -1 {
                return math.MaxInt32
        } else {
@@ -869,6 +912,14 @@ func (pc *pushConsumer) getMaxReconsumeTimes() int32 {
        }
 }
 
+func (pc *pushConsumer) getMaxReconsumeTimes() int32 {
+       if pc.option.MaxReconsumeTimes == -1 {
+               return 16
+       } else {
+               return pc.option.MaxReconsumeTimes
+       }
+}
+
 func (pc *pushConsumer) tryLocakLaterAndReconsume(mq *primitive.MessageQueue, 
delay int64) {
        time.Sleep(time.Duration(delay) * time.Millisecond)
        if pc.lock(mq) == true {
diff --git a/examples/consumer/retry/concurrent/main.go 
b/examples/consumer/retry/concurrent/main.go
new file mode 100644
index 0000000..8a21f83
--- /dev/null
+++ b/examples/consumer/retry/concurrent/main.go
@@ -0,0 +1,73 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+/**
+ * use concurrent consumer model, when Subscribe function return 
consumer.ConsumeRetryLater, the message will be
+ * send to rocketmq retry topic. we could set DelayLevelWhenNextConsume in 
ConsumeConcurrentlyContext, which used to
+ * indicate the delay of message re-send to origin topic from retry topic.
+ * in this example, we always set DelayLevelWhenNextConsume=1, means that the 
message will be sent to origin topic after
+ * 1s. in case of the unlimit retry, we will return consumer.ConsumeSuccess 
after ReconsumeTimes > 5
+ */
+package main
+
+import (
+       "context"
+       "fmt"
+       "os"
+       "time"
+
+       "github.com/apache/rocketmq-client-go"
+       "github.com/apache/rocketmq-client-go/consumer"
+       "github.com/apache/rocketmq-client-go/primitive"
+)
+
+func main() {
+       c, _ := rocketmq.NewPushConsumer(
+               consumer.WithGroupName("testGroup"),
+               consumer.WithNameServer([]string{"127.0.0.1:9876"}),
+               consumer.WithConsumerModel(consumer.Clustering),
+       )
+
+       retryLevel := 1 // mean 10s later before consume
+       err := c.Subscribe("TopicTest", consumer.MessageSelector{}, func(ctx 
context.Context,
+               msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
+               fmt.Printf("subscribe callback len: %d \n", len(msgs))
+
+               concurrentCtx, _ := primitive.GetConcurrentlyCtx(ctx)
+               concurrentCtx.DelayLevelWhenNextConsume = retryLevel // only 
run when return consumer.ConsumeRetryLater
+
+               for _, msg := range msgs {
+                       if msg.ReconsumeTimes > 5 {
+                               fmt.Printf("msg ReconsumeTimes > 5. msg: %v", 
msg)
+                               return consumer.ConsumeSuccess, nil
+                       } else {
+                               fmt.Printf("subscribe callback: %v \n", msg)
+                       }
+               }
+               return consumer.ConsumeRetryLater, nil
+       })
+       if err != nil {
+               fmt.Println(err.Error())
+       }
+       // Note: start after subscribe
+       err = c.Start()
+       if err != nil {
+               fmt.Println(err.Error())
+               os.Exit(-1)
+       }
+       time.Sleep(time.Hour)
+}
diff --git a/examples/consumer/retry/order/main.go 
b/examples/consumer/retry/order/main.go
new file mode 100644
index 0000000..f74ef68
--- /dev/null
+++ b/examples/consumer/retry/order/main.go
@@ -0,0 +1,72 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+/**
+ * use orderly consumer model, when Subscribe function return 
consumer.SuspendCurrentQueueAMoment, it will be re-send to
+ * local msg queue for later consume if msg.ReconsumeTimes < 
MaxReconsumeTimes, otherwise, it will be send to rocketmq
+ * DLQ topic, we should manually resolve the msg.
+ */
+package main
+
+import (
+       "context"
+       "fmt"
+       "os"
+       "time"
+
+       "github.com/apache/rocketmq-client-go"
+       "github.com/apache/rocketmq-client-go/consumer"
+       "github.com/apache/rocketmq-client-go/primitive"
+)
+
+func main() {
+       c, _ := rocketmq.NewPushConsumer(
+               consumer.WithGroupName("testGroup"),
+               consumer.WithNameServer([]string{"127.0.0.1:9876"}),
+               consumer.WithConsumerModel(consumer.Clustering),
+               consumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset),
+               consumer.WithConsumerOrder(true),
+               consumer.WithMaxReconsumeTimes(5),
+       )
+
+       err := c.Subscribe("TopicTest", consumer.MessageSelector{}, func(ctx 
context.Context,
+               msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
+               orderlyCtx, _ := primitive.GetOrderlyCtx(ctx)
+               fmt.Printf("orderly context: %v\n", orderlyCtx)
+               fmt.Printf("subscribe orderly callback len: %d \n", len(msgs))
+
+               for _, msg := range msgs {
+                       if msg.ReconsumeTimes > 5 {
+                               fmt.Printf("msg ReconsumeTimes > 5. msg: %v", 
msg)
+                       } else {
+                               fmt.Printf("subscribe orderly callback: %v \n", 
msg)
+                       }
+               }
+               return consumer.SuspendCurrentQueueAMoment, nil
+
+       })
+       if err != nil {
+               fmt.Println(err.Error())
+       }
+       // Note: start after subscribe
+       err = c.Start()
+       if err != nil {
+               fmt.Println(err.Error())
+               os.Exit(-1)
+       }
+       time.Sleep(time.Hour)
+}
diff --git a/internal/client.go b/internal/client.go
index 34870b3..ff4157d 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -136,10 +136,6 @@ type RMQClient interface {
        CheckClientInBroker()
        SendHeartbeatToAllBrokerWithLock()
        UpdateTopicRouteInfo()
-       SendMessageAsync(ctx context.Context, brokerAddrs, brokerName string, 
request *SendMessageRequest,
-               msgs []*primitive.Message, f func(result 
*primitive.SendResult)) error
-       SendMessageOneWay(ctx context.Context, brokerAddrs string, request 
*SendMessageRequest,
-               msgs []*primitive.Message) (*primitive.SendResult, error)
 
        ProcessSendResponse(brokerName string, cmd *remote.RemotingCommand, 
resp *primitive.SendResult, msgs ...*primitive.Message) error
 
@@ -361,9 +357,7 @@ func (c *rmqClient) UpdateTopicRouteInfo() {
                consumer := value.(InnerConsumer)
                list := consumer.SubscriptionDataList()
                for idx := range list {
-                       if !strings.HasPrefix(list[idx].Topic, 
RetryGroupTopicPrefix) {
-                               subscribedTopicSet[list[idx].Topic] = true
-                       }
+                       subscribedTopicSet[list[idx].Topic] = true
                }
                return true
        })
diff --git a/internal/constants.go b/internal/constants.go
index 5711750..a234c18 100644
--- a/internal/constants.go
+++ b/internal/constants.go
@@ -18,8 +18,9 @@ limitations under the License.
 package internal
 
 const (
-       RetryGroupTopicPrefix = "%RETRY%"
-       DefaultConsumerGroup  = "DEFAULT_CONSUMER"
+       RetryGroupTopicPrefix    = "%RETRY%"
+       DefaultConsumerGroup     = "DEFAULT_CONSUMER"
+       ClientInnerProducerGroup = "CLIENT_INNER_PRODUCER"
 )
 
 func GetRetryTopic(group string) string {
diff --git a/internal/request.go b/internal/request.go
index 85f3519..e09a986 100644
--- a/internal/request.go
+++ b/internal/request.go
@@ -31,6 +31,7 @@ const (
        ReqSearchOffsetByTimestamp  = int16(30)
        ReqGetMaxOffset             = int16(30)
        ReqHeartBeat                = int16(34)
+       ReqConsumerSendMsgBack      = int16(36)
        ReqGetConsumerListByGroup   = int16(38)
        ReqLockBatchMQ              = int16(41)
        ReqUnlockBatchMQ            = int16(42)
@@ -80,6 +81,29 @@ func (request *SendMessageRequest) Decode(properties 
map[string]string) error {
        return nil
 }
 
+type ConsumerSendMsgBackRequest struct {
+       Group             string `json:"group"`
+       Offset            int64  `json:"offset"`
+       DelayLevel        int    `json:"delayLevel"`
+       OriginMsgId       string `json:"originMsgId"`
+       OriginTopic       string `json:"originTopic"`
+       UnitMode          bool   `json:"unitMode"`
+       MaxReconsumeTimes int32  `json:"maxReconsumeTimes"`
+}
+
+func (request *ConsumerSendMsgBackRequest) Encode() map[string]string {
+       maps := make(map[string]string)
+       maps["group"] = request.Group
+       maps["offset"] = strconv.FormatInt(request.Offset, 10)
+       maps["delayLevel"] = strconv.Itoa(request.DelayLevel)
+       maps["originMsgId"] = request.OriginMsgId
+       maps["originTopic"] = request.OriginTopic
+       maps["unitMode"] = strconv.FormatBool(request.UnitMode)
+       maps["maxReconsumeTimes"] = strconv.Itoa(int(request.MaxReconsumeTimes))
+
+       return maps
+}
+
 type PullMessageRequest struct {
        ConsumerGroup        string        `json:"consumerGroup"`
        Topic                string        `json:"topic"`
diff --git a/internal/route.go b/internal/route.go
index 94fd5c0..8b1e713 100644
--- a/internal/route.go
+++ b/internal/route.go
@@ -65,6 +65,7 @@ var (
        //subscribeInfoMap sync.Map
        routeDataMap sync.Map
        lockNamesrv  sync.Mutex
+
 )
 
 func cleanOfflineBroker() {
@@ -254,7 +255,45 @@ func FetchSubscribeMessageQueues(topic string) 
([]*primitive.MessageQueue, error
        return mqs, nil
 }
 
+func FindMQByTopic(topic string) *primitive.MessageQueue {
+       mqs, err := FetchPublishMessageQueues(topic)
+       if err != nil {
+               return nil
+       }
+       r := rand.New(rand.NewSource(time.Now().UnixNano()))
+       i := utils.AbsInt(r.Int())
+       return mqs[i%len(mqs)]
+}
+
+func FetchPublishMessageQueues(topic string) ([]*primitive.MessageQueue, 
error) {
+       var (
+               err       error
+               routeData *TopicRouteData
+       )
+
+       v, exist := routeDataMap.Load(topic)
+       if !exist {
+               routeData, err = queryTopicRouteInfoFromServer(topic)
+               if err != nil {
+                       rlog.Error("queryTopicRouteInfoFromServer failed. 
topic: %v", topic)
+                       return nil, err
+               }
+               routeDataMap.Store(topic, routeData)
+               AddBroker(routeData)
+       } else {
+               routeData = v.(*TopicRouteData)
+       }
+
+       if err != nil {
+               return nil, err
+       }
+       publishinfo := routeData2PublishInfo(topic, routeData)
+
+       return publishinfo.MqList, nil
+}
+
 func findBrokerVersion(brokerName, brokerAddr string) int32 {
+
        versions, exist := brokerVersionMap.Load(brokerName)
 
        if !exist {
diff --git a/primitive/ctx.go b/primitive/ctx.go
index a10319f..6e9b6d3 100644
--- a/primitive/ctx.go
+++ b/primitive/ctx.go
@@ -20,7 +20,10 @@ limitations under the License.
  */
 package primitive
 
-import "context"
+import (
+       "context"
+       "math"
+)
 
 type CtxKey int
 
@@ -28,6 +31,7 @@ const (
        method CtxKey = iota
        msgCtx
        orderlyCtx
+       concurrentlyCtx
 
        // method name in  producer
        SendSync   = "SendSync"
@@ -91,3 +95,24 @@ func GetOrderlyCtx(ctx context.Context) 
(*ConsumeOrderlyContext, bool) {
        c, exist := ctx.Value(orderlyCtx).(*ConsumeOrderlyContext)
        return c, exist
 }
+
+type ConsumeConcurrentlyContext struct {
+       MQ                        MessageQueue
+       DelayLevelWhenNextConsume int
+       AckIndex                  int32
+}
+
+func NewConsumeConcurrentlyContext() *ConsumeConcurrentlyContext {
+       return &ConsumeConcurrentlyContext{
+               AckIndex: math.MaxInt32,
+       }
+}
+
+func WithConcurrentlyCtx(ctx context.Context, c *ConsumeConcurrentlyContext) 
context.Context {
+       return context.WithValue(ctx, concurrentlyCtx, c)
+}
+
+func GetConcurrentlyCtx(ctx context.Context) (*ConsumeConcurrentlyContext, 
bool) {
+       c, exist := ctx.Value(concurrentlyCtx).(*ConsumeConcurrentlyContext)
+       return c, exist
+}

Reply via email to