This is an automated email from the ASF dual-hosted git repository. aaronai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit 9c5d8858460e0676e302abcc3cc64733f737852b Author: Aaron Ai <[email protected]> AuthorDate: Tue Feb 7 20:00:08 2023 +0800 Polish code --- csharp/examples/ProducerNormalMessageExample.cs | 2 +- csharp/rocketmq-client-csharp/AccessPoint.cs | 74 ------------ csharp/rocketmq-client-csharp/Address.cs | 3 +- .../AddressListEqualityComparer.cs | 17 +++ csharp/rocketmq-client-csharp/AddressScheme.cs | 17 +++ csharp/rocketmq-client-csharp/Broker.cs | 21 +++- csharp/rocketmq-client-csharp/Client.cs | 47 +++++--- csharp/rocketmq-client-csharp/ClientConfig.cs | 8 -- .../ClientLoggerInterceptor.cs | 1 + csharp/rocketmq-client-csharp/ClientManager.cs | 1 - csharp/rocketmq-client-csharp/ClientType.cs | 20 ++-- .../ConfigFileCredentialsProvider.cs | 1 + csharp/rocketmq-client-csharp/Endpoints.cs | 34 ++++-- .../ExponentialBackoffRetryPolicy.cs | 14 +-- csharp/rocketmq-client-csharp/IClient.cs | 30 ++++- csharp/rocketmq-client-csharp/IClientConfig.cs | 13 +- csharp/rocketmq-client-csharp/IClientManager.cs | 6 +- .../rocketmq-client-csharp/ICredentialsProvider.cs | 1 + csharp/rocketmq-client-csharp/Message.cs | 75 ++---------- csharp/rocketmq-client-csharp/MessageException.cs | 29 ----- .../rocketmq-client-csharp/MessageIdGenerator.cs | 26 ++-- csharp/rocketmq-client-csharp/MessageQueue.cs | 17 +++ csharp/rocketmq-client-csharp/MessageType.cs | 34 +++--- csharp/rocketmq-client-csharp/MessageView.cs | 2 +- csharp/rocketmq-client-csharp/MqEncoding.cs | 30 +++-- csharp/rocketmq-client-csharp/MqLogManager.cs | 1 + csharp/rocketmq-client-csharp/Permission.cs | 34 +++--- csharp/rocketmq-client-csharp/Producer.cs | 57 ++++++--- csharp/rocketmq-client-csharp/PublishingMessage.cs | 27 ++--- .../rocketmq-client-csharp/PublishingSettings.cs | 19 ++- csharp/rocketmq-client-csharp/Resource.cs | 8 +- csharp/rocketmq-client-csharp/RetryPolicy.cs | 8 +- csharp/rocketmq-client-csharp/SendReceipt.cs | 17 +-- csharp/rocketmq-client-csharp/SequenceGenerator.cs | 131 --------------------- csharp/rocketmq-client-csharp/Session.cs | 19 +-- csharp/rocketmq-client-csharp/Settings.cs | 29 ++++- csharp/rocketmq-client-csharp/Signature.cs | 53 ++++----- csharp/rocketmq-client-csharp/StatusChecker.cs | 2 +- .../SubscriptionLoadBalancer.cs | 1 - .../rocketmq-client-csharp/TopicRouteException.cs | 1 + csharp/tests/MessageIdGeneratorTest.cs | 4 +- csharp/tests/SequenceGeneratorTest.cs | 49 -------- 42 files changed, 395 insertions(+), 588 deletions(-) diff --git a/csharp/examples/ProducerNormalMessageExample.cs b/csharp/examples/ProducerNormalMessageExample.cs index 1f80671c..16791a13 100644 --- a/csharp/examples/ProducerNormalMessageExample.cs +++ b/csharp/examples/ProducerNormalMessageExample.cs @@ -24,7 +24,7 @@ using Org.Apache.Rocketmq; namespace examples { - static class ProducerNormalMessageExample + internal static class ProducerNormalMessageExample { private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger(); diff --git a/csharp/rocketmq-client-csharp/AccessPoint.cs b/csharp/rocketmq-client-csharp/AccessPoint.cs deleted file mode 100644 index f05fa293..00000000 --- a/csharp/rocketmq-client-csharp/AccessPoint.cs +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -using System; -using rmq = Apache.Rocketmq.V2; -using System.Net; -using System.Net.Sockets; - -namespace Org.Apache.Rocketmq -{ - public class AccessPoint - { - public AccessPoint() - { - - } - - public AccessPoint(string accessUrl) - { - string[] segments = accessUrl.Split(":"); - if (segments.Length != 2) - { - throw new ArgumentException("Access url should be of format host:port"); - } - - Host = segments[0]; - Port = Int32.Parse(segments[1]); - } - - public string Host { get; } - - public int Port { get; set; } - - public string TargetUrl() - { - return $"https://{Host}:{Port}"; - } - - public rmq::AddressScheme HostScheme() - { - return SchemeOf(Host); - } - - private static rmq::AddressScheme SchemeOf(string host) - { - var result = IPAddress.TryParse(host, out var ip); - if (!result) - { - return rmq::AddressScheme.DomainName; - } - - return ip.AddressFamily switch - { - AddressFamily.InterNetwork => rmq::AddressScheme.Ipv4, - AddressFamily.InterNetworkV6 => rmq::AddressScheme.Ipv6, - _ => rmq::AddressScheme.Unspecified - }; - } - } -} diff --git a/csharp/rocketmq-client-csharp/Address.cs b/csharp/rocketmq-client-csharp/Address.cs index 316323c9..fca83530 100644 --- a/csharp/rocketmq-client-csharp/Address.cs +++ b/csharp/rocketmq-client-csharp/Address.cs @@ -57,8 +57,7 @@ namespace Org.Apache.Rocketmq return true; } - if (obj.GetType() != this.GetType()) return false; - return Equals((Address)obj); + return obj.GetType() == GetType() && Equals((Address)obj); } public override int GetHashCode() diff --git a/csharp/rocketmq-client-csharp/AddressListEqualityComparer.cs b/csharp/rocketmq-client-csharp/AddressListEqualityComparer.cs index 5b793f37..b8aff27a 100644 --- a/csharp/rocketmq-client-csharp/AddressListEqualityComparer.cs +++ b/csharp/rocketmq-client-csharp/AddressListEqualityComparer.cs @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + using System.Collections.Generic; using System.Linq; diff --git a/csharp/rocketmq-client-csharp/AddressScheme.cs b/csharp/rocketmq-client-csharp/AddressScheme.cs index f9c1c290..6f36f546 100644 --- a/csharp/rocketmq-client-csharp/AddressScheme.cs +++ b/csharp/rocketmq-client-csharp/AddressScheme.cs @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + using rmq = Apache.Rocketmq.V2; namespace Org.Apache.Rocketmq diff --git a/csharp/rocketmq-client-csharp/Broker.cs b/csharp/rocketmq-client-csharp/Broker.cs index 370ac96a..6b426a5a 100644 --- a/csharp/rocketmq-client-csharp/Broker.cs +++ b/csharp/rocketmq-client-csharp/Broker.cs @@ -1,10 +1,27 @@ -using rmq = Apache.Rocketmq.V2; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using Proto = Apache.Rocketmq.V2; namespace Org.Apache.Rocketmq { public class Broker { - public Broker(rmq.Broker broker) + public Broker(Proto.Broker broker) { Name = broker.Name; Id = broker.Id; diff --git a/csharp/rocketmq-client-csharp/Client.cs b/csharp/rocketmq-client-csharp/Client.cs index a1c4b821..26ff9fcc 100644 --- a/csharp/rocketmq-client-csharp/Client.cs +++ b/csharp/rocketmq-client-csharp/Client.cs @@ -41,7 +41,7 @@ namespace Org.Apache.Rocketmq private readonly CancellationTokenSource _settingsSyncCtx; protected readonly ClientConfig ClientConfig; - protected readonly IClientManager Manager; + protected readonly IClientManager ClientManager; protected readonly string ClientId; protected readonly ConcurrentDictionary<string, bool> Topics; @@ -58,7 +58,7 @@ namespace Org.Apache.Rocketmq Topics = topics; ClientId = Utilities.GetClientId(); - Manager = new ClientManager(this); + ClientManager = new ClientManager(this); _topicRouteCache = new ConcurrentDictionary<string, TopicRouteData>(); @@ -91,7 +91,7 @@ namespace Org.Apache.Rocketmq _topicRouteUpdateCtx.Cancel(); _heartbeatCts.Cancel(); _telemetryCts.Cancel(); - await Manager.Shutdown(); + await ClientManager.Shutdown(); Logger.Debug($"Shutdown the rocketmq client successfully, clientId={ClientId}"); } @@ -120,7 +120,7 @@ namespace Org.Apache.Rocketmq return (false, session); } - var stream = Manager.Telemetry(endpoints); + var stream = ClientManager.Telemetry(endpoints); var created = new Session(endpoints, stream, this); _sessionsTable.Add(endpoints, created); return (true, created); @@ -134,7 +134,7 @@ namespace Org.Apache.Rocketmq protected abstract Proto::HeartbeatRequest WrapHeartbeatRequest(); - protected abstract void OnTopicDataFetched0(string topic, TopicRouteData topicRouteData); + protected abstract void OnTopicRouteDataFetched0(string topic, TopicRouteData topicRouteData); private async Task OnTopicRouteDataFetched(string topic, TopicRouteData topicRouteData) @@ -158,7 +158,7 @@ namespace Org.Apache.Rocketmq } _topicRouteCache[topic] = topicRouteData; - OnTopicDataFetched0(topic, topicRouteData); + OnTopicRouteDataFetched0(topic, topicRouteData); } @@ -198,16 +198,26 @@ namespace Org.Apache.Rocketmq } } - private static void ScheduleWithFixedDelay(Action action, TimeSpan period, CancellationToken token) + private void ScheduleWithFixedDelay(Action action, TimeSpan period, CancellationToken token) { Task.Run(async () => { while (!token.IsCancellationRequested) { - action(); - await Task.Delay(period, token); + try + { + action(); + } + catch (Exception e) + { + Logger.Error(e, $"Failed to execute scheduled task, ClientId={ClientId}"); + } + finally + { + await Task.Delay(period, token); + } } - }); + }, token); } protected async Task<TopicRouteData> FetchTopicRoute(string topic) @@ -232,7 +242,7 @@ namespace Org.Apache.Rocketmq }; var response = - await Manager.QueryRoute(ClientConfig.Endpoints, request, ClientConfig.RequestTimeout); + await ClientManager.QueryRoute(ClientConfig.Endpoints, request, ClientConfig.RequestTimeout); var code = response.Status.Code; if (!Proto.Code.Ok.Equals(code)) { @@ -245,7 +255,7 @@ namespace Org.Apache.Rocketmq return new TopicRouteData(messageQueues); } - public async void Heartbeat() + private async void Heartbeat() { var endpoints = GetTotalRouteEndpoints(); if (0 == endpoints.Count) @@ -259,7 +269,7 @@ namespace Org.Apache.Rocketmq // Collect task into a map. foreach (var item in endpoints) { - var task = Manager.Heartbeat(item, request, ClientConfig.RequestTimeout); + var task = ClientManager.Heartbeat(item, request, ClientConfig.RequestTimeout); responses[item]= task; } foreach (var item in responses.Keys) @@ -276,12 +286,14 @@ namespace Org.Apache.Rocketmq Logger.Info($"Failed to send heartbeat, endpoints={item}, code={code}, statusMessage={statusMessage}, clientId={ClientId}"); } } + + public grpc.Metadata Sign() { var metadata = new grpc::Metadata(); - Signature.Sign(ClientConfig, metadata); + Signature.Sign(this, metadata); return metadata; } @@ -294,7 +306,7 @@ namespace Org.Apache.Rocketmq }; foreach (var item in endpoints) { - var response = await Manager.NotifyClientTermination(item, request, ClientConfig.RequestTimeout); + var response = await ClientManager.NotifyClientTermination(item, request, ClientConfig.RequestTimeout); try { StatusChecker.Check(response.Status, request); @@ -319,6 +331,11 @@ namespace Org.Apache.Rocketmq return ClientId; } + public ClientConfig GetClientConfig() + { + return ClientConfig; + } + public void OnRecoverOrphanedTransactionCommand(Endpoints endpoints, Proto.RecoverOrphanedTransactionCommand command) { diff --git a/csharp/rocketmq-client-csharp/ClientConfig.cs b/csharp/rocketmq-client-csharp/ClientConfig.cs index e5fd8643..7e434eae 100644 --- a/csharp/rocketmq-client-csharp/ClientConfig.cs +++ b/csharp/rocketmq-client-csharp/ClientConfig.cs @@ -22,13 +22,8 @@ namespace Org.Apache.Rocketmq { public class ClientConfig : IClientConfig { - private static long _instanceSequence = 0; - public ClientConfig(string endpoints) { - var hostName = System.Net.Dns.GetHostName(); - var pid = System.Diagnostics.Process.GetCurrentProcess().Id; - ClientId = $"{hostName}@{pid}@{Interlocked.Increment(ref _instanceSequence)}"; RequestTimeout = TimeSpan.FromSeconds(3); Endpoints = new Endpoints(endpoints); } @@ -37,9 +32,6 @@ namespace Org.Apache.Rocketmq public TimeSpan RequestTimeout { get; set; } - public string ClientId { get; } - - public Endpoints Endpoints { get; } } } \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/ClientLoggerInterceptor.cs b/csharp/rocketmq-client-csharp/ClientLoggerInterceptor.cs index d9622291..890ce877 100644 --- a/csharp/rocketmq-client-csharp/ClientLoggerInterceptor.cs +++ b/csharp/rocketmq-client-csharp/ClientLoggerInterceptor.cs @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + using System; using System.Threading.Tasks; using Grpc.Core; diff --git a/csharp/rocketmq-client-csharp/ClientManager.cs b/csharp/rocketmq-client-csharp/ClientManager.cs index bd18ebc4..3eef2fe6 100644 --- a/csharp/rocketmq-client-csharp/ClientManager.cs +++ b/csharp/rocketmq-client-csharp/ClientManager.cs @@ -21,7 +21,6 @@ using System.Threading; using System.Threading.Tasks; using grpc = Grpc.Core; using System.Collections.Generic; -using NLog; namespace Org.Apache.Rocketmq { diff --git a/csharp/rocketmq-client-csharp/ClientType.cs b/csharp/rocketmq-client-csharp/ClientType.cs index 15481c98..487b3bec 100644 --- a/csharp/rocketmq-client-csharp/ClientType.cs +++ b/csharp/rocketmq-client-csharp/ClientType.cs @@ -15,7 +15,7 @@ * limitations under the License. */ -using rmq = Apache.Rocketmq.V2; +using Proto = Apache.Rocketmq.V2; namespace Org.Apache.Rocketmq { @@ -28,19 +28,15 @@ namespace Org.Apache.Rocketmq public static class ClientTypeHelper { - public static rmq.ClientType ToProtobuf(ClientType clientType) + public static Proto.ClientType ToProtobuf(ClientType clientType) { - switch (clientType) + return clientType switch { - case ClientType.Producer: - return rmq.ClientType.Producer; - case ClientType.SimpleConsumer: - return rmq.ClientType.SimpleConsumer; - case ClientType.PushConsumer: - return rmq.ClientType.PushConsumer; - default: - return rmq.ClientType.Unspecified; - } + ClientType.Producer => Proto.ClientType.Producer, + ClientType.SimpleConsumer => Proto.ClientType.SimpleConsumer, + ClientType.PushConsumer => Proto.ClientType.PushConsumer, + _ => Proto.ClientType.Unspecified + }; } } } \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/ConfigFileCredentialsProvider.cs b/csharp/rocketmq-client-csharp/ConfigFileCredentialsProvider.cs index 73d05f63..7764dc34 100644 --- a/csharp/rocketmq-client-csharp/ConfigFileCredentialsProvider.cs +++ b/csharp/rocketmq-client-csharp/ConfigFileCredentialsProvider.cs @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + using System.IO; using System; using System.Text.Json; diff --git a/csharp/rocketmq-client-csharp/Endpoints.cs b/csharp/rocketmq-client-csharp/Endpoints.cs index e7cf5f9c..54d8f0d2 100644 --- a/csharp/rocketmq-client-csharp/Endpoints.cs +++ b/csharp/rocketmq-client-csharp/Endpoints.cs @@ -1,19 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + using System; using System.Collections.Generic; using System.Linq; -using rmq = Apache.Rocketmq.V2; +using Proto = Apache.Rocketmq.V2; namespace Org.Apache.Rocketmq { public class Endpoints : IEquatable<Endpoints> { private static readonly AddressListEqualityComparer AddressListComparer = new(); - private static readonly string EndpointSeparator = ":"; + private const string EndpointSeparator = ":"; private List<Address> Addresses { get; } private AddressScheme Scheme { get; } private readonly int _hashCode; - public Endpoints(global::Apache.Rocketmq.V2.Endpoints endpoints) + public Endpoints(Proto.Endpoints endpoints) { Addresses = new List<Address>(); foreach (var address in endpoints.Addresses) @@ -28,13 +45,14 @@ namespace Org.Apache.Rocketmq switch (endpoints.Scheme) { - case rmq.AddressScheme.Ipv4: + case Proto.AddressScheme.Ipv4: Scheme = AddressScheme.Ipv4; break; - case rmq.AddressScheme.Ipv6: + case Proto.AddressScheme.Ipv6: Scheme = AddressScheme.Ipv6; break; - case rmq.AddressScheme.DomainName: + case Proto.AddressScheme.DomainName: + case Proto.AddressScheme.Unspecified: default: Scheme = AddressScheme.DomainName; if (Addresses.Count > 1) @@ -123,9 +141,9 @@ namespace Org.Apache.Rocketmq return _hashCode; } - public rmq.Endpoints ToProtobuf() + public Proto.Endpoints ToProtobuf() { - var endpoints = new rmq.Endpoints(); + var endpoints = new Proto.Endpoints(); foreach (var address in Addresses) { endpoints.Addresses.Add(address.ToProtobuf()); diff --git a/csharp/rocketmq-client-csharp/ExponentialBackoffRetryPolicy.cs b/csharp/rocketmq-client-csharp/ExponentialBackoffRetryPolicy.cs index e987d979..094c2607 100644 --- a/csharp/rocketmq-client-csharp/ExponentialBackoffRetryPolicy.cs +++ b/csharp/rocketmq-client-csharp/ExponentialBackoffRetryPolicy.cs @@ -4,11 +4,11 @@ using Google.Protobuf.WellKnownTypes; namespace Org.Apache.Rocketmq { - public class ExponentialBackoffRetryPolicy : RetryPolicy + public class ExponentialBackoffRetryPolicy : IRetryPolicy { - private int _maxAttempts; + private readonly int _maxAttempts; - public ExponentialBackoffRetryPolicy(int maxAttempts, TimeSpan initialBackoff, TimeSpan maxBackoff, + private ExponentialBackoffRetryPolicy(int maxAttempts, TimeSpan initialBackoff, TimeSpan maxBackoff, double backoffMultiplier) { _maxAttempts = maxAttempts; @@ -17,7 +17,7 @@ namespace Org.Apache.Rocketmq BackoffMultiplier = backoffMultiplier; } - public int getMaxAttempts() + public int GetMaxAttempts() { return _maxAttempts; } @@ -28,17 +28,17 @@ namespace Org.Apache.Rocketmq public double BackoffMultiplier { get; } - public TimeSpan getNextAttemptDelay(int attempt) + public TimeSpan GetNextAttemptDelay(int attempt) { return TimeSpan.Zero; } - public static ExponentialBackoffRetryPolicy immediatelyRetryPolicy(int maxAttempts) + public static ExponentialBackoffRetryPolicy ImmediatelyRetryPolicy(int maxAttempts) { return new ExponentialBackoffRetryPolicy(maxAttempts, TimeSpan.Zero, TimeSpan.Zero, 1); } - public global::Apache.Rocketmq.V2.RetryPolicy toProtobuf() + public global::Apache.Rocketmq.V2.RetryPolicy ToProtobuf() { var exponentialBackoff = new ExponentialBackoff { diff --git a/csharp/rocketmq-client-csharp/IClient.cs b/csharp/rocketmq-client-csharp/IClient.cs index db219af9..5ba4c6f1 100644 --- a/csharp/rocketmq-client-csharp/IClient.cs +++ b/csharp/rocketmq-client-csharp/IClient.cs @@ -23,22 +23,44 @@ namespace Org.Apache.Rocketmq { public interface IClient { - void Heartbeat(); - - void NotifyClientTermination(Proto.Resource group); - CancellationTokenSource TelemetryCts(); + ClientConfig GetClientConfig(); + Proto.Settings GetSettings(); + /// <summary> + /// Get the identifier of current client. + /// </summary> + /// <returns>Client identifier.</returns> string GetClientId(); + /// <summary> + /// This method will be triggered when client settings is received from remote endpoints. + /// </summary> + /// <param name="endpoints"></param> + /// <param name="settings"></param> void OnSettingsCommand(Endpoints endpoints, Proto.Settings settings); + /// <summary> + /// This method will be triggered when orphaned transaction need to be recovered. + /// </summary> + /// <param name="endpoints">Remote endpoints.</param> + /// <param name="command">Command of orphaned transaction recovery.</param> void OnRecoverOrphanedTransactionCommand(Endpoints endpoints, Proto.RecoverOrphanedTransactionCommand command); + /// <summary> + /// This method will be triggered when message verification command is received. + /// </summary> + /// <param name="endpoints">Remote endpoints.</param> + /// <param name="command">Command of message verification.</param> void OnVerifyMessageCommand(Endpoints endpoints, Proto.VerifyMessageCommand command); + /// <summary> + /// This method will be triggered when thread stack trace command is received. + /// </summary> + /// <param name="endpoints">Remote endpoints.</param> + /// <param name="command">Command of printing thread stack trace.</param> void OnPrintThreadStackTraceCommand(Endpoints endpoints, Proto.PrintThreadStackTraceCommand command); Metadata Sign(); diff --git a/csharp/rocketmq-client-csharp/IClientConfig.cs b/csharp/rocketmq-client-csharp/IClientConfig.cs index 5603a616..a50bdf93 100644 --- a/csharp/rocketmq-client-csharp/IClientConfig.cs +++ b/csharp/rocketmq-client-csharp/IClientConfig.cs @@ -15,21 +15,10 @@ * limitations under the License. */ -using System; - namespace Org.Apache.Rocketmq { public interface IClientConfig { - - ICredentialsProvider CredentialsProvider - { - get; - } - - string ClientId - { - get; - } + ICredentialsProvider CredentialsProvider { get; } } } \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/IClientManager.cs b/csharp/rocketmq-client-csharp/IClientManager.cs index beb8880b..f2e48e36 100644 --- a/csharp/rocketmq-client-csharp/IClientManager.cs +++ b/csharp/rocketmq-client-csharp/IClientManager.cs @@ -19,15 +19,13 @@ using System.Threading.Tasks; using System; using System.Collections.Generic; using Apache.Rocketmq.V2; -using grpc = Grpc.Core; -using rmq = Apache.Rocketmq.V2; - +using Grpc.Core; namespace Org.Apache.Rocketmq { public interface IClientManager { - grpc::AsyncDuplexStreamingCall<TelemetryCommand, TelemetryCommand> Telemetry(Endpoints endpoints); + AsyncDuplexStreamingCall<TelemetryCommand, TelemetryCommand> Telemetry(Endpoints endpoints); Task<QueryRouteResponse> QueryRoute(Endpoints endpoints, QueryRouteRequest request, TimeSpan timeout); diff --git a/csharp/rocketmq-client-csharp/ICredentialsProvider.cs b/csharp/rocketmq-client-csharp/ICredentialsProvider.cs index 2f6e71eb..e98df14a 100644 --- a/csharp/rocketmq-client-csharp/ICredentialsProvider.cs +++ b/csharp/rocketmq-client-csharp/ICredentialsProvider.cs @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + namespace Org.Apache.Rocketmq { public interface ICredentialsProvider diff --git a/csharp/rocketmq-client-csharp/Message.cs b/csharp/rocketmq-client-csharp/Message.cs index fb004da3..9993b52f 100644 --- a/csharp/rocketmq-client-csharp/Message.cs +++ b/csharp/rocketmq-client-csharp/Message.cs @@ -20,14 +20,15 @@ using System.Collections.Generic; namespace Org.Apache.Rocketmq { - public class Message { public Message() : this(null, null) { } - public Message(string topic, byte[] body) : this(topic, null, new List<string>(), body) { } + public Message(string topic, byte[] body) : this(topic, null, new List<string>(), body) + { + } public Message(string topic, string tag, byte[] body) : this(topic, tag, new List<string>(), body) { @@ -35,7 +36,6 @@ namespace Org.Apache.Rocketmq public Message(string topic, string tag, List<string> keys, byte[] body) { - MaxAttemptTimes = 3; Topic = topic; Tag = tag; Keys = keys; @@ -44,73 +44,20 @@ namespace Org.Apache.Rocketmq DeliveryTimestamp = null; } - public string Topic - { - get; - set; - } + public string Topic { get; set; } - public byte[] Body - { - get; - set; - } + public byte[] Body { get; set; } - public string Tag - { - get; - set; - } + public string Tag { get; set; } - public List<string> Keys - { - get; - set; - } - public Dictionary<string, string> UserProperties - { - get; - set; - } + public List<string> Keys { get; set; } + public Dictionary<string, string> UserProperties { get; set; } - public int MaxAttemptTimes - { - get; - set; - } + public DateTime? DeliveryTimestamp { get; set; } - public DateTime? DeliveryTimestamp - { - get; - set; - } - - public int DeliveryAttempt - { - get; - internal set; - } - - public string MessageGroup - { - get; - set; - } - - public bool Fifo() - { - return !String.IsNullOrEmpty(MessageGroup); - } - - public bool Scheduled() - { - return DeliveryTimestamp > DateTime.UtcNow; - } + public int DeliveryAttempt { get; internal set; } - internal bool _checksumVerifiedOk = true; - internal string _receiptHandle; - internal string _sourceHost; + public string MessageGroup { get; set; } } - } \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/MessageException.cs b/csharp/rocketmq-client-csharp/MessageException.cs deleted file mode 100644 index 7ef10df3..00000000 --- a/csharp/rocketmq-client-csharp/MessageException.cs +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -using System; - -namespace Org.Apache.Rocketmq -{ - [Serializable] - public class MessageException : Exception - { - public MessageException(string message) : base(message) - { - } - } -} \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/MessageIdGenerator.cs b/csharp/rocketmq-client-csharp/MessageIdGenerator.cs index 8dc370d1..60620ef0 100644 --- a/csharp/rocketmq-client-csharp/MessageIdGenerator.cs +++ b/csharp/rocketmq-client-csharp/MessageIdGenerator.cs @@ -27,7 +27,7 @@ namespace Org.Apache.Rocketmq */ public class MessageIdGenerator { - public static readonly string version = "01"; + public const string Version = "01"; private static readonly MessageIdGenerator Instance = new(); private readonly string _prefix; @@ -39,15 +39,15 @@ namespace Org.Apache.Rocketmq private MessageIdGenerator() { - MemoryStream stream = new MemoryStream(); - BinaryWriter writer = new BinaryWriter(stream); + var stream = new MemoryStream(); + var writer = new BinaryWriter(stream); var macAddress = Utilities.GetMacAddress(); writer.Write(macAddress, 0, 6); - int processId = Utilities.GetProcessId(); + var processId = Utilities.GetProcessId(); - byte[] processIdBytes = BitConverter.GetBytes(processId); + var processIdBytes = BitConverter.GetBytes(processId); if (BitConverter.IsLittleEndian) { Array.Reverse(processIdBytes); @@ -55,9 +55,9 @@ namespace Org.Apache.Rocketmq writer.Write(processIdBytes, 2, 2); var array = stream.ToArray(); - _prefix = version + Utilities.ByteArrayToHexString(array); + _prefix = Version + Utilities.ByteArrayToHexString(array); - DateTime epoch = new DateTime(2021, 1, 1, + var epoch = new DateTime(2021, 1, 1, 0, 0, 0, 0, DateTimeKind.Utc); var now = DateTime.Now; @@ -67,12 +67,12 @@ namespace Org.Apache.Rocketmq _sequence = 0; } - public String Next() + public string Next() { - long deltaSeconds = _secondsSinceCustomEpoch + _stopwatch.ElapsedMilliseconds / 1_000; + var deltaSeconds = _secondsSinceCustomEpoch + _stopwatch.ElapsedMilliseconds / 1_000; - MemoryStream stream = new MemoryStream(); - BinaryWriter writer = new BinaryWriter(stream); + var stream = new MemoryStream(); + var writer = new BinaryWriter(stream); byte[] deltaSecondsBytes = BitConverter.GetBytes(deltaSeconds); if (BitConverter.IsLittleEndian) @@ -82,8 +82,8 @@ namespace Org.Apache.Rocketmq writer.Write(deltaSecondsBytes, 4, 4); - int no = Interlocked.Increment(ref _sequence); - byte[] noBytes = BitConverter.GetBytes(no); + var no = Interlocked.Increment(ref _sequence); + var noBytes = BitConverter.GetBytes(no); if (BitConverter.IsLittleEndian) { Array.Reverse(noBytes); diff --git a/csharp/rocketmq-client-csharp/MessageQueue.cs b/csharp/rocketmq-client-csharp/MessageQueue.cs index 385e392c..cd6f0ce3 100644 --- a/csharp/rocketmq-client-csharp/MessageQueue.cs +++ b/csharp/rocketmq-client-csharp/MessageQueue.cs @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + using System.Collections.Generic; using rmq = Apache.Rocketmq.V2; diff --git a/csharp/rocketmq-client-csharp/MessageType.cs b/csharp/rocketmq-client-csharp/MessageType.cs index 6338e365..09896a0b 100644 --- a/csharp/rocketmq-client-csharp/MessageType.cs +++ b/csharp/rocketmq-client-csharp/MessageType.cs @@ -16,7 +16,7 @@ */ using Org.Apache.Rocketmq.Error; -using rmq = Apache.Rocketmq.V2; +using Proto = Apache.Rocketmq.V2; namespace Org.Apache.Rocketmq { @@ -30,38 +30,34 @@ namespace Org.Apache.Rocketmq public static class MessageTypeHelper { - public static MessageType FromProtobuf(rmq.MessageType messageType) + public static MessageType FromProtobuf(Proto.MessageType messageType) { switch (messageType) { - case rmq.MessageType.Normal: + case Proto.MessageType.Normal: return MessageType.Normal; - case rmq.MessageType.Fifo: + case Proto.MessageType.Fifo: return MessageType.Fifo; - case rmq.MessageType.Delay: + case Proto.MessageType.Delay: return MessageType.Delay; - case rmq.MessageType.Transaction: + case Proto.MessageType.Transaction: return MessageType.Transaction; + case Proto.MessageType.Unspecified: default: throw new InternalErrorException("MessageType is not specified"); } } - public static rmq.MessageType ToProtobuf(MessageType messageType) + public static Proto.MessageType ToProtobuf(MessageType messageType) { - switch (messageType) + return messageType switch { - case MessageType.Normal: - return rmq.MessageType.Normal; - case MessageType.Fifo: - return rmq.MessageType.Fifo; - case MessageType.Delay: - return rmq.MessageType.Delay; - case MessageType.Transaction: - return rmq.MessageType.Transaction; - default: - return rmq.MessageType.Unspecified; - } + MessageType.Normal => Proto.MessageType.Normal, + MessageType.Fifo => Proto.MessageType.Fifo, + MessageType.Delay => Proto.MessageType.Delay, + MessageType.Transaction => Proto.MessageType.Transaction, + _ => Proto.MessageType.Unspecified + }; } } } \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/MessageView.cs b/csharp/rocketmq-client-csharp/MessageView.cs index 57b573ac..26d7fcc6 100644 --- a/csharp/rocketmq-client-csharp/MessageView.cs +++ b/csharp/rocketmq-client-csharp/MessageView.cs @@ -79,7 +79,7 @@ namespace Org.Apache.Rocketmq public int DeliveryAttempt { get; } - public static MessageView fromProtobuf(rmq.Message message, rmq.MessageQueue messageQueue) + public static MessageView FromProtobuf(rmq.Message message, rmq.MessageQueue messageQueue) { var topic = message.Topic.Name; var systemProperties = message.SystemProperties; diff --git a/csharp/rocketmq-client-csharp/MqEncoding.cs b/csharp/rocketmq-client-csharp/MqEncoding.cs index ba2a489d..27dfb052 100644 --- a/csharp/rocketmq-client-csharp/MqEncoding.cs +++ b/csharp/rocketmq-client-csharp/MqEncoding.cs @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + using rmq = Apache.Rocketmq.V2; namespace Org.Apache.Rocketmq @@ -12,15 +29,12 @@ namespace Org.Apache.Rocketmq { public static rmq.Encoding ToProtobuf(MqEncoding mqEncoding) { - switch (mqEncoding) + return mqEncoding switch { - case MqEncoding.Gzip: - return rmq.Encoding.Gzip; - case MqEncoding.Identity: - return rmq.Encoding.Identity; - default: - return rmq.Encoding.Unspecified; - } + MqEncoding.Gzip => rmq.Encoding.Gzip, + MqEncoding.Identity => rmq.Encoding.Identity, + _ => rmq.Encoding.Unspecified + }; } } } \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/MqLogManager.cs b/csharp/rocketmq-client-csharp/MqLogManager.cs index dcbdce57..7fa2b7bf 100644 --- a/csharp/rocketmq-client-csharp/MqLogManager.cs +++ b/csharp/rocketmq-client-csharp/MqLogManager.cs @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + using System; using System.IO; using System.Reflection; diff --git a/csharp/rocketmq-client-csharp/Permission.cs b/csharp/rocketmq-client-csharp/Permission.cs index d5fe6348..eedd6247 100644 --- a/csharp/rocketmq-client-csharp/Permission.cs +++ b/csharp/rocketmq-client-csharp/Permission.cs @@ -48,32 +48,30 @@ namespace Org.Apache.Rocketmq } public static bool IsWritable(Permission permission) { - if (Permission.Write.Equals(permission)) - { - return true; - } - - if (Permission.ReadWrite.Equals(permission)) + switch (permission) { - return true; + case Permission.Write: + case Permission.ReadWrite: + return true; + case Permission.None: + case Permission.Read: + default: + return false; } - - return false; } public static bool IsReadable(Permission permission) { - if (Permission.Read.Equals(permission)) - { - return true; - } - - if (Permission.ReadWrite.Equals(permission)) + switch (permission) { - return true; + case Permission.Read: + case Permission.ReadWrite: + return true; + case Permission.None: + case Permission.Write: + default: + return false; } - - return false; } } diff --git a/csharp/rocketmq-client-csharp/Producer.cs b/csharp/rocketmq-client-csharp/Producer.cs index 56ec5b40..c2782fc0 100644 --- a/csharp/rocketmq-client-csharp/Producer.cs +++ b/csharp/rocketmq-client-csharp/Producer.cs @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + using System; using System.Collections.Concurrent; using System.Collections.Generic; @@ -27,7 +44,7 @@ namespace Org.Apache.Rocketmq private Producer(ClientConfig clientConfig, ConcurrentDictionary<string, bool> topics, int maxAttempts) : base(clientConfig, topics) { - var retryPolicy = ExponentialBackoffRetryPolicy.immediatelyRetryPolicy(maxAttempts); + var retryPolicy = ExponentialBackoffRetryPolicy.ImmediatelyRetryPolicy(maxAttempts); _publishingSettings = new PublishingSettings(ClientId, clientConfig.Endpoints, retryPolicy, clientConfig.RequestTimeout, topics); _publishingRouteDataCache = new ConcurrentDictionary<string, PublishingLoadBalancer>(); @@ -63,27 +80,37 @@ namespace Org.Apache.Rocketmq }; } - protected override void OnTopicDataFetched0(string topic, TopicRouteData topicRouteData) + private async Task<PublishingLoadBalancer> GetPublishingLoadBalancer(string topic) { + if (_publishingRouteDataCache.TryGetValue(topic, out var publishingLoadBalancer)) + { + return publishingLoadBalancer; + } + + var topicRouteData = await FetchTopicRoute(topic); + publishingLoadBalancer = new PublishingLoadBalancer(topicRouteData); + _publishingRouteDataCache.TryAdd(topic, publishingLoadBalancer); + + return publishingLoadBalancer; } - private RetryPolicy GetRetryPolicy() + protected override void OnTopicRouteDataFetched0(string topic, TopicRouteData topicRouteData) + { + var publishingLoadBalancer = new PublishingLoadBalancer(topicRouteData); + _publishingRouteDataCache.TryAdd(topic, publishingLoadBalancer); + } + + private IRetryPolicy GetRetryPolicy() { return _publishingSettings.GetRetryPolicy(); } public async Task<SendReceipt> Send(Message message) { - if (!_publishingRouteDataCache.TryGetValue(message.Topic, out var publishingLoadBalancer)) - { - var topicRouteData = await FetchTopicRoute(message.Topic); - publishingLoadBalancer = new PublishingLoadBalancer(topicRouteData); - _publishingRouteDataCache.TryAdd(message.Topic, publishingLoadBalancer); - } - + var publishingLoadBalancer = await GetPublishingLoadBalancer(message.Topic); var publishingMessage = new PublishingMessage(message, _publishingSettings, false); var retryPolicy = GetRetryPolicy(); - var maxAttempts = retryPolicy.getMaxAttempts(); + var maxAttempts = retryPolicy.GetMaxAttempts(); var candidates = publishingLoadBalancer.TakeMessageQueues(publishingMessage.MessageGroup, maxAttempts); Exception exception = null; for (var attempt = 0; attempt < maxAttempts; attempt++) @@ -102,7 +129,7 @@ namespace Org.Apache.Rocketmq throw exception!; } - private Proto.SendMessageRequest WrapSendMessageRequest(PublishingMessage message, MessageQueue mq) + private static Proto.SendMessageRequest WrapSendMessageRequest(PublishingMessage message, MessageQueue mq) { return new Proto.SendMessageRequest { @@ -125,11 +152,11 @@ namespace Org.Apache.Rocketmq var sendMessageRequest = WrapSendMessageRequest(message, mq); var endpoints = mq.Broker.Endpoints; - Proto.SendMessageResponse response = - await Manager.SendMessage(endpoints, sendMessageRequest, ClientConfig.RequestTimeout); + var response = + await ClientManager.SendMessage(endpoints, sendMessageRequest, ClientConfig.RequestTimeout); try { - var sendReceipts = SendReceipt.processSendMessageResponse(response); + var sendReceipts = SendReceipt.ProcessSendMessageResponse(response); var sendReceipt = sendReceipts.First(); if (attempt > 1) diff --git a/csharp/rocketmq-client-csharp/PublishingMessage.cs b/csharp/rocketmq-client-csharp/PublishingMessage.cs index 93eb2de6..7839edaa 100644 --- a/csharp/rocketmq-client-csharp/PublishingMessage.cs +++ b/csharp/rocketmq-client-csharp/PublishingMessage.cs @@ -19,7 +19,7 @@ using System; using System.IO; using Google.Protobuf; using Google.Protobuf.WellKnownTypes; -using rmq = Apache.Rocketmq.V2; +using Proto = Apache.Rocketmq.V2; using Org.Apache.Rocketmq.Error; namespace Org.Apache.Rocketmq @@ -31,7 +31,7 @@ namespace Org.Apache.Rocketmq { public MessageType MessageType { set; get; } - public String MessageId { get; } + private string MessageId { get; } public PublishingMessage(Message message, PublishingSettings publishingSettings, bool txEnabled) : base( message.Topic, message.Body) @@ -45,7 +45,7 @@ namespace Org.Apache.Rocketmq // Generate message id. MessageId = MessageIdGenerator.GetInstance().Next(); // For NORMAL message. - if (String.IsNullOrEmpty(message.MessageGroup) && !message.DeliveryTimestamp.HasValue && + if (string.IsNullOrEmpty(message.MessageGroup) && !message.DeliveryTimestamp.HasValue && !txEnabled) { MessageType = MessageType.Normal; @@ -53,7 +53,7 @@ namespace Org.Apache.Rocketmq } // For FIFO message. - if (!String.IsNullOrEmpty(message.MessageGroup) && !txEnabled) + if (!string.IsNullOrEmpty(message.MessageGroup) && !txEnabled) { MessageType = MessageType.Fifo; return; @@ -67,18 +67,15 @@ namespace Org.Apache.Rocketmq } // For TRANSACTION message. - if (!String.IsNullOrEmpty(message.MessageGroup) && !message.DeliveryTimestamp.HasValue && txEnabled) - { - MessageType = MessageType.Transaction; - return; - } - - throw new InternalErrorException("Transactional message should not set messageGroup or deliveryTimestamp"); + if (string.IsNullOrEmpty(message.MessageGroup) || message.DeliveryTimestamp.HasValue || !txEnabled) + throw new InternalErrorException( + "Transactional message should not set messageGroup or deliveryTimestamp"); + MessageType = MessageType.Transaction; } - public rmq::Message ToProtobuf(int queueId) + public Proto::Message ToProtobuf(int queueId) { - rmq.SystemProperties systemProperties = new rmq.SystemProperties + var systemProperties = new Proto.SystemProperties { Keys = { Keys }, MessageId = MessageId, @@ -103,11 +100,11 @@ namespace Org.Apache.Rocketmq systemProperties.MessageGroup = MessageGroup; } - rmq.Resource topicResource = new rmq.Resource + var topicResource = new Proto.Resource { Name = Topic }; - return new rmq.Message + return new Proto.Message { Topic = topicResource, Body = ByteString.CopyFrom(Body), diff --git a/csharp/rocketmq-client-csharp/PublishingSettings.cs b/csharp/rocketmq-client-csharp/PublishingSettings.cs index b543cb71..023b0be3 100644 --- a/csharp/rocketmq-client-csharp/PublishingSettings.cs +++ b/csharp/rocketmq-client-csharp/PublishingSettings.cs @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + using System; using System.Collections.Concurrent; using System.Collections.Generic; @@ -55,7 +72,7 @@ namespace Org.Apache.Rocketmq AccessPoint = AccessPoint.ToProtobuf(), ClientType = ClientTypeHelper.ToProtobuf(ClientType), RequestTimeout = Duration.FromTimeSpan(RequestTimeout), - BackoffPolicy = RetryPolicy.toProtobuf(), + BackoffPolicy = RetryPolicy.ToProtobuf(), UserAgent = UserAgent.Instance.ToProtobuf() }; } diff --git a/csharp/rocketmq-client-csharp/Resource.cs b/csharp/rocketmq-client-csharp/Resource.cs index 5af67d1e..a1825f15 100644 --- a/csharp/rocketmq-client-csharp/Resource.cs +++ b/csharp/rocketmq-client-csharp/Resource.cs @@ -1,11 +1,11 @@ using System; -using rmq = Apache.Rocketmq.V2; +using Proto = Apache.Rocketmq.V2; namespace Org.Apache.Rocketmq { public class Resource { - public Resource(rmq.Resource resource) + public Resource(Proto.Resource resource) { Namespace = resource.ResourceNamespace; Name = resource.Name; @@ -14,9 +14,9 @@ namespace Org.Apache.Rocketmq public string Namespace { get; } public string Name { get; } - public rmq.Resource ToProtobuf() + public Proto.Resource ToProtobuf() { - return new rmq.Resource + return new Proto.Resource { ResourceNamespace = Namespace, Name = Name diff --git a/csharp/rocketmq-client-csharp/RetryPolicy.cs b/csharp/rocketmq-client-csharp/RetryPolicy.cs index 9169b5d1..92b82013 100644 --- a/csharp/rocketmq-client-csharp/RetryPolicy.cs +++ b/csharp/rocketmq-client-csharp/RetryPolicy.cs @@ -2,12 +2,12 @@ using System; namespace Org.Apache.Rocketmq { - public interface RetryPolicy + public interface IRetryPolicy { - int getMaxAttempts(); + int GetMaxAttempts(); - TimeSpan getNextAttemptDelay(int attempt); + TimeSpan GetNextAttemptDelay(int attempt); - global::Apache.Rocketmq.V2.RetryPolicy toProtobuf(); + global::Apache.Rocketmq.V2.RetryPolicy ToProtobuf(); } } \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/SendReceipt.cs b/csharp/rocketmq-client-csharp/SendReceipt.cs index a4ff1e3e..fa5c75c7 100644 --- a/csharp/rocketmq-client-csharp/SendReceipt.cs +++ b/csharp/rocketmq-client-csharp/SendReceipt.cs @@ -16,7 +16,8 @@ */ using System.Collections.Generic; -using rmq = Apache.Rocketmq.V2; +using System.Linq; +using Proto = Apache.Rocketmq.V2; namespace Org.Apache.Rocketmq { @@ -34,12 +35,12 @@ namespace Org.Apache.Rocketmq return $"{nameof(MessageId)}: {MessageId}"; } - public static List<SendReceipt> processSendMessageResponse(rmq.SendMessageResponse response) + public static List<SendReceipt> ProcessSendMessageResponse(Proto.SendMessageResponse response) { - rmq.Status status = response.Status; + var status = response.Status; foreach (var entry in response.Entries) { - if (rmq.Code.Ok.Equals(entry.Status.Code)) + if (Proto.Code.Ok.Equals(entry.Status.Code)) { status = entry.Status; } @@ -47,13 +48,7 @@ namespace Org.Apache.Rocketmq // May throw exception. StatusChecker.Check(status, response); - List<SendReceipt> sendReceipts = new List<SendReceipt>(); - foreach (var entry in response.Entries) - { - sendReceipts.Add(new SendReceipt(entry.MessageId)); - } - - return sendReceipts; + return response.Entries.Select(entry => new SendReceipt(entry.MessageId)).ToList(); } } } \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/SequenceGenerator.cs b/csharp/rocketmq-client-csharp/SequenceGenerator.cs deleted file mode 100644 index 97a1eb91..00000000 --- a/csharp/rocketmq-client-csharp/SequenceGenerator.cs +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -using System; -using System.Threading; -using System.Net.NetworkInformation; -using NLog; - -namespace Org.Apache.Rocketmq -{ - /** - * See https://yuque.antfin-inc.com/aone709911/ca1edg/af2t6o for Sequence ID spec. - * - * In the implementation layer, this class follows Singleton pattern. - */ - public sealed class SequenceGenerator - { - private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger(); - - public static SequenceGenerator Instance - { - get - { - return Nested.instance; - } - } - - private class Nested - { - static Nested() - { - - } - - internal static readonly SequenceGenerator instance = new SequenceGenerator(); - } - - private SequenceGenerator() - { - currentSecond = SecondsSinceCustomEpoch(); - macAddress = MacAddress(); - pidBytes = ToArray(pid); - if (BitConverter.IsLittleEndian) - { - Array.Reverse(version); - } - } - - /** - * Sequence version, 2 bytes. - */ - private static byte[] version = new byte[2] { 0x00, 0x01 }; - - /** - * MAC address, 6 bytes. - */ - private byte[] macAddress; - - private int sequenceInSecond = 0; - private int currentSecond; - - private static int pid = System.Diagnostics.Process.GetCurrentProcess().Id; - private static byte[] pidBytes; - - private static byte[] ToArray(int number) - { - byte[] bytes = BitConverter.GetBytes(number); - if (BitConverter.IsLittleEndian) - Array.Reverse(bytes); - return bytes; - } - - private static int SecondsSinceCustomEpoch() - { - var customEpoch = new DateTime(2021, 01, 01, 00, 00, 00, DateTimeKind.Utc); - var diff = DateTime.UtcNow.Subtract(customEpoch); - return (int)diff.TotalSeconds; - } - - private static byte[] MacAddress() - { - foreach (var nic in NetworkInterface.GetAllNetworkInterfaces()) - { - if (nic.OperationalStatus == OperationalStatus.Up) - { - if (nic.Name.StartsWith("lo")) - { - continue; - } - Logger.Debug($"NIC={nic.Name}"); - return nic.GetPhysicalAddress().GetAddressBytes(); - } - } - return null; - } - - public string Next() - { - byte[] data = new byte[18]; - Array.Copy(version, 0, data, 0, 2); - Array.Copy(macAddress, 0, data, 2, 6); - Array.Copy(pidBytes, 2, data, 8, 2); - int second = SecondsSinceCustomEpoch(); - if (second != currentSecond) - { - currentSecond = second; - Interlocked.Exchange(ref sequenceInSecond, 0); - } - byte[] secondBytes = ToArray(second); - Array.Copy(secondBytes, 0, data, 10, 4); - int sequence = Interlocked.Increment(ref sequenceInSecond); - byte[] sequenceBytes = ToArray(sequence); - Array.Copy(sequenceBytes, 0, data, 14, 4); - return BitConverter.ToString(data).Replace("-", ""); ; - } - } - -} \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/Session.cs b/csharp/rocketmq-client-csharp/Session.cs index 82f4f1ef..99d61268 100644 --- a/csharp/rocketmq-client-csharp/Session.cs +++ b/csharp/rocketmq-client-csharp/Session.cs @@ -34,14 +34,14 @@ namespace Org.Apache.Rocketmq private readonly grpc::AsyncDuplexStreamingCall<Proto::TelemetryCommand, Proto::TelemetryCommand> _streamingCall; - private readonly Client _client; + private readonly IClient _client; private readonly Channel<bool> _channel; private readonly Endpoints _endpoints; private readonly SemaphoreSlim _semaphore; public Session(Endpoints endpoints, AsyncDuplexStreamingCall<Proto::TelemetryCommand, Proto::TelemetryCommand> streamingCall, - Client client) + IClient client) { _endpoints = endpoints; _semaphore = new SemaphoreSlim(1); @@ -80,21 +80,6 @@ namespace Org.Apache.Rocketmq } } - // public async void xx() - // { - // while (true) - // { - // var reader = _streamingCall.ResponseStream; - // if (await reader.MoveNext(_client.TelemetryCts().Token)) - // { - // var command = reader.Current; - // Console.WriteLine("xxxxxxxx"); - // Console.WriteLine(command); - // } - // } - // } - - private void Loop() { Task.Run(async () => diff --git a/csharp/rocketmq-client-csharp/Settings.cs b/csharp/rocketmq-client-csharp/Settings.cs index e7ea4e92..7716fc2d 100644 --- a/csharp/rocketmq-client-csharp/Settings.cs +++ b/csharp/rocketmq-client-csharp/Settings.cs @@ -1,5 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + using System; -using rmq = Apache.Rocketmq.V2; +using Proto = Apache.Rocketmq.V2; namespace Org.Apache.Rocketmq { @@ -8,10 +25,10 @@ namespace Org.Apache.Rocketmq protected readonly string ClientId; protected readonly ClientType ClientType; protected readonly Endpoints AccessPoint; - protected volatile RetryPolicy RetryPolicy; + protected volatile IRetryPolicy RetryPolicy; protected readonly TimeSpan RequestTimeout; - public Settings(string clientId, ClientType clientType, Endpoints accessPoint, RetryPolicy retryPolicy, + public Settings(string clientId, ClientType clientType, Endpoints accessPoint, IRetryPolicy retryPolicy, TimeSpan requestTimeout) { ClientId = clientId; @@ -30,11 +47,11 @@ namespace Org.Apache.Rocketmq RequestTimeout = requestTimeout; } - public abstract rmq::Settings ToProtobuf(); + public abstract Proto::Settings ToProtobuf(); - public abstract void Sync(rmq::Settings settings); + public abstract void Sync(Proto::Settings settings); - public RetryPolicy GetRetryPolicy() + public IRetryPolicy GetRetryPolicy() { return RetryPolicy; } diff --git a/csharp/rocketmq-client-csharp/Signature.cs b/csharp/rocketmq-client-csharp/Signature.cs index e65125b6..8588c25a 100644 --- a/csharp/rocketmq-client-csharp/Signature.cs +++ b/csharp/rocketmq-client-csharp/Signature.cs @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + using System; using System.Text; using grpc = Grpc.Core; @@ -23,43 +24,37 @@ namespace Org.Apache.Rocketmq { public static class Signature { - public static void Sign(IClientConfig clientConfig, grpc::Metadata metadata) + public static void Sign(IClient client, grpc::Metadata metadata) { + var clientConfig = client.GetClientConfig(); metadata.Add(MetadataConstants.LanguageKey, MetadataConstants.LanguageValue); metadata.Add(MetadataConstants.ClientVersionKey, MetadataConstants.Instance.ClientVersion); - metadata.Add(MetadataConstants.ClientIdKey, clientConfig.ClientId); - - string time = DateTime.Now.ToString(MetadataConstants.DateTimeFormat); + metadata.Add(MetadataConstants.ClientIdKey, client.GetClientId()); + + var time = DateTime.Now.ToString(MetadataConstants.DateTimeFormat); metadata.Add(MetadataConstants.DateTimeKey, time); - if (null != clientConfig.CredentialsProvider) + var credentials = clientConfig.CredentialsProvider?.Credentials; + if (credentials == null || credentials.expired()) { - var credentials = clientConfig.CredentialsProvider.Credentials; - if (null == credentials || credentials.expired()) - { - return; - } - - if (!String.IsNullOrEmpty(credentials.SessionToken)) - { - metadata.Add(MetadataConstants.SessionTokenKey, credentials.SessionToken); - } + return; + } - byte[] secretData = Encoding.ASCII.GetBytes(credentials.AccessSecret); - byte[] data = Encoding.ASCII.GetBytes(time); - HMACSHA1 signer = new HMACSHA1(secretData); - byte[] digest = signer.ComputeHash(data); - string hmac = BitConverter.ToString(digest).Replace("-", ""); - string authorization = string.Format("{0} {1}={2}, {3}={4}, {5}={6}", - MetadataConstants.AlgorithmKey, - MetadataConstants.CredentialKey, - credentials.AccessKey, - MetadataConstants.SignedHeadersKey, - MetadataConstants.DateTimeKey, - MetadataConstants.SignatureKey, - hmac); - metadata.Add(MetadataConstants.Authorization, authorization); + if (!string.IsNullOrEmpty(credentials.SessionToken)) + { + metadata.Add(MetadataConstants.SessionTokenKey, credentials.SessionToken); } + + var secretData = Encoding.ASCII.GetBytes(credentials.AccessSecret); + var data = Encoding.ASCII.GetBytes(time); + var signer = new HMACSHA1(secretData); + var digest = signer.ComputeHash(data); + var hmac = BitConverter.ToString(digest).Replace("-", ""); + var authorization = $"{MetadataConstants.AlgorithmKey} " + + $"{MetadataConstants.CredentialKey}={credentials.AccessKey}, " + + $"{MetadataConstants.SignedHeadersKey}={MetadataConstants.DateTimeKey}, " + + $"{MetadataConstants.SignatureKey}={hmac}"; + metadata.Add(MetadataConstants.Authorization, authorization); } } } \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/StatusChecker.cs b/csharp/rocketmq-client-csharp/StatusChecker.cs index cf15c204..641fd097 100644 --- a/csharp/rocketmq-client-csharp/StatusChecker.cs +++ b/csharp/rocketmq-client-csharp/StatusChecker.cs @@ -28,7 +28,7 @@ namespace Org.Apache.Rocketmq public static void Check(Proto.Status status, IMessage message) { - Proto.Code statusCode = status.Code; + var statusCode = status.Code; var statusMessage = status.Message; switch (statusCode) diff --git a/csharp/rocketmq-client-csharp/SubscriptionLoadBalancer.cs b/csharp/rocketmq-client-csharp/SubscriptionLoadBalancer.cs index cf803377..b77da833 100644 --- a/csharp/rocketmq-client-csharp/SubscriptionLoadBalancer.cs +++ b/csharp/rocketmq-client-csharp/SubscriptionLoadBalancer.cs @@ -15,7 +15,6 @@ * limitations under the License. */ -using System; using System.Collections.Generic; using System.Threading; using rmq = Apache.Rocketmq.V2; diff --git a/csharp/rocketmq-client-csharp/TopicRouteException.cs b/csharp/rocketmq-client-csharp/TopicRouteException.cs index 75462fd3..c80e8699 100644 --- a/csharp/rocketmq-client-csharp/TopicRouteException.cs +++ b/csharp/rocketmq-client-csharp/TopicRouteException.cs @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + using System; namespace Org.Apache.Rocketmq { diff --git a/csharp/tests/MessageIdGeneratorTest.cs b/csharp/tests/MessageIdGeneratorTest.cs index c98e1131..19d0cebd 100644 --- a/csharp/tests/MessageIdGeneratorTest.cs +++ b/csharp/tests/MessageIdGeneratorTest.cs @@ -29,11 +29,11 @@ namespace tests MessageIdGenerator instance = MessageIdGenerator.GetInstance(); var firstMessageId = instance.Next(); Assert.AreEqual(34, firstMessageId.Length); - Assert.AreEqual(MessageIdGenerator.version, firstMessageId.Substring(0, 2)); + Assert.AreEqual(MessageIdGenerator.Version, firstMessageId.Substring(0, 2)); var secondMessageId = instance.Next(); Assert.AreEqual(34, secondMessageId.Length); - Assert.AreEqual(MessageIdGenerator.version, secondMessageId.Substring(0, 2)); + Assert.AreEqual(MessageIdGenerator.Version, secondMessageId.Substring(0, 2)); Assert.AreNotEqual(firstMessageId, secondMessageId); Assert.AreEqual(firstMessageId.Substring(0, 24), secondMessageId.Substring(0, 24)); diff --git a/csharp/tests/SequenceGeneratorTest.cs b/csharp/tests/SequenceGeneratorTest.cs deleted file mode 100644 index 9b553346..00000000 --- a/csharp/tests/SequenceGeneratorTest.cs +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -using Microsoft.VisualStudio.TestTools.UnitTesting; - -using System; -using System.Collections.Generic; - -namespace Org.Apache.Rocketmq -{ - [TestClass] - public class SequenceGeneratorTest - { - - [ClassInitialize] - public static void SetUp(TestContext context) - { - } - - [TestMethod] - public void testNext() - { - var set = new HashSet<string>(); - for (int i = 0; i < 500000; i++) - { - var nextId = SequenceGenerator.Instance.Next(); - if (set.Contains(nextId)) - { - Assert.Fail("SequenceGenerator violates uniqueness"); - } - set.Add(nextId); - } - } - } -} -
