This is an automated email from the ASF dual-hosted git repository.
wenfeng pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/native by this push:
new f9521f9 Prevent pc.prch blocking (#327)
f9521f9 is described below
commit f9521f97be4c50033ef15c5c435e375d2a63dd39
Author: 兰园望月 <[email protected]>
AuthorDate: Wed Dec 11 15:45:18 2019 +0800
Prevent pc.prch blocking (#327)
---
consumer/push_consumer.go | 32 ++++++++++++++++----------------
1 file changed, 16 insertions(+), 16 deletions(-)
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index c915a32..6d70393 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -148,20 +148,16 @@ func (pc *pushConsumer) Start() error {
return
}
- pc.Rebalance()
- time.Sleep(1 * time.Second)
-
go func() {
- // initial lock.
- time.Sleep(1000 * time.Millisecond)
- pc.lockAll()
-
+ // todo start clean msg expired
for {
select {
- case <-pc.lockTicker.C:
- pc.lockAll()
+ case pr := <-pc.prCh:
+ go func() {
+ pc.pullMessage(&pr)
+ }()
case <-pc.done:
- rlog.Info("push consumer close tick.",
map[string]interface{}{
+ rlog.Info("push consumer close
pullConsumer listener.", map[string]interface{}{
rlog.LogKeyConsumerGroup:
pc.consumerGroup,
})
return
@@ -169,16 +165,20 @@ func (pc *pushConsumer) Start() error {
}
}()
+ pc.Rebalance()
+ time.Sleep(1 * time.Second)
+
go func() {
- // todo start clean msg expired
+ // initial lock.
+ time.Sleep(1000 * time.Millisecond)
+ pc.lockAll()
+
for {
select {
- case pr := <-pc.prCh:
- go func() {
- pc.pullMessage(&pr)
- }()
+ case <-pc.lockTicker.C:
+ pc.lockAll()
case <-pc.done:
- rlog.Info("push consumer close
pullConsumer listener.", map[string]interface{}{
+ rlog.Info("push consumer close tick.",
map[string]interface{}{
rlog.LogKeyConsumerGroup:
pc.consumerGroup,
})
return