This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new c28e0771 fix(golang): add filter type in simple consumer warp request
(#531)
c28e0771 is described below
commit c28e0771dd952ad317b01053f4a3bfbc0bb66474
Author: nvac <[email protected]>
AuthorDate: Thu Jun 1 19:12:19 2023 +0800
fix(golang): add filter type in simple consumer warp request (#531)
---
golang/simple_consumer.go | 11 +++++++++++
1 file changed, 11 insertions(+)
diff --git a/golang/simple_consumer.go b/golang/simple_consumer.go
index dfc72320..8abfb413 100644
--- a/golang/simple_consumer.go
+++ b/golang/simple_consumer.go
@@ -154,6 +154,16 @@ func (sc *defaultSimpleConsumer) Unsubscribe(topic string)
error {
}
func (sc *defaultSimpleConsumer) wrapReceiveMessageRequest(batchSize int,
messageQueue *v2.MessageQueue, filterExpression *FilterExpression,
invisibleDuration time.Duration) *v2.ReceiveMessageRequest {
+ var filterType v2.FilterType
+ switch filterExpression.expressionType {
+ case SQL92:
+ filterType = v2.FilterType_SQL
+ case TAG:
+ filterType = v2.FilterType_TAG
+ default:
+ filterType = v2.FilterType_FILTER_TYPE_UNSPECIFIED
+ }
+
return &v2.ReceiveMessageRequest{
Group: &v2.Resource{
Name: sc.groupName,
@@ -161,6 +171,7 @@ func (sc *defaultSimpleConsumer)
wrapReceiveMessageRequest(batchSize int, messag
MessageQueue: messageQueue,
FilterExpression: &v2.FilterExpression{
Expression: filterExpression.expression,
+ Type: filterType,
},
BatchSize: int32(batchSize),
InvisibleDuration: durationpb.New(invisibleDuration),