This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch csharp_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/csharp_dev by this push:
     new 0185295  Complete simple consumer example
0185295 is described below

commit 0185295c5412e017b19e0b8b59de2c6c2e36576b
Author: Zhanhui Li <[email protected]>
AuthorDate: Thu Sep 1 17:26:55 2022 +0800

    Complete simple consumer example
---
 csharp/examples/Program.cs                      | 44 ++++++++++++-------------
 csharp/rocketmq-client-csharp/Client.cs         |  9 ++---
 csharp/rocketmq-client-csharp/ClientManager.cs  |  5 +++
 csharp/rocketmq-client-csharp/IClient.cs        |  2 +-
 csharp/rocketmq-client-csharp/RpcClient.cs      |  7 ++--
 csharp/rocketmq-client-csharp/SimpleConsumer.cs |  7 +++-
 6 files changed, 44 insertions(+), 30 deletions(-)

diff --git a/csharp/examples/Program.cs b/csharp/examples/Program.cs
index f3c6027..abc89ce 100644
--- a/csharp/examples/Program.cs
+++ b/csharp/examples/Program.cs
@@ -123,27 +123,27 @@ namespace examples
         static async Task Main(string[] args)
         {
             var credentialsProvider = new ConfigFileCredentialsProvider();
-            // var producer = new Producer(ACCESS_URL)
-            // {
-            //     CredentialsProvider = credentialsProvider
-            // };
-            // producer.AddTopicOfInterest(STANDARD_TOPIC);
-            // producer.AddTopicOfInterest(FIFO_TOPIC);
-            // producer.AddTopicOfInterest(TIMED_TOPIC);
-            // producer.AddTopicOfInterest(TRANSACTIONAL_TOPIC);
-            //
-            // await producer.Start();
-            //
-            // var sendReceiptOfStandardMessage = await 
SendStandardMessage(producer);
-            // Console.WriteLine($"Standard message-id: 
{sendReceiptOfStandardMessage.MessageId}");
-            //
-            // var sendReceiptOfFifoMessage = await SendFifoMessage(producer);
-            // Console.WriteLine($"FIFO message-id: 
{sendReceiptOfFifoMessage.MessageId}");
-            //
-            // var sendReceiptOfTimedMessage = await 
SendTimedMessage(producer);
-            // Console.WriteLine($"Timed message-id: 
{sendReceiptOfTimedMessage.MessageId}");
-            //
-            // await producer.Shutdown();
+            var producer = new Producer(ACCESS_URL)
+            {
+                CredentialsProvider = credentialsProvider
+            };
+            producer.AddTopicOfInterest(STANDARD_TOPIC);
+            producer.AddTopicOfInterest(FIFO_TOPIC);
+            producer.AddTopicOfInterest(TIMED_TOPIC);
+            producer.AddTopicOfInterest(TRANSACTIONAL_TOPIC);
+            
+            await producer.Start();
+            
+            var sendReceiptOfStandardMessage = await 
SendStandardMessage(producer);
+            Console.WriteLine($"Standard message-id: 
{sendReceiptOfStandardMessage.MessageId}");
+            
+            var sendReceiptOfFifoMessage = await SendFifoMessage(producer);
+            Console.WriteLine($"FIFO message-id: 
{sendReceiptOfFifoMessage.MessageId}");
+            
+            var sendReceiptOfTimedMessage = await SendTimedMessage(producer);
+            Console.WriteLine($"Timed message-id: 
{sendReceiptOfTimedMessage.MessageId}");
+            
+            await producer.Shutdown();
 
             Console.WriteLine("Now start a simple consumer");
             var simpleConsumer = new SimpleConsumer(ACCESS_URL, 
CONCURRENT_GROUP)
@@ -156,7 +156,7 @@ namespace examples
 
             await ConsumeAndAckMessages(simpleConsumer);
 
-            // await simpleConsumer.Shutdown();
+            await simpleConsumer.Shutdown();
 
             Console.ReadKey();
         }
diff --git a/csharp/rocketmq-client-csharp/Client.cs 
b/csharp/rocketmq-client-csharp/Client.cs
index 675b9eb..2e6a6ec 100644
--- a/csharp/rocketmq-client-csharp/Client.cs
+++ b/csharp/rocketmq-client-csharp/Client.cs
@@ -449,12 +449,13 @@ namespace Org.Apache.Rocketmq
             return await Manager.ChangeInvisibleDuration(target, metadata, 
request, RequestTimeout);
         }
 
-        public async Task<bool> NotifyClientTermination()
+        public async Task<bool> NotifyClientTermination(rmq.Resource group)
         {
             List<string> endpoints = AvailableBrokerEndpoints();
-            var request = new rmq::NotifyClientTerminationRequest();
-
-
+            var request = new rmq::NotifyClientTerminationRequest
+            {
+                Group = group
+            };
             var metadata = new grpc.Metadata();
             Signature.Sign(this, metadata);
 
diff --git a/csharp/rocketmq-client-csharp/ClientManager.cs 
b/csharp/rocketmq-client-csharp/ClientManager.cs
index a64cdf7..c2e00e8 100644
--- a/csharp/rocketmq-client-csharp/ClientManager.cs
+++ b/csharp/rocketmq-client-csharp/ClientManager.cs
@@ -191,6 +191,11 @@ namespace Org.Apache.Rocketmq
                                 Logger.Warn("TooManyRequest: servers 
throttled");
                                 break;
                             }
+                            case rmq.Code.MessageNotFound:
+                            {
+                                Logger.Info("No message is found in the 
server");
+                                break;
+                            }
                             default:
                             {
                                 Logger.Warn("Unknown error status");
diff --git a/csharp/rocketmq-client-csharp/IClient.cs 
b/csharp/rocketmq-client-csharp/IClient.cs
index a96e940..b1e992a 100644
--- a/csharp/rocketmq-client-csharp/IClient.cs
+++ b/csharp/rocketmq-client-csharp/IClient.cs
@@ -27,7 +27,7 @@ namespace Org.Apache.Rocketmq
 
         Task Heartbeat();
 
-        Task<bool> NotifyClientTermination();
+        Task<bool> NotifyClientTermination(rmq.Resource group);
 
         void BuildClientSetting(rmq::Settings settings);
         
diff --git a/csharp/rocketmq-client-csharp/RpcClient.cs 
b/csharp/rocketmq-client-csharp/RpcClient.cs
index dc9d753..50cd8e9 100644
--- a/csharp/rocketmq-client-csharp/RpcClient.cs
+++ b/csharp/rocketmq-client-csharp/RpcClient.cs
@@ -34,9 +34,11 @@ namespace Org.Apache.Rocketmq
         protected static readonly Logger Logger = 
MqLogManager.Instance.GetCurrentClassLogger();
         private readonly rmq::MessagingService.MessagingServiceClient _stub;
         private readonly GrpcChannel _channel;
+        private readonly string _target;
 
         public RpcClient(string target)
         {
+            _target = target;
             _channel = GrpcChannel.ForAddress(target, new GrpcChannelOptions
             {
                 HttpHandler = CreateHttpHandler()
@@ -125,15 +127,16 @@ namespace Org.Apache.Rocketmq
             var deadline = DateTime.UtcNow.Add(timeout);
             var callOptions = new CallOptions(metadata, deadline);
             var call = _stub.ReceiveMessage(request, callOptions);
+            Logger.Debug($"ReceiveMessageRequest has been written to 
{_target}");
             var result = new List<rmq::ReceiveMessageResponse>();
             var stream = call.ResponseStream;
             while (await stream.MoveNext())
             {
                 var entry = stream.Current;
-                Logger.Debug($"Got ReceiveMessageResponse {entry}");
+                Logger.Debug($"Got ReceiveMessageResponse {entry} from 
{_target}");
                 result.Add(entry);
             }
-            Logger.Debug($"Receiving of messages completed");
+            Logger.Debug($"Receiving messages from {_target} completed");
             return result;
         }
 
diff --git a/csharp/rocketmq-client-csharp/SimpleConsumer.cs 
b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
index b9d83ce..60f0ba9 100644
--- a/csharp/rocketmq-client-csharp/SimpleConsumer.cs
+++ b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
@@ -78,7 +78,12 @@ namespace Org.Apache.Rocketmq
         {
             _scanAssignmentCts.Cancel();
             await base.Shutdown();
-            if (!await NotifyClientTermination())
+            var group = new rmq.Resource()
+            {
+                Name = _group,
+                ResourceNamespace = "",
+            };
+            if (!await NotifyClientTermination(group))
             {
                 Logger.Warn("Failed to NotifyClientTermination");
             }

Reply via email to