This is an automated email from the ASF dual-hosted git repository. aaronai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit 30c1832bb83c00c946786a2b44cb073e5050150e Author: Aaron Ai <[email protected]> AuthorDate: Wed Feb 15 17:22:11 2023 +0800 Implement Client#OnVerrifyMessageCommand and Client#OnPrintThreadStackTraceCommand --- csharp/rocketmq-client-csharp/Client.cs | 43 +++++++++++++++++++++++++++----- csharp/rocketmq-client-csharp/Session.cs | 5 ++++ 2 files changed, 42 insertions(+), 6 deletions(-) diff --git a/csharp/rocketmq-client-csharp/Client.cs b/csharp/rocketmq-client-csharp/Client.cs index 89208db5..fc6871b9 100644 --- a/csharp/rocketmq-client-csharp/Client.cs +++ b/csharp/rocketmq-client-csharp/Client.cs @@ -171,7 +171,6 @@ namespace Org.Apache.Rocketmq OnTopicRouteDataUpdated0(topic, topicRouteData); } - /** * Return all endpoints of brokers in route table. */ @@ -201,7 +200,8 @@ namespace Org.Apache.Rocketmq } catch (Exception e) { - Logger.Error(e, $"[Bug] unexpected exception raised during topic route cache update, clientId={ClientId}"); + Logger.Error(e, + $"[Bug] unexpected exception raised during topic route cache update, clientId={ClientId}"); } } @@ -324,14 +324,16 @@ namespace Org.Apache.Rocketmq Logger.Info($"Send heartbeat successfully, endpoints={item}, clientId={ClientId}"); if (Isolated.TryRemove(item, out _)) { - Logger.Info($"Rejoin endpoints which was isolated before, endpoints={item}, clientId={ClientId}"); + Logger.Info( + $"Rejoin endpoints which was isolated before, endpoints={item}, clientId={ClientId}"); } return; } var statusMessage = response.Status.Message; - Logger.Info($"Failed to send heartbeat, endpoints={item}, code={code}, statusMessage={statusMessage}, clientId={ClientId}"); + Logger.Info( + $"Failed to send heartbeat, endpoints={item}, code={code}, statusMessage={statusMessage}, clientId={ClientId}"); } } catch (Exception e) @@ -390,14 +392,43 @@ namespace Org.Apache.Rocketmq public void OnRecoverOrphanedTransactionCommand(Endpoints endpoints, Proto.RecoverOrphanedTransactionCommand command) { + // TODO } - public void OnVerifyMessageCommand(Endpoints endpoints, Proto.VerifyMessageCommand command) + public async void OnVerifyMessageCommand(Endpoints endpoints, Proto.VerifyMessageCommand command) { + // Only push consumer support message consumption verification. + Logger.Warn($"Ignore verify message command from remote, which is not expected, clientId={ClientId}, " + + $"endpoints={endpoints}, command={command}"); + var status = new Proto.Status + { + Code = Proto.Code.Unsupported, + Message = "Message consumption verification is not supported" + }; + var telemetryCommand = new Proto.TelemetryCommand() + { + Status = status + }; + var (_, session) = GetSession(endpoints); + await session.write(telemetryCommand); } - public void OnPrintThreadStackTraceCommand(Endpoints endpoints, Proto.PrintThreadStackTraceCommand command) + public async void OnPrintThreadStackTraceCommand(Endpoints endpoints, + Proto.PrintThreadStackTraceCommand command) { + Logger.Warn("Ignore thread stack trace printing command from remote because it is still not supported, " + + $"clientId={ClientId}, endpoints={endpoints}"); + var status = new Proto.Status + { + Code = Proto.Code.Unsupported, + Message = "C# don't support thread stack trace printing" + }; + var telemetryCommand = new Proto.TelemetryCommand() + { + Status = status + }; + var (_, session) = GetSession(endpoints); + await session.write(telemetryCommand); } public void OnSettingsCommand(Endpoints endpoints, Proto.Settings settings) diff --git a/csharp/rocketmq-client-csharp/Session.cs b/csharp/rocketmq-client-csharp/Session.cs index c948ef86..24ff9c7c 100644 --- a/csharp/rocketmq-client-csharp/Session.cs +++ b/csharp/rocketmq-client-csharp/Session.cs @@ -53,6 +53,11 @@ namespace Org.Apache.Rocketmq Loop(); } + public async Task write(Proto.TelemetryCommand telemetryCommand) + { + var writer = _streamingCall.RequestStream; + await writer.WriteAsync(telemetryCommand); + } public async Task SyncSettings(bool awaitResp) {
