This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new 4f9013e  [ISSUE #402] fix(consumer): improve retry topic support (#403)
4f9013e is described below

commit 4f9013ed90a244cc49fc1b392d6c967a2631c5e9
Author: xujianhai666 <[email protected]>
AuthorDate: Wed Feb 12 15:05:18 2020 +0800

    [ISSUE #402] fix(consumer): improve retry topic support (#403)
    
    * fix(consumer): improve retry topic support
---
 consumer/consumer.go      |  1 -
 consumer/push_consumer.go | 13 +---------
 internal/client.go        | 62 ++++++++++++++++++++++++++++++++---------------
 3 files changed, 43 insertions(+), 33 deletions(-)

diff --git a/consumer/consumer.go b/consumer/consumer.go
index 5a5d7d6..c46917e 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -290,7 +290,6 @@ func (dc *defaultConsumer) start() error {
                dc.storage = NewLocalFileOffsetStore(dc.consumerGroup, 
dc.client.ClientID())
        }
 
-       dc.client.UpdateTopicRouteInfo()
        dc.client.Start()
        atomic.StoreInt32(&dc.state, int32(internal.StateRunning))
        dc.consumerStartTimestamp = time.Now().UnixNano() / 
int64(time.Millisecond)
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index c84e524..3bef44c 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -163,9 +163,6 @@ func (pc *pushConsumer) Start() error {
                        }
                }()
 
-               pc.Rebalance()
-               time.Sleep(1 * time.Second)
-
                go primitive.WithRecover(func() {
                        // initial lock.
                        if !pc.consumeOrderly {
@@ -203,9 +200,9 @@ func (pc *pushConsumer) Start() error {
                        return fmt.Errorf("the topic=%s route info not found, 
it may not exist", k)
                }
        }
-       pc.client.RebalanceImmediately()
        pc.client.CheckClientInBroker()
        pc.client.SendHeartbeatToAllBrokerWithLock()
+       pc.client.RebalanceImmediately()
 
        return err
 }
@@ -234,14 +231,6 @@ func (pc *pushConsumer) Subscribe(topic string, selector 
MessageSelector,
        pc.subscriptionDataTable.Store(topic, data)
        pc.subscribedTopic[topic] = ""
 
-       if pc.option.ConsumerModel == Clustering {
-               // add retry topic for clustering mode
-               retryTopic := internal.GetRetryTopic(pc.consumerGroup)
-               retryData := buildSubscriptionData(retryTopic, 
MessageSelector{Expression: _SubAll})
-               pc.subscriptionDataTable.Store(retryTopic, retryData)
-               pc.subscribedTopic[retryTopic] = ""
-       }
-
        pc.consumeFunc.Add(&PushConsumerCallback{
                f:     f,
                topic: topic,
diff --git a/internal/client.go b/internal/client.go
index 08bbcb3..56bda03 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -262,15 +262,19 @@ func (c *rmqClient) Start() {
                }
                // fetchNameServerAddr
                if len(c.option.NameServerAddrs) == 0 {
-                       go func() {
-                               // delay
-                               ticker := time.NewTicker(60 * 2 * time.Second)
+                       go primitive.WithRecover(func() {
+                               op := func() {
+                                       
c.namesrvs.UpdateNameServerAddress(c.option.NameServerDomain, 
c.option.InstanceName)
+                               }
+                               time.Sleep(10 * time.Second)
+                               op()
+
+                               ticker := time.NewTicker(2 * time.Minute)
                                defer ticker.Stop()
-                               time.Sleep(50 * time.Millisecond)
                                for {
                                        select {
                                        case <-ticker.C:
-                                               
c.namesrvs.UpdateNameServerAddress(c.option.NameServerDomain, 
c.option.InstanceName)
+                                               op()
                                        case <-c.done:
                                                rlog.Info("The RMQClient 
stopping update name server domain info.", map[string]interface{}{
                                                        "clientID": 
c.ClientID(),
@@ -278,19 +282,24 @@ func (c *rmqClient) Start() {
                                                return
                                        }
                                }
-                       }()
+                       })
                }
 
                // schedule update route info
                go primitive.WithRecover(func() {
                        // delay
+                       op := func() {
+                               c.UpdateTopicRouteInfo()
+                       }
+                       time.Sleep(10 * time.Millisecond)
+                       op()
+
                        ticker := time.NewTicker(_PullNameServerInterval)
                        defer ticker.Stop()
-                       time.Sleep(50 * time.Millisecond)
                        for {
                                select {
                                case <-ticker.C:
-                                       c.UpdateTopicRouteInfo()
+                                       op()
                                case <-c.done:
                                        rlog.Info("The RMQClient stopping 
update topic route info.", map[string]interface{}{
                                                "clientID": c.ClientID(),
@@ -301,13 +310,20 @@ func (c *rmqClient) Start() {
                })
 
                go primitive.WithRecover(func() {
+                       op := func() {
+                               c.namesrvs.cleanOfflineBroker()
+                               c.SendHeartbeatToAllBrokerWithLock()
+                       }
+
+                       time.Sleep(time.Second)
+                       op()
+
                        ticker := time.NewTicker(_HeartbeatBrokerInterval)
                        defer ticker.Stop()
                        for {
                                select {
                                case <-ticker.C:
-                                       c.namesrvs.cleanOfflineBroker()
-                                       c.SendHeartbeatToAllBrokerWithLock()
+                                       op()
                                case <-c.done:
                                        rlog.Info("The RMQClient stopping clean 
off line broker and heart beat", map[string]interface{}{
                                                "clientID": c.ClientID(),
@@ -319,21 +335,27 @@ func (c *rmqClient) Start() {
 
                // schedule persist offset
                go primitive.WithRecover(func() {
+                       op := func() {
+                               c.consumerMap.Range(func(key, value 
interface{}) bool {
+                                       consumer := value.(InnerConsumer)
+                                       err := consumer.PersistConsumerOffset()
+                                       if err != nil {
+                                               rlog.Error("persist offset 
failed", map[string]interface{}{
+                                                       
rlog.LogKeyUnderlayError: err,
+                                               })
+                                       }
+                                       return true
+                               })
+                       }
+                       time.Sleep(10 * time.Second)
+                       op()
+
                        ticker := time.NewTicker(_PersistOffsetInterval)
                        defer ticker.Stop()
                        for {
                                select {
                                case <-ticker.C:
-                                       c.consumerMap.Range(func(key, value 
interface{}) bool {
-                                               consumer := 
value.(InnerConsumer)
-                                               err := 
consumer.PersistConsumerOffset()
-                                               if err != nil {
-                                                       rlog.Error("persist 
offset failed", map[string]interface{}{
-                                                               
rlog.LogKeyUnderlayError: err,
-                                                       })
-                                               }
-                                               return true
-                                       })
+                                       op()
                                case <-c.done:
                                        rlog.Info("The RMQClient stopping 
persist offset", map[string]interface{}{
                                                "clientID": c.ClientID(),

Reply via email to