This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch csharp_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git

commit 7bbdbca1ace6e31ad387f5bdfc90d5cf710fde78
Author: Zhanhui Li <[email protected]>
AuthorDate: Tue Aug 30 17:38:44 2022 +0800

    Clean up code
---
 csharp/examples/Program.cs                      |  8 +--
 csharp/rocketmq-client-csharp/Client.cs         | 73 ++++++++++++-------------
 csharp/rocketmq-client-csharp/Producer.cs       |  4 +-
 csharp/rocketmq-client-csharp/PushConsumer.cs   |  6 +-
 csharp/rocketmq-client-csharp/SimpleConsumer.cs |  8 +--
 5 files changed, 48 insertions(+), 51 deletions(-)

diff --git a/csharp/examples/Program.cs b/csharp/examples/Program.cs
index 28ad6f3..9eb0dd7 100644
--- a/csharp/examples/Program.cs
+++ b/csharp/examples/Program.cs
@@ -17,7 +17,6 @@
 using System;
 using System.Collections.Generic;
 using System.Threading.Tasks;
-using System.Threading;
 using Org.Apache.Rocketmq;
 
 namespace examples
@@ -26,15 +25,16 @@ namespace examples
     {
         static async Task Main(string[] args)
         {
-            string accessUrl = 
"rmq-cn-7mz2uk4nn0p.cn-hangzhou.rmq.aliyuncs.com:8080";
+            string accessUrl = 
"rmq-cn-tl32uly8x0n.cn-hangzhou.rmq.aliyuncs.com:8080";
+            var topic = "sdk_standard";
             var credentialsProvider = new ConfigFileCredentialsProvider();
             var accessPoint = new AccessPoint(accessUrl);
             var producer = new Producer(accessPoint, "");
             producer.CredentialsProvider = credentialsProvider;
+            producer.AddTopicOfInterest(topic);
+            
             await producer.Start();
 
-            var topic = "sdk_standard";
-            
             byte[] body = new byte[1024];
             Array.Fill(body, (byte)'x');
             // Associate the message with one or multiple keys
diff --git a/csharp/rocketmq-client-csharp/Client.cs 
b/csharp/rocketmq-client-csharp/Client.cs
index 32dffae..c7abc12 100644
--- a/csharp/rocketmq-client-csharp/Client.cs
+++ b/csharp/rocketmq-client-csharp/Client.cs
@@ -25,9 +25,6 @@ using rmq = Apache.Rocketmq.V2;
 using grpc = global::Grpc.Core;
 using NLog;
 using System.Diagnostics.Metrics;
-using OpenTelemetry;
-using OpenTelemetry.Metrics;
-
 
 namespace Org.Apache.Rocketmq
 {
@@ -37,7 +34,7 @@ namespace Org.Apache.Rocketmq
 
         protected Client(AccessPoint accessPoint, string resourceNamespace)
         {
-            _accessPoint = accessPoint;
+            AccessPoint = accessPoint;
 
             // Support IPv4 for now
             AccessPointScheme = rmq::AddressScheme.Ipv4;
@@ -48,19 +45,19 @@ namespace Org.Apache.Rocketmq
 
             _resourceNamespace = resourceNamespace;
 
-            _clientSettings = new rmq::Settings();
+            ClientSettings = new rmq::Settings();
 
-            _clientSettings.AccessPoint = new rmq::Endpoints();
-            _clientSettings.AccessPoint.Scheme = rmq::AddressScheme.Ipv4;
-            _clientSettings.AccessPoint.Addresses.Add(serviceEndpoint);
+            ClientSettings.AccessPoint = new rmq::Endpoints();
+            ClientSettings.AccessPoint.Scheme = rmq::AddressScheme.Ipv4;
+            ClientSettings.AccessPoint.Addresses.Add(serviceEndpoint);
 
-            _clientSettings.RequestTimeout = 
Google.Protobuf.WellKnownTypes.Duration.FromTimeSpan(TimeSpan.FromSeconds(3));
+            ClientSettings.RequestTimeout = 
Google.Protobuf.WellKnownTypes.Duration.FromTimeSpan(TimeSpan.FromSeconds(3));
 
-            _clientSettings.UserAgent = new rmq.UA();
-            _clientSettings.UserAgent.Language = rmq::Language.DotNet;
-            _clientSettings.UserAgent.Version = "5.0.0";
-            _clientSettings.UserAgent.Platform = 
Environment.OSVersion.ToString();
-            _clientSettings.UserAgent.Hostname = System.Net.Dns.GetHostName();
+            ClientSettings.UserAgent = new rmq.UA();
+            ClientSettings.UserAgent.Language = rmq::Language.DotNet;
+            ClientSettings.UserAgent.Version = "5.0.0";
+            ClientSettings.UserAgent.Platform = 
Environment.OSVersion.ToString();
+            ClientSettings.UserAgent.Hostname = System.Net.Dns.GetHostName();
 
             Manager = ClientManagerFactory.getClientManager(resourceNamespace);
 
@@ -69,12 +66,12 @@ namespace Org.Apache.Rocketmq
 
             _healthCheckCts = new CancellationTokenSource();
 
-            telemetryCts_ = new CancellationTokenSource();
+            _telemetryCts = new CancellationTokenSource();
         }
 
         public virtual async Task Start()
         {
-            schedule(async () =>
+            Schedule(async () =>
             {
                 await UpdateTopicRoute();
 
@@ -83,8 +80,8 @@ namespace Org.Apache.Rocketmq
             // Get routes for topics of interest.
             await UpdateTopicRoute();
 
-            string accessPointUrl = _accessPoint.TargetUrl();
-            createSession(accessPointUrl);
+            string accessPointUrl = AccessPoint.TargetUrl();
+            CreateSession(accessPointUrl);
 
             await 
_sessions[accessPointUrl].AwaitSettingNegotiationCompletion();
 
@@ -95,7 +92,7 @@ namespace Org.Apache.Rocketmq
         {
             Logger.Info($"Shutdown 
client[resource-namespace={_resourceNamespace}");
             _updateTopicRouteCts.Cancel();
-            telemetryCts_.Cancel();
+            _telemetryCts.Cancel();
             await Manager.Shutdown();
         }
 
@@ -138,7 +135,7 @@ namespace Org.Apache.Rocketmq
         private async Task UpdateTopicRoute()
         {
             HashSet<string> topics = new HashSet<string>();
-            foreach (var topic in topicsOfInterest_)
+            foreach (var topic in _topicsOfInterest)
             {
                 topics.Add(topic);
             }
@@ -192,7 +189,7 @@ namespace Org.Apache.Rocketmq
             }
         }
 
-        public void schedule(Action action, int seconds, CancellationToken 
token)
+        public void Schedule(Action action, int seconds, CancellationToken 
token)
         {
             if (null == action)
             {
@@ -303,7 +300,7 @@ namespace Org.Apache.Rocketmq
 
         }
 
-        protected async Task<List<rmq::Assignment>> scanLoadAssignment(string 
topic, string group)
+        protected async Task<List<rmq::Assignment>> ScanLoadAssignment(string 
topic, string group)
         {
             // Pick a broker randomly
             string target = FilterBroker((s) => true);
@@ -345,10 +342,10 @@ namespace Org.Apache.Rocketmq
 
         public virtual void BuildClientSetting(rmq::Settings settings)
         {
-            settings.MergeFrom(_clientSettings);
+            settings.MergeFrom(ClientSettings);
         }
 
-        public void createSession(string url)
+        public void CreateSession(string url)
         {
             var metadata = new grpc::Metadata();
             Signature.sign(this, metadata);
@@ -447,24 +444,24 @@ namespace Org.Apache.Rocketmq
         {
             if (null != settings.Metric)
             {
-                _clientSettings.Metric = new rmq::Metric();
-                _clientSettings.Metric.MergeFrom(settings.Metric);
+                ClientSettings.Metric = new rmq::Metric();
+                ClientSettings.Metric.MergeFrom(settings.Metric);
             }
 
             if (null != settings.BackoffPolicy)
             {
-                _clientSettings.BackoffPolicy = new rmq::RetryPolicy();
-                
_clientSettings.BackoffPolicy.MergeFrom(settings.BackoffPolicy);
+                ClientSettings.BackoffPolicy = new rmq::RetryPolicy();
+                ClientSettings.BackoffPolicy.MergeFrom(settings.BackoffPolicy);
             }
         }
 
         protected readonly IClientManager Manager;
 
-        private readonly HashSet<string> topicsOfInterest_ = new 
HashSet<string>();
+        private readonly HashSet<string> _topicsOfInterest = new 
HashSet<string>();
 
         public void AddTopicOfInterest(string topic)
         {
-            topicsOfInterest_.Add(topic);
+            _topicsOfInterest.Add(topic);
         }
 
         private readonly ConcurrentDictionary<string, TopicRouteData> 
_topicRouteTable;
@@ -472,24 +469,24 @@ namespace Org.Apache.Rocketmq
 
         private readonly CancellationTokenSource _healthCheckCts;
 
-        private readonly CancellationTokenSource telemetryCts_ = new 
CancellationTokenSource();
+        private readonly CancellationTokenSource _telemetryCts;
 
         public CancellationTokenSource TelemetryCts()
         {
-            return telemetryCts_;
+            return _telemetryCts;
         }
 
-        protected readonly AccessPoint _accessPoint;
+        protected readonly AccessPoint AccessPoint;
 
         // This field is subject changes from servers.
-        protected readonly rmq::Settings _clientSettings;
+        protected readonly rmq::Settings ClientSettings;
 
         private readonly Random _random = new Random();
-        
-        protected readonly ConcurrentDictionary<string, Session> _sessions = 
new ConcurrentDictionary<string, Session>();
 
-        public static readonly string MeterName = "Apache.RocketMQ.Client";
-        
+        private readonly ConcurrentDictionary<string, Session> _sessions = new 
ConcurrentDictionary<string, Session>();
+
+        protected const string MeterName = "Apache.RocketMQ.Client";
+
         protected static readonly Meter MetricMeter = new(MeterName, "1.0");
     }
 }
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/Producer.cs 
b/csharp/rocketmq-client-csharp/Producer.cs
index 37aebb9..333fc91 100644
--- a/csharp/rocketmq-client-csharp/Producer.cs
+++ b/csharp/rocketmq-client-csharp/Producer.cs
@@ -51,8 +51,8 @@ namespace Org.Apache.Rocketmq
                 .AddOtlpExporter(delegate(OtlpExporterOptions options, 
MetricReaderOptions readerOptions)
                 {
                     options.Protocol = OtlpExportProtocol.Grpc;
-                    options.Endpoint = new Uri(_accessPoint.TargetUrl());
-                    options.TimeoutMilliseconds = (int) 
_clientSettings.RequestTimeout.ToTimeSpan().TotalMilliseconds;
+                    options.Endpoint = new Uri(AccessPoint.TargetUrl());
+                    options.TimeoutMilliseconds = (int) 
ClientSettings.RequestTimeout.ToTimeSpan().TotalMilliseconds;
 
                     
readerOptions.PeriodicExportingMetricReaderOptions.ExportIntervalMilliseconds = 
60 * 1000;
                 })
diff --git a/csharp/rocketmq-client-csharp/PushConsumer.cs 
b/csharp/rocketmq-client-csharp/PushConsumer.cs
index cc30943..30c3c8f 100644
--- a/csharp/rocketmq-client-csharp/PushConsumer.cs
+++ b/csharp/rocketmq-client-csharp/PushConsumer.cs
@@ -55,12 +55,12 @@ namespace Org.Apache.Rocketmq
             await Heartbeat();
 
             // Step-3: Scan load assignments that are assigned to current 
client
-            schedule(async () =>
+            Schedule(async () =>
             {
                 await scanLoadAssignments();
             }, 10, _scanAssignmentCTS.Token);
 
-            schedule(() =>
+            Schedule(() =>
             {
                 ScanExpiredProcessQueue();
             }, 10, _scanExpiredProcessQueueCTS.Token);
@@ -81,7 +81,7 @@ namespace Org.Apache.Rocketmq
             List<Task<List<rmq::Assignment>>> tasks = new 
List<Task<List<rmq::Assignment>>>();
             foreach (var item in _topicFilterExpressionMap)
             {
-                tasks.Add(scanLoadAssignment(item.Key, _group));
+                tasks.Add(ScanLoadAssignment(item.Key, _group));
             }
             var result = await Task.WhenAll(tasks);
 
diff --git a/csharp/rocketmq-client-csharp/SimpleConsumer.cs 
b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
index 154efa0..a0077ff 100644
--- a/csharp/rocketmq-client-csharp/SimpleConsumer.cs
+++ b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
@@ -61,7 +61,7 @@ namespace Org.Apache.Rocketmq
             await base.Start();
             
             // Scan load assignment periodically
-            schedule(async () =>
+            Schedule(async () =>
             {
                 while (!_scanAssignmentCts.IsCancellationRequested)
                 {
@@ -100,13 +100,13 @@ namespace Org.Apache.Rocketmq
                 request.Endpoints = new rmq::Endpoints();
                 request.Endpoints.Scheme = rmq.AddressScheme.Ipv4;
                 var address = new rmq::Address();
-                address.Host = _accessPoint.Host;
-                address.Port = _accessPoint.Port;
+                address.Host = AccessPoint.Host;
+                address.Port = AccessPoint.Port;
                 request.Endpoints.Addresses.Add(address);
 
                 var metadata = new Metadata();
                 Signature.sign(this, metadata);
-                
tasks.Add(Manager.QueryLoadAssignment(_accessPoint.TargetUrl(), metadata, 
request, TimeSpan.FromSeconds(3)));
+                tasks.Add(Manager.QueryLoadAssignment(AccessPoint.TargetUrl(), 
metadata, request, TimeSpan.FromSeconds(3)));
             }
 
             List<rmq.Assignment>[] list = await Task.WhenAll(tasks);

Reply via email to