This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 8019e59 [ISSUE #487] Implement Unsubscribe method for push consumer
(#626)
8019e59 is described below
commit 8019e59bf0cf7edb70da035d2529faf8573e908b
Author: 张旭 <[email protected]>
AuthorDate: Tue Mar 16 19:39:52 2021 +0800
[ISSUE #487] Implement Unsubscribe method for push consumer (#626)
* Implement Unsubscribe method for push consumer
---
consumer/push_consumer.go | 13 ++++++++++++-
consumer/push_consumer_test.go | 17 +++++++++++++++++
2 files changed, 29 insertions(+), 1 deletion(-)
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 8fb4637..393a0e4 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -227,6 +227,14 @@ func (pc *pushConsumer) Subscribe(topic string, selector
MessageSelector,
return errors.New("cannot subscribe topic since client either
failed to start or has been shutdown.")
}
+ // add retry topic subscription for resubscribe
+ retryTopic := internal.GetRetryTopic(pc.consumerGroup)
+ _, exists := pc.subscriptionDataTable.Load(retryTopic)
+ if !exists {
+ sub := buildSubscriptionData(retryTopic, MessageSelector{TAG,
_SubAll})
+ pc.subscriptionDataTable.Store(retryTopic, sub)
+ }
+
if pc.option.Namespace != "" {
topic = pc.option.Namespace + "%" + topic
}
@@ -241,7 +249,10 @@ func (pc *pushConsumer) Subscribe(topic string, selector
MessageSelector,
return nil
}
-func (pc *pushConsumer) Unsubscribe(string) error {
+func (pc *pushConsumer) Unsubscribe(topic string) error {
+ pc.subscriptionDataTable.Delete(topic)
+ retryTopic := internal.GetRetryTopic(pc.consumerGroup)
+ pc.subscriptionDataTable.Delete(retryTopic)
return nil
}
diff --git a/consumer/push_consumer_test.go b/consumer/push_consumer_test.go
index 4789e9b..e67b2db 100644
--- a/consumer/push_consumer_test.go
+++ b/consumer/push_consumer_test.go
@@ -52,6 +52,23 @@ func TestStart(t *testing.T) {
return ConsumeSuccess, nil
})
+ _, exists := c.subscriptionDataTable.Load("TopicTest")
+ So(exists, ShouldBeTrue)
+
+ err = c.Unsubscribe("TopicTest")
+ So(err, ShouldBeNil)
+ _, exists = c.subscriptionDataTable.Load("TopicTest")
+ So(exists, ShouldBeFalse)
+
+ err = c.Subscribe("TopicTest", MessageSelector{}, func(ctx
context.Context,
+ msgs ...*primitive.MessageExt) (ConsumeResult, error) {
+ fmt.Printf("subscribe callback: %v \n", msgs)
+ return ConsumeSuccess, nil
+ })
+
+ _, exists = c.subscriptionDataTable.Load("TopicTest")
+ So(exists, ShouldBeTrue)
+
client.EXPECT().ClientID().Return("127.0.0.1@DEFAULT")
client.EXPECT().Start().Return()
client.EXPECT().RegisterConsumer(gomock.Any(),
gomock.Any()).Return(nil)