This is an automated email from the ASF dual-hosted git repository. aaronai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit e3fbe1dbdfee0fe81b7f0e41cd8784ed42fd4a0e Author: Aaron Ai <[email protected]> AuthorDate: Thu Feb 23 18:42:48 2023 +0800 Implement ExponentialBackoffRetryPolicy --- csharp/examples/ProducerDelayMessageExample.cs | 2 +- csharp/examples/ProducerFifoMessageExample.cs | 8 ++--- csharp/examples/ProducerNormalMessageExample.cs | 8 ++--- .../examples/ProducerTransactionMessageExample.cs | 4 +-- .../ExponentialBackoffRetryPolicy.cs | 42 ++++++++++++++++++++-- csharp/rocketmq-client-csharp/IRetryPolicy.cs | 3 ++ .../rocketmq-client-csharp/PublishingSettings.cs | 18 ++++++++-- .../SimpleSubscriptionSettings.cs | 6 +++- 8 files changed, 73 insertions(+), 18 deletions(-) diff --git a/csharp/examples/ProducerDelayMessageExample.cs b/csharp/examples/ProducerDelayMessageExample.cs index 9ad5fbb9..edb356a5 100644 --- a/csharp/examples/ProducerDelayMessageExample.cs +++ b/csharp/examples/ProducerDelayMessageExample.cs @@ -34,7 +34,7 @@ namespace examples const string secretKey = "yourSecretKey"; // Credential provider is optional for client configuration. var credentialsProvider = new StaticCredentialsProvider(accessKey, secretKey); - const string endpoints = "rmq-cn-7mz30qjc71a.cn-hangzhou.rmq.aliyuncs.com:8080"; + const string endpoints = "foobar.com:8080"; var clientConfig = new ClientConfig(endpoints) { CredentialsProvider = credentialsProvider diff --git a/csharp/examples/ProducerFifoMessageExample.cs b/csharp/examples/ProducerFifoMessageExample.cs index 87f953c3..bfca32f5 100644 --- a/csharp/examples/ProducerFifoMessageExample.cs +++ b/csharp/examples/ProducerFifoMessageExample.cs @@ -30,12 +30,12 @@ namespace examples internal static async Task QuickStart() { - const string accessKey = "5jFk0wK7OU6Uq395"; - const string secretKey = "V1u8z19URHs4o6RQ"; + const string accessKey = "yourAccessKey"; + const string secretKey = "yourSecretKey"; // Credential provider is optional for client configuration. var credentialsProvider = new StaticCredentialsProvider(accessKey, secretKey); - const string endpoints = "rmq-cn-7mz30qjc71a.cn-hangzhou.rmq.aliyuncs.com:8080"; + const string endpoints = "foobar.com:8080"; var clientConfig = new ClientConfig(endpoints) { CredentialsProvider = credentialsProvider @@ -43,7 +43,7 @@ namespace examples // In most case, you don't need to create too many producers, single pattern is recommended. var producer = new Producer(clientConfig); - const string topic = "lingchu_fifo_topic"; + const string topic = "yourFifoTopic"; producer.SetTopics(topic); // Set the topic name(s), which is optional but recommended. It makes producer could prefetch // the topic route before message publishing. diff --git a/csharp/examples/ProducerNormalMessageExample.cs b/csharp/examples/ProducerNormalMessageExample.cs index 3274ade2..09e4dff1 100644 --- a/csharp/examples/ProducerNormalMessageExample.cs +++ b/csharp/examples/ProducerNormalMessageExample.cs @@ -30,12 +30,12 @@ namespace examples internal static async Task QuickStart() { - const string accessKey = "5jFk0wK7OU6Uq395"; - const string secretKey = "V1u8z19URHs4o6RQ"; + const string accessKey = "yourAccessKey"; + const string secretKey = "yourSecretKey"; // Credential provider is optional for client configuration. var credentialsProvider = new StaticCredentialsProvider(accessKey, secretKey); - const string endpoints = "rmq-cn-7mz30qjc71a.cn-hangzhou.rmq.aliyuncs.com:8080"; + const string endpoints = "foobar.com:8080"; var clientConfig = new ClientConfig(endpoints) { CredentialsProvider = credentialsProvider @@ -43,7 +43,7 @@ namespace examples // In most case, you don't need to create too many producers, single pattern is recommended. var producer = new Producer(clientConfig); - const string topic = "lingchu_normal_topic"; + const string topic = "yourNormalTopic"; producer.SetTopics(topic); // Set the topic name(s), which is optional but recommended. It makes producer could prefetch // the topic route before message publishing. diff --git a/csharp/examples/ProducerTransactionMessageExample.cs b/csharp/examples/ProducerTransactionMessageExample.cs index 8b331722..10b61142 100644 --- a/csharp/examples/ProducerTransactionMessageExample.cs +++ b/csharp/examples/ProducerTransactionMessageExample.cs @@ -41,7 +41,7 @@ namespace examples const string secretKey = "yourSecretKey"; // Credential provider is optional for client configuration. var credentialsProvider = new StaticCredentialsProvider(accessKey, secretKey); - const string endpoints = "rmq-cn-7mz30qjc71a.cn-hangzhou.rmq.aliyuncs.com:8080"; + const string endpoints = "foobar.com:8080"; var clientConfig = new ClientConfig(endpoints) { CredentialsProvider = credentialsProvider @@ -49,7 +49,7 @@ namespace examples // In most case, you don't need to create too many producers, single pattern is recommended. var producer = new Producer(clientConfig); - const string topic = "lingchu_transactional_topic"; + const string topic = "yourTransactionTopic"; producer.SetTopics(topic); producer.SetTransactionChecker(new TransactionChecker()); diff --git a/csharp/rocketmq-client-csharp/ExponentialBackoffRetryPolicy.cs b/csharp/rocketmq-client-csharp/ExponentialBackoffRetryPolicy.cs index 094c2607..ddc4d281 100644 --- a/csharp/rocketmq-client-csharp/ExponentialBackoffRetryPolicy.cs +++ b/csharp/rocketmq-client-csharp/ExponentialBackoffRetryPolicy.cs @@ -1,5 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + using System; -using Apache.Rocketmq.V2; +using Proto = Apache.Rocketmq.V2; using Google.Protobuf.WellKnownTypes; namespace Org.Apache.Rocketmq @@ -28,9 +45,28 @@ namespace Org.Apache.Rocketmq public double BackoffMultiplier { get; } + public IRetryPolicy InheritBackoff(Proto.RetryPolicy retryPolicy) + { + if (retryPolicy.StrategyCase != Proto.RetryPolicy.StrategyOneofCase.ExponentialBackoff) + { + throw new InvalidOperationException("Strategy must be exponential backoff"); + } + + return InheritBackoff(retryPolicy.ExponentialBackoff); + } + + private IRetryPolicy InheritBackoff(Proto.ExponentialBackoff retryPolicy) + { + return new ExponentialBackoffRetryPolicy(_maxAttempts, retryPolicy.Initial.ToTimeSpan(), + retryPolicy.Max.ToTimeSpan(), retryPolicy.Multiplier); + } + public TimeSpan GetNextAttemptDelay(int attempt) { - return TimeSpan.Zero; + var delayMillis = Math.Min( + InitialBackoff.TotalMilliseconds * Math.Pow(BackoffMultiplier, 1.0 * (attempt - 1)), + MaxBackoff.TotalMilliseconds); + return delayMillis < 0 ? TimeSpan.Zero : TimeSpan.FromMilliseconds(delayMillis); } public static ExponentialBackoffRetryPolicy ImmediatelyRetryPolicy(int maxAttempts) @@ -40,7 +76,7 @@ namespace Org.Apache.Rocketmq public global::Apache.Rocketmq.V2.RetryPolicy ToProtobuf() { - var exponentialBackoff = new ExponentialBackoff + var exponentialBackoff = new Proto.ExponentialBackoff { Multiplier = (float)BackoffMultiplier, Max = Duration.FromTimeSpan(MaxBackoff), diff --git a/csharp/rocketmq-client-csharp/IRetryPolicy.cs b/csharp/rocketmq-client-csharp/IRetryPolicy.cs index c006b1bd..86d280eb 100644 --- a/csharp/rocketmq-client-csharp/IRetryPolicy.cs +++ b/csharp/rocketmq-client-csharp/IRetryPolicy.cs @@ -17,6 +17,7 @@ using System; using Apache.Rocketmq.V2; +using Proto = Apache.Rocketmq.V2; namespace Org.Apache.Rocketmq { @@ -43,5 +44,7 @@ namespace Org.Apache.Rocketmq /// </summary> /// <returns></returns> RetryPolicy ToProtobuf(); + + IRetryPolicy InheritBackoff(Proto.RetryPolicy retryPolicy); } } \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/PublishingSettings.cs b/csharp/rocketmq-client-csharp/PublishingSettings.cs index b9f8f454..17830a00 100644 --- a/csharp/rocketmq-client-csharp/PublishingSettings.cs +++ b/csharp/rocketmq-client-csharp/PublishingSettings.cs @@ -19,18 +19,21 @@ using System; using System.Collections.Concurrent; using System.Linq; using Google.Protobuf.WellKnownTypes; +using NLog; using Proto = Apache.Rocketmq.V2; namespace Org.Apache.Rocketmq { public class PublishingSettings : Settings { + private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger(); + private volatile int _maxBodySizeBytes = 4 * 1024 * 1024; private volatile bool _validateMessageType = true; public PublishingSettings(string clientId, Endpoints endpoints, ExponentialBackoffRetryPolicy retryPolicy, - TimeSpan requestTimeout, ConcurrentDictionary<string, bool> topics) : base(clientId, ClientType.Producer, endpoints, - retryPolicy, requestTimeout) + TimeSpan requestTimeout, ConcurrentDictionary<string, bool> topics) : base(clientId, ClientType.Producer, + endpoints, retryPolicy, requestTimeout) { Topics = topics; } @@ -49,7 +52,16 @@ namespace Org.Apache.Rocketmq public override void Sync(Proto::Settings settings) { - // TODO + if (Proto.Settings.PubSubOneofCase.Publishing != settings.PubSubCase) + { + Logger.Error($"[Bug] Issued settings does not match with the client type, clientId={ClientId}, " + + $"pubSubCase={settings.PubSubCase}, clientType={ClientType}"); + return; + } + + RetryPolicy = RetryPolicy.InheritBackoff(settings.BackoffPolicy); + _validateMessageType = settings.Publishing.ValidateMessageType; + _maxBodySizeBytes = settings.Publishing.MaxBodySize; } public override Proto.Settings ToProtobuf() diff --git a/csharp/rocketmq-client-csharp/SimpleSubscriptionSettings.cs b/csharp/rocketmq-client-csharp/SimpleSubscriptionSettings.cs index a6a409d7..c83cca7d 100644 --- a/csharp/rocketmq-client-csharp/SimpleSubscriptionSettings.cs +++ b/csharp/rocketmq-client-csharp/SimpleSubscriptionSettings.cs @@ -44,7 +44,11 @@ namespace Org.Apache.Rocketmq public override void Sync(Proto::Settings settings) { - // TODO + if (Proto.Settings.PubSubOneofCase.Subscription != settings.PubSubCase) + { + Logger.Error($"[Bug] Issued settings doesn't match with the client type, clientId={ClientId}, " + + $"pubSubCase={settings.PubSubCase}, clientType={ClientType}"); + } } public override Proto.Settings ToProtobuf()
