This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git

commit 7ce29d7d70d2b9e63dc94c35d53ca3dcf902df26
Author: Aaron Ai <[email protected]>
AuthorDate: Mon Feb 13 20:38:04 2023 +0800

    Notify remote endpoints that current client is terminated
---
 csharp/rocketmq-client-csharp/Client.cs         | 13 +++++++------
 csharp/rocketmq-client-csharp/Producer.cs       |  5 +++++
 csharp/rocketmq-client-csharp/SimpleConsumer.cs |  8 ++++++++
 3 files changed, 20 insertions(+), 6 deletions(-)

diff --git a/csharp/rocketmq-client-csharp/Client.cs 
b/csharp/rocketmq-client-csharp/Client.cs
index 9d4d7e69..4849857b 100644
--- a/csharp/rocketmq-client-csharp/Client.cs
+++ b/csharp/rocketmq-client-csharp/Client.cs
@@ -94,6 +94,7 @@ namespace Org.Apache.Rocketmq
             _topicRouteUpdateCtx.Cancel();
             _heartbeatCts.Cancel();
             _telemetryCts.Cancel();
+            NotifyClientTermination();
             await ClientManager.Shutdown();
             Logger.Debug($"Shutdown the rocketmq client successfully, 
clientId={ClientId}");
         }
@@ -299,7 +300,7 @@ namespace Org.Apache.Rocketmq
                     if (Isolated.TryRemove(item, out _))
                     {
                         Logger.Info(
-                            $"Rejoin endpoints which was isolate before, 
endpoints={item}, clientId={ClientId}");
+                            $"Rejoin endpoints which was isolated before, 
endpoints={item}, clientId={ClientId}");
                     }
 
                     return;
@@ -319,13 +320,13 @@ namespace Org.Apache.Rocketmq
             return metadata;
         }
 
-        public async void NotifyClientTermination(Proto.Resource group)
+        protected abstract Proto::NotifyClientTerminationRequest 
WrapNotifyClientTerminationRequest();
+
+        private async void NotifyClientTermination()
         {
+            Logger.Info($"Notify remote endpoints that current client is 
terminated, clientId={ClientId}");
             var endpoints = GetTotalRouteEndpoints();
-            var request = new Proto::NotifyClientTerminationRequest
-            {
-                Group = group
-            };
+            var request = WrapNotifyClientTerminationRequest();
             foreach (var item in endpoints)
             {
                 var response = await 
ClientManager.NotifyClientTermination(item, request, 
ClientConfig.RequestTimeout);
diff --git a/csharp/rocketmq-client-csharp/Producer.cs 
b/csharp/rocketmq-client-csharp/Producer.cs
index 170dbeab..4bbc4bfa 100644
--- a/csharp/rocketmq-client-csharp/Producer.cs
+++ b/csharp/rocketmq-client-csharp/Producer.cs
@@ -88,6 +88,11 @@ namespace Org.Apache.Rocketmq
             };
         }
 
+        protected override Proto::NotifyClientTerminationRequest 
WrapNotifyClientTerminationRequest()
+        {
+            return new Proto::NotifyClientTerminationRequest();
+        }
+
         private async Task<PublishingLoadBalancer> 
GetPublishingLoadBalancer(string topic)
         {
             if (_publishingRouteDataCache.TryGetValue(topic, out var 
publishingLoadBalancer))
diff --git a/csharp/rocketmq-client-csharp/SimpleConsumer.cs 
b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
index 8a8922fd..cdd5aa64 100644
--- a/csharp/rocketmq-client-csharp/SimpleConsumer.cs
+++ b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
@@ -84,6 +84,14 @@ namespace Org.Apache.Rocketmq
             return _subscriptionExpressions.Keys;
         }
 
+        protected override Proto.NotifyClientTerminationRequest 
WrapNotifyClientTerminationRequest()
+        {
+            return new Proto.NotifyClientTerminationRequest()
+            {
+                Group = GetProtobufGroup()
+            };
+        }
+
         protected override Proto.HeartbeatRequest WrapHeartbeatRequest()
         {
             return new Proto::HeartbeatRequest

Reply via email to