This is an automated email from the ASF dual-hosted git repository. aaronai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit 7ce29d7d70d2b9e63dc94c35d53ca3dcf902df26 Author: Aaron Ai <[email protected]> AuthorDate: Mon Feb 13 20:38:04 2023 +0800 Notify remote endpoints that current client is terminated --- csharp/rocketmq-client-csharp/Client.cs | 13 +++++++------ csharp/rocketmq-client-csharp/Producer.cs | 5 +++++ csharp/rocketmq-client-csharp/SimpleConsumer.cs | 8 ++++++++ 3 files changed, 20 insertions(+), 6 deletions(-) diff --git a/csharp/rocketmq-client-csharp/Client.cs b/csharp/rocketmq-client-csharp/Client.cs index 9d4d7e69..4849857b 100644 --- a/csharp/rocketmq-client-csharp/Client.cs +++ b/csharp/rocketmq-client-csharp/Client.cs @@ -94,6 +94,7 @@ namespace Org.Apache.Rocketmq _topicRouteUpdateCtx.Cancel(); _heartbeatCts.Cancel(); _telemetryCts.Cancel(); + NotifyClientTermination(); await ClientManager.Shutdown(); Logger.Debug($"Shutdown the rocketmq client successfully, clientId={ClientId}"); } @@ -299,7 +300,7 @@ namespace Org.Apache.Rocketmq if (Isolated.TryRemove(item, out _)) { Logger.Info( - $"Rejoin endpoints which was isolate before, endpoints={item}, clientId={ClientId}"); + $"Rejoin endpoints which was isolated before, endpoints={item}, clientId={ClientId}"); } return; @@ -319,13 +320,13 @@ namespace Org.Apache.Rocketmq return metadata; } - public async void NotifyClientTermination(Proto.Resource group) + protected abstract Proto::NotifyClientTerminationRequest WrapNotifyClientTerminationRequest(); + + private async void NotifyClientTermination() { + Logger.Info($"Notify remote endpoints that current client is terminated, clientId={ClientId}"); var endpoints = GetTotalRouteEndpoints(); - var request = new Proto::NotifyClientTerminationRequest - { - Group = group - }; + var request = WrapNotifyClientTerminationRequest(); foreach (var item in endpoints) { var response = await ClientManager.NotifyClientTermination(item, request, ClientConfig.RequestTimeout); diff --git a/csharp/rocketmq-client-csharp/Producer.cs b/csharp/rocketmq-client-csharp/Producer.cs index 170dbeab..4bbc4bfa 100644 --- a/csharp/rocketmq-client-csharp/Producer.cs +++ b/csharp/rocketmq-client-csharp/Producer.cs @@ -88,6 +88,11 @@ namespace Org.Apache.Rocketmq }; } + protected override Proto::NotifyClientTerminationRequest WrapNotifyClientTerminationRequest() + { + return new Proto::NotifyClientTerminationRequest(); + } + private async Task<PublishingLoadBalancer> GetPublishingLoadBalancer(string topic) { if (_publishingRouteDataCache.TryGetValue(topic, out var publishingLoadBalancer)) diff --git a/csharp/rocketmq-client-csharp/SimpleConsumer.cs b/csharp/rocketmq-client-csharp/SimpleConsumer.cs index 8a8922fd..cdd5aa64 100644 --- a/csharp/rocketmq-client-csharp/SimpleConsumer.cs +++ b/csharp/rocketmq-client-csharp/SimpleConsumer.cs @@ -84,6 +84,14 @@ namespace Org.Apache.Rocketmq return _subscriptionExpressions.Keys; } + protected override Proto.NotifyClientTerminationRequest WrapNotifyClientTerminationRequest() + { + return new Proto.NotifyClientTerminationRequest() + { + Group = GetProtobufGroup() + }; + } + protected override Proto.HeartbeatRequest WrapHeartbeatRequest() { return new Proto::HeartbeatRequest
