This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new c28e0771 fix(golang): add filter type in simple consumer warp request 
(#531)
c28e0771 is described below

commit c28e0771dd952ad317b01053f4a3bfbc0bb66474
Author: nvac <[email protected]>
AuthorDate: Thu Jun 1 19:12:19 2023 +0800

    fix(golang): add filter type in simple consumer warp request (#531)
---
 golang/simple_consumer.go | 11 +++++++++++
 1 file changed, 11 insertions(+)

diff --git a/golang/simple_consumer.go b/golang/simple_consumer.go
index dfc72320..8abfb413 100644
--- a/golang/simple_consumer.go
+++ b/golang/simple_consumer.go
@@ -154,6 +154,16 @@ func (sc *defaultSimpleConsumer) Unsubscribe(topic string) 
error {
 }
 
 func (sc *defaultSimpleConsumer) wrapReceiveMessageRequest(batchSize int, 
messageQueue *v2.MessageQueue, filterExpression *FilterExpression, 
invisibleDuration time.Duration) *v2.ReceiveMessageRequest {
+       var filterType v2.FilterType
+       switch filterExpression.expressionType {
+       case SQL92:
+               filterType = v2.FilterType_SQL
+       case TAG:
+               filterType = v2.FilterType_TAG
+       default:
+               filterType = v2.FilterType_FILTER_TYPE_UNSPECIFIED
+       }
+
        return &v2.ReceiveMessageRequest{
                Group: &v2.Resource{
                        Name: sc.groupName,
@@ -161,6 +171,7 @@ func (sc *defaultSimpleConsumer) 
wrapReceiveMessageRequest(batchSize int, messag
                MessageQueue: messageQueue,
                FilterExpression: &v2.FilterExpression{
                        Expression: filterExpression.expression,
+                       Type:       filterType,
                },
                BatchSize:         int32(batchSize),
                InvisibleDuration: durationpb.New(invisibleDuration),

Reply via email to