This is an automated email from the ASF dual-hosted git repository. aaronai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit 44943d58fd2d6715b4532b7844ffa61e6baa28a8 Author: Aaron Ai <[email protected]> AuthorDate: Mon Feb 13 19:19:49 2023 +0800 Polish logs --- csharp/examples/ProducerBenchmark.cs | 74 ++++++++++++++++++++++++++++++- csharp/examples/QuickStart.cs | 3 +- csharp/rocketmq-client-csharp/Producer.cs | 15 +++++-- 3 files changed, 87 insertions(+), 5 deletions(-) diff --git a/csharp/examples/ProducerBenchmark.cs b/csharp/examples/ProducerBenchmark.cs index 361aa95d..8ad03847 100644 --- a/csharp/examples/ProducerBenchmark.cs +++ b/csharp/examples/ProducerBenchmark.cs @@ -15,6 +15,10 @@ * limitations under the License. */ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; using System.Threading.Tasks; using NLog; using Org.Apache.Rocketmq; @@ -25,8 +29,76 @@ namespace examples { private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger(); - internal static async Task QuickStart() + private static readonly SemaphoreSlim _semaphore = new SemaphoreSlim(0); + private static long _counter = 0; + + internal static void QuickStart() { + const string accessKey = "5jFk0wK7OU6Uq395"; + const string secretKey = "V1u8z19URHs4o6RQ"; + + // Credential provider is optional for client configuration. + var credentialsProvider = new StaticCredentialsProvider(accessKey, secretKey); + const string endpoints = "rmq-cn-7mz30qjc71a.cn-hangzhou.rmq.aliyuncs.com:8080"; + var clientConfig = new ClientConfig(endpoints) + { + CredentialsProvider = credentialsProvider + }; + // In most case, you don't need to create too many producers, single pattern is recommended. + var producer = new Producer(clientConfig); + + const string topic = "lingchu_normal_topic"; + producer.SetTopics(topic); + // Set the topic name(s), which is optional but recommended. It makes producer could prefetch + // the topic route before message publishing. + producer.Start().Wait(); + // Define your message body. + var bytes = Encoding.UTF8.GetBytes("foobar"); + const string tag = "yourMessageTagA"; + // You could set multiple keys for the single message. + var keys = new List<string> + { + "yourMessageKey-7044358f98fc", + "yourMessageKey-f72539fbc246" + }; + // Set topic for current message. + var message = new Message(topic, bytes) + { + Tag = tag, + Keys = keys + }; + + const int tpsLimit = 1000; + + Task.Run(async () => + { + while (true) + { + _semaphore.Release(tpsLimit/1000); + await Task.Delay(TimeSpan.FromMilliseconds(1)); + } + }); + + Task.Run(async () => + { + while (true) + { + Logger.Info($"Send {_counter} messages successfully."); + Interlocked.Exchange(ref _counter, 0); + await Task.Delay(TimeSpan.FromSeconds(1)); + } + }); + + var tasks = new List<Task>(); + while (true) + { + _semaphore.Wait(); + Interlocked.Increment(ref _counter); + var task = producer.Send(message); + tasks.Add(task); + } + + Task.WhenAll(tasks).Wait(); } } } \ No newline at end of file diff --git a/csharp/examples/QuickStart.cs b/csharp/examples/QuickStart.cs index 4d6b423d..8323218f 100644 --- a/csharp/examples/QuickStart.cs +++ b/csharp/examples/QuickStart.cs @@ -27,10 +27,11 @@ namespace examples { // Console.WriteLine(MetadataConstants.Instance.ClientVersion); - ProducerNormalMessageExample.QuickStart().Wait(); + // ProducerNormalMessageExample.QuickStart().Wait(); // await ProducerFifoMessageExample.QuickStart(); // await ProducerDelayMessageExample.QuickStart(); // await SimpleConsumerExample.QuickStart(); + ProducerBenchmark.QuickStart(); } } } \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/Producer.cs b/csharp/rocketmq-client-csharp/Producer.cs index 6a9040ec..cc7794f6 100644 --- a/csharp/rocketmq-client-csharp/Producer.cs +++ b/csharp/rocketmq-client-csharp/Producer.cs @@ -42,7 +42,8 @@ namespace Org.Apache.Rocketmq { } - private Producer(ClientConfig clientConfig, ConcurrentDictionary<string, bool> publishingTopics, int maxAttempts) : + private Producer(ClientConfig clientConfig, ConcurrentDictionary<string, bool> publishingTopics, + int maxAttempts) : base(clientConfig, publishingTopics.Keys) { var retryPolicy = ExponentialBackoffRetryPolicy.ImmediatelyRetryPolicy(maxAttempts); @@ -120,7 +121,7 @@ namespace Org.Apache.Rocketmq : new List<MessageQueue> { publishingLoadBalancer.TakeMessageQueueByMessageGroup(publishingMessage.MessageGroup) }; Exception exception = null; - for (var attempt = 0; attempt < maxAttempts; attempt++) + for (var attempt = 1; attempt <= maxAttempts; attempt++) { try { @@ -179,8 +180,16 @@ namespace Org.Apache.Rocketmq { // Isolate current endpoints. Isolated[endpoints] = true; + if (attempt >= maxAttempts) + { + Logger.Error( + $"Failed to send message finally, run out of attempt times, topic={message.Topic}, " + + $"maxAttempt={maxAttempts}, attempt={attempt}, endpoints={endpoints}, clientId={ClientId}"); + throw; + } + Logger.Warn(e, $"Failed to send message, topic={message.Topic}, maxAttempts={maxAttempts}, " + - $"endpoints={endpoints}, clientId={ClientId}"); + $"attempt={attempt}, endpoints={endpoints}, clientId={ClientId}"); throw; } }
