This is an automated email from the ASF dual-hosted git repository. aaronai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit d70721197113c66f063acd40efa64395569236d4 Author: Aaron Ai <[email protected]> AuthorDate: Fri Feb 17 11:57:51 2023 +0800 Polish code --- csharp/examples/ProducerBenchmark.cs | 7 +-- csharp/rocketmq-client-csharp/Client.cs | 86 +++++++++++++++++---------- csharp/rocketmq-client-csharp/MqLogManager.cs | 13 ++-- csharp/rocketmq-client-csharp/Session.cs | 9 ++- csharp/rocketmq-client-csharp/State.cs | 24 ++++++++ 5 files changed, 90 insertions(+), 49 deletions(-) diff --git a/csharp/examples/ProducerBenchmark.cs b/csharp/examples/ProducerBenchmark.cs index 3918666d..6f94028e 100644 --- a/csharp/examples/ProducerBenchmark.cs +++ b/csharp/examples/ProducerBenchmark.cs @@ -68,14 +68,14 @@ namespace examples Keys = keys }; - const int tpsLimit = 500; + const int tpsLimit = 1; Task.Run(async () => { while (true) { _semaphore.Release(tpsLimit); - await Task.Delay(TimeSpan.FromMilliseconds(1000)); + await Task.Delay(TimeSpan.FromSeconds(1)); } }); @@ -83,8 +83,7 @@ namespace examples { while (true) { - Logger.Info($"Send {_counter} messages successfully."); - Interlocked.Exchange(ref _counter, 0); + Logger.Info($"Send {Interlocked.Exchange(ref _counter, 0)} messages successfully."); await Task.Delay(TimeSpan.FromSeconds(1)); } }); diff --git a/csharp/rocketmq-client-csharp/Client.cs b/csharp/rocketmq-client-csharp/Client.cs index 9311b48a..5a2cd69f 100644 --- a/csharp/rocketmq-client-csharp/Client.cs +++ b/csharp/rocketmq-client-csharp/Client.cs @@ -40,11 +40,11 @@ namespace Org.Apache.Rocketmq private readonly CancellationTokenSource _topicRouteUpdateCts; private static readonly TimeSpan SettingsSyncScheduleDelay = TimeSpan.FromSeconds(1); - private static readonly TimeSpan SettingsSyncSchedulePeriod = TimeSpan.FromMinutes(5); + private static readonly TimeSpan SettingsSyncSchedulePeriod = TimeSpan.FromSeconds(1); private readonly CancellationTokenSource _settingsSyncCts; - private static readonly TimeSpan StatsScheduleDelay = TimeSpan.FromSeconds(1); - private static readonly TimeSpan StatsSchedulePeriod = TimeSpan.FromSeconds(1); + private static readonly TimeSpan StatsScheduleDelay = TimeSpan.FromSeconds(60); + private static readonly TimeSpan StatsSchedulePeriod = TimeSpan.FromSeconds(60); private readonly CancellationTokenSource _statsCts; protected readonly ClientConfig ClientConfig; @@ -201,6 +201,26 @@ namespace Org.Apache.Rocketmq try { Logger.Info($"Start to update topic route cache for a new round, clientId={ClientId}"); + Dictionary<string, Task<TopicRouteData>> responses = new(); + + foreach (var topic in GetTopics()) + { + var task = FetchTopicRoute(topic); + responses[topic] = task; + } + + foreach (var item in responses.Keys) + { + try + { + await responses[item]; + } + catch (Exception e) + { + Logger.Error(e, $"Failed to update topic route cache, topic={item}"); + } + } + foreach (var topic in GetTopics()) { await FetchTopicRoute(topic); @@ -208,8 +228,8 @@ namespace Org.Apache.Rocketmq } catch (Exception e) { - Logger.Error(e, - $"[Bug] unexpected exception raised during topic route cache update, clientId={ClientId}"); + Logger.Error(e, $"[Bug] unexpected exception raised during topic route cache update, " + + $"clientId={ClientId}"); } } @@ -218,9 +238,12 @@ namespace Org.Apache.Rocketmq try { var totalRouteEndpoints = GetTotalRouteEndpoints(); - foreach (var (_, session) in totalRouteEndpoints.Select(GetSession)) + foreach (var endpoints in totalRouteEndpoints) { + var (_, session) = GetSession(endpoints); await session.SyncSettings(false); + Logger.Info($"Sync settings to remote, endpoints={endpoints}"); + } } catch (Exception e) @@ -319,38 +342,37 @@ namespace Org.Apache.Rocketmq // Collect task into a map. foreach (var item in endpoints) { - try - { - var task = ClientManager.Heartbeat(item, request, ClientConfig.RequestTimeout); - responses[item] = task; - } - catch (Exception e) - { - Logger.Error(e, $"Failed to send heartbeat, endpoints={item}"); - } + var task = ClientManager.Heartbeat(item, request, ClientConfig.RequestTimeout); + responses[item] = task; } - foreach (var item in responses.Keys) { - var response = await responses[item]; - var code = response.Status.Code; - - if (code.Equals(Proto.Code.Ok)) + try { - Logger.Info($"Send heartbeat successfully, endpoints={item}, clientId={ClientId}"); - if (Isolated.TryRemove(item, out _)) + var response = await responses[item]; + var code = response.Status.Code; + + if (code.Equals(Proto.Code.Ok)) { - Logger.Info( - $"Rejoin endpoints which was isolated before, endpoints={item}, clientId={ClientId}"); + Logger.Info($"Send heartbeat successfully, endpoints={item}, clientId={ClientId}"); + if (Isolated.TryRemove(item, out _)) + { + Logger.Info($"Rejoin endpoints which was isolated before, endpoints={item}, " + + $"clientId={ClientId}"); + } + + return; } - return; + var statusMessage = response.Status.Message; + Logger.Info($"Failed to send heartbeat, endpoints={item}, code={code}, " + + $"statusMessage={statusMessage}, clientId={ClientId}"); + } + catch (Exception e) + { + Logger.Error(e, $"Failed to send heartbeat, endpoints={item}"); } - - var statusMessage = response.Status.Message; - Logger.Info( - $"Failed to send heartbeat, endpoints={item}, code={code}, statusMessage={statusMessage}, clientId={ClientId}"); } } catch (Exception e) @@ -421,7 +443,7 @@ namespace Org.Apache.Rocketmq Status = status }; var (_, session) = GetSession(endpoints); - await session.write(telemetryCommand); + await session.WriteAsync(telemetryCommand); } public async void OnVerifyMessageCommand(Endpoints endpoints, Proto.VerifyMessageCommand command) @@ -439,7 +461,7 @@ namespace Org.Apache.Rocketmq Status = status }; var (_, session) = GetSession(endpoints); - await session.write(telemetryCommand); + await session.WriteAsync(telemetryCommand); } public async void OnPrintThreadStackTraceCommand(Endpoints endpoints, @@ -457,7 +479,7 @@ namespace Org.Apache.Rocketmq Status = status }; var (_, session) = GetSession(endpoints); - await session.write(telemetryCommand); + await session.WriteAsync(telemetryCommand); } public void OnSettingsCommand(Endpoints endpoints, Proto.Settings settings) diff --git a/csharp/rocketmq-client-csharp/MqLogManager.cs b/csharp/rocketmq-client-csharp/MqLogManager.cs index 7fa2b7bf..6b117ea5 100644 --- a/csharp/rocketmq-client-csharp/MqLogManager.cs +++ b/csharp/rocketmq-client-csharp/MqLogManager.cs @@ -28,22 +28,19 @@ namespace Org.Apache.Rocketmq * * Configure component logging, please refer to https://github.com/NLog/NLog/wiki/Configure-component-logging */ - public class MqLogManager + public static class MqLogManager { - public static LogFactory Instance - { - get { return LazyInstance.Value; } - } + public static LogFactory Instance => LazyInstance.Value; private static readonly Lazy<LogFactory> LazyInstance = new(BuildLogFactory); private static LogFactory BuildLogFactory() { // Use name of current assembly to construct NLog config filename - Assembly thisAssembly = Assembly.GetExecutingAssembly(); - string configFilePath = Path.ChangeExtension(thisAssembly.Location, ".nlog"); + var thisAssembly = Assembly.GetExecutingAssembly(); + var configFilePath = Path.ChangeExtension(thisAssembly.Location, ".nlog"); - LogFactory logFactory = new LogFactory(); + var logFactory = new LogFactory(); logFactory.Configuration = new XmlLoggingConfiguration(configFilePath, logFactory); return logFactory; } diff --git a/csharp/rocketmq-client-csharp/Session.cs b/csharp/rocketmq-client-csharp/Session.cs index 0d35be0a..92465291 100644 --- a/csharp/rocketmq-client-csharp/Session.cs +++ b/csharp/rocketmq-client-csharp/Session.cs @@ -33,7 +33,7 @@ namespace Org.Apache.Rocketmq private static readonly TimeSpan SettingsInitializationTimeout = TimeSpan.FromSeconds(3); - private readonly grpc::AsyncDuplexStreamingCall<Proto::TelemetryCommand, Proto::TelemetryCommand> + private readonly AsyncDuplexStreamingCall<Proto::TelemetryCommand, Proto::TelemetryCommand> _streamingCall; private readonly IClient _client; @@ -56,26 +56,25 @@ namespace Org.Apache.Rocketmq Loop(); } - public async Task write(Proto.TelemetryCommand telemetryCommand) + public async Task WriteAsync(Proto.TelemetryCommand telemetryCommand) { var writer = _streamingCall.RequestStream; await writer.WriteAsync(telemetryCommand); } + // TODO: Test concurrency. public async Task SyncSettings(bool awaitResp) { // Add more buffer time. await _semaphore.WaitAsync(_client.GetClientConfig().RequestTimeout.Add(SettingsInitializationTimeout)); try { - var writer = _streamingCall.RequestStream; - // await readTask; var settings = _client.GetSettings(); var telemetryCommand = new Proto.TelemetryCommand { Settings = settings.ToProtobuf() }; - await writer.WriteAsync(telemetryCommand); + await WriteAsync(telemetryCommand); // await writer.CompleteAsync(); if (awaitResp) { diff --git a/csharp/rocketmq-client-csharp/State.cs b/csharp/rocketmq-client-csharp/State.cs new file mode 100644 index 00000000..1dbd6b30 --- /dev/null +++ b/csharp/rocketmq-client-csharp/State.cs @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Org.Apache.Rocketmq +{ + public enum State + { + + } +} \ No newline at end of file
