This is an automated email from the ASF dual-hosted git repository. lizhanhui pushed a commit to branch observability in repository https://gitbox.apache.org/repos/asf/rocketmq-client-csharp.git
commit 86083310e94f41a27d470f6b8d63b27394e63168 Author: Li Zhanhui <[email protected]> AuthorDate: Tue Jun 21 16:44:26 2022 +0800 WIP --- rocketmq-client-csharp/Client.cs | 15 +++++++++++++++ rocketmq-client-csharp/IClient.cs | 1 + rocketmq-client-csharp/Producer.cs | 2 ++ rocketmq-client-csharp/Session.cs | 2 +- rocketmq-client-csharp/SimpleConsumer.cs | 16 ++++++++++++++++ 5 files changed, 35 insertions(+), 1 deletion(-) diff --git a/rocketmq-client-csharp/Client.cs b/rocketmq-client-csharp/Client.cs index 1eb368e..607daf4 100644 --- a/rocketmq-client-csharp/Client.cs +++ b/rocketmq-client-csharp/Client.cs @@ -398,6 +398,21 @@ namespace Org.Apache.Rocketmq return true; } + public virtual void OnReceive(rmq::Settings settings) + { + if (null != settings.Metric) + { + _clientSettings.Metric = new rmq::Metric(); + _clientSettings.Metric.MergeFrom(settings.Metric); + } + + if (null != settings.BackoffPolicy) + { + _clientSettings.BackoffPolicy = new rmq::RetryPolicy(); + _clientSettings.BackoffPolicy.MergeFrom(settings.BackoffPolicy); + } + } + protected readonly IClientManager Manager; private readonly ConcurrentDictionary<string, TopicRouteData> _topicRouteTable; diff --git a/rocketmq-client-csharp/IClient.cs b/rocketmq-client-csharp/IClient.cs index abdcc21..4b7206b 100644 --- a/rocketmq-client-csharp/IClient.cs +++ b/rocketmq-client-csharp/IClient.cs @@ -31,5 +31,6 @@ namespace Org.Apache.Rocketmq void BuildClientSetting(rmq::Settings settings); + void OnReceive(rmq::Settings settings); } } \ No newline at end of file diff --git a/rocketmq-client-csharp/Producer.cs b/rocketmq-client-csharp/Producer.cs index bfcc1d3..32606ae 100644 --- a/rocketmq-client-csharp/Producer.cs +++ b/rocketmq-client-csharp/Producer.cs @@ -47,7 +47,9 @@ namespace Org.Apache.Rocketmq protected override void PrepareHeartbeatData(rmq::HeartbeatRequest request) { + request.ClientType = rmq::ClientType.Producer; + // Concept of ProducerGroup has been removed. } public async Task<SendReceipt> Send(Message message) diff --git a/rocketmq-client-csharp/Session.cs b/rocketmq-client-csharp/Session.cs index 3e234f2..f5e7795 100644 --- a/rocketmq-client-csharp/Session.cs +++ b/rocketmq-client-csharp/Session.cs @@ -60,8 +60,8 @@ namespace Org.Apache.Rocketmq } case rmq::TelemetryCommand.CommandOneofCase.Settings: { - Logger.Info($"Received settings from server {cmd.Settings.ToString()}"); + _client.OnReceive(cmd.Settings); break; } case rmq::TelemetryCommand.CommandOneofCase.PrintThreadStackTraceCommand: diff --git a/rocketmq-client-csharp/SimpleConsumer.cs b/rocketmq-client-csharp/SimpleConsumer.cs index 4c447c9..9eaf365 100644 --- a/rocketmq-client-csharp/SimpleConsumer.cs +++ b/rocketmq-client-csharp/SimpleConsumer.cs @@ -19,6 +19,7 @@ using rmq = Apache.Rocketmq.V2; using NLog; using System.Collections.Generic; using System.Collections.Concurrent; +using Apache.Rocketmq.V2; namespace Org.Apache.Rocketmq { @@ -63,6 +64,10 @@ namespace Org.Apache.Rocketmq protected override void PrepareHeartbeatData(rmq::HeartbeatRequest request) { + request.ClientType = rmq::ClientType.SimpleConsumer; + request.Group = new rmq::Resource(); + request.Group.Name = Group; + request.Group.ResourceNamespace = ResourceNamespace; } public void Subscribe(string topic, rmq::FilterType filterType, string expression) @@ -77,6 +82,17 @@ namespace Org.Apache.Rocketmq subscriptions_.AddOrUpdate(topic, entry, (k, prev) => { return entry; }); } + public override void OnReceive(Settings settings) + { + base.OnReceive(settings); + + if (settings.Subscription.Fifo) + { + fifo_ = true; + } + + } + private string group_; public string Group
