This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git

commit 6cde562e368dbe947a009fc4cba3bec252a2b649
Author: Aaron Ai <[email protected]>
AuthorDate: Wed Feb 15 15:54:03 2023 +0800

    Add error log for scheduled task
---
 csharp/examples/ProducerBenchmark.cs               | 12 ++--
 csharp/examples/SimpleConsumerExample.cs           |  3 +-
 csharp/rocketmq-client-csharp/Client.cs            | 84 ++++++++++++++--------
 .../ClientLoggerInterceptor.cs                     | 14 +---
 csharp/rocketmq-client-csharp/Session.cs           |  1 +
 5 files changed, 68 insertions(+), 46 deletions(-)

diff --git a/csharp/examples/ProducerBenchmark.cs 
b/csharp/examples/ProducerBenchmark.cs
index 8ad03847..4e334104 100644
--- a/csharp/examples/ProducerBenchmark.cs
+++ b/csharp/examples/ProducerBenchmark.cs
@@ -34,12 +34,12 @@ namespace examples
 
         internal static void QuickStart()
         {
-            const string accessKey = "5jFk0wK7OU6Uq395";
-            const string secretKey = "V1u8z19URHs4o6RQ";
+            const string accessKey = "amKhwEM40L61znSz";
+            const string secretKey = "bT6c3gpF3EFB10F3";
 
             // Credential provider is optional for client configuration.
             var credentialsProvider = new StaticCredentialsProvider(accessKey, 
secretKey);
-            const string endpoints = 
"rmq-cn-7mz30qjc71a.cn-hangzhou.rmq.aliyuncs.com:8080";
+            const string endpoints = 
"rmq-cn-nwy337bf81g.cn-hangzhou.rmq.aliyuncs.com:8080";
             var clientConfig = new ClientConfig(endpoints)
             {
                 CredentialsProvider = credentialsProvider
@@ -68,14 +68,14 @@ namespace examples
                 Keys = keys
             };
 
-            const int tpsLimit = 1000;
+            const int tpsLimit = 800;
 
             Task.Run(async () =>
             {
                 while (true)
                 {
-                    _semaphore.Release(tpsLimit/1000);
-                    await Task.Delay(TimeSpan.FromMilliseconds(1));
+                    _semaphore.Release(tpsLimit);
+                    await Task.Delay(TimeSpan.FromMilliseconds(1000));
                 }
             });
 
diff --git a/csharp/examples/SimpleConsumerExample.cs 
b/csharp/examples/SimpleConsumerExample.cs
index b41125c8..fa78e845 100644
--- a/csharp/examples/SimpleConsumerExample.cs
+++ b/csharp/examples/SimpleConsumerExample.cs
@@ -41,8 +41,9 @@ namespace examples
             };
             // Add your subscriptions.
             const string consumerGroup = "yourConsumerGroup";
+            const string topic = "yourTopic";
             var subscription = new Dictionary<string, FilterExpression>
-                { { consumerGroup, new FilterExpression("*") } };
+                { { topic, new FilterExpression("*") } };
             // In most case, you don't need to create too many consumers, 
