This is an automated email from the ASF dual-hosted git repository. lizhanhui pushed a commit to branch csharp_dev in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit 7bbdbca1ace6e31ad387f5bdfc90d5cf710fde78 Author: Zhanhui Li <[email protected]> AuthorDate: Tue Aug 30 17:38:44 2022 +0800 Clean up code --- csharp/examples/Program.cs | 8 +-- csharp/rocketmq-client-csharp/Client.cs | 73 ++++++++++++------------- csharp/rocketmq-client-csharp/Producer.cs | 4 +- csharp/rocketmq-client-csharp/PushConsumer.cs | 6 +- csharp/rocketmq-client-csharp/SimpleConsumer.cs | 8 +-- 5 files changed, 48 insertions(+), 51 deletions(-) diff --git a/csharp/examples/Program.cs b/csharp/examples/Program.cs index 28ad6f3..9eb0dd7 100644 --- a/csharp/examples/Program.cs +++ b/csharp/examples/Program.cs @@ -17,7 +17,6 @@ using System; using System.Collections.Generic; using System.Threading.Tasks; -using System.Threading; using Org.Apache.Rocketmq; namespace examples @@ -26,15 +25,16 @@ namespace examples { static async Task Main(string[] args) { - string accessUrl = "rmq-cn-7mz2uk4nn0p.cn-hangzhou.rmq.aliyuncs.com:8080"; + string accessUrl = "rmq-cn-tl32uly8x0n.cn-hangzhou.rmq.aliyuncs.com:8080"; + var topic = "sdk_standard"; var credentialsProvider = new ConfigFileCredentialsProvider(); var accessPoint = new AccessPoint(accessUrl); var producer = new Producer(accessPoint, ""); producer.CredentialsProvider = credentialsProvider; + producer.AddTopicOfInterest(topic); + await producer.Start(); - var topic = "sdk_standard"; - byte[] body = new byte[1024]; Array.Fill(body, (byte)'x'); // Associate the message with one or multiple keys diff --git a/csharp/rocketmq-client-csharp/Client.cs b/csharp/rocketmq-client-csharp/Client.cs index 32dffae..c7abc12 100644 --- a/csharp/rocketmq-client-csharp/Client.cs +++ b/csharp/rocketmq-client-csharp/Client.cs @@ -25,9 +25,6 @@ using rmq = Apache.Rocketmq.V2; using grpc = global::Grpc.Core; using NLog; using System.Diagnostics.Metrics; -using OpenTelemetry; -using OpenTelemetry.Metrics; - namespace Org.Apache.Rocketmq { @@ -37,7 +34,7 @@ namespace Org.Apache.Rocketmq protected Client(AccessPoint accessPoint, string resourceNamespace) { - _accessPoint = accessPoint; + AccessPoint = accessPoint; // Support IPv4 for now AccessPointScheme = rmq::AddressScheme.Ipv4; @@ -48,19 +45,19 @@ namespace Org.Apache.Rocketmq _resourceNamespace = resourceNamespace; - _clientSettings = new rmq::Settings(); + ClientSettings = new rmq::Settings(); - _clientSettings.AccessPoint = new rmq::Endpoints(); - _clientSettings.AccessPoint.Scheme = rmq::AddressScheme.Ipv4; - _clientSettings.AccessPoint.Addresses.Add(serviceEndpoint); + ClientSettings.AccessPoint = new rmq::Endpoints(); + ClientSettings.AccessPoint.Scheme = rmq::AddressScheme.Ipv4; + ClientSettings.AccessPoint.Addresses.Add(serviceEndpoint); - _clientSettings.RequestTimeout = Google.Protobuf.WellKnownTypes.Duration.FromTimeSpan(TimeSpan.FromSeconds(3)); + ClientSettings.RequestTimeout = Google.Protobuf.WellKnownTypes.Duration.FromTimeSpan(TimeSpan.FromSeconds(3)); - _clientSettings.UserAgent = new rmq.UA(); - _clientSettings.UserAgent.Language = rmq::Language.DotNet; - _clientSettings.UserAgent.Version = "5.0.0"; - _clientSettings.UserAgent.Platform = Environment.OSVersion.ToString(); - _clientSettings.UserAgent.Hostname = System.Net.Dns.GetHostName(); + ClientSettings.UserAgent = new rmq.UA(); + ClientSettings.UserAgent.Language = rmq::Language.DotNet; + ClientSettings.UserAgent.Version = "5.0.0"; + ClientSettings.UserAgent.Platform = Environment.OSVersion.ToString(); + ClientSettings.UserAgent.Hostname = System.Net.Dns.GetHostName(); Manager = ClientManagerFactory.getClientManager(resourceNamespace); @@ -69,12 +66,12 @@ namespace Org.Apache.Rocketmq _healthCheckCts = new CancellationTokenSource(); - telemetryCts_ = new CancellationTokenSource(); + _telemetryCts = new CancellationTokenSource(); } public virtual async Task Start() { - schedule(async () => + Schedule(async () => { await UpdateTopicRoute(); @@ -83,8 +80,8 @@ namespace Org.Apache.Rocketmq // Get routes for topics of interest. await UpdateTopicRoute(); - string accessPointUrl = _accessPoint.TargetUrl(); - createSession(accessPointUrl); + string accessPointUrl = AccessPoint.TargetUrl(); + CreateSession(accessPointUrl); await _sessions[accessPointUrl].AwaitSettingNegotiationCompletion(); @@ -95,7 +92,7 @@ namespace Org.Apache.Rocketmq { Logger.Info($"Shutdown client[resource-namespace={_resourceNamespace}"); _updateTopicRouteCts.Cancel(); - telemetryCts_.Cancel(); + _telemetryCts.Cancel(); await Manager.Shutdown(); } @@ -138,7 +135,7 @@ namespace Org.Apache.Rocketmq private async Task UpdateTopicRoute() { HashSet<string> topics = new HashSet<string>(); - foreach (var topic in topicsOfInterest_) + foreach (var topic in _topicsOfInterest) { topics.Add(topic); } @@ -192,7 +189,7 @@ namespace Org.Apache.Rocketmq } } - public void schedule(Action action, int seconds, CancellationToken token) + public void Schedule(Action action, int seconds, CancellationToken token) { if (null == action) { @@ -303,7 +300,7 @@ namespace Org.Apache.Rocketmq } - protected async Task<List<rmq::Assignment>> scanLoadAssignment(string topic, string group) + protected async Task<List<rmq::Assignment>> ScanLoadAssignment(string topic, string group) { // Pick a broker randomly string target = FilterBroker((s) => true); @@ -345,10 +342,10 @@ namespace Org.Apache.Rocketmq public virtual void BuildClientSetting(rmq::Settings settings) { - settings.MergeFrom(_clientSettings); + settings.MergeFrom(ClientSettings); } - public void createSession(string url) + public void CreateSession(string url) { var metadata = new grpc::Metadata(); Signature.sign(this, metadata); @@ -447,24 +444,24 @@ namespace Org.Apache.Rocketmq { if (null != settings.Metric) { - _clientSettings.Metric = new rmq::Metric(); - _clientSettings.Metric.MergeFrom(settings.Metric); + ClientSettings.Metric = new rmq::Metric(); + ClientSettings.Metric.MergeFrom(settings.Metric); } if (null != settings.BackoffPolicy) { - _clientSettings.BackoffPolicy = new rmq::RetryPolicy(); - _clientSettings.BackoffPolicy.MergeFrom(settings.BackoffPolicy); + ClientSettings.BackoffPolicy = new rmq::RetryPolicy(); + ClientSettings.BackoffPolicy.MergeFrom(settings.BackoffPolicy); } } protected readonly IClientManager Manager; - private readonly HashSet<string> topicsOfInterest_ = new HashSet<string>(); + private readonly HashSet<string> _topicsOfInterest = new HashSet<string>(); public void AddTopicOfInterest(string topic) { - topicsOfInterest_.Add(topic); + _topicsOfInterest.Add(topic); } private readonly ConcurrentDictionary<string, TopicRouteData> _topicRouteTable; @@ -472,24 +469,24 @@ namespace Org.Apache.Rocketmq private readonly CancellationTokenSource _healthCheckCts; - private readonly CancellationTokenSource telemetryCts_ = new CancellationTokenSource(); + private readonly CancellationTokenSource _telemetryCts; public CancellationTokenSource TelemetryCts() { - return telemetryCts_; + return _telemetryCts; } - protected readonly AccessPoint _accessPoint; + protected readonly AccessPoint AccessPoint; // This field is subject changes from servers. - protected readonly rmq::Settings _clientSettings; + protected readonly rmq::Settings ClientSettings; private readonly Random _random = new Random(); - - protected readonly ConcurrentDictionary<string, Session> _sessions = new ConcurrentDictionary<string, Session>(); - public static readonly string MeterName = "Apache.RocketMQ.Client"; - + private readonly ConcurrentDictionary<string, Session> _sessions = new ConcurrentDictionary<string, Session>(); + + protected const string MeterName = "Apache.RocketMQ.Client"; + protected static readonly Meter MetricMeter = new(MeterName, "1.0"); } } \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/Producer.cs b/csharp/rocketmq-client-csharp/Producer.cs index 37aebb9..333fc91 100644 --- a/csharp/rocketmq-client-csharp/Producer.cs +++ b/csharp/rocketmq-client-csharp/Producer.cs @@ -51,8 +51,8 @@ namespace Org.Apache.Rocketmq .AddOtlpExporter(delegate(OtlpExporterOptions options, MetricReaderOptions readerOptions) { options.Protocol = OtlpExportProtocol.Grpc; - options.Endpoint = new Uri(_accessPoint.TargetUrl()); - options.TimeoutMilliseconds = (int) _clientSettings.RequestTimeout.ToTimeSpan().TotalMilliseconds; + options.Endpoint = new Uri(AccessPoint.TargetUrl()); + options.TimeoutMilliseconds = (int) ClientSettings.RequestTimeout.ToTimeSpan().TotalMilliseconds; readerOptions.PeriodicExportingMetricReaderOptions.ExportIntervalMilliseconds = 60 * 1000; }) diff --git a/csharp/rocketmq-client-csharp/PushConsumer.cs b/csharp/rocketmq-client-csharp/PushConsumer.cs index cc30943..30c3c8f 100644 --- a/csharp/rocketmq-client-csharp/PushConsumer.cs +++ b/csharp/rocketmq-client-csharp/PushConsumer.cs @@ -55,12 +55,12 @@ namespace Org.Apache.Rocketmq await Heartbeat(); // Step-3: Scan load assignments that are assigned to current client - schedule(async () => + Schedule(async () => { await scanLoadAssignments(); }, 10, _scanAssignmentCTS.Token); - schedule(() => + Schedule(() => { ScanExpiredProcessQueue(); }, 10, _scanExpiredProcessQueueCTS.Token); @@ -81,7 +81,7 @@ namespace Org.Apache.Rocketmq List<Task<List<rmq::Assignment>>> tasks = new List<Task<List<rmq::Assignment>>>(); foreach (var item in _topicFilterExpressionMap) { - tasks.Add(scanLoadAssignment(item.Key, _group)); + tasks.Add(ScanLoadAssignment(item.Key, _group)); } var result = await Task.WhenAll(tasks); diff --git a/csharp/rocketmq-client-csharp/SimpleConsumer.cs b/csharp/rocketmq-client-csharp/SimpleConsumer.cs index 154efa0..a0077ff 100644 --- a/csharp/rocketmq-client-csharp/SimpleConsumer.cs +++ b/csharp/rocketmq-client-csharp/SimpleConsumer.cs @@ -61,7 +61,7 @@ namespace Org.Apache.Rocketmq await base.Start(); // Scan load assignment periodically - schedule(async () => + Schedule(async () => { while (!_scanAssignmentCts.IsCancellationRequested) { @@ -100,13 +100,13 @@ namespace Org.Apache.Rocketmq request.Endpoints = new rmq::Endpoints(); request.Endpoints.Scheme = rmq.AddressScheme.Ipv4; var address = new rmq::Address(); - address.Host = _accessPoint.Host; - address.Port = _accessPoint.Port; + address.Host = AccessPoint.Host; + address.Port = AccessPoint.Port; request.Endpoints.Addresses.Add(address); var metadata = new Metadata(); Signature.sign(this, metadata); - tasks.Add(Manager.QueryLoadAssignment(_accessPoint.TargetUrl(), metadata, request, TimeSpan.FromSeconds(3))); + tasks.Add(Manager.QueryLoadAssignment(AccessPoint.TargetUrl(), metadata, request, TimeSpan.FromSeconds(3))); } List<rmq.Assignment>[] list = await Task.WhenAll(tasks);
