This is an automated email from the ASF dual-hosted git repository. aaronai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit 8417c0a9beaf055b17073c79e7db7008a9c87b23 Author: Aaron Ai <[email protected]> AuthorDate: Tue Feb 7 19:03:30 2023 +0800 Add more logs --- csharp/examples/ProducerNormalMessageExample.cs | 4 ++- csharp/rocketmq-client-csharp/Client.cs | 38 ++++++++++++++++++---- .../ClientLoggerInterceptor.cs | 4 +-- csharp/rocketmq-client-csharp/Endpoints.cs | 7 +++- csharp/rocketmq-client-csharp/Producer.cs | 15 ++++----- csharp/rocketmq-client-csharp/TopicRouteData.cs | 26 ++++++++++----- 6 files changed, 67 insertions(+), 27 deletions(-) diff --git a/csharp/examples/ProducerNormalMessageExample.cs b/csharp/examples/ProducerNormalMessageExample.cs index c22bc695..1f80671c 100644 --- a/csharp/examples/ProducerNormalMessageExample.cs +++ b/csharp/examples/ProducerNormalMessageExample.cs @@ -17,6 +17,7 @@ using System.Collections.Generic; using System.Text; +using System.Threading; using System.Threading.Tasks; using NLog; using Org.Apache.Rocketmq; @@ -64,8 +65,9 @@ namespace examples }; var sendReceipt = await producer.Send(message); Logger.Info($"Send message successfully, sendReceipt={sendReceipt}"); + Thread.Sleep(9999999); // Close the producer if you don't need it anymore. - await producer.Shutdown(); + // await producer.Shutdown(); } } } \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/Client.cs b/csharp/rocketmq-client-csharp/Client.cs index c4392e2d..a1c4b821 100644 --- a/csharp/rocketmq-client-csharp/Client.cs +++ b/csharp/rocketmq-client-csharp/Client.cs @@ -181,6 +181,7 @@ namespace Org.Apache.Rocketmq private async void UpdateTopicRouteCache() { + Logger.Info($"Start to update topic route cache for a new round, clientId={ClientId}"); foreach (var topic in Topics.Keys) { var topicRouteData = await FetchTopicRoute(topic); @@ -230,9 +231,17 @@ namespace Org.Apache.Rocketmq Endpoints = ClientConfig.Endpoints.ToProtobuf() }; - var queryRouteResponse = + var response = await Manager.QueryRoute(ClientConfig.Endpoints, request, ClientConfig.RequestTimeout); - var messageQueues = queryRouteResponse.MessageQueues.ToList(); + var code = response.Status.Code; + if (!Proto.Code.Ok.Equals(code)) + { + Logger.Error($"Failed to fetch topic route, clientId={ClientId}, topic={topic}, code={code}, " + + $"statusMessage={response.Status.Message}"); + } + StatusChecker.Check(response.Status, request); + + var messageQueues = response.MessageQueues.ToList(); return new TopicRouteData(messageQueues); } @@ -246,11 +255,26 @@ namespace Org.Apache.Rocketmq } var request = WrapHeartbeatRequest(); - - var tasks = endpoints.Select(endpoint => Manager.Heartbeat(endpoint, request, ClientConfig.RequestTimeout)) - .Cast<Task>().ToList(); - - await Task.WhenAll(tasks); + Dictionary<Endpoints, Task<Proto.HeartbeatResponse>> responses = new (); + // Collect task into a map. + foreach (var item in endpoints) + { + var task = Manager.Heartbeat(item, request, ClientConfig.RequestTimeout); + responses[item]= task; + } + foreach (var item in responses.Keys) + { + var response = await responses[item]; + var code = response.Status.Code; + + if (code.Equals(Proto.Code.Ok)) + { + Logger.Info($"Send heartbeat successfully, endpoints={item}, clientId={ClientId}"); + return; + } + var statusMessage = response.Status.Message; + Logger.Info($"Failed to send heartbeat, endpoints={item}, code={code}, statusMessage={statusMessage}, clientId={ClientId}"); + } } diff --git a/csharp/rocketmq-client-csharp/ClientLoggerInterceptor.cs b/csharp/rocketmq-client-csharp/ClientLoggerInterceptor.cs index 01adddc8..d9622291 100644 --- a/csharp/rocketmq-client-csharp/ClientLoggerInterceptor.cs +++ b/csharp/rocketmq-client-csharp/ClientLoggerInterceptor.cs @@ -56,7 +56,7 @@ namespace Org.Apache.Rocketmq try { var response = await t; - Logger.Debug($"Response received: {response}"); + Logger.Trace($"Response received: {response}"); return response; } catch (Exception ex) @@ -101,7 +101,7 @@ namespace Org.Apache.Rocketmq where TRequest : class where TResponse : class { - Logger.Debug($"Starting call. Type: {method.Type}. Request: {typeof(TRequest)}. Response: {typeof(TResponse)}"); + Logger.Trace($"Starting call. Type: {method.Type}. Request: {typeof(TRequest)}. Response: {typeof(TResponse)}"); } private void AddCallerMetadata<TRequest, TResponse>(ref ClientInterceptorContext<TRequest, TResponse> context) diff --git a/csharp/rocketmq-client-csharp/Endpoints.cs b/csharp/rocketmq-client-csharp/Endpoints.cs index 3228881c..e7cf5f9c 100644 --- a/csharp/rocketmq-client-csharp/Endpoints.cs +++ b/csharp/rocketmq-client-csharp/Endpoints.cs @@ -67,6 +67,11 @@ namespace Org.Apache.Rocketmq Addresses = addresses; } + public override string ToString() + { + return GrpcTarget; + } + public string GrpcTarget { // TODO @@ -82,7 +87,7 @@ namespace Org.Apache.Rocketmq return ""; } } - + public bool Equals(Endpoints other) { if (ReferenceEquals(null, other)) diff --git a/csharp/rocketmq-client-csharp/Producer.cs b/csharp/rocketmq-client-csharp/Producer.cs index 9fa10ad3..56ec5b40 100644 --- a/csharp/rocketmq-client-csharp/Producer.cs +++ b/csharp/rocketmq-client-csharp/Producer.cs @@ -19,7 +19,12 @@ namespace Org.Apache.Rocketmq { } - public Producer(ClientConfig clientConfig, ConcurrentDictionary<string, bool> topics, int maxAttempts) : + public Producer(ClientConfig clientConfig, int maxAttempts) : this(clientConfig, + new ConcurrentDictionary<string, bool>(), maxAttempts) + { + } + + private Producer(ClientConfig clientConfig, ConcurrentDictionary<string, bool> topics, int maxAttempts) : base(clientConfig, topics) { var retryPolicy = ExponentialBackoffRetryPolicy.immediatelyRetryPolicy(maxAttempts); @@ -72,12 +77,6 @@ namespace Org.Apache.Rocketmq if (!_publishingRouteDataCache.TryGetValue(message.Topic, out var publishingLoadBalancer)) { var topicRouteData = await FetchTopicRoute(message.Topic); - if (null == topicRouteData || null == topicRouteData.MessageQueues || - 0 == topicRouteData.MessageQueues.Count) - { - throw new TopicRouteException($"No topic route for {message.Topic}"); - } - publishingLoadBalancer = new PublishingLoadBalancer(topicRouteData); _publishingRouteDataCache.TryAdd(message.Topic, publishingLoadBalancer); } @@ -87,7 +86,7 @@ namespace Org.Apache.Rocketmq var maxAttempts = retryPolicy.getMaxAttempts(); var candidates = publishingLoadBalancer.TakeMessageQueues(publishingMessage.MessageGroup, maxAttempts); Exception exception = null; - for (int attempt = 0; attempt < maxAttempts; attempt++) + for (var attempt = 0; attempt < maxAttempts; attempt++) { try { diff --git a/csharp/rocketmq-client-csharp/TopicRouteData.cs b/csharp/rocketmq-client-csharp/TopicRouteData.cs index 05418eaa..2be2b9a5 100644 --- a/csharp/rocketmq-client-csharp/TopicRouteData.cs +++ b/csharp/rocketmq-client-csharp/TopicRouteData.cs @@ -26,7 +26,7 @@ namespace Org.Apache.Rocketmq { public TopicRouteData(List<rmq::MessageQueue> messageQueues) { - List<MessageQueue> messageQueuesList = new List<MessageQueue>(); + var messageQueuesList = new List<MessageQueue>(); foreach (var mq in messageQueues) { messageQueuesList.Add(new MessageQueue(mq)); @@ -40,17 +40,27 @@ namespace Org.Apache.Rocketmq public bool Equals(TopicRouteData other) { - if (ReferenceEquals(null, other)) return false; - if (ReferenceEquals(this, other)) return true; - return Equals(MessageQueues, other.MessageQueues); + if (ReferenceEquals(null, other)) + { + return false; + } + + return ReferenceEquals(this, other) || Equals(MessageQueues, other.MessageQueues); } public override bool Equals(object obj) { - if (ReferenceEquals(null, obj)) return false; - if (ReferenceEquals(this, obj)) return true; - if (obj.GetType() != this.GetType()) return false; - return Equals((TopicRouteData)obj); + if (ReferenceEquals(null, obj)) + { + return false; + } + + if (ReferenceEquals(this, obj)) + { + return true; + } + + return obj.GetType() == GetType() && Equals((TopicRouteData)obj); } public override int GetHashCode()
