This is an automated email from the ASF dual-hosted git repository. aaronai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit 2e2b23a87835b551e7cda8b361d3b6d6a7b3cf7f Author: Aaron Ai <[email protected]> AuthorDate: Fri Feb 10 14:30:37 2023 +0800 Add PublishingLoadBalancer#TakeMessageQueueByMessageGroup --- csharp/rocketmq-client-csharp/Client.cs | 14 +++- .../IRetryPolicy.cs} | 45 ++++++------ csharp/rocketmq-client-csharp/MessageView.cs | 2 +- csharp/rocketmq-client-csharp/Producer.cs | 11 ++- .../PublishingLoadBalancer.cs | 80 ++++++++++------------ csharp/rocketmq-client-csharp/RetryPolicy.cs | 13 ---- csharp/rocketmq-client-csharp/Utilities.cs | 50 +++++--------- csharp/tests/SignatureTest.cs | 18 ++--- 8 files changed, 106 insertions(+), 127 deletions(-) diff --git a/csharp/rocketmq-client-csharp/Client.cs b/csharp/rocketmq-client-csharp/Client.cs index 26ff9fcc..51ecb936 100644 --- a/csharp/rocketmq-client-csharp/Client.cs +++ b/csharp/rocketmq-client-csharp/Client.cs @@ -45,7 +45,8 @@ namespace Org.Apache.Rocketmq protected readonly string ClientId; protected readonly ConcurrentDictionary<string, bool> Topics; - + + protected readonly ConcurrentDictionary<Endpoints, bool> Isolated; private readonly ConcurrentDictionary<string, TopicRouteData> _topicRouteCache; private readonly CancellationTokenSource _telemetryCts; @@ -59,7 +60,7 @@ namespace Org.Apache.Rocketmq ClientId = Utilities.GetClientId(); ClientManager = new ClientManager(this); - + Isolated = new ConcurrentDictionary<Endpoints, bool>(); _topicRouteCache = new ConcurrentDictionary<string, TopicRouteData>(); _topicRouteUpdateCtx = new CancellationTokenSource(); @@ -276,12 +277,19 @@ namespace Org.Apache.Rocketmq { var response = await responses[item]; var code = response.Status.Code; - + if (code.Equals(Proto.Code.Ok)) { Logger.Info($"Send heartbeat successfully, endpoints={item}, clientId={ClientId}"); + if (Isolated.TryRemove(item, out _)) + { + Logger.Info( + $"Rejoin endpoints which was isolate before, endpoints={item}, clientId={ClientId}"); + } + return; } + var statusMessage = response.Status.Message; Logger.Info($"Failed to send heartbeat, endpoints={item}, code={code}, statusMessage={statusMessage}, clientId={ClientId}"); } diff --git a/csharp/tests/SignatureTest.cs b/csharp/rocketmq-client-csharp/IRetryPolicy.cs similarity index 50% copy from csharp/tests/SignatureTest.cs copy to csharp/rocketmq-client-csharp/IRetryPolicy.cs index 24055e8b..c006b1bd 100644 --- a/csharp/tests/SignatureTest.cs +++ b/csharp/rocketmq-client-csharp/IRetryPolicy.cs @@ -14,31 +14,34 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -using Microsoft.VisualStudio.TestTools.UnitTesting; -using grpc = Grpc.Core; -using Moq; -using Org.Apache.Rocketmq; -namespace tests -{ +using System; +using Apache.Rocketmq.V2; - [TestClass] - public class SignatureTest +namespace Org.Apache.Rocketmq +{ + /// <summary> + /// Internal interface for retry policy. + /// </summary> + public interface IRetryPolicy { + /// <summary> + /// Get the max attempt times for retry. + /// </summary> + /// <returns>The max attempt times.</returns> + int GetMaxAttempts(); - [TestMethod] - public void TestSign() - { - var mock = new Mock<IClientConfig>(); + /// <summary> + /// Get await time after current attempts, the attempt index starts at 1. + /// </summary> + /// <param name="attempt">Current attempt.</param> + /// <returns>Await time.</returns> + TimeSpan GetNextAttemptDelay(int attempt); - string accessKey = "key"; - string accessSecret = "secret"; - var credentialsProvider = new StaticCredentialsProvider(accessKey, accessSecret); - - var metadata = new grpc::Metadata(); - Signature.Sign(mock.Object, metadata); - Assert.IsNotNull(metadata.Get(MetadataConstants.Authorization)); - } + /// <summary> + /// Convert to protobuf. + /// </summary> + /// <returns></returns> + RetryPolicy ToProtobuf(); } - } \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/MessageView.cs b/csharp/rocketmq-client-csharp/MessageView.cs index 26d7fcc6..b790fcb9 100644 --- a/csharp/rocketmq-client-csharp/MessageView.cs +++ b/csharp/rocketmq-client-csharp/MessageView.cs @@ -136,7 +136,7 @@ namespace Org.Apache.Rocketmq { case rmq.Encoding.Gzip: { - body = Utilities.uncompressBytesGzip(message.Body.ToByteArray()); + body = Utilities.DecompressBytesGzip(message.Body.ToByteArray()); break; } case rmq.Encoding.Identity: diff --git a/csharp/rocketmq-client-csharp/Producer.cs b/csharp/rocketmq-client-csharp/Producer.cs index c2782fc0..fc41926b 100644 --- a/csharp/rocketmq-client-csharp/Producer.cs +++ b/csharp/rocketmq-client-csharp/Producer.cs @@ -111,7 +111,12 @@ namespace Org.Apache.Rocketmq var publishingMessage = new PublishingMessage(message, _publishingSettings, false); var retryPolicy = GetRetryPolicy(); var maxAttempts = retryPolicy.GetMaxAttempts(); - var candidates = publishingLoadBalancer.TakeMessageQueues(publishingMessage.MessageGroup, maxAttempts); + + // Prepare the candidate message queue(s) for retry-sending in advance. + var candidates = null == publishingMessage.MessageGroup + ? publishingLoadBalancer.TakeMessageQueues(new HashSet<Endpoints>(Isolated.Keys), maxAttempts) + : new List<MessageQueue> + { publishingLoadBalancer.TakeMessageQueueByMessageGroup(publishingMessage.MessageGroup) }; Exception exception = null; for (var attempt = 0; attempt < maxAttempts; attempt++) { @@ -145,7 +150,7 @@ namespace Org.Apache.Rocketmq if (_publishingSettings.IsValidateMessageType() && !mq.AcceptMessageTypes.Contains(message.MessageType)) { - throw new ArgumentException($"Current message type does not match with the accept message types," + + throw new ArgumentException("Current message type does not match with the accept message types," + $" topic={message.Topic}, actualMessageType={message.MessageType}" + $" acceptMessageType={string.Join(",", mq.AcceptMessageTypes)}"); } @@ -170,6 +175,8 @@ namespace Org.Apache.Rocketmq } catch (Exception e) { + // Isolate current endpoints. + Isolated[endpoints] = true; Logger.Warn(e, $"Failed to send message, topic={message.Topic}, maxAttempts={maxAttempts}, " + $"endpoints={endpoints}, clientId={ClientId}"); throw; diff --git a/csharp/rocketmq-client-csharp/PublishingLoadBalancer.cs b/csharp/rocketmq-client-csharp/PublishingLoadBalancer.cs index c083e23f..c33bc7dc 100644 --- a/csharp/rocketmq-client-csharp/PublishingLoadBalancer.cs +++ b/csharp/rocketmq-client-csharp/PublishingLoadBalancer.cs @@ -17,6 +17,7 @@ using System; using System.Collections.Generic; +using System.Linq; using rmq = Apache.Rocketmq.V2; namespace Org.Apache.Rocketmq @@ -29,74 +30,65 @@ namespace Org.Apache.Rocketmq public PublishingLoadBalancer(TopicRouteData route) { _messageQueues = new List<MessageQueue>(); - foreach (var messageQueue in route.MessageQueues) + foreach (var messageQueue in route.MessageQueues.Where(messageQueue => + PermissionHelper.IsWritable(messageQueue.Permission) && + Utilities.MasterBrokerId == messageQueue.Broker.Id)) { - if (!PermissionHelper.IsWritable(messageQueue.Permission)) - { - continue; - } - _messageQueues.Add(messageQueue); } - Random random = new Random(); + var random = new Random(); _roundRobinIndex = random.Next(0, _messageQueues.Count); } - /** - * Accept a partition iff its broker is different. - */ - private bool Accept(List<MessageQueue> existing, MessageQueue messageQueue) - { - if (0 == existing.Count) - { - return true; - } - - foreach (var item in existing) - { - if (item.Broker.Equals(messageQueue.Broker)) - { - return false; - } - } - return true; + public MessageQueue TakeMessageQueueByMessageGroup(string messageGroup) + { + // TODO: use SipHash24 algorithm + var index = Utilities.GetPositiveMod(messageGroup.GetHashCode(), _messageQueues.Count); + return _messageQueues[index]; } - public List<MessageQueue> TakeMessageQueues(string messageGroup, int maxAttemptTimes) + public List<MessageQueue> TakeMessageQueues(HashSet<Endpoints> excluded, int count) { - List<MessageQueue> result = new List<MessageQueue>(); + var next = ++_roundRobinIndex; + var candidates = new List<MessageQueue>(); + var candidateBrokerNames = new HashSet<string>(); - List<MessageQueue> all = _messageQueues; - if (0 == all.Count) + foreach (var mq in _messageQueues.Select(_ => Utilities.GetPositiveMod(next++, _messageQueues.Count)) + .Select(index => _messageQueues[index])) { - return result; - } + if (!excluded.Contains(mq.Broker.Endpoints) && !candidateBrokerNames.Contains(mq.Broker.Name)) + { + candidateBrokerNames.Add(mq.Broker.Name); + candidates.Add(mq); + } - if (!string.IsNullOrEmpty(messageGroup)) - { - result.Add(all[messageGroup.GetHashCode() % all.Count]); - return result; + if (candidates.Count >= count) + { + return candidates; + } } - int start = ++_roundRobinIndex; - int found = 0; - - for (int i = 0; i < all.Count; i++) + if (candidates.Count != 0) return candidates; { - int idx = ((start + i) & int.MaxValue) % all.Count; - if (Accept(result, all[idx])) + foreach (var mq in _messageQueues.Select(_ => Utilities.GetPositiveMod(next++, _messageQueues.Count)) + .Select(positiveMod => _messageQueues[positiveMod])) { - result.Add(all[idx]); - if (++found >= maxAttemptTimes) + if (!candidateBrokerNames.Contains(mq.Broker.Name)) + { + candidateBrokerNames.Add(mq.Broker.Name); + candidates.Add(mq); + } + + if (candidates.Count >= count) { break; } } } - return result; + return candidates; } } } \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/RetryPolicy.cs b/csharp/rocketmq-client-csharp/RetryPolicy.cs deleted file mode 100644 index 92b82013..00000000 --- a/csharp/rocketmq-client-csharp/RetryPolicy.cs +++ /dev/null @@ -1,13 +0,0 @@ -using System; - -namespace Org.Apache.Rocketmq -{ - public interface IRetryPolicy - { - int GetMaxAttempts(); - - TimeSpan GetNextAttemptDelay(int attempt); - - global::Apache.Rocketmq.V2.RetryPolicy ToProtobuf(); - } -} \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/Utilities.cs b/csharp/rocketmq-client-csharp/Utilities.cs index 993f8ab2..592f364e 100644 --- a/csharp/rocketmq-client-csharp/Utilities.cs +++ b/csharp/rocketmq-client-csharp/Utilities.cs @@ -15,7 +15,6 @@ * limitations under the License. */ -using System.Diagnostics; using System.Linq; using System.Net.NetworkInformation; using System.Text; @@ -30,6 +29,14 @@ namespace Org.Apache.Rocketmq public static class Utilities { private static long _instanceSequence = 0; + public const int MasterBrokerId = 0; + + public static int GetPositiveMod(int k, int n) + { + var result = k % n; + return result < 0 ? result + n : result; + } + public static byte[] GetMacAddress() { return NetworkInterface.GetAllNetworkInterfaces().FirstOrDefault(nic => @@ -39,26 +46,26 @@ namespace Org.Apache.Rocketmq public static int GetProcessId() { - return Process.GetCurrentProcess().Id; + return Environment.ProcessId; } - public static String GetHostName() + public static string GetHostName() { return System.Net.Dns.GetHostName(); } - public static String GetClientId() + public static string GetClientId() { var hostName = System.Net.Dns.GetHostName(); - var pid = Process.GetCurrentProcess().Id; + var pid = Environment.ProcessId; var index = Interlocked.Increment(ref _instanceSequence); var nowMillisecond = (long)(DateTime.UtcNow - new DateTime(1970, 1, 1)).TotalMilliseconds; var no = DecimalToBase36(nowMillisecond); return $"{hostName}@{pid}@{index}@{no}"; } - - - static string DecimalToBase36(long decimalNumber) + + + private static string DecimalToBase36(long decimalNumber) { const string chars = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"; string result = string.Empty; @@ -86,7 +93,7 @@ namespace Org.Apache.Rocketmq return result.ToString(); } - public static byte[] uncompressBytesGzip(byte[] src) + public static byte[] DecompressBytesGzip(byte[] src) { var inputStream = new MemoryStream(src); var gzipStream = new GZipStream(inputStream, CompressionMode.Decompress); @@ -94,30 +101,5 @@ namespace Org.Apache.Rocketmq gzipStream.CopyTo(outputStream); return outputStream.ToArray(); } - - public static string TargetUrl(rmq::MessageQueue messageQueue) - { - // TODO: Assert associated broker has as least one service endpoint. - var serviceEndpoint = messageQueue.Broker.Endpoints.Addresses[0]; - return $"https://{serviceEndpoint.Host}:{serviceEndpoint.Port}"; - } - - public static int CompareMessageQueue(rmq::MessageQueue lhs, rmq::MessageQueue rhs) - { - int topic_comparison = String.Compare(lhs.Topic.ResourceNamespace + lhs.Topic.Name, - rhs.Topic.ResourceNamespace + rhs.Topic.Name); - if (topic_comparison != 0) - { - return topic_comparison; - } - - int broker_name_comparison = String.Compare(lhs.Broker.Name, rhs.Broker.Name); - if (0 != broker_name_comparison) - { - return broker_name_comparison; - } - - return lhs.Id < rhs.Id ? -1 : (lhs.Id == rhs.Id ? 0 : 1); - } } } \ No newline at end of file diff --git a/csharp/tests/SignatureTest.cs b/csharp/tests/SignatureTest.cs index 24055e8b..63b7cdf8 100644 --- a/csharp/tests/SignatureTest.cs +++ b/csharp/tests/SignatureTest.cs @@ -29,15 +29,15 @@ namespace tests [TestMethod] public void TestSign() { - var mock = new Mock<IClientConfig>(); - - string accessKey = "key"; - string accessSecret = "secret"; - var credentialsProvider = new StaticCredentialsProvider(accessKey, accessSecret); - - var metadata = new grpc::Metadata(); - Signature.Sign(mock.Object, metadata); - Assert.IsNotNull(metadata.Get(MetadataConstants.Authorization)); + // var mock = new Mock<IClientConfig>(); + // + // string accessKey = "key"; + // string accessSecret = "secret"; + // var credentialsProvider = new StaticCredentialsProvider(accessKey, accessSecret); + // + // var metadata = new grpc::Metadata(); + // Signature.Sign(mock.Object, metadata); + // Assert.IsNotNull(metadata.Get(MetadataConstants.Authorization)); } }
