This is an automated email from the ASF dual-hosted git repository. aaronai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit 8f4c0c4d99decaabcc6f69941a8d5960dddb1595 Author: colprog <[email protected]> AuthorDate: Tue Dec 20 23:12:49 2022 +0800 Add missing unsubscribe in dotnet SimpleConsumer --- csharp/rocketmq-client-csharp/Client.cs | 15 ++++++++------- csharp/rocketmq-client-csharp/Producer.cs | 4 ++-- csharp/rocketmq-client-csharp/SimpleConsumer.cs | 6 ++++++ 3 files changed, 16 insertions(+), 9 deletions(-) diff --git a/csharp/rocketmq-client-csharp/Client.cs b/csharp/rocketmq-client-csharp/Client.cs index 217479d9..bd9fdef1 100644 --- a/csharp/rocketmq-client-csharp/Client.cs +++ b/csharp/rocketmq-client-csharp/Client.cs @@ -148,11 +148,7 @@ namespace Org.Apache.Rocketmq private async Task UpdateTopicRoute() { - HashSet<string> topics = new HashSet<string>(); - foreach (var topic in _topicsOfInterest) - { - topics.Add(topic); - } + HashSet<string> topics = new HashSet<string>(_topicsOfInterest.Keys); foreach (var item in _topicRouteTable) { @@ -518,11 +514,16 @@ namespace Org.Apache.Rocketmq protected readonly IClientManager Manager; - protected readonly HashSet<string> _topicsOfInterest = new HashSet<string>(); + protected readonly ConcurrentDictionary<string, bool> _topicsOfInterest = new (); public void AddTopicOfInterest(string topic) { - _topicsOfInterest.Add(topic); + _topicsOfInterest.TryAdd(topic, true); + } + + public void RemoveTopicOfInterest(string topic) + { + _topicsOfInterest.TryRemove(topic, out var _); } private readonly ConcurrentDictionary<string, TopicRouteData> _topicRouteTable; diff --git a/csharp/rocketmq-client-csharp/Producer.cs b/csharp/rocketmq-client-csharp/Producer.cs index 77016571..39e39b8e 100644 --- a/csharp/rocketmq-client-csharp/Producer.cs +++ b/csharp/rocketmq-client-csharp/Producer.cs @@ -94,7 +94,7 @@ namespace Org.Apache.Rocketmq { var resource = new rmq.Resource() { - Name = topic, + Name = topic.Key, ResourceNamespace = ResourceNamespace }; publishing.Topics.Add(resource); @@ -105,7 +105,7 @@ namespace Org.Apache.Rocketmq public async Task<SendReceipt> Send(Message message) { - _topicsOfInterest.Add(message.Topic); + _topicsOfInterest.TryAdd(message.Topic, true); if (!_loadBalancer.TryGetValue(message.Topic, out var publishLb)) { diff --git a/csharp/rocketmq-client-csharp/SimpleConsumer.cs b/csharp/rocketmq-client-csharp/SimpleConsumer.cs index 12e5d8cf..efe38211 100644 --- a/csharp/rocketmq-client-csharp/SimpleConsumer.cs +++ b/csharp/rocketmq-client-csharp/SimpleConsumer.cs @@ -180,6 +180,12 @@ namespace Org.Apache.Rocketmq AddTopicOfInterest(topic); } + public void Unsubscribe(string topic) + { + _subscriptions.TryRemove(topic, out var _); + RemoveTopicOfInterest(topic); + } + internal override void OnSettingsReceived(rmq.Settings settings) { base.OnSettingsReceived(settings);
