This is an automated email from the ASF dual-hosted git repository. aaronai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit 6cde562e368dbe947a009fc4cba3bec252a2b649 Author: Aaron Ai <[email protected]> AuthorDate: Wed Feb 15 15:54:03 2023 +0800 Add error log for scheduled task --- csharp/examples/ProducerBenchmark.cs | 12 ++-- csharp/examples/SimpleConsumerExample.cs | 3 +- csharp/rocketmq-client-csharp/Client.cs | 84 ++++++++++++++-------- .../ClientLoggerInterceptor.cs | 14 +--- csharp/rocketmq-client-csharp/Session.cs | 1 + 5 files changed, 68 insertions(+), 46 deletions(-) diff --git a/csharp/examples/ProducerBenchmark.cs b/csharp/examples/ProducerBenchmark.cs index 8ad03847..4e334104 100644 --- a/csharp/examples/ProducerBenchmark.cs +++ b/csharp/examples/ProducerBenchmark.cs @@ -34,12 +34,12 @@ namespace examples internal static void QuickStart() { - const string accessKey = "5jFk0wK7OU6Uq395"; - const string secretKey = "V1u8z19URHs4o6RQ"; + const string accessKey = "amKhwEM40L61znSz"; + const string secretKey = "bT6c3gpF3EFB10F3"; // Credential provider is optional for client configuration. var credentialsProvider = new StaticCredentialsProvider(accessKey, secretKey); - const string endpoints = "rmq-cn-7mz30qjc71a.cn-hangzhou.rmq.aliyuncs.com:8080"; + const string endpoints = "rmq-cn-nwy337bf81g.cn-hangzhou.rmq.aliyuncs.com:8080"; var clientConfig = new ClientConfig(endpoints) { CredentialsProvider = credentialsProvider @@ -68,14 +68,14 @@ namespace examples Keys = keys }; - const int tpsLimit = 1000; + const int tpsLimit = 800; Task.Run(async () => { while (true) { - _semaphore.Release(tpsLimit/1000); - await Task.Delay(TimeSpan.FromMilliseconds(1)); + _semaphore.Release(tpsLimit); + await Task.Delay(TimeSpan.FromMilliseconds(1000)); } }); diff --git a/csharp/examples/SimpleConsumerExample.cs b/csharp/examples/SimpleConsumerExample.cs index b41125c8..fa78e845 100644 --- a/csharp/examples/SimpleConsumerExample.cs +++ b/csharp/examples/SimpleConsumerExample.cs @@ -41,8 +41,9 @@ namespace examples }; // Add your subscriptions. const string consumerGroup = "yourConsumerGroup"; + const string topic = "yourTopic"; var subscription = new Dictionary<string, FilterExpression> - { { consumerGroup, new FilterExpression("*") } }; + { { topic, new FilterExpression("*") } }; // In most case, you don't need to create too many consumers, single pattern is recommended. var simpleConsumer = new SimpleConsumer(clientConfig, consumerGroup, TimeSpan.FromSeconds(15), subscription); diff --git a/csharp/rocketmq-client-csharp/Client.cs b/csharp/rocketmq-client-csharp/Client.cs index 4849857b..89208db5 100644 --- a/csharp/rocketmq-client-csharp/Client.cs +++ b/csharp/rocketmq-client-csharp/Client.cs @@ -191,19 +191,33 @@ namespace Org.Apache.Rocketmq private async void UpdateTopicRouteCache() { - Logger.Info($"Start to update topic route cache for a new round, clientId={ClientId}"); - foreach (var topic in GetTopics()) + try { - await FetchTopicRoute(topic); + Logger.Info($"Start to update topic route cache for a new round, clientId={ClientId}"); + foreach (var topic in GetTopics()) + { + await FetchTopicRoute(topic); + } + } + catch (Exception e) + { + Logger.Error(e, $"[Bug] unexpected exception raised during topic route cache update, clientId={ClientId}"); } } private async void SyncSettings() { - var totalRouteEndpoints = GetTotalRouteEndpoints(); - foreach (var (_, session) in totalRouteEndpoints.Select(GetSession)) + try { - await session.SyncSettings(false); + var totalRouteEndpoints = GetTotalRouteEndpoints(); + foreach (var (_, session) in totalRouteEndpoints.Select(GetSession)) + { + await session.SyncSettings(false); + } + } + catch (Exception e) + { + Logger.Error(e, $"[Bug] unexpected exception raised during setting sync, clientId={ClientId}"); } } @@ -279,36 +293,50 @@ namespace Org.Apache.Rocketmq private async void Heartbeat() { - var endpoints = GetTotalRouteEndpoints(); - var request = WrapHeartbeatRequest(); - Dictionary<Endpoints, Task<Proto.HeartbeatResponse>> responses = new(); - // Collect task into a map. - foreach (var item in endpoints) + try { - var task = ClientManager.Heartbeat(item, request, ClientConfig.RequestTimeout); - responses[item] = task; - } + var endpoints = GetTotalRouteEndpoints(); + var request = WrapHeartbeatRequest(); + Dictionary<Endpoints, Task<Proto.HeartbeatResponse>> responses = new(); + + // Collect task into a map. + foreach (var item in endpoints) + { + try + { + var task = ClientManager.Heartbeat(item, request, ClientConfig.RequestTimeout); + responses[item] = task; + } + catch (Exception e) + { + Logger.Error(e, $"Failed to send heartbeat, endpoints={item}"); + } + } - foreach (var item in responses.Keys) - { - var response = await responses[item]; - var code = response.Status.Code; - if (code.Equals(Proto.Code.Ok)) + foreach (var item in responses.Keys) { - Logger.Info($"Send heartbeat successfully, endpoints={item}, clientId={ClientId}"); - if (Isolated.TryRemove(item, out _)) + var response = await responses[item]; + var code = response.Status.Code; + + if (code.Equals(Proto.Code.Ok)) { - Logger.Info( - $"Rejoin endpoints which was isolated before, endpoints={item}, clientId={ClientId}"); + Logger.Info($"Send heartbeat successfully, endpoints={item}, clientId={ClientId}"); + if (Isolated.TryRemove(item, out _)) + { + Logger.Info($"Rejoin endpoints which was isolated before, endpoints={item}, clientId={ClientId}"); + } + + return; } - return; + var statusMessage = response.Status.Message; + Logger.Info($"Failed to send heartbeat, endpoints={item}, code={code}, statusMessage={statusMessage}, clientId={ClientId}"); } - - var statusMessage = response.Status.Message; - Logger.Info( - $"Failed to send heartbeat, endpoints={item}, code={code}, statusMessage={statusMessage}, clientId={ClientId}"); + } + catch (Exception e) + { + Logger.Error(e, $"[Bug] unexpected exception raised during heartbeat, clientId={ClientId}"); } } diff --git a/csharp/rocketmq-client-csharp/ClientLoggerInterceptor.cs b/csharp/rocketmq-client-csharp/ClientLoggerInterceptor.cs index 890ce877..6f990de7 100644 --- a/csharp/rocketmq-client-csharp/ClientLoggerInterceptor.cs +++ b/csharp/rocketmq-client-csharp/ClientLoggerInterceptor.cs @@ -54,17 +54,9 @@ namespace Org.Apache.Rocketmq private async Task<TResponse> HandleResponse<TResponse>(Task<TResponse> t) { - try - { - var response = await t; - Logger.Trace($"Response received: {response}"); - return response; - } - catch (Exception ex) - { - Logger.Error($"Call error: {ex.Message}"); - throw; - } + var response = await t; + Logger.Trace($"Response received: {response}"); + return response; } public override AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>( diff --git a/csharp/rocketmq-client-csharp/Session.cs b/csharp/rocketmq-client-csharp/Session.cs index dd3da7bd..c948ef86 100644 --- a/csharp/rocketmq-client-csharp/Session.cs +++ b/csharp/rocketmq-client-csharp/Session.cs @@ -56,6 +56,7 @@ namespace Org.Apache.Rocketmq public async Task SyncSettings(bool awaitResp) { + // TODO await _semaphore.WaitAsync(); try {
