This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git

commit 30c1832bb83c00c946786a2b44cb073e5050150e
Author: Aaron Ai <[email protected]>
AuthorDate: Wed Feb 15 17:22:11 2023 +0800

    Implement Client#OnVerrifyMessageCommand and 
Client#OnPrintThreadStackTraceCommand
---
 csharp/rocketmq-client-csharp/Client.cs  | 43 +++++++++++++++++++++++++++-----
 csharp/rocketmq-client-csharp/Session.cs |  5 ++++
 2 files changed, 42 insertions(+), 6 deletions(-)

diff --git a/csharp/rocketmq-client-csharp/Client.cs 
b/csharp/rocketmq-client-csharp/Client.cs
index 89208db5..fc6871b9 100644
--- a/csharp/rocketmq-client-csharp/Client.cs
+++ b/csharp/rocketmq-client-csharp/Client.cs
@@ -171,7 +171,6 @@ namespace Org.Apache.Rocketmq
             OnTopicRouteDataUpdated0(topic, topicRouteData);
         }
 
-
         /**
          * Return all endpoints of brokers in route table.
          */
@@ -201,7 +200,8 @@ namespace Org.Apache.Rocketmq
             }
             catch (Exception e)
             {
-                Logger.Error(e, $"[Bug] unexpected exception raised during 
topic route cache update, clientId={ClientId}");
+                Logger.Error(e,
+                    $"[Bug] unexpected exception raised during topic route 
cache update, clientId={ClientId}");
             }
         }
 
@@ -324,14 +324,16 @@ namespace Org.Apache.Rocketmq
                         Logger.Info($"Send heartbeat successfully, 
endpoints={item}, clientId={ClientId}");
                         if (Isolated.TryRemove(item, out _))
                         {
-                            Logger.Info($"Rejoin endpoints which was isolated 
before, endpoints={item}, clientId={ClientId}");
+                            Logger.Info(
+                                $"Rejoin endpoints which was isolated before, 
endpoints={item}, clientId={ClientId}");
                         }
 
                         return;
                     }
 
                     var statusMessage = response.Status.Message;
-                    Logger.Info($"Failed to send heartbeat, endpoints={item}, 
code={code}, statusMessage={statusMessage}, clientId={ClientId}");
+                    Logger.Info(
+                        $"Failed to send heartbeat, endpoints={item}, 
code={code}, statusMessage={statusMessage}, clientId={ClientId}");
                 }
             }
             catch (Exception e)
@@ -390,14 +392,43 @@ namespace Org.Apache.Rocketmq
         public void OnRecoverOrphanedTransactionCommand(Endpoints endpoints,
             Proto.RecoverOrphanedTransactionCommand command)
         {
+            // TODO
         }
 
-        public void OnVerifyMessageCommand(Endpoints endpoints, 
Proto.VerifyMessageCommand command)
+        public async void OnVerifyMessageCommand(Endpoints endpoints, 
Proto.VerifyMessageCommand command)
         {
+            // Only push consumer support message consumption verification.
+            Logger.Warn($"Ignore verify message command from remote, which is 
not expected, clientId={ClientId}, " +
+                        $"endpoints={endpoints}, command={command}");
+            var status = new Proto.Status
+            {
+                Code = Proto.Code.Unsupported,
+                Message = "Message consumption verification is not supported"
+            };
+            var telemetryCommand = new Proto.TelemetryCommand()
+            {
+                Status = status
+            };
+            var (_, session) = GetSession(endpoints);
+            await session.write(telemetryCommand);
         }
 
-        public void OnPrintThreadStackTraceCommand(Endpoints endpoints, 
Proto.PrintThreadStackTraceCommand command)
+        public async void OnPrintThreadStackTraceCommand(Endpoints endpoints,
+            Proto.PrintThreadStackTraceCommand command)
         {
+            Logger.Warn("Ignore thread stack trace printing command from 
remote because it is still not supported, " +
+                        $"clientId={ClientId}, endpoints={endpoints}");
+            var status = new Proto.Status
+            {
+                Code = Proto.Code.Unsupported,
+                Message = "C# don't support thread stack trace printing"
+            };
+            var telemetryCommand = new Proto.TelemetryCommand()
+            {
+                Status = status
+            };
+            var (_, session) = GetSession(endpoints);
+            await session.write(telemetryCommand);
         }
 
         public void OnSettingsCommand(Endpoints endpoints, Proto.Settings 
settings)
diff --git a/csharp/rocketmq-client-csharp/Session.cs 
b/csharp/rocketmq-client-csharp/Session.cs
index c948ef86..24ff9c7c 100644
--- a/csharp/rocketmq-client-csharp/Session.cs
+++ b/csharp/rocketmq-client-csharp/Session.cs
@@ -53,6 +53,11 @@ namespace Org.Apache.Rocketmq
             Loop();
         }
 
+        public async Task write(Proto.TelemetryCommand telemetryCommand)
+        {
+            var writer = _streamingCall.RequestStream;
+            await writer.WriteAsync(telemetryCommand);
+        } 
 
         public async Task SyncSettings(bool awaitResp)
         {

Reply via email to