single pattern is recommended.
             var simpleConsumer =
                 new SimpleConsumer(clientConfig, consumerGroup, 
TimeSpan.FromSeconds(15), subscription);
diff --git a/csharp/rocketmq-client-csharp/Client.cs 
b/csharp/rocketmq-client-csharp/Client.cs
index 4849857b..89208db5 100644
--- a/csharp/rocketmq-client-csharp/Client.cs
+++ b/csharp/rocketmq-client-csharp/Client.cs
@@ -191,19 +191,33 @@ namespace Org.Apache.Rocketmq
 
         private async void UpdateTopicRouteCache()
         {
-            Logger.Info($"Start to update topic route cache for a new round, 
clientId={ClientId}");
-            foreach (var topic in GetTopics())
+            try
             {
-                await FetchTopicRoute(topic);
+                Logger.Info($"Start to update topic route cache for a new 
round, clientId={ClientId}");
+                foreach (var topic in GetTopics())
+                {
+                    await FetchTopicRoute(topic);
+                }
+            }
+            catch (Exception e)
+            {
+                Logger.Error(e, $"[Bug] unexpected exception raised during 
topic route cache update, clientId={ClientId}");
             }
         }
 
         private async void SyncSettings()
         {
-            var totalRouteEndpoints = GetTotalRouteEndpoints();
-            foreach (var (_, session) in 
totalRouteEndpoints.Select(GetSession))
+            try
             {
-                await session.SyncSettings(false);
+                var totalRouteEndpoints = GetTotalRouteEndpoints();
+                foreach (var (_, session) in 
totalRouteEndpoints.Select(GetSession))
+                {
+                    await session.SyncSettings(false);
+                }
+            }
+            catch (Exception e)
+            {
+                Logger.Error(e, $"[Bug] unexpected exception raised during 
setting sync, clientId={ClientId}");
             }
         }
 
@@ -279,36 +293,50 @@ namespace Org.Apache.Rocketmq
 
         private async void Heartbeat()
         {
-            var endpoints = GetTotalRouteEndpoints();
-            var request = WrapHeartbeatRequest();
-            Dictionary<Endpoints, Task<Proto.HeartbeatResponse>> responses = 
new();
-            // Collect task into a map.
-            foreach (var item in endpoints)
+            try
             {
-                var task = ClientManager.Heartbeat(item, request, 
ClientConfig.RequestTimeout);
-                responses[item] = task;
-            }
+                var endpoints = GetTotalRouteEndpoints();
+                var request = WrapHeartbeatRequest();
+                Dictionary<Endpoints, Task<Proto.HeartbeatResponse>> responses 
= new();
+
+                // Collect task into a map.
+                foreach (var item in endpoints)
+                {
+                    try
+                    {
+                        var task = ClientManager.Heartbeat(item, request, 
ClientConfig.RequestTimeout);
+                        responses[item] = task;
+                    }
+                    catch (Exception e)
+                    {
+                        Logger.Error(e, $"Failed to send heartbeat, 
endpoints={item}");
+                    }
+                }
 
-            foreach (var item in responses.Keys)
-            {
-                var response = await responses[item];
-                var code = response.Status.Code;
 
-                if (code.Equals(Proto.Code.Ok))
+                foreach (var item in responses.Keys)
                 {
-                    Logger.Info($"Send heartbeat successfully, 
endpoints={item}, clientId={ClientId}");
-                    if (Isolated.TryRemove(item, out _))
+                    var response = await responses[item];
+                    var code = response.Status.Code;
+
+                    if (code.Equals(Proto.Code.Ok))
                     {
-                        Logger.Info(
-                            $"Rejoin endpoints which was isolated before, 
endpoints={item}, clientId={ClientId}");
+                        Logger.Info($"Send heartbeat successfully, 
endpoints={item}, clientId={ClientId}");
+                        if (Isolated.TryRemove(item, out _))
+                        {
+                            Logger.Info($"Rejoin endpoints which was isolated 
before, endpoints={item}, clientId={ClientId}");
+                        }
+
+                        return;
                     }
 
-                    return;
+                    var statusMessage = response.Status.Message;
+                    Logger.Info($"Failed to send heartbeat, endpoints={item}, 
code={code}, statusMessage={statusMessage}, clientId={ClientId}");
                 }
-
-                var statusMessage = response.Status.Message;
-                Logger.Info(
-                    $"Failed to send heartbeat, endpoints={item}, code={code}, 
statusMessage={statusMessage}, clientId={ClientId}");
+            }
+            catch (Exception e)
+            {
+                Logger.Error(e, $"[Bug] unexpected exception raised during 
heartbeat, clientId={ClientId}");
             }
         }
 
diff --git a/csharp/rocketmq-client-csharp/ClientLoggerInterceptor.cs 
b/csharp/rocketmq-client-csharp/ClientLoggerInterceptor.cs
index 890ce877..6f990de7 100644
--- a/csharp/rocketmq-client-csharp/ClientLoggerInterceptor.cs
+++ b/csharp/rocketmq-client-csharp/ClientLoggerInterceptor.cs
@@ -54,17 +54,9 @@ namespace Org.Apache.Rocketmq
 
         private async Task<TResponse> 
HandleResponse<TResponse>(Task<TResponse> t)
         {
-            try
-            {
-                var response = await t;
-                Logger.Trace($"Response received: {response}");
-                return response;
-            }
-            catch (Exception ex)
-            {
-                Logger.Error($"Call error: {ex.Message}");
-                throw;
-            }
+            var response = await t;
+            Logger.Trace($"Response received: {response}");
+            return response;
         }
 
         public override AsyncClientStreamingCall<TRequest, TResponse> 
AsyncClientStreamingCall<TRequest, TResponse>(
diff --git a/csharp/rocketmq-client-csharp/Session.cs 
b/csharp/rocketmq-client-csharp/Session.cs
index dd3da7bd..c948ef86 100644
--- a/csharp/rocketmq-client-csharp/Session.cs
+++ b/csharp/rocketmq-client-csharp/Session.cs
@@ -56,6 +56,7 @@ namespace Org.Apache.Rocketmq
 
         public async Task SyncSettings(bool awaitResp)
         {
+            // TODO
             await _semaphore.WaitAsync();
             try
             {

Reply via email to