This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 3f06d9a when queue dropped,should'not consume the process
queue.(#905) (#906)
3f06d9a is described below
commit 3f06d9ad2161ad900177f78ceadbe5f3647c1fc7
Author: Nick <[email protected]>
AuthorDate: Thu Sep 1 19:52:22 2022 +0800
when queue dropped,should'not consume the process queue.(#905) (#906)
* when queue dropped,should'not consume the process queue.(#905)
* poll method use private properties
---
consumer/pull_consumer.go | 23 +++++++++++++++++++++++
examples/consumer/pull/poll/main.go | 2 ++
2 files changed, 25 insertions(+)
diff --git a/consumer/pull_consumer.go b/consumer/pull_consumer.go
index 1ca4881..6c24c63 100644
--- a/consumer/pull_consumer.go
+++ b/consumer/pull_consumer.go
@@ -56,6 +56,14 @@ func (cr *ConsumeRequest) GetMsgList()
[]*primitive.MessageExt {
return cr.msgList
}
+func (cr *ConsumeRequest) GetMQ() *primitive.MessageQueue {
+ return cr.messageQueue
+}
+
+func (cr *ConsumeRequest) GetPQ() *processQueue {
+ return cr.processQueue
+}
+
type defaultPullConsumer struct {
*defaultConsumer
@@ -193,6 +201,14 @@ func (pc *defaultPullConsumer) Poll(ctx context.Context,
timeout time.Duration)
case <-ctx.Done():
return nil, ErrNoNewMsg
case cr := <-pc.consumeRequestCache:
+ if cr.processQueue.IsDroppd() {
+ rlog.Info("defaultPullConsumer poll the message queue
not be able to consume, because it was dropped", map[string]interface{}{
+ rlog.LogKeyMessageQueue:
cr.messageQueue.String(),
+ rlog.LogKeyConsumerGroup: pc.consumerGroup,
+ })
+ return nil, ErrNoNewMsg
+ }
+
if len(cr.GetMsgList()) == 0 {
return nil, ErrNoNewMsg
}
@@ -780,6 +796,13 @@ func (pc *defaultPullConsumer) consumeMessageCurrently(pq
*processQueue, mq *pri
if msgList == nil {
return
}
+ if pq.IsDroppd() {
+ rlog.Info("defaultPullConsumer consumeMessageCurrently the
message queue not be able to consume, because it was dropped",
map[string]interface{}{
+ rlog.LogKeyMessageQueue: mq.String(),
+ rlog.LogKeyConsumerGroup: pc.consumerGroup,
+ })
+ return
+ }
cr := &ConsumeRequest{
messageQueue: mq,
processQueue: pq,
diff --git a/examples/consumer/pull/poll/main.go
b/examples/consumer/pull/poll/main.go
index b6c783b..63d90f7 100644
--- a/examples/consumer/pull/poll/main.go
+++ b/examples/consumer/pull/poll/main.go
@@ -97,6 +97,8 @@ func poll() {
}
// todo LOGIC CODE HERE
log.Println("msgList: ", cr.GetMsgList())
+ log.Println("messageQueue: ", cr.GetMQ())
+ log.Println("processQueue: ", cr.GetPQ())
// pullConsumer.ACK(context.TODO(), cr, consumer.ConsumeRetryLater)
pullConsumer.ACK(context.TODO(), cr, consumer.ConsumeSuccess)
}