This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git

commit e272ee3da21560ae86ab560f6bd0bd9243ee2047
Author: Aaron Ai <[email protected]>
AuthorDate: Wed Feb 22 17:43:22 2023 +0800

    Add nonce for TelemetryCommand
---
 csharp/rocketmq-client-csharp/Client.cs | 38 ++++++++++++++++-----------------
 1 file changed, 19 insertions(+), 19 deletions(-)

diff --git a/csharp/rocketmq-client-csharp/Client.cs 
b/csharp/rocketmq-client-csharp/Client.cs
index 24e1e4ac..e0a4c553 100644
--- a/csharp/rocketmq-client-csharp/Client.cs
+++ b/csharp/rocketmq-client-csharp/Client.cs
@@ -42,7 +42,7 @@ namespace Org.Apache.Rocketmq
         private static readonly TimeSpan SettingsSyncScheduleDelay = 
TimeSpan.FromSeconds(1);
         private static readonly TimeSpan SettingsSyncSchedulePeriod = 
TimeSpan.FromSeconds(1);
         private readonly CancellationTokenSource _settingsSyncCts;
-        
+
         private static readonly TimeSpan StatsScheduleDelay = 
TimeSpan.FromSeconds(60);
         private static readonly TimeSpan StatsSchedulePeriod = 
TimeSpan.FromSeconds(60);
         private readonly CancellationTokenSource _statsCts;
@@ -201,13 +201,13 @@ namespace Org.Apache.Rocketmq
             {
                 Logger.Info($"Start to update topic route cache for a new 
round, clientId={ClientId}");
                 Dictionary<string, Task<TopicRouteData>> responses = new();
-                
+
                 foreach (var topic in GetTopics())
                 {
                     var task = FetchTopicRoute(topic);
                     responses[topic] = task;
                 }
-                
+
                 foreach (var item in responses.Keys)
                 {
                     try
@@ -219,7 +219,7 @@ namespace Org.Apache.Rocketmq
                         Logger.Error(e, $"Failed to update topic route cache, 
topic={item}");
                     }
                 }
-                
+
                 foreach (var topic in GetTopics())
                 {
                     await FetchTopicRoute(topic);
@@ -242,7 +242,6 @@ namespace Org.Apache.Rocketmq
                     var (_, session) = GetSession(endpoints);
                     await session.SyncSettings(false);
                     Logger.Info($"Sync settings to remote, 
endpoints={endpoints}");
-
                 }
             }
             catch (Exception e)
@@ -422,22 +421,11 @@ namespace Org.Apache.Rocketmq
             return ClientConfig;
         }
 
-        public virtual async void 
OnRecoverOrphanedTransactionCommand(Endpoints endpoints,
+        public virtual void OnRecoverOrphanedTransactionCommand(Endpoints 
endpoints,
             Proto.RecoverOrphanedTransactionCommand command)
         {
             Logger.Warn($"Ignore orphaned transaction recovery command from 
remote, which is not expected, " +
                         $"clientId={ClientId}, endpoints={endpoints}");
-            var status = new Proto.Status
-            {
-                Code = Proto.Code.InternalError,
-                Message = "Current client don't support transaction message 
recovery"
-            };
-            var telemetryCommand = new Proto.TelemetryCommand
-            {
-                Status = status
-            };
-            var (_, session) = GetSession(endpoints);
-            await session.WriteAsync(telemetryCommand);
         }
 
         public async void OnVerifyMessageCommand(Endpoints endpoints, 
Proto.VerifyMessageCommand command)
@@ -450,8 +438,14 @@ namespace Org.Apache.Rocketmq
                 Code = Proto.Code.Unsupported,
                 Message = "Message consumption verification is not supported"
             };
+            var verifyMessageResult = new Proto.VerifyMessageResult
+            {
+                Nonce = command.Nonce
+            };
+
             var telemetryCommand = new Proto.TelemetryCommand
             {
+                VerifyMessageResult = verifyMessageResult,
                 Status = status
             };
             var (_, session) = GetSession(endpoints);
@@ -468,9 +462,15 @@ namespace Org.Apache.Rocketmq
                 Code = Proto.Code.Unsupported,
                 Message = "C# don't support thread stack trace printing"
             };
-            var telemetryCommand = new Proto.TelemetryCommand()
+            var threadStackTrace = new Proto.ThreadStackTrace
             {
-                Status = status
+                Nonce = command.Nonce
+            };
+
+            var telemetryCommand = new Proto.TelemetryCommand
+            {
+                ThreadStackTrace = threadStackTrace,
+                Status = status,
             };
             var (_, session) = GetSession(endpoints);
             await session.WriteAsync(telemetryCommand);

Reply via email to