This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 2b28c34 consumer consume include tag according to accumulation (#834)
2b28c34 is described below
commit 2b28c342e52929938c99db6f6c160b97ed9e2d0b
Author: wang1309 <[email protected]>
AuthorDate: Thu Jul 21 20:07:52 2022 +0800
consumer consume include tag according to accumulation (#834)
Co-authored-by: 王瑞 <[email protected]>
---
consumer/push_consumer.go | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index d3f8545..4ce8fa4 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -705,7 +705,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
sd := v.(*internal.SubscriptionData)
classFilter := sd.ClassFilterMode
- if pc.option.PostSubscriptionWhenPull && classFilter {
+ if pc.option.PostSubscriptionWhenPull && !classFilter {
subExpression = sd.SubString
}
@@ -719,7 +719,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
MaxMsgNums: pc.option.PullBatchSize,
SysFlag: sysFlag,
CommitOffset: commitOffsetValue,
- SubExpression: _SubAll,
+ SubExpression: subExpression,
ExpressionType: string(TAG),
SuspendTimeoutMillis: 20 * time.Second,
}