This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b28c34  consumer consume include tag according to accumulation (#834)
2b28c34 is described below

commit 2b28c342e52929938c99db6f6c160b97ed9e2d0b
Author: wang1309 <[email protected]>
AuthorDate: Thu Jul 21 20:07:52 2022 +0800

    consumer consume include tag according to accumulation (#834)
    
    Co-authored-by: 王瑞 <[email protected]>
---
 consumer/push_consumer.go | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index d3f8545..4ce8fa4 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -705,7 +705,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
 
                sd := v.(*internal.SubscriptionData)
                classFilter := sd.ClassFilterMode
-               if pc.option.PostSubscriptionWhenPull && classFilter {
+               if pc.option.PostSubscriptionWhenPull && !classFilter {
                        subExpression = sd.SubString
                }
 
@@ -719,7 +719,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
                        MaxMsgNums:           pc.option.PullBatchSize,
                        SysFlag:              sysFlag,
                        CommitOffset:         commitOffsetValue,
-                       SubExpression:        _SubAll,
+                       SubExpression:        subExpression,
                        ExpressionType:       string(TAG),
                        SuspendTimeoutMillis: 20 * time.Second,
                }

Reply via email to