This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 8019e59  [ISSUE #487] Implement Unsubscribe method for push consumer 
(#626)
8019e59 is described below

commit 8019e59bf0cf7edb70da035d2529faf8573e908b
Author: 张旭 <[email protected]>
AuthorDate: Tue Mar 16 19:39:52 2021 +0800

    [ISSUE #487] Implement Unsubscribe method for push consumer (#626)
    
    * Implement Unsubscribe method for push consumer
---
 consumer/push_consumer.go      | 13 ++++++++++++-
 consumer/push_consumer_test.go | 17 +++++++++++++++++
 2 files changed, 29 insertions(+), 1 deletion(-)

diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 8fb4637..393a0e4 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -227,6 +227,14 @@ func (pc *pushConsumer) Subscribe(topic string, selector 
MessageSelector,
                return errors.New("cannot subscribe topic since client either 
failed to start or has been shutdown.")
        }
 
+       // add retry topic subscription for resubscribe
+       retryTopic := internal.GetRetryTopic(pc.consumerGroup)
+       _, exists := pc.subscriptionDataTable.Load(retryTopic)
+       if !exists {
+               sub := buildSubscriptionData(retryTopic, MessageSelector{TAG, 
_SubAll})
+               pc.subscriptionDataTable.Store(retryTopic, sub)
+       }
+
        if pc.option.Namespace != "" {
                topic = pc.option.Namespace + "%" + topic
        }
@@ -241,7 +249,10 @@ func (pc *pushConsumer) Subscribe(topic string, selector 
MessageSelector,
        return nil
 }
 
-func (pc *pushConsumer) Unsubscribe(string) error {
+func (pc *pushConsumer) Unsubscribe(topic string) error {
+       pc.subscriptionDataTable.Delete(topic)
+       retryTopic := internal.GetRetryTopic(pc.consumerGroup)
+       pc.subscriptionDataTable.Delete(retryTopic)
        return nil
 }
 
diff --git a/consumer/push_consumer_test.go b/consumer/push_consumer_test.go
index 4789e9b..e67b2db 100644
--- a/consumer/push_consumer_test.go
+++ b/consumer/push_consumer_test.go
@@ -52,6 +52,23 @@ func TestStart(t *testing.T) {
                        return ConsumeSuccess, nil
                })
 
+               _, exists := c.subscriptionDataTable.Load("TopicTest")
+               So(exists, ShouldBeTrue)
+
+               err = c.Unsubscribe("TopicTest")
+               So(err, ShouldBeNil)
+               _, exists = c.subscriptionDataTable.Load("TopicTest")
+               So(exists, ShouldBeFalse)
+
+               err = c.Subscribe("TopicTest", MessageSelector{}, func(ctx 
context.Context,
+                       msgs ...*primitive.MessageExt) (ConsumeResult, error) {
+                       fmt.Printf("subscribe callback: %v \n", msgs)
+                       return ConsumeSuccess, nil
+               })
+
+               _, exists = c.subscriptionDataTable.Load("TopicTest")
+               So(exists, ShouldBeTrue)
+
                client.EXPECT().ClientID().Return("127.0.0.1@DEFAULT")
                client.EXPECT().Start().Return()
                client.EXPECT().RegisterConsumer(gomock.Any(), 
gomock.Any()).Return(nil)

Reply via email to