This is an automated email from the ASF dual-hosted git repository.

wenfeng pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new f9521f9  Prevent pc.prch blocking (#327)
f9521f9 is described below

commit f9521f97be4c50033ef15c5c435e375d2a63dd39
Author: 兰园望月 <[email protected]>
AuthorDate: Wed Dec 11 15:45:18 2019 +0800

    Prevent pc.prch blocking (#327)
---
 consumer/push_consumer.go | 32 ++++++++++++++++----------------
 1 file changed, 16 insertions(+), 16 deletions(-)

diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index c915a32..6d70393 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -148,20 +148,16 @@ func (pc *pushConsumer) Start() error {
                        return
                }
 
-               pc.Rebalance()
-               time.Sleep(1 * time.Second)
-
                go func() {
-                       // initial lock.
-                       time.Sleep(1000 * time.Millisecond)
-                       pc.lockAll()
-
+                       // todo start clean msg expired
                        for {
                                select {
-                               case <-pc.lockTicker.C:
-                                       pc.lockAll()
+                               case pr := <-pc.prCh:
+                                       go func() {
+                                               pc.pullMessage(&pr)
+                                       }()
                                case <-pc.done:
-                                       rlog.Info("push consumer close tick.", 
map[string]interface{}{
+                                       rlog.Info("push consumer close 
pullConsumer listener.", map[string]interface{}{
                                                rlog.LogKeyConsumerGroup: 
pc.consumerGroup,
                                        })
                                        return
@@ -169,16 +165,20 @@ func (pc *pushConsumer) Start() error {
                        }
                }()
 
+               pc.Rebalance()
+               time.Sleep(1 * time.Second)
+
                go func() {
-                       // todo start clean msg expired
+                       // initial lock.
+                       time.Sleep(1000 * time.Millisecond)
+                       pc.lockAll()
+
                        for {
                                select {
-                               case pr := <-pc.prCh:
-                                       go func() {
-                                               pc.pullMessage(&pr)
-                                       }()
+                               case <-pc.lockTicker.C:
+                                       pc.lockAll()
                                case <-pc.done:
-                                       rlog.Info("push consumer close 
pullConsumer listener.", map[string]interface{}{
+                                       rlog.Info("push consumer close tick.", 
map[string]interface{}{
                                                rlog.LogKeyConsumerGroup: 
pc.consumerGroup,
                                        })
                                        return

Reply via email to