This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 3f06d9a  when queue dropped,should'not consume the process 
queue.(#905) (#906)
3f06d9a is described below

commit 3f06d9ad2161ad900177f78ceadbe5f3647c1fc7
Author: Nick <[email protected]>
AuthorDate: Thu Sep 1 19:52:22 2022 +0800

    when queue dropped,should'not consume the process queue.(#905) (#906)
    
    * when queue dropped,should'not consume the process queue.(#905)
    
    * poll method use private properties
---
 consumer/pull_consumer.go           | 23 +++++++++++++++++++++++
 examples/consumer/pull/poll/main.go |  2 ++
 2 files changed, 25 insertions(+)

diff --git a/consumer/pull_consumer.go b/consumer/pull_consumer.go
index 1ca4881..6c24c63 100644
--- a/consumer/pull_consumer.go
+++ b/consumer/pull_consumer.go
@@ -56,6 +56,14 @@ func (cr *ConsumeRequest) GetMsgList() 
[]*primitive.MessageExt {
        return cr.msgList
 }
 
+func (cr *ConsumeRequest) GetMQ() *primitive.MessageQueue {
+       return cr.messageQueue
+}
+
+func (cr *ConsumeRequest) GetPQ() *processQueue {
+       return cr.processQueue
+}
+
 type defaultPullConsumer struct {
        *defaultConsumer
 
@@ -193,6 +201,14 @@ func (pc *defaultPullConsumer) Poll(ctx context.Context, 
timeout time.Duration)
        case <-ctx.Done():
                return nil, ErrNoNewMsg
        case cr := <-pc.consumeRequestCache:
+               if cr.processQueue.IsDroppd() {
+                       rlog.Info("defaultPullConsumer poll the message queue 
not be able to consume, because it was dropped", map[string]interface{}{
+                               rlog.LogKeyMessageQueue:  
cr.messageQueue.String(),
+                               rlog.LogKeyConsumerGroup: pc.consumerGroup,
+                       })
+                       return nil, ErrNoNewMsg
+               }
+
                if len(cr.GetMsgList()) == 0 {
                        return nil, ErrNoNewMsg
                }
@@ -780,6 +796,13 @@ func (pc *defaultPullConsumer) consumeMessageCurrently(pq 
*processQueue, mq *pri
        if msgList == nil {
                return
        }
+       if pq.IsDroppd() {
+               rlog.Info("defaultPullConsumer consumeMessageCurrently the 
message queue not be able to consume, because it was dropped", 
map[string]interface{}{
+                       rlog.LogKeyMessageQueue:  mq.String(),
+                       rlog.LogKeyConsumerGroup: pc.consumerGroup,
+               })
+               return
+       }
        cr := &ConsumeRequest{
                messageQueue: mq,
                processQueue: pq,
diff --git a/examples/consumer/pull/poll/main.go 
b/examples/consumer/pull/poll/main.go
index b6c783b..63d90f7 100644
--- a/examples/consumer/pull/poll/main.go
+++ b/examples/consumer/pull/poll/main.go
@@ -97,6 +97,8 @@ func poll() {
        }
        // todo LOGIC CODE HERE
        log.Println("msgList: ", cr.GetMsgList())
+       log.Println("messageQueue: ", cr.GetMQ())
+       log.Println("processQueue: ", cr.GetPQ())
        // pullConsumer.ACK(context.TODO(), cr, consumer.ConsumeRetryLater)
        pullConsumer.ACK(context.TODO(), cr, consumer.ConsumeSuccess)
 }

Reply via email to