This is an automated email from the ASF dual-hosted git repository. aaronai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit 8f7418ed80b62ac163b20ea33aaa0058fa9279fe Author: Aaron Ai <[email protected]> AuthorDate: Mon Feb 13 17:27:23 2023 +0800 Bugfix: ICollection is read-only --- csharp/examples/ProducerBenchmark.cs | 32 +++++++++++++++++++++++++ csharp/rocketmq-client-csharp/Client.cs | 7 ++++-- csharp/rocketmq-client-csharp/IClient.cs | 2 +- csharp/rocketmq-client-csharp/Producer.cs | 19 +++++++-------- csharp/rocketmq-client-csharp/Session.cs | 5 ++-- csharp/rocketmq-client-csharp/SimpleConsumer.cs | 27 ++++++++++++++------- 6 files changed, 67 insertions(+), 25 deletions(-) diff --git a/csharp/examples/ProducerBenchmark.cs b/csharp/examples/ProducerBenchmark.cs new file mode 100644 index 00000000..361aa95d --- /dev/null +++ b/csharp/examples/ProducerBenchmark.cs @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System.Threading.Tasks; +using NLog; +using Org.Apache.Rocketmq; + +namespace examples +{ + public class ProducerBenchmark + { + private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger(); + + internal static async Task QuickStart() + { + } + } +} \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/Client.cs b/csharp/rocketmq-client-csharp/Client.cs index 3b260022..133fed00 100644 --- a/csharp/rocketmq-client-csharp/Client.cs +++ b/csharp/rocketmq-client-csharp/Client.cs @@ -333,7 +333,7 @@ namespace Org.Apache.Rocketmq return _telemetryCts; } - public abstract Proto.Settings GetSettings(); + public abstract Settings GetSettings(); public string GetClientId() { @@ -358,6 +358,9 @@ namespace Org.Apache.Rocketmq { } - public abstract void OnSettingsCommand(Endpoints endpoints, Proto.Settings settings); + public void OnSettingsCommand(Endpoints endpoints, Proto.Settings settings) + { + GetSettings().Sync(settings); + } } } \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/IClient.cs b/csharp/rocketmq-client-csharp/IClient.cs index 5ba4c6f1..fc4c0127 100644 --- a/csharp/rocketmq-client-csharp/IClient.cs +++ b/csharp/rocketmq-client-csharp/IClient.cs @@ -27,7 +27,7 @@ namespace Org.Apache.Rocketmq ClientConfig GetClientConfig(); - Proto.Settings GetSettings(); + Settings GetSettings(); /// <summary> /// Get the identifier of current client. diff --git a/csharp/rocketmq-client-csharp/Producer.cs b/csharp/rocketmq-client-csharp/Producer.cs index 62387a98..6a9040ec 100644 --- a/csharp/rocketmq-client-csharp/Producer.cs +++ b/csharp/rocketmq-client-csharp/Producer.cs @@ -30,6 +30,7 @@ namespace Org.Apache.Rocketmq private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger(); private readonly ConcurrentDictionary<string /* topic */, PublishingLoadBalancer> _publishingRouteDataCache; private readonly PublishingSettings _publishingSettings; + private readonly ConcurrentDictionary<string, bool> _publishingTopics; public Producer(ClientConfig clientConfig) : this(clientConfig, new ConcurrentDictionary<string, bool>(), 3) @@ -41,20 +42,21 @@ namespace Org.Apache.Rocketmq { } - private Producer(ClientConfig clientConfig, ConcurrentDictionary<string, bool> topics, int maxAttempts) : - base(clientConfig, topics.Keys) + private Producer(ClientConfig clientConfig, ConcurrentDictionary<string, bool> publishingTopics, int maxAttempts) : + base(clientConfig, publishingTopics.Keys) { var retryPolicy = ExponentialBackoffRetryPolicy.ImmediatelyRetryPolicy(maxAttempts); _publishingSettings = new PublishingSettings(ClientId, clientConfig.Endpoints, retryPolicy, - clientConfig.RequestTimeout, topics); + clientConfig.RequestTimeout, publishingTopics); _publishingRouteDataCache = new ConcurrentDictionary<string, PublishingLoadBalancer>(); + _publishingTopics = publishingTopics; } public void SetTopics(params string[] topics) { foreach (var topic in topics) { - Topics.Add(topic); + _publishingTopics.TryAdd(topic, true); } } @@ -183,14 +185,9 @@ namespace Org.Apache.Rocketmq } } - public override Proto.Settings GetSettings() + public override Settings GetSettings() { - return _publishingSettings.ToProtobuf(); - } - - public override void OnSettingsCommand(Endpoints endpoints, Proto.Settings settings) - { - _publishingSettings.Sync(settings); + return _publishingSettings; } } } \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/Session.cs b/csharp/rocketmq-client-csharp/Session.cs index 99d61268..dd3da7bd 100644 --- a/csharp/rocketmq-client-csharp/Session.cs +++ b/csharp/rocketmq-client-csharp/Session.cs @@ -15,7 +15,6 @@ * limitations under the License. */ -using System; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; @@ -63,9 +62,9 @@ namespace Org.Apache.Rocketmq var writer = _streamingCall.RequestStream; // await readTask; var settings = _client.GetSettings(); - Proto.TelemetryCommand telemetryCommand = new Proto.TelemetryCommand + var telemetryCommand = new Proto.TelemetryCommand { - Settings = settings + Settings = settings.ToProtobuf() }; await writer.WriteAsync(telemetryCommand); // await writer.CompleteAsync(); diff --git a/csharp/rocketmq-client-csharp/SimpleConsumer.cs b/csharp/rocketmq-client-csharp/SimpleConsumer.cs index 1a0f0ec2..cb380d89 100644 --- a/csharp/rocketmq-client-csharp/SimpleConsumer.cs +++ b/csharp/rocketmq-client-csharp/SimpleConsumer.cs @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + using System; using System.Collections.Concurrent; using System.Collections.Generic; @@ -77,17 +94,11 @@ namespace Org.Apache.Rocketmq _subscriptionRouteDataCache.TryAdd(topic, subscriptionLoadBalancer); } - public override Proto.Settings GetSettings() + public override Settings GetSettings() { - return _simpleSubscriptionSettings.ToProtobuf(); + return _simpleSubscriptionSettings; } - public override void OnSettingsCommand(Endpoints endpoints, Proto.Settings settings) - { - _simpleSubscriptionSettings.Sync(settings); - } - - private async Task<SubscriptionLoadBalancer> GetSubscriptionLoadBalancer(string topic) { if (_subscriptionRouteDataCache.TryGetValue(topic, out var subscriptionLoadBalancer))
