This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new 82bd638  [ISSUE #271] Fix bugs that can't consume multiple topics in 
single consumer (#310)
82bd638 is described below

commit 82bd6384a2262c8de493115d9c703c1bbb3d5d2f
Author: wenfeng <[email protected]>
AuthorDate: Tue Nov 26 19:00:12 2019 +0800

    [ISSUE #271] Fix bugs that can't consume multiple topics in single consumer 
(#310)
    
    * fix issue 271
---
 .travis.yml               |  6 +++---
 consumer/push_consumer.go | 43 ++++++++++++++++++++++++++++++++++++-------
 internal/client.go        |  7 +++++++
 3 files changed, 46 insertions(+), 10 deletions(-)

diff --git a/.travis.yml b/.travis.yml
index e68fd31..2562e74 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -19,9 +19,9 @@ env:
 
 before_script:
   - cd ${TRAVIS_HOME}
-  - wget 
http://us.mirrors.quenda.co/apache/rocketmq/4.5.2/rocketmq-all-4.5.2-bin-release.zip
-  - unzip rocketmq-all-4.5.2-bin-release.zip
-  - cd rocketmq-all-4.5.2-bin-release
+  - wget 
http://us.mirrors.quenda.co/apache/rocketmq/4.6.0/rocketmq-all-4.6.0-bin-release.zip
+  - unzip rocketmq-all-4.6.0-bin-release.zip
+  - cd rocketmq-all-4.6.0-bin-release
   - perl -i -pe's/-Xms8g -Xmx8g -Xmn4g/-Xms2g -Xmx2g -Xmn1g/g' bin/runbroker.sh
   - nohup sh bin/mqnamesrv &
   - nohup sh bin/mqbroker -n localhost:9876 &
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index aa999c5..3c5b79a 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -29,6 +29,7 @@ import (
 
        "github.com/apache/rocketmq-client-go/internal"
        "github.com/apache/rocketmq-client-go/internal/remote"
+       "github.com/apache/rocketmq-client-go/internal/utils"
        "github.com/apache/rocketmq-client-go/primitive"
        "github.com/apache/rocketmq-client-go/rlog"
 )
@@ -46,11 +47,20 @@ const (
        Mb = 1024 * 1024
 )
 
+type PushConsumerCallback struct {
+       topic string
+       f     func(context.Context, ...*primitive.MessageExt) (ConsumeResult, 
error)
+}
+
+func (callback PushConsumerCallback) UniqueID() string {
+       return callback.topic
+}
+
 type pushConsumer struct {
        *defaultConsumer
        queueFlowControlTimes        int
        queueMaxSpanFlowControlTimes int
-       consume                      func(context.Context, 
...*primitive.MessageExt) (ConsumeResult, error)
+       consumeFunc                  utils.Set
        submitToConsume              func(*processQueue, 
*primitive.MessageQueue)
        subscribedTopic              map[string]string
        interceptor                  primitive.Interceptor
@@ -98,6 +108,7 @@ func NewPushConsumer(opts ...Option) (*pushConsumer, error) {
                queueLock:       newQueueLock(),
                lockTicker:      
time.NewTicker(dc.option.RebalanceLockInterval),
                done:            make(chan struct{}, 1),
+               consumeFunc:     utils.NewSet(),
        }
        dc.mqChanged = p.messageQueueChanged
        if p.consumeOrderly {
@@ -111,7 +122,6 @@ func NewPushConsumer(opts ...Option) (*pushConsumer, error) 
{
        return p, nil
 }
 
-// TODO: add shutdown on pushConsumer.
 func (pc *pushConsumer) Start() error {
        var err error
        pc.once.Do(func() {
@@ -123,13 +133,14 @@ func (pc *pushConsumer) Start() error {
                pc.state = internal.StateStartFailed
                pc.validate()
 
-               err := pc.client.RegisterConsumer(pc.consumerGroup, pc)
+               err = pc.client.RegisterConsumer(pc.consumerGroup, pc)
                if err != nil {
                        pc.state = internal.StateStartFailed
                        rlog.Error("the consumer group has been created, 
specify another one", map[string]interface{}{
                                rlog.LogKeyConsumerGroup: pc.consumerGroup,
                        })
                        err = ErrCreated
+                       return
                }
 
                err = pc.defaultConsumer.start()
@@ -160,7 +171,6 @@ func (pc *pushConsumer) Start() error {
 
                go func() {
                        // todo start clean msg expired
-                       // TODO quit
                        for {
                                select {
                                case pr := <-pc.prCh:
@@ -177,6 +187,10 @@ func (pc *pushConsumer) Start() error {
                }()
        })
 
+       if err != nil {
+               return err
+       }
+
        pc.client.UpdateTopicRouteInfo()
        for k := range pc.subscribedTopic {
                _, exist := pc.topicSubscribeInfoTable.Load(k)
@@ -224,7 +238,10 @@ func (pc *pushConsumer) Subscribe(topic string, selector 
MessageSelector,
                pc.subscribedTopic[retryTopic] = ""
        }
 
-       pc.consume = f
+       pc.consumeFunc.Add(&PushConsumerCallback{
+               f:     f,
+               topic: topic,
+       })
        return nil
 }
 
@@ -752,14 +769,26 @@ func (pc *pushConsumer) removeUnnecessaryMessageQueue(mq 
*primitive.MessageQueue
 }
 
 func (pc *pushConsumer) consumeInner(ctx context.Context, subMsgs 
[]*primitive.MessageExt) (ConsumeResult, error) {
+       if len(subMsgs) == 0 {
+               return ConsumeRetryLater, errors.New("msg list empty")
+       }
 
+       f, exist := pc.consumeFunc.Contains(subMsgs[0].Topic)
+       if !exist {
+               return ConsumeRetryLater, fmt.Errorf("the consume callback 
missing for topic: %s", subMsgs[0].Topic)
+       }
+
+       callback, ok := f.(*PushConsumerCallback)
+       if !ok {
+               return ConsumeRetryLater, fmt.Errorf("the consume callback 
assert failed for topic: %s", subMsgs[0].Topic)
+       }
        if pc.interceptor == nil {
-               return pc.consume(ctx, subMsgs...)
+               return callback.f(ctx, subMsgs...)
        } else {
                var container ConsumeResultHolder
                err := pc.interceptor(ctx, subMsgs, &container, func(ctx 
context.Context, req, reply interface{}) error {
                        msgs := req.([]*primitive.MessageExt)
-                       r, e := pc.consume(ctx, msgs...)
+                       r, e := callback.f(ctx, msgs...)
 
                        realReply := reply.(*ConsumeResultHolder)
                        realReply.ConsumeResult = r
diff --git a/internal/client.go b/internal/client.go
index f66e8b0..3336944 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -565,6 +565,13 @@ func (c *rmqClient) PullMessageAsync(ctx context.Context, 
brokerAddrs string, re
 }
 
 func (c *rmqClient) RegisterConsumer(group string, consumer InnerConsumer) 
error {
+       _, exist := c.consumerMap.Load(group)
+       if exist {
+               rlog.Warning("the consumer group exist already", 
map[string]interface{}{
+                       rlog.LogKeyConsumerGroup: group,
+               })
+               return fmt.Errorf("the consumer group exist already")
+       }
        c.consumerMap.Store(group, consumer)
        return nil
 }

Reply via email to