This is an automated email from the ASF dual-hosted git repository. aaronai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit e272ee3da21560ae86ab560f6bd0bd9243ee2047 Author: Aaron Ai <[email protected]> AuthorDate: Wed Feb 22 17:43:22 2023 +0800 Add nonce for TelemetryCommand --- csharp/rocketmq-client-csharp/Client.cs | 38 ++++++++++++++++----------------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/csharp/rocketmq-client-csharp/Client.cs b/csharp/rocketmq-client-csharp/Client.cs index 24e1e4ac..e0a4c553 100644 --- a/csharp/rocketmq-client-csharp/Client.cs +++ b/csharp/rocketmq-client-csharp/Client.cs @@ -42,7 +42,7 @@ namespace Org.Apache.Rocketmq private static readonly TimeSpan SettingsSyncScheduleDelay = TimeSpan.FromSeconds(1); private static readonly TimeSpan SettingsSyncSchedulePeriod = TimeSpan.FromSeconds(1); private readonly CancellationTokenSource _settingsSyncCts; - + private static readonly TimeSpan StatsScheduleDelay = TimeSpan.FromSeconds(60); private static readonly TimeSpan StatsSchedulePeriod = TimeSpan.FromSeconds(60); private readonly CancellationTokenSource _statsCts; @@ -201,13 +201,13 @@ namespace Org.Apache.Rocketmq { Logger.Info($"Start to update topic route cache for a new round, clientId={ClientId}"); Dictionary<string, Task<TopicRouteData>> responses = new(); - + foreach (var topic in GetTopics()) { var task = FetchTopicRoute(topic); responses[topic] = task; } - + foreach (var item in responses.Keys) { try @@ -219,7 +219,7 @@ namespace Org.Apache.Rocketmq Logger.Error(e, $"Failed to update topic route cache, topic={item}"); } } - + foreach (var topic in GetTopics()) { await FetchTopicRoute(topic); @@ -242,7 +242,6 @@ namespace Org.Apache.Rocketmq var (_, session) = GetSession(endpoints); await session.SyncSettings(false); Logger.Info($"Sync settings to remote, endpoints={endpoints}"); - } } catch (Exception e) @@ -422,22 +421,11 @@ namespace Org.Apache.Rocketmq return ClientConfig; } - public virtual async void OnRecoverOrphanedTransactionCommand(Endpoints endpoints, + public virtual void OnRecoverOrphanedTransactionCommand(Endpoints endpoints, Proto.RecoverOrphanedTransactionCommand command) { Logger.Warn($"Ignore orphaned transaction recovery command from remote, which is not expected, " + $"clientId={ClientId}, endpoints={endpoints}"); - var status = new Proto.Status - { - Code = Proto.Code.InternalError, - Message = "Current client don't support transaction message recovery" - }; - var telemetryCommand = new Proto.TelemetryCommand - { - Status = status - }; - var (_, session) = GetSession(endpoints); - await session.WriteAsync(telemetryCommand); } public async void OnVerifyMessageCommand(Endpoints endpoints, Proto.VerifyMessageCommand command) @@ -450,8 +438,14 @@ namespace Org.Apache.Rocketmq Code = Proto.Code.Unsupported, Message = "Message consumption verification is not supported" }; + var verifyMessageResult = new Proto.VerifyMessageResult + { + Nonce = command.Nonce + }; + var telemetryCommand = new Proto.TelemetryCommand { + VerifyMessageResult = verifyMessageResult, Status = status }; var (_, session) = GetSession(endpoints); @@ -468,9 +462,15 @@ namespace Org.Apache.Rocketmq Code = Proto.Code.Unsupported, Message = "C# don't support thread stack trace printing" }; - var telemetryCommand = new Proto.TelemetryCommand() + var threadStackTrace = new Proto.ThreadStackTrace { - Status = status + Nonce = command.Nonce + }; + + var telemetryCommand = new Proto.TelemetryCommand + { + ThreadStackTrace = threadStackTrace, + Status = status, }; var (_, session) = GetSession(endpoints); await session.WriteAsync(telemetryCommand);
