This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 9da25a0  Complete SimpleConsumer example (#222)
9da25a0 is described below

commit 9da25a0f86ad426b75abece4d457357f2becd045
Author: Zhanhui Li <[email protected]>
AuthorDate: Thu Sep 1 18:53:20 2022 +0800

    Complete SimpleConsumer example (#222)
    
    * Clean up code
    
    * WIP: prepare to fix
    
    * WIP: bugfix
    
    * Complete simple consumer example
---
 csharp/examples/Program.cs                      |  70 +++++--
 csharp/rocketmq-client-csharp/AccessPoint.cs    |  28 +--
 csharp/rocketmq-client-csharp/Client.cs         | 265 ++++++++++++++----------
 csharp/rocketmq-client-csharp/ClientConfig.cs   |  24 +--
 csharp/rocketmq-client-csharp/ClientManager.cs  |  15 +-
 csharp/rocketmq-client-csharp/IClient.cs        |   6 +-
 csharp/rocketmq-client-csharp/IClientConfig.cs  |   4 -
 csharp/rocketmq-client-csharp/Producer.cs       |   4 +-
 csharp/rocketmq-client-csharp/RpcClient.cs      |   9 +-
 csharp/rocketmq-client-csharp/Session.cs        |  48 ++---
 csharp/rocketmq-client-csharp/Signature.cs      |  10 +-
 csharp/rocketmq-client-csharp/SimpleConsumer.cs | 232 ++++++++++++---------
 csharp/tests/ClientManagerTest.cs               |   2 +-
 csharp/tests/ProducerTest.cs                    | 138 ++++++------
 csharp/tests/RpcClientTest.cs                   |   6 +-
 csharp/tests/SignatureTest.cs                   |  11 +-
 csharp/tests/SimpleConsumerTest.cs              |  20 +-
 17 files changed, 498 insertions(+), 394 deletions(-)

diff --git a/csharp/examples/Program.cs b/csharp/examples/Program.cs
index e0d3851..abc89ce 100644
--- a/csharp/examples/Program.cs
+++ b/csharp/examples/Program.cs
@@ -23,11 +23,13 @@ namespace examples
 {
     class Program
     {
-        private const string accessUrl = 
"rmq-cn-tl32uly8x0n.cn-hangzhou.rmq.aliyuncs.com:8080";
-        private const string standardTopic = "sdk_standard";
-        private const string fifoTopic = "sdk_fifo";
-        private const string timedTopic = "sdk_timed";
-        private const string transactionalTopic = "sdk_transactional";
+        private const string ACCESS_URL = 
"rmq-cn-tl32uly8x0n.cn-hangzhou.rmq.aliyuncs.com:8080";
+        private const string STANDARD_TOPIC = "sdk_standard";
+        private const string FIFO_TOPIC = "sdk_fifo";
+        private const string TIMED_TOPIC = "sdk_timed";
+        private const string TRANSACTIONAL_TOPIC = "sdk_transactional";
+
+        private const string CONCURRENT_GROUP = "sdk_concurrency";
         
         private static async Task<SendReceipt> SendStandardMessage(Producer 
producer)
         {
@@ -40,7 +42,7 @@ namespace examples
                 "k2"
             };
             
-            var msg = new Message(standardTopic, body)
+            var msg = new Message(STANDARD_TOPIC, body)
             {
                 // Tag the massage. A message has at most one tag.
                 Tag = "Tag-0",
@@ -63,7 +65,7 @@ namespace examples
                 "k2"
             };
             
-            var msg = new Message(fifoTopic, body)
+            var msg = new Message(FIFO_TOPIC, body)
             {
                 // Tag the massage. A message has at most one tag.
                 Tag = "Tag-0",
@@ -89,7 +91,7 @@ namespace examples
                 "k2"
             };
             
-            var msg = new Message(timedTopic, body)
+            var msg = new Message(TIMED_TOPIC, body)
             {
                 // Tag the massage. A message has at most one tag.
                 Tag = "Tag-0",
@@ -100,27 +102,61 @@ namespace examples
             msg.DeliveryTimestamp = DateTime.UtcNow + TimeSpan.FromSeconds(30);
             return await producer.Send(msg);
         }
+
+        private static async Task ConsumeAndAckMessages(SimpleConsumer 
simpleConsumer)
+        {
+            var messages = await simpleConsumer.Receive(32, 
TimeSpan.FromSeconds(60));
+            if (null != messages)
+            {
+                var tasks = new List<Task>();
+                foreach (var message in messages)
+                {
+                    Console.WriteLine($"Receive a message, 
topic={message.Topic}, message-id={message.MessageId}");
+                    var task = simpleConsumer.Ack(message);
+                    tasks.Add(task);
+                }
+                await Task.WhenAll(tasks);
+                Console.WriteLine($"{tasks.Count} messages have been 
acknowledged");
+            }
+        }
         
         static async Task Main(string[] args)
         {
             var credentialsProvider = new ConfigFileCredentialsProvider();
-            var producer = new Producer(accessUrl);
-            producer.CredentialsProvider = credentialsProvider;
-            producer.AddTopicOfInterest(standardTopic);
-            producer.AddTopicOfInterest(fifoTopic);
-            producer.AddTopicOfInterest(timedTopic);
-            producer.AddTopicOfInterest(transactionalTopic);
-
+            var producer = new Producer(ACCESS_URL)
+            {
+                CredentialsProvider = credentialsProvider
+            };
+            producer.AddTopicOfInterest(STANDARD_TOPIC);
+            producer.AddTopicOfInterest(FIFO_TOPIC);
+            producer.AddTopicOfInterest(TIMED_TOPIC);
+            producer.AddTopicOfInterest(TRANSACTIONAL_TOPIC);
+            
             await producer.Start();
-
+            
             var sendReceiptOfStandardMessage = await 
SendStandardMessage(producer);
             Console.WriteLine($"Standard message-id: 
{sendReceiptOfStandardMessage.MessageId}");
-
+            
             var sendReceiptOfFifoMessage = await SendFifoMessage(producer);
             Console.WriteLine($"FIFO message-id: 
{sendReceiptOfFifoMessage.MessageId}");
             
             var sendReceiptOfTimedMessage = await SendTimedMessage(producer);
             Console.WriteLine($"Timed message-id: 
{sendReceiptOfTimedMessage.MessageId}");
+            
+            await producer.Shutdown();
+
+            Console.WriteLine("Now start a simple consumer");
+            var simpleConsumer = new SimpleConsumer(ACCESS_URL, 
CONCURRENT_GROUP)
+            {
+                CredentialsProvider = credentialsProvider
+            };
+            
+            simpleConsumer.Subscribe(STANDARD_TOPIC, new FilterExpression("*", 
ExpressionType.TAG));
+            await simpleConsumer.Start();
+
+            await ConsumeAndAckMessages(simpleConsumer);
+
+            await simpleConsumer.Shutdown();
 
             Console.ReadKey();
         }
diff --git a/csharp/rocketmq-client-csharp/AccessPoint.cs 
b/csharp/rocketmq-client-csharp/AccessPoint.cs
index ab29273..f05fa29 100644
--- a/csharp/rocketmq-client-csharp/AccessPoint.cs
+++ b/csharp/rocketmq-client-csharp/AccessPoint.cs
@@ -37,35 +37,27 @@ namespace Org.Apache.Rocketmq
                 throw new ArgumentException("Access url should be of format 
host:port");
             }
 
-            _host = segments[0];
-            _port = Int32.Parse(segments[1]);
+            Host = segments[0];
+            Port = Int32.Parse(segments[1]);
         }
-        
-        private string _host;
 
-        public string Host
-        {
-            get { return _host; }
-            set { _host = value; }
-        }
+        public string Host { get; }
 
-        private int _port;
+        public int Port { get; set; }
 
-        public int Port
+        public string TargetUrl()
         {
-            get { return _port; }
-            set { _port = value; }
+            return $"https://{Host}:{Port}";;
         }
 
-        public string TargetUrl()
+        public rmq::AddressScheme HostScheme()
         {
-            return $"https://{_host}:{_port}";;
+            return SchemeOf(Host);
         }
 
-        public rmq::AddressScheme HostScheme()
+        private static rmq::AddressScheme SchemeOf(string host)
         {
-            IPAddress ip;
-            bool result = IPAddress.TryParse(_host, out ip);
+            var result = IPAddress.TryParse(host, out var ip);
             if (!result)
             {
                 return rmq::AddressScheme.DomainName;
diff --git a/csharp/rocketmq-client-csharp/Client.cs 
b/csharp/rocketmq-client-csharp/Client.cs
index 0fe4ec9..2e6a6ec 100644
--- a/csharp/rocketmq-client-csharp/Client.cs
+++ b/csharp/rocketmq-client-csharp/Client.cs
@@ -22,7 +22,7 @@ using System.Threading;
 using System.Diagnostics;
 using System;
 using rmq = Apache.Rocketmq.V2;
-using grpc = global::Grpc.Core;
+using grpc = Grpc.Core;
 using NLog;
 using System.Diagnostics.Metrics;
 
@@ -37,34 +37,39 @@ namespace Org.Apache.Rocketmq
             AccessPoint = new AccessPoint(accessUrl);
 
             AccessPointScheme = AccessPoint.HostScheme();
-            var serviceEndpoint = new rmq::Address();
-            serviceEndpoint.Host = AccessPoint.Host;
-            serviceEndpoint.Port = AccessPoint.Port;
+            var serviceEndpoint = new rmq::Address
+            {
+                Host = AccessPoint.Host,
+                Port = AccessPoint.Port
+            };
             AccessPointEndpoints = new List<rmq::Address> { serviceEndpoint };
 
             _resourceNamespace = "";
 
-            ClientSettings = new rmq::Settings();
+            ClientSettings = new rmq::Settings
+            {
+                AccessPoint = new rmq::Endpoints
+                {
+                    Scheme = AccessPoint.HostScheme()
+                }
+            };
 
-            ClientSettings.AccessPoint = new rmq::Endpoints();
-            ClientSettings.AccessPoint.Scheme = rmq::AddressScheme.Ipv4;
             ClientSettings.AccessPoint.Addresses.Add(serviceEndpoint);
 
             ClientSettings.RequestTimeout = 
Google.Protobuf.WellKnownTypes.Duration.FromTimeSpan(TimeSpan.FromSeconds(3));
 
-            ClientSettings.UserAgent = new rmq.UA();
-            ClientSettings.UserAgent.Language = rmq::Language.DotNet;
-            ClientSettings.UserAgent.Version = "5.0.0";
-            ClientSettings.UserAgent.Platform = 
Environment.OSVersion.ToString();
-            ClientSettings.UserAgent.Hostname = System.Net.Dns.GetHostName();
+            ClientSettings.UserAgent = new rmq.UA
+            {
+                Language = rmq::Language.DotNet,
+                Version = "5.0.0",
+                Platform = Environment.OSVersion.ToString(),
+                Hostname = System.Net.Dns.GetHostName()
+            };
 
-            _manager = new ClientManager();
+            Manager = new ClientManager();
 
             _topicRouteTable = new ConcurrentDictionary<string, 
TopicRouteData>();
             _updateTopicRouteCts = new CancellationTokenSource();
-
-            _healthCheckCts = new CancellationTokenSource();
-
             _telemetryCts = new CancellationTokenSource();
         }
 
@@ -72,36 +77,37 @@ namespace Org.Apache.Rocketmq
         {
             Schedule(async () =>
             {
+                Logger.Debug("Update topic route by schedule");
                 await UpdateTopicRoute();
 
             }, 30, _updateTopicRouteCts.Token);
 
             // Get routes for topics of interest.
+            Logger.Debug("Step of #Start: get route for topics of interest");
             await UpdateTopicRoute();
 
             string accessPointUrl = AccessPoint.TargetUrl();
             CreateSession(accessPointUrl);
-
             await 
_sessions[accessPointUrl].AwaitSettingNegotiationCompletion();
-
+            Logger.Debug($"Session has been created for {accessPointUrl}");
             await Heartbeat();
         }
 
         public virtual async Task Shutdown()
         {
-            Logger.Info($"Shutdown 
client[resource-namespace={_resourceNamespace}");
+            Logger.Info($"Shutdown client");
             _updateTopicRouteCts.Cancel();
             _telemetryCts.Cancel();
-            await _manager.Shutdown();
+            await Manager.Shutdown();
         }
 
-        protected string FilterBroker(Func<string, bool> acceptor)
+        private string FilterBroker(Func<string, bool> acceptor)
         {
             foreach (var item in _topicRouteTable)
             {
                 foreach (var partition in item.Value.MessageQueues)
                 {
-                    string target = Utilities.TargetUrl(partition);
+                    var target = Utilities.TargetUrl(partition);
                     if (acceptor(target))
                     {
                         return target;
@@ -116,7 +122,7 @@ namespace Org.Apache.Rocketmq
          */
         private List<string> AvailableBrokerEndpoints()
         {
-            List<string> endpoints = new List<string>();
+            var endpoints = new List<string>();
             foreach (var item in _topicRouteTable)
             {
                 foreach (var partition in item.Value.MessageQueues)
@@ -149,7 +155,7 @@ namespace Org.Apache.Rocketmq
             List<string> topicList = new List<string>();
             topicList.AddRange(topics);
 
-            List<Task<TopicRouteData>> tasks = new 
List<Task<TopicRouteData>>();
+            var tasks = new List<Task<TopicRouteData>>();
             foreach (var item in topicList)
             {
                 tasks.Add(GetRouteFor(item, true));
@@ -188,7 +194,7 @@ namespace Org.Apache.Rocketmq
             }
         }
 
-        public void Schedule(Action action, int seconds, CancellationToken 
token)
+        protected void Schedule(Action action, int seconds, CancellationToken 
token)
         {
             if (null == action)
             {
@@ -213,46 +219,51 @@ namespace Org.Apache.Rocketmq
          * direct
          *    Indicate if we should by-pass cache and fetch route entries from 
name server.
          */
-        public async Task<TopicRouteData> GetRouteFor(string topic, bool 
direct)
+        protected async Task<TopicRouteData> GetRouteFor(string topic, bool 
direct)
         {
+            Logger.Debug($"Get route for topic={topic}, direct={direct}");
             if (!direct && _topicRouteTable.ContainsKey(topic))
             {
+                Logger.Debug($"Return cached route for {topic}");
                 return _topicRouteTable[topic];
             }
 
             // We got one or more name servers available.
-            var request = new rmq::QueryRouteRequest();
-            request.Topic = new rmq::Resource();
-            request.Topic.ResourceNamespace = _resourceNamespace;
-            request.Topic.Name = topic;
-            request.Endpoints = new rmq::Endpoints();
-            request.Endpoints.Scheme = AccessPointScheme;
+            var request = new rmq::QueryRouteRequest
+            {
+                Topic = new rmq::Resource
+                {
+                    ResourceNamespace = _resourceNamespace,
+                    Name = topic
+                },
+                Endpoints = new rmq::Endpoints
+                {
+                    Scheme = AccessPointScheme
+                }
+            };
             foreach (var address in AccessPointEndpoints)
             {
                 request.Endpoints.Addresses.Add(address);
             }
 
             var metadata = new grpc.Metadata();
-            Signature.sign(this, metadata);
+            Signature.Sign(this, metadata);
             int index = _random.Next(0, AccessPointEndpoints.Count);
             var serviceEndpoint = AccessPointEndpoints[index];
             // AccessPointAddresses.Count
             string target = 
$"https://{serviceEndpoint.Host}:{serviceEndpoint.Port}";;
-            TopicRouteData topicRouteData;
             try
             {
                 Logger.Debug($"Resolving route for topic={topic}");
-                topicRouteData = await _manager.ResolveRoute(target, metadata, 
request, RequestTimeout);
+                var topicRouteData = await Manager.ResolveRoute(target, 
metadata, request, RequestTimeout);
                 if (null != topicRouteData)
                 {
                     Logger.Debug($"Got route entries for {topic} from name 
server");
                     _topicRouteTable.TryAdd(topic, topicRouteData);
+                    Logger.Debug($"Got route for {topic} from {target}");
                     return topicRouteData;
                 }
-                else
-                {
-                    Logger.Warn($"Failed to query route of {topic} from 
{target}");
-                }
+                Logger.Warn($"Failed to query route of {topic} from {target}");
             }
             catch (Exception e)
             {
@@ -273,16 +284,20 @@ namespace Org.Apache.Rocketmq
                 return;
             }
 
-            var request = new rmq::HeartbeatRequest();
+            var request = new rmq::HeartbeatRequest
+            {
+                Group = null,
+                ClientType = rmq.ClientType.Unspecified
+            };
             PrepareHeartbeatData(request);
 
             var metadata = new grpc::Metadata();
-            Signature.sign(this, metadata);
+            Signature.Sign(this, metadata);
 
             List<Task> tasks = new List<Task>();
             foreach (var endpoint in endpoints)
             {
-                tasks.Add(_manager.Heartbeat(endpoint, metadata, request, 
RequestTimeout));
+                tasks.Add(Manager.Heartbeat(endpoint, metadata, request, 
RequestTimeout));
             }
 
             await Task.WhenAll(tasks);
@@ -303,15 +318,24 @@ namespace Org.Apache.Rocketmq
         {
             // Pick a broker randomly
             string target = FilterBroker((s) => true);
-            var request = new rmq::QueryAssignmentRequest();
-            request.Topic = new rmq::Resource();
-            request.Topic.ResourceNamespace = _resourceNamespace;
-            request.Topic.Name = topic;
-            request.Group = new rmq::Resource();
-            request.Group.ResourceNamespace = _resourceNamespace;
-            request.Group.Name = group;
-            request.Endpoints = new rmq::Endpoints();
-            request.Endpoints.Scheme = AccessPointScheme;
+            var request = new rmq::QueryAssignmentRequest
+            {
+                Topic = new rmq::Resource
+                {
+                    ResourceNamespace = _resourceNamespace,
+                    Name = topic
+                },
+                Group = new rmq::Resource
+                {
+                    ResourceNamespace = _resourceNamespace,
+                    Name = group
+                },
+                Endpoints = new rmq::Endpoints
+                {
+                    Scheme = AccessPointScheme
+                }
+            };
+            
             foreach (var endpoint in AccessPointEndpoints)
             {
                 request.Endpoints.Addresses.Add(endpoint);
@@ -319,8 +343,8 @@ namespace Org.Apache.Rocketmq
             try
             {
                 var metadata = new grpc::Metadata();
-                Signature.sign(this, metadata);
-                return await _manager.QueryLoadAssignment(target, metadata, 
request, RequestTimeout);
+                Signature.Sign(this, metadata);
+                return await Manager.QueryLoadAssignment(target, metadata, 
request, RequestTimeout);
             }
             catch (System.Exception e)
             {
@@ -344,88 +368,102 @@ namespace Org.Apache.Rocketmq
             settings.MergeFrom(ClientSettings);
         }
 
-        public void CreateSession(string url)
+        private async Task CreateSession(string url)
         {
+            Logger.Debug($"Create session for url={url}");
             var metadata = new grpc::Metadata();
-            Signature.sign(this, metadata);
-            var stream = _manager.Telemetry(url, metadata);
+            Signature.Sign(this, metadata);
+            var stream = Manager.Telemetry(url, metadata);
             var session = new Session(url, stream, this);
             _sessions.TryAdd(url, session);
-            Task.Run(async () =>
-            {
-                await session.Loop();
-            });
+            await session.Loop();
         }
 
-
-        public async Task<List<Message>> ReceiveMessage(rmq::Assignment 
assignment, string group)
+        internal async Task<List<Message>> ReceiveMessage(rmq::Assignment 
assignment, string group)
         {
             var targetUrl = TargetUrl(assignment);
             var metadata = new grpc::Metadata();
-            Signature.sign(this, metadata);
-            var request = new rmq::ReceiveMessageRequest();
-            request.Group = new rmq::Resource();
-            request.Group.ResourceNamespace = _resourceNamespace;
-            request.Group.Name = group;
-            request.MessageQueue = assignment.MessageQueue;
-            var messages = await _manager.ReceiveMessage(targetUrl, metadata, 
request, getLongPollingTimeout());
+            Signature.Sign(this, metadata);
+            var request = new rmq::ReceiveMessageRequest
+            {
+                Group = new rmq::Resource
+                {
+                    ResourceNamespace = _resourceNamespace,
+                    Name = group
+                },
+                MessageQueue = assignment.MessageQueue
+            };
+            var messages = await Manager.ReceiveMessage(targetUrl, metadata, 
request, 
+                ClientSettings.Subscription.LongPollingTimeout.ToTimeSpan());
             return messages;
         }
 
         public async Task<Boolean> Ack(string target, string group, string 
topic, string receiptHandle, String messageId)
         {
-            var request = new rmq::AckMessageRequest();
-            request.Group = new rmq::Resource();
-            request.Group.ResourceNamespace = _resourceNamespace;
-            request.Group.Name = group;
-
-            request.Topic = new rmq::Resource();
-            request.Topic.ResourceNamespace = _resourceNamespace;
-            request.Topic.Name = topic;
-
-            var entry = new rmq::AckMessageEntry();
-            entry.ReceiptHandle = receiptHandle;
-            entry.MessageId = messageId;
+            var request = new rmq::AckMessageRequest
+            {
+                Group = new rmq::Resource
+                {
+                    ResourceNamespace = _resourceNamespace,
+                    Name = group
+                },
+                Topic = new rmq::Resource
+                {
+                    ResourceNamespace = _resourceNamespace,
+                    Name = topic
+                }
+            };
+
+            var entry = new rmq::AckMessageEntry
+            {
+                ReceiptHandle = receiptHandle,
+                MessageId = messageId
+            };
             request.Entries.Add(entry);
 
             var metadata = new grpc::Metadata();
-            Signature.sign(this, metadata);
-            return await _manager.Ack(target, metadata, request, 
RequestTimeout);
+            Signature.Sign(this, metadata);
+            return await Manager.Ack(target, metadata, request, 
RequestTimeout);
         }
 
         public async Task<Boolean> ChangeInvisibleDuration(string target, 
string group, string topic, string receiptHandle, String messageId)
         {
-            var request = new rmq::ChangeInvisibleDurationRequest();
-            request.ReceiptHandle = receiptHandle;
-            request.Group = new rmq::Resource();
-            request.Group.ResourceNamespace = _resourceNamespace;
-            request.Group.Name = group;
-
-            request.Topic = new rmq::Resource();
-            request.Topic.ResourceNamespace = _resourceNamespace;
-            request.Topic.Name = topic;
-
-            request.MessageId = messageId;
+            var request = new rmq::ChangeInvisibleDurationRequest
+            {
+                ReceiptHandle = receiptHandle,
+                Group = new rmq::Resource
+                {
+                    ResourceNamespace = _resourceNamespace,
+                    Name = group
+                },
+                Topic = new rmq::Resource
+                {
+                    ResourceNamespace = _resourceNamespace,
+                    Name = topic
+                },
+                MessageId = messageId
+            };
 
             var metadata = new grpc::Metadata();
-            Signature.sign(this, metadata);
-            return await _manager.ChangeInvisibleDuration(target, metadata, 
request, RequestTimeout);
+            Signature.Sign(this, metadata);
+            return await Manager.ChangeInvisibleDuration(target, metadata, 
request, RequestTimeout);
         }
 
-        public async Task<bool> NotifyClientTermination()
+        public async Task<bool> NotifyClientTermination(rmq.Resource group)
         {
             List<string> endpoints = AvailableBrokerEndpoints();
-            var request = new rmq::NotifyClientTerminationRequest();
-
-
+            var request = new rmq::NotifyClientTerminationRequest
+            {
+                Group = group
+            };
             var metadata = new grpc.Metadata();
-            Signature.sign(this, metadata);
+            Signature.Sign(this, metadata);
 
             List<Task<Boolean>> tasks = new List<Task<Boolean>>();
 
             foreach (var endpoint in endpoints)
             {
-                tasks.Add(_manager.NotifyClientTermination(endpoint, metadata, 
request, RequestTimeout));
+                tasks.Add(Manager.NotifyClientTermination(endpoint, metadata, 
request, RequestTimeout));
             }
 
             bool[] results = await Task.WhenAll(tasks);
@@ -439,7 +477,7 @@ namespace Org.Apache.Rocketmq
             return true;
         }
 
-        public virtual void OnSettingsReceived(rmq::Settings settings)
+        internal virtual void OnSettingsReceived(rmq::Settings settings)
         {
             if (null != settings.Metric)
             {
@@ -452,9 +490,24 @@ namespace Org.Apache.Rocketmq
                 ClientSettings.BackoffPolicy = new rmq::RetryPolicy();
                 ClientSettings.BackoffPolicy.MergeFrom(settings.BackoffPolicy);
             }
+
+            switch (settings.PubSubCase)
+            {
+                case rmq.Settings.PubSubOneofCase.Publishing:
+                {
+                    ClientSettings.Publishing = settings.Publishing;
+                    break;
+                }
+
+                case rmq.Settings.PubSubOneofCase.Subscription:
+                {
+                    ClientSettings.Subscription = settings.Subscription;
+                    break;
+                }
+            }
         }
 
-        protected readonly IClientManager _manager;
+        protected readonly IClientManager Manager;
 
         private readonly HashSet<string> _topicsOfInterest = new 
HashSet<string>();
 
@@ -465,9 +518,7 @@ namespace Org.Apache.Rocketmq
 
         private readonly ConcurrentDictionary<string, TopicRouteData> 
_topicRouteTable;
         private readonly CancellationTokenSource _updateTopicRouteCts;
-
-        private readonly CancellationTokenSource _healthCheckCts;
-
+        
         private readonly CancellationTokenSource _telemetryCts;
 
         public CancellationTokenSource TelemetryCts()
diff --git a/csharp/rocketmq-client-csharp/ClientConfig.cs 
b/csharp/rocketmq-client-csharp/ClientConfig.cs
index 0d99cb1..86175a2 100644
--- a/csharp/rocketmq-client-csharp/ClientConfig.cs
+++ b/csharp/rocketmq-client-csharp/ClientConfig.cs
@@ -75,15 +75,6 @@ namespace Org.Apache.Rocketmq
             set { credentialsProvider_ = value; }
         }
 
-        public string tenantId()
-        {
-            return _tenantId;
-        }
-        public string TenantId
-        {
-            set { _tenantId = value; }
-        }
-
         public TimeSpan RequestTimeout
         {
             get
@@ -96,15 +87,6 @@ namespace Org.Apache.Rocketmq
             }
         }
 
-        public TimeSpan getLongPollingTimeout()
-        {
-            return longPollingIoTimeout_;
-        }
-        public TimeSpan LongPollingTimeout
-        {
-            set { longPollingIoTimeout_ = value; }
-        }
-
         public string getGroupName()
         {
             return groupName_;
@@ -139,9 +121,7 @@ namespace Org.Apache.Rocketmq
         protected string _resourceNamespace;
 
         private ICredentialsProvider credentialsProvider_;
-
-        private string _tenantId;
-
+        
         private TimeSpan _requestTimeout;
 
         private TimeSpan longPollingIoTimeout_;
@@ -150,7 +130,7 @@ namespace Org.Apache.Rocketmq
 
         private string clientId_;
 
-        private bool tracingEnabled_ = false;
+        private bool tracingEnabled_;
 
         private string instanceName_ = "default";
 
diff --git a/csharp/rocketmq-client-csharp/ClientManager.cs 
b/csharp/rocketmq-client-csharp/ClientManager.cs
index a39a0e0..c2e00e8 100644
--- a/csharp/rocketmq-client-csharp/ClientManager.cs
+++ b/csharp/rocketmq-client-csharp/ClientManager.cs
@@ -191,6 +191,11 @@ namespace Org.Apache.Rocketmq
                                 Logger.Warn("TooManyRequest: servers 
throttled");
                                 break;
                             }
+                            case rmq.Code.MessageNotFound:
+                            {
+                                Logger.Info("No message is found in the 
server");
+                                break;
+                            }
                             default:
                             {
                                 Logger.Warn("Unknown error status");
@@ -221,10 +226,12 @@ namespace Org.Apache.Rocketmq
 
         private Message Convert(string sourceHost, rmq::Message message)
         {
-            var msg = new Message();
-            msg.Topic = message.Topic.Name;
-            msg.MessageId = message.SystemProperties.MessageId;
-            msg.Tag = message.SystemProperties.Tag;
+            var msg = new Message
+            {
+                Topic = message.Topic.Name,
+                MessageId = message.SystemProperties.MessageId,
+                Tag = message.SystemProperties.Tag
+            };
 
             // Validate message body checksum
             byte[] raw = message.Body.ToByteArray();
diff --git a/csharp/rocketmq-client-csharp/IClient.cs 
b/csharp/rocketmq-client-csharp/IClient.cs
index 3352028..b1e992a 100644
--- a/csharp/rocketmq-client-csharp/IClient.cs
+++ b/csharp/rocketmq-client-csharp/IClient.cs
@@ -27,12 +27,10 @@ namespace Org.Apache.Rocketmq
 
         Task Heartbeat();
 
-        Task<bool> NotifyClientTermination();
+        Task<bool> NotifyClientTermination(rmq.Resource group);
 
         void BuildClientSetting(rmq::Settings settings);
-
-
-        void OnSettingsReceived(rmq::Settings settings);
+        
 
         CancellationTokenSource TelemetryCts();
     }
diff --git a/csharp/rocketmq-client-csharp/IClientConfig.cs 
b/csharp/rocketmq-client-csharp/IClientConfig.cs
index 438d7a8..57325b4 100644
--- a/csharp/rocketmq-client-csharp/IClientConfig.cs
+++ b/csharp/rocketmq-client-csharp/IClientConfig.cs
@@ -28,10 +28,6 @@ namespace Org.Apache.Rocketmq
 
         ICredentialsProvider credentialsProvider();
 
-        string tenantId();
-
-        TimeSpan getLongPollingTimeout();
-
         string getGroupName();
 
         string clientId();
diff --git a/csharp/rocketmq-client-csharp/Producer.cs 
b/csharp/rocketmq-client-csharp/Producer.cs
index e337b1a..7d55904 100644
--- a/csharp/rocketmq-client-csharp/Producer.cs
+++ b/csharp/rocketmq-client-csharp/Producer.cs
@@ -161,7 +161,7 @@ namespace Org.Apache.Rocketmq
             }
 
             var metadata = new Metadata();
-            Signature.sign(this, metadata);
+            Signature.Sign(this, metadata);
 
             Exception ex = null;
 
@@ -171,7 +171,7 @@ namespace Org.Apache.Rocketmq
                 {
                     var stopWatch = new Stopwatch();
                     stopWatch.Start();
-                    rmq::SendMessageResponse response = await 
_manager.SendMessage(target, metadata, request, RequestTimeout);
+                    rmq::SendMessageResponse response = await 
Manager.SendMessage(target, metadata, request, RequestTimeout);
                     if (null != response && rmq::Code.Ok == 
response.Status.Code)
                     {
                         var messageId = response.Entries[0].MessageId;
diff --git a/csharp/rocketmq-client-csharp/RpcClient.cs 
b/csharp/rocketmq-client-csharp/RpcClient.cs
index c1f1cd6..50cd8e9 100644
--- a/csharp/rocketmq-client-csharp/RpcClient.cs
+++ b/csharp/rocketmq-client-csharp/RpcClient.cs
@@ -34,9 +34,11 @@ namespace Org.Apache.Rocketmq
         protected static readonly Logger Logger = 
MqLogManager.Instance.GetCurrentClassLogger();
         private readonly rmq::MessagingService.MessagingServiceClient _stub;
         private readonly GrpcChannel _channel;
+        private readonly string _target;
 
         public RpcClient(string target)
         {
+            _target = target;
             _channel = GrpcChannel.ForAddress(target, new GrpcChannelOptions
             {
                 HttpHandler = CreateHttpHandler()
@@ -76,7 +78,7 @@ namespace Org.Apache.Rocketmq
 
         public AsyncDuplexStreamingCall<rmq::TelemetryCommand, 
rmq::TelemetryCommand> Telemetry(Metadata metadata)
         {
-            var deadline = DateTime.UtcNow.Add(TimeSpan.FromSeconds(3));
+            var deadline = DateTime.UtcNow.Add(TimeSpan.FromDays(3650));
             var callOptions = new CallOptions(metadata, deadline);
             return _stub.Telemetry(callOptions);
         }
@@ -125,15 +127,16 @@ namespace Org.Apache.Rocketmq
             var deadline = DateTime.UtcNow.Add(timeout);
             var callOptions = new CallOptions(metadata, deadline);
             var call = _stub.ReceiveMessage(request, callOptions);
+            Logger.Debug($"ReceiveMessageRequest has been written to 
{_target}");
             var result = new List<rmq::ReceiveMessageResponse>();
             var stream = call.ResponseStream;
             while (await stream.MoveNext())
             {
                 var entry = stream.Current;
-                Logger.Debug($"Got ReceiveMessageResponse {entry}");
+                Logger.Debug($"Got ReceiveMessageResponse {entry} from 
{_target}");
                 result.Add(entry);
             }
-            Logger.Debug($"Receiving of messages completed");
+            Logger.Debug($"Receiving messages from {_target} completed");
             return result;
         }
 
diff --git a/csharp/rocketmq-client-csharp/Session.cs 
b/csharp/rocketmq-client-csharp/Session.cs
index a6be057..4d09894 100644
--- a/csharp/rocketmq-client-csharp/Session.cs
+++ b/csharp/rocketmq-client-csharp/Session.cs
@@ -18,7 +18,7 @@
 using System.Threading;
 using System.Threading.Channels;
 using System.Threading.Tasks;
-using grpc = global::Grpc.Core;
+using grpc = Grpc.Core;
 using NLog;
 using rmq = Apache.Rocketmq.V2;
 
@@ -28,31 +28,33 @@ namespace Org.Apache.Rocketmq
     {
         private static readonly Logger Logger = 
MqLogManager.Instance.GetCurrentClassLogger();
 
-        public Session(string target,
+        public Session(string target, 
             grpc::AsyncDuplexStreamingCall<rmq::TelemetryCommand, 
rmq::TelemetryCommand> stream,
-            IClient client)
+            Client client)
         {
-            this._target = target;
-            this._stream = stream;
-            this._client = client;
-            this._channel = Channel.CreateUnbounded<bool>();
+            _target = target;
+            _stream = stream;
+            _client = client;
+            _channel = Channel.CreateUnbounded<bool>();
         }
 
         public async Task Loop()
         {
-            var reader = this._stream.ResponseStream;
-            var writer = this._stream.RequestStream;
-            var request = new rmq::TelemetryCommand();
-            request.Settings = new rmq::Settings();
+            var reader = _stream.ResponseStream;
+            var writer = _stream.RequestStream;
+            var request = new rmq::TelemetryCommand
+            {
+                Settings = new rmq::Settings()
+            };
             _client.BuildClientSetting(request.Settings);
             await writer.WriteAsync(request);
-            Logger.Debug($"Writing Client Settings Done: 
{request.Settings.ToString()}");
+            Logger.Debug($"Writing Client Settings to {_target} Done: 
{request.Settings}");
             while (!_client.TelemetryCts().IsCancellationRequested)
             {
                 if (await reader.MoveNext(_client.TelemetryCts().Token))
                 {
                     var cmd = reader.Current;
-                    Logger.Debug($"Received a TelemetryCommand: 
{cmd.ToString()}");
+                    Logger.Debug($"Received a TelemetryCommand from {_target}: 
{cmd}");
                     switch (cmd.CommandCase)
                     {
                         case rmq::TelemetryCommand.CommandOneofCase.None:
@@ -71,7 +73,7 @@ namespace Org.Apache.Rocketmq
                                     await _channel.Writer.WriteAsync(true);
                                 }
 
-                                Logger.Info($"Received settings from server 
{cmd.Settings.ToString()}");
+                                Logger.Info($"Received settings from 
{_target}: {cmd.Settings}");
                                 _client.OnSettingsReceived(cmd.Settings);
                                 break;
                             }
@@ -90,17 +92,10 @@ namespace Org.Apache.Rocketmq
                     }
                 }
             }
-            Logger.Info("Telemetry stream cancelled");
+            Logger.Info($"Telemetry stream for {_target} is cancelled");
             await writer.CompleteAsync();
         }
 
-        private string _target;
-
-        public string Target
-        {
-            get { return _target; }
-        }
-
         public async Task AwaitSettingNegotiationCompletion()
         {
             if (0 != Interlocked.Read(ref _established))
@@ -112,11 +107,12 @@ namespace Org.Apache.Rocketmq
             await _channel.Reader.ReadAsync();
         }
 
-        private grpc::AsyncDuplexStreamingCall<rmq::TelemetryCommand, 
rmq::TelemetryCommand> _stream;
-        private IClient _client;
+        private readonly grpc::AsyncDuplexStreamingCall<rmq::TelemetryCommand, 
rmq::TelemetryCommand> _stream;
+        private readonly Client _client;
 
-        private long _established = 0;
+        private long _established;
 
-        private Channel<bool> _channel;
+        private readonly Channel<bool> _channel;
+        private readonly string _target;
     };
 }
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/Signature.cs 
b/csharp/rocketmq-client-csharp/Signature.cs
index 2331b53..ec78171 100644
--- a/csharp/rocketmq-client-csharp/Signature.cs
+++ b/csharp/rocketmq-client-csharp/Signature.cs
@@ -16,22 +16,18 @@
  */
 using System;
 using System.Text;
-using grpc = global::Grpc.Core;
+using grpc = Grpc.Core;
 using System.Security.Cryptography;
 
 namespace Org.Apache.Rocketmq
 {
-    public class Signature
+    public static class Signature
     {
-        public static void sign(IClientConfig clientConfig, grpc::Metadata 
metadata)
+        public static void Sign(IClientConfig clientConfig, grpc::Metadata 
metadata)
         {
             metadata.Add(MetadataConstants.LANGUAGE_KEY, "DOTNET");
             metadata.Add(MetadataConstants.CLIENT_VERSION_KEY, "5.0.0");
             metadata.Add(MetadataConstants.CLIENT_ID_KEY, 
clientConfig.clientId());
-            if (!String.IsNullOrEmpty(clientConfig.tenantId()))
-            {
-                metadata.Add(MetadataConstants.TENANT_ID_KEY, 
clientConfig.tenantId());
-            }
 
             if (!String.IsNullOrEmpty(clientConfig.resourceNamespace()))
             {
diff --git a/csharp/rocketmq-client-csharp/SimpleConsumer.cs 
b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
index d4694ac..60f0ba9 100644
--- a/csharp/rocketmq-client-csharp/SimpleConsumer.cs
+++ b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
@@ -33,7 +33,6 @@ namespace Org.Apache.Rocketmq
         public SimpleConsumer(string accessUrl, string group)
         : base(accessUrl)
         {
-            _fifo = false;
             _subscriptions = new ConcurrentDictionary<string, 
rmq.SubscriptionEntry>();
             _topicAssignments = new ConcurrentDictionary<string, 
List<rmq.Assignment>>();
             _group = group;
@@ -44,15 +43,20 @@ namespace Org.Apache.Rocketmq
             base.BuildClientSetting(settings);
 
             settings.ClientType = rmq::ClientType.SimpleConsumer;
-            settings.Subscription = new rmq::Subscription();
-            settings.Subscription.Group = new rmq::Resource();
-            settings.Subscription.Group.Name = _group;
-            settings.Subscription.Group.ResourceNamespace = ResourceNamespace;
+            settings.Subscription = new rmq::Subscription
+            {
+                Group = new rmq::Resource
+                {
+                    Name = _group,
+                    ResourceNamespace = ResourceNamespace
+                }
+            };
 
             foreach (var kv in _subscriptions)
             {
                 settings.Subscription.Subscriptions.Add(kv.Value);
             }
+            Logger.Info($"ClientSettings built OK. {settings}");
         }
 
         public override async Task Start()
@@ -62,54 +66,70 @@ namespace Org.Apache.Rocketmq
             // Scan load assignment periodically
             Schedule(async () =>
             {
-                while (!_scanAssignmentCts.IsCancellationRequested)
-                {
-                    await ScanLoadAssignments();                    
-                }
+                Logger.Debug("Scan load assignments by schedule");
+                await ScanLoadAssignments();
             }, 30, _scanAssignmentCts.Token);
 
             await ScanLoadAssignments();
+            Logger.Debug("Step of #Start: ScanLoadAssignments completed");
         }
 
         public override async Task Shutdown()
         {
+            _scanAssignmentCts.Cancel();
             await base.Shutdown();
-            if (!await NotifyClientTermination())
+            var group = new rmq.Resource()
+            {
+                Name = _group,
+                ResourceNamespace = "",
+            };
+            if (!await NotifyClientTermination(group))
             {
                 Logger.Warn("Failed to NotifyClientTermination");
             }
         }
-
+        
+        /**
+         * For 5.x, we can assume there is a load-balancer before gateway 
nodes.
+         */
         private async Task ScanLoadAssignments()
         {
-
-            List<Task<List<rmq::Assignment>>> tasks = new 
List<Task<List<rmq.Assignment>>>();
-            List<string> topics = new List<string>();
+            var tasks = new List<Task<List<rmq.Assignment>>>();
+            var topics = new List<string>();
             foreach (var sub in _subscriptions)
             {
-                var request = new rmq::QueryAssignmentRequest();
-                request.Topic = new rmq::Resource();
-                request.Topic.ResourceNamespace = ResourceNamespace;
-                request.Topic.Name = sub.Key;
+                var request = new rmq::QueryAssignmentRequest
+                {
+                    Topic = new rmq::Resource
+                    {
+                        ResourceNamespace = ResourceNamespace,
+                        Name = sub.Key
+                    }
+                };
                 topics.Add(sub.Key);
-                request.Group = new rmq::Resource();
-                request.Group.Name = _group;
-                request.Group.ResourceNamespace = ResourceNamespace;
+                request.Group = new rmq::Resource
+                {
+                    Name = _group,
+                    ResourceNamespace = ResourceNamespace
+                };
 
-                request.Endpoints = new rmq::Endpoints();
-                request.Endpoints.Scheme = rmq.AddressScheme.Ipv4;
-                var address = new rmq::Address();
-                address.Host = AccessPoint.Host;
-                address.Port = AccessPoint.Port;
+                request.Endpoints = new rmq::Endpoints
+                {
+                    Scheme = AccessPoint.HostScheme()
+                };
+                var address = new rmq::Address
+                {
+                    Host = AccessPoint.Host,
+                    Port = AccessPoint.Port
+                };
                 request.Endpoints.Addresses.Add(address);
 
                 var metadata = new Metadata();
-                Signature.sign(this, metadata);
-                
tasks.Add(_manager.QueryLoadAssignment(AccessPoint.TargetUrl(), metadata, 
request, TimeSpan.FromSeconds(3)));
+                Signature.Sign(this, metadata);
+                tasks.Add(Manager.QueryLoadAssignment(AccessPoint.TargetUrl(), 
metadata, request, TimeSpan.FromSeconds(3)));
             }
 
-            List<rmq.Assignment>[] list = await Task.WhenAll(tasks);
-
+            var list = await Task.WhenAll(tasks);
             var i = 0;
             foreach (var assignments in list)
             {
@@ -129,36 +149,48 @@ namespace Org.Apache.Rocketmq
         protected override void PrepareHeartbeatData(rmq::HeartbeatRequest 
request)
         {
             request.ClientType = rmq::ClientType.SimpleConsumer;
-            request.Group = new rmq::Resource();
-            request.Group.Name = _group;
-            request.Group.ResourceNamespace = ResourceNamespace;
+            request.Group = new rmq::Resource
+            {
+                Name = _group,
+                ResourceNamespace = ResourceNamespace
+            };
         }
 
-        public void Subscribe(string topic, rmq::FilterType filterType, string 
expression)
+        public void Subscribe(string topic, FilterExpression filterExpression)
         {
-            var entry = new rmq::SubscriptionEntry();
-            entry.Topic = new rmq::Resource();
-            entry.Topic.Name = topic;
-            entry.Topic.ResourceNamespace = ResourceNamespace;
-            entry.Expression = new rmq::FilterExpression();
-            entry.Expression.Type = filterType;
-            entry.Expression.Expression = expression;
+            var entry = new rmq::SubscriptionEntry
+            {
+                Topic = new rmq::Resource
+                {
+                    Name = topic,
+                    ResourceNamespace = ResourceNamespace
+                },
+                Expression = new rmq::FilterExpression
+                {
+                    Type = filterExpression.Type switch {
+                        ExpressionType.TAG => rmq::FilterType.Tag,
+                        ExpressionType.SQL92 => rmq::FilterType.Sql,
+                        _ => rmq.FilterType.Tag
+                    },
+                    Expression = filterExpression.Expression
+                }
+            };
+
             _subscriptions.AddOrUpdate(topic, entry, (k, prev) => entry);
             AddTopicOfInterest(topic);
         }
 
-        public override void OnSettingsReceived(rmq.Settings settings)
+        internal override void OnSettingsReceived(rmq.Settings settings)
         {
             base.OnSettingsReceived(settings);
 
             if (settings.Subscription.Fifo)
             {
-                _fifo = true;
                 Logger.Info($"#OnSettingsReceived: Group {_group} is FIFO");
             }
         }
 
-        public async Task<List<Message>> Receive(int batchSize, TimeSpan 
timeout)
+        public async Task<List<Message>> Receive(int batchSize, TimeSpan 
invisibleDuration)
         {
             var messageQueue = NextQueue();
             if (null == messageQueue)
@@ -167,37 +199,48 @@ namespace Org.Apache.Rocketmq
                 return new List<Message>();
             }
 
-            var request = new rmq.ReceiveMessageRequest();
-            request.Group = new rmq.Resource();
-            request.Group.ResourceNamespace = ResourceNamespace;
-            request.Group.Name = _group;
+            var request = new rmq.ReceiveMessageRequest
+            {
+                Group = new rmq.Resource
+                {
+                    ResourceNamespace = ResourceNamespace,
+                    Name = _group
+                },
+                MessageQueue = new rmq.MessageQueue()
+            };
 
-            request.MessageQueue = new rmq.MessageQueue();
             request.MessageQueue.MergeFrom(messageQueue);
             request.BatchSize = batchSize;
+            request.InvisibleDuration = 
Duration.FromTimeSpan(invisibleDuration);
             
             // Client is responsible of extending message invisibility duration
             request.AutoRenew = false;
             
             var targetUrl = Utilities.TargetUrl(messageQueue);
             var metadata = new Metadata();
-            Signature.sign(this, metadata);
+            Signature.Sign(this, metadata);
             
-            return await _manager.ReceiveMessage(targetUrl, metadata, request, 
timeout);
+            return await Manager.ReceiveMessage(targetUrl, metadata, request, 
+                ClientSettings.Subscription.LongPollingTimeout.ToTimeSpan());
         }
 
 
         public async Task Ack(Message message)
         {
-            var request = new rmq.AckMessageRequest();
-            request.Group = new rmq.Resource();
-            request.Group.ResourceNamespace = ResourceNamespace;
-            request.Group.Name = _group;
+            var request = new rmq.AckMessageRequest
+            {
+                Group = new rmq.Resource
+                {
+                    ResourceNamespace = ResourceNamespace,
+                    Name = _group
+                },
+                Topic = new rmq.Resource
+                {
+                    ResourceNamespace = ResourceNamespace,
+                    Name = message.Topic
+                }
+            };
 
-            request.Topic = new rmq.Resource();
-            request.Topic.ResourceNamespace = ResourceNamespace;
-            request.Topic.Name = message.Topic;
-            
             var entry = new rmq.AckMessageEntry();
             request.Entries.Add(entry);
             entry.MessageId = message.MessageId;
@@ -205,30 +248,33 @@ namespace Org.Apache.Rocketmq
 
             var targetUrl = message._sourceHost;
             var metadata = new Metadata();
-            Signature.sign(this, metadata);
-            await _manager.Ack(targetUrl, metadata, request, RequestTimeout);
+            Signature.Sign(this, metadata);
+            await Manager.Ack(targetUrl, metadata, request, RequestTimeout);
         }
 
         public async Task ChangeInvisibleDuration(Message message, TimeSpan 
invisibleDuration)
         {
-            var request = new rmq.ChangeInvisibleDurationRequest();
-            request.Group = new rmq.Resource();
-            request.Group.ResourceNamespace = ResourceNamespace;
-            request.Group.Name = _group;
-
-            request.Topic = new rmq.Resource();
-            request.Topic.ResourceNamespace = ResourceNamespace;
-            request.Topic.Name = message.Topic;
-
-            request.ReceiptHandle = message._receiptHandle;
-            request.MessageId = message.MessageId;
-            
-            request.InvisibleDuration = 
Duration.FromTimeSpan(invisibleDuration);
+            var request = new rmq.ChangeInvisibleDurationRequest
+            {
+                Group = new rmq.Resource
+                {
+                    ResourceNamespace = ResourceNamespace,
+                    Name = _group
+                },
+                Topic = new rmq.Resource
+                {
+                    ResourceNamespace = ResourceNamespace,
+                    Name = message.Topic
+                },
+                ReceiptHandle = message._receiptHandle,
+                MessageId = message.MessageId,
+                InvisibleDuration = Duration.FromTimeSpan(invisibleDuration)
+            };
 
             var targetUrl = message._sourceHost;
             var metadata = new Metadata();
-            Signature.sign(this, metadata);
-            await _manager.ChangeInvisibleDuration(targetUrl, metadata, 
request, RequestTimeout);
+            Signature.Sign(this, metadata);
+            await Manager.ChangeInvisibleDuration(targetUrl, metadata, 
request, RequestTimeout);
         }
         
         private rmq.MessageQueue NextQueue()
@@ -238,35 +284,35 @@ namespace Org.Apache.Rocketmq
                 return null;
             }
             
-            UInt32 topicSeq = CurrentTopicSequence.Value;
-            CurrentTopicSequence.Value = topicSeq + 1;
+            var topicSeq = _currentTopicSequence.Value;
+            _currentTopicSequence.Value = topicSeq + 1;
 
             var total = _topicAssignments.Count;
             var topicIndex = topicSeq % total;
             var topic = _topicAssignments.Keys.Skip((int)topicIndex).First();
             
-            UInt32 queueSeq = CurrentQueueSequence.Value;
-            CurrentQueueSequence.Value = queueSeq + 1;
-            List<rmq.Assignment> assignments;
-            if (_topicAssignments.TryGetValue(topic, out assignments))
+            UInt32 queueSeq = _currentQueueSequence.Value;
+            _currentQueueSequence.Value = queueSeq + 1;
+            if (!_topicAssignments.TryGetValue(topic, out var assignments))
             {
-                if (null == assignments)
-                {
-                    return null;
-                }
-                var idx = queueSeq % assignments.Count;
-                return assignments[(int)idx].MessageQueue;
-
+                return null;
             }
 
-            return null;
+            var idx = queueSeq % assignments?.Count;
+            return assignments?[(int)idx].MessageQueue;
         }
 
-        private ThreadLocal<UInt32> CurrentTopicSequence = new 
ThreadLocal<UInt32>(true);
-        private ThreadLocal<UInt32> CurrentQueueSequence = new 
ThreadLocal<UInt32>(true);
+        private readonly ThreadLocal<UInt32> _currentTopicSequence = new 
ThreadLocal<UInt32>(true)
+        {
+            Value = 0
+        };
+        
+        private readonly ThreadLocal<UInt32> _currentQueueSequence = new 
ThreadLocal<UInt32>(true)
+        {
+            Value = 0
+        };
 
         private readonly string _group;
-        private bool _fifo;
         private readonly ConcurrentDictionary<string, rmq::SubscriptionEntry> 
_subscriptions;
         private readonly ConcurrentDictionary<string, List<rmq.Assignment>> 
_topicAssignments;
         private readonly CancellationTokenSource _scanAssignmentCts = new 
CancellationTokenSource();
diff --git a/csharp/tests/ClientManagerTest.cs 
b/csharp/tests/ClientManagerTest.cs
index af5983c..e12c027 100644
--- a/csharp/tests/ClientManagerTest.cs
+++ b/csharp/tests/ClientManagerTest.cs
@@ -48,7 +48,7 @@ namespace Org.Apache.Rocketmq
             clientConfig.CredentialsProvider = credentialsProvider;
             clientConfig.ResourceNamespace = resourceNamespace;
             clientConfig.Region = "cn-hangzhou-pre";
-            Signature.sign(clientConfig, metadata);
+            Signature.Sign(clientConfig, metadata);
             var clientManager = new ClientManager();
             string target = "https://116.62.231.199:80";;
             var topicRouteData = clientManager.ResolveRoute(target, metadata, 
request, TimeSpan.FromSeconds(3)).GetAwaiter().GetResult();
diff --git a/csharp/tests/ProducerTest.cs b/csharp/tests/ProducerTest.cs
index b5094a2..bcf36a5 100644
--- a/csharp/tests/ProducerTest.cs
+++ b/csharp/tests/ProducerTest.cs
@@ -26,30 +26,14 @@ namespace tests
     [TestClass]
     public class ProducerTest
     {
-
-        private static AccessPoint _accessPoint;
-
-        [ClassInitialize]
-        public static void SetUp(TestContext context)
-        {
-            _accessPoint = new AccessPoint
-            {
-                Host = HOST,
-                Port = PORT
-            };
-        }
-
-        [ClassCleanup]
-        public static void TearDown()
-        {
-        }
-
         [TestMethod]
         public async Task TestLifecycle()
         {
-            var producer = new Producer($"{HOST}:{PORT}");
-            producer.CredentialsProvider = new ConfigFileCredentialsProvider();
-            producer.Region = "cn-hangzhou-pre";
+            var producer = new Producer($"{HOST}:{PORT}")
+            {
+                CredentialsProvider = new ConfigFileCredentialsProvider(),
+                Region = "cn-hangzhou-pre"
+            };
             await producer.Start();
             await producer.Shutdown();
         }
@@ -57,21 +41,26 @@ namespace tests
         [TestMethod]
         public async Task TestSendStandardMessage()
         {
-            var producer = new Producer($"{HOST}:{PORT}");
-            producer.CredentialsProvider = new ConfigFileCredentialsProvider();
-            producer.Region = "cn-hangzhou-pre";
+            var producer = new Producer($"{HOST}:{PORT}")
+            {
+                CredentialsProvider = new ConfigFileCredentialsProvider(),
+                Region = "cn-hangzhou-pre"
+            };
             await producer.Start();
             byte[] body = new byte[1024];
             Array.Fill(body, (byte)'x');
-            var msg = new Message(topic, body);
-            
-            // Tag the massage. A message has at most one tag.
-            msg.Tag = "Tag-0";
-            
+            var msg = new Message(TOPIC, body)
+            {
+                // Tag the massage. A message has at most one tag.
+                Tag = "Tag-0"
+            };
+
             // Associate the message with one or multiple keys
-            var keys = new List<string>();
-            keys.Add("k1");
-            keys.Add("k2");
+            var keys = new List<string>
+            {
+                "k1",
+                "k2"
+            };
             msg.Keys = keys;
             
             var sendResult = await producer.Send(msg);
@@ -82,23 +71,28 @@ namespace tests
         [TestMethod]
         public async Task TestSendMultipleMessages()
         {
-            var producer = new Producer($"{HOST}:{PORT}");
-            producer.CredentialsProvider = new ConfigFileCredentialsProvider();
-            producer.Region = "cn-hangzhou-pre";
+            var producer = new Producer($"{HOST}:{PORT}")
+            {
+                CredentialsProvider = new ConfigFileCredentialsProvider(),
+                Region = "cn-hangzhou-pre"
+            };
             await producer.Start();
             byte[] body = new byte[1024];
             Array.Fill(body, (byte)'x');
             for (var i = 0; i < 128; i++)
             {
-                var msg = new Message(topic, body);
-            
-                // Tag the massage. A message has at most one tag.
-                msg.Tag = "Tag-0";
-            
+                var msg = new Message(TOPIC, body)
+                {
+                    // Tag the massage. A message has at most one tag.
+                    Tag = "Tag-0"
+                };
+
                 // Associate the message with one or multiple keys
-                var keys = new List<string>();
-                keys.Add("k1");
-                keys.Add("k2");
+                var keys = new List<string>
+                {
+                    "k1",
+                    "k2"
+                };
                 msg.Keys = keys;
                 var sendResult = await producer.Send(msg);
                 Assert.IsNotNull(sendResult);                
@@ -109,17 +103,20 @@ namespace tests
         [TestMethod]
         public async Task TestSendFifoMessage()
         {
-            var producer = new Producer($"{HOST}:{PORT}");
-            producer.CredentialsProvider = new ConfigFileCredentialsProvider();
-            producer.Region = "cn-hangzhou-pre";
+            var producer = new Producer($"{HOST}:{PORT}")
+            {
+                CredentialsProvider = new ConfigFileCredentialsProvider(),
+                Region = "cn-hangzhou-pre"
+            };
             await producer.Start();
             byte[] body = new byte[1024];
             Array.Fill(body, (byte)'x');
-            var msg = new Message(topic, body);
-            
-            // Messages of the same group will get delivered one after 
another. 
-            msg.MessageGroup = "message-group-0";
-            
+            var msg = new Message(TOPIC, body)
+            {
+                // Messages of the same group will get delivered one after 
another. 
+                MessageGroup = "message-group-0"
+            };
+
             // Verify messages are FIFO iff their message group is not null or 
empty.
             Assert.IsTrue(msg.Fifo());
 
@@ -131,15 +128,19 @@ namespace tests
         [TestMethod]
         public async Task TestSendScheduledMessage()
         {
-            var producer = new Producer($"{HOST}:{PORT}");
-            producer.CredentialsProvider = new ConfigFileCredentialsProvider();
-            producer.Region = "cn-hangzhou-pre";
+            var producer = new Producer($"{HOST}:{PORT}")
+            {
+                CredentialsProvider = new ConfigFileCredentialsProvider(),
+                Region = "cn-hangzhou-pre"
+            };
             await producer.Start();
             byte[] body = new byte[1024];
             Array.Fill(body, (byte)'x');
-            var msg = new Message(topic, body);
-            
-            msg.DeliveryTimestamp = DateTime.UtcNow + TimeSpan.FromSeconds(10);
+            var msg = new Message(TOPIC, body)
+            {
+                DeliveryTimestamp = DateTime.UtcNow + TimeSpan.FromSeconds(10)
+            };
+
             Assert.IsTrue(msg.Scheduled());
             
             var sendResult = await producer.Send(msg);
@@ -154,15 +155,19 @@ namespace tests
         [TestMethod]
         public async Task TestSendMessage_Failure()
         {
-            var producer = new Producer($"{HOST}:{PORT}");
-            producer.CredentialsProvider = new ConfigFileCredentialsProvider();
-            producer.Region = "cn-hangzhou-pre";
+            var producer = new Producer($"{HOST}:{PORT}")
+            {
+                CredentialsProvider = new ConfigFileCredentialsProvider(),
+                Region = "cn-hangzhou-pre"
+            };
             await producer.Start();
             byte[] body = new byte[1024];
             Array.Fill(body, (byte)'x');
-            var msg = new Message(topic, body);
-            msg.MessageGroup = "Group-0";
-            msg.DeliveryTimestamp = DateTime.UtcNow + TimeSpan.FromSeconds(10);
+            var msg = new Message(TOPIC, body)
+            {
+                MessageGroup = "Group-0",
+                DeliveryTimestamp = DateTime.UtcNow + TimeSpan.FromSeconds(10)
+            };
             Assert.IsTrue(msg.Scheduled());
 
             try
@@ -176,10 +181,9 @@ namespace tests
             await producer.Shutdown();
         }
         
-        private static string topic = "cpp_sdk_standard";
-
-        private static string HOST = "127.0.0.1";
-        private static int PORT = 8081;
+        private const string TOPIC = "cpp_sdk_standard";
+        private const string HOST = "127.0.0.1";
+        private const int PORT = 8081;
     }
 
 }
\ No newline at end of file
diff --git a/csharp/tests/RpcClientTest.cs b/csharp/tests/RpcClientTest.cs
index a1ecf82..b438047 100644
--- a/csharp/tests/RpcClientTest.cs
+++ b/csharp/tests/RpcClientTest.cs
@@ -39,7 +39,7 @@ namespace Org.Apache.Rocketmq
             var rpc_client = new RpcClient(target);
             var client_config = new ClientConfig();
             var metadata = new grpc::Metadata();
-            Signature.sign(client_config, metadata);
+            Signature.Sign(client_config, metadata);
 
             var cmd = new rmq::TelemetryCommand();
             cmd.Settings = new rmq::Settings();
@@ -107,7 +107,7 @@ namespace Org.Apache.Rocketmq
             var rpc_client = new RpcClient(target);
             var client_config = new ClientConfig();
             var metadata = new grpc::Metadata();
-            Signature.sign(client_config, metadata);
+            Signature.Sign(client_config, metadata);
             var request = new rmq::QueryRouteRequest();
             request.Topic = new rmq::Resource();
             request.Topic.Name = "cpp_sdk_standard";
@@ -128,7 +128,7 @@ namespace Org.Apache.Rocketmq
             var rpc_client = new RpcClient(target);
             var client_config = new ClientConfig();
             var metadata = new grpc::Metadata();
-            Signature.sign(client_config, metadata);
+            Signature.Sign(client_config, metadata);
 
             var request = new rmq::SendMessageRequest();
             var message = new rmq::Message();
diff --git a/csharp/tests/SignatureTest.cs b/csharp/tests/SignatureTest.cs
index 16d0f46..12d1c10 100644
--- a/csharp/tests/SignatureTest.cs
+++ b/csharp/tests/SignatureTest.cs
@@ -15,11 +15,11 @@
  * limitations under the License.
  */
 using Microsoft.VisualStudio.TestTools.UnitTesting;
-using grpc = global::Grpc.Core;
+using grpc = Grpc.Core;
 using Moq;
-using System;
+using Org.Apache.Rocketmq;
 
-namespace Org.Apache.Rocketmq
+namespace tests
 {
 
     [TestClass]
@@ -27,11 +27,10 @@ namespace Org.Apache.Rocketmq
     {
 
         [TestMethod]
-        public void testSign()
+        public void TestSign()
         {
             var mock = new Mock<IClientConfig>();
             mock.Setup(x => x.getGroupName()).Returns("G1");
-            mock.Setup(x => x.tenantId()).Returns("Tenant-id");
             mock.Setup(x => x.resourceNamespace()).Returns("mq:arn:test:");
             mock.Setup(x => x.serviceName()).Returns("mq");
             mock.Setup(x => x.region()).Returns("cn-hangzhou");
@@ -42,7 +41,7 @@ namespace Org.Apache.Rocketmq
             mock.Setup(x => 
x.credentialsProvider()).Returns(credentialsProvider);
 
             var metadata = new grpc::Metadata();
-            Signature.sign(mock.Object, metadata);
+            Signature.Sign(mock.Object, metadata);
             Assert.IsNotNull(metadata.Get(MetadataConstants.AUTHORIZATION));
         }
     }
diff --git a/csharp/tests/SimpleConsumerTest.cs 
b/csharp/tests/SimpleConsumerTest.cs
index e5fc8f0..6f49eb1 100644
--- a/csharp/tests/SimpleConsumerTest.cs
+++ b/csharp/tests/SimpleConsumerTest.cs
@@ -39,7 +39,7 @@ namespace tests
         public async Task TestLifecycle()
         {
             var simpleConsumer = new SimpleConsumer($"{HOST}:{PORT}", _group);
-            simpleConsumer.Subscribe(_topic, rmq::FilterType.Tag, "*");
+            simpleConsumer.Subscribe(_topic, new FilterExpression("*", 
ExpressionType.TAG));
             await simpleConsumer.Start();
             Thread.Sleep(1_000);
             await simpleConsumer.Shutdown();
@@ -49,11 +49,11 @@ namespace tests
         public async Task TestReceive()
         {
             var simpleConsumer = new SimpleConsumer($"{HOST}:{PORT}", _group);
-            simpleConsumer.Subscribe(_topic, rmq::FilterType.Tag, "*");
+            simpleConsumer.Subscribe(_topic, new FilterExpression("*", 
ExpressionType.TAG));
             await simpleConsumer.Start();
             var batchSize = 32;
-            var receiveTimeout = TimeSpan.FromSeconds(10);
-            var messages  = await simpleConsumer.Receive(batchSize, 
receiveTimeout);
+            var invisibleDuration = TimeSpan.FromSeconds(10);
+            var messages  = await simpleConsumer.Receive(batchSize, 
invisibleDuration);
             Assert.IsTrue(messages.Count > 0);
             Assert.IsTrue(messages.Count <= batchSize);
             await simpleConsumer.Shutdown();
@@ -64,11 +64,11 @@ namespace tests
         public async Task TestAck()
         {
             var simpleConsumer = new SimpleConsumer($"{HOST}:{PORT}", _group);
-            simpleConsumer.Subscribe(_topic, rmq::FilterType.Tag, "*");
+            simpleConsumer.Subscribe(_topic, new FilterExpression("*", 
ExpressionType.TAG));
             await simpleConsumer.Start();
             var batchSize = 32;
-            var receiveTimeout = TimeSpan.FromSeconds(10);
-            var messages  = await simpleConsumer.Receive(batchSize, 
receiveTimeout);
+            var invisibleDuration = TimeSpan.FromSeconds(10);
+            var messages  = await simpleConsumer.Receive(batchSize, 
invisibleDuration);
             foreach (var message in messages)
             {
                 await simpleConsumer.Ack(message);
@@ -81,11 +81,11 @@ namespace tests
         public async Task TestChangeInvisibleDuration()
         {
             var simpleConsumer = new SimpleConsumer($"{HOST}:{PORT}", _group);
-            simpleConsumer.Subscribe(_topic, rmq::FilterType.Tag, "*");
+            simpleConsumer.Subscribe(_topic, new FilterExpression("*", 
ExpressionType.TAG));
             await simpleConsumer.Start();
             var batchSize = 32;
-            var receiveTimeout = TimeSpan.FromSeconds(10);
-            var messages  = await simpleConsumer.Receive(batchSize, 
receiveTimeout);
+            var invisibleDuration = TimeSpan.FromSeconds(10);
+            var messages  = await simpleConsumer.Receive(batchSize, 
invisibleDuration);
             foreach (var message in messages)
             {
                 await simpleConsumer.ChangeInvisibleDuration(message, 
TimeSpan.FromSeconds(10));

Reply via email to