This is an automated email from the ASF dual-hosted git repository. aaronai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit d7a0a5c6a62a7a6b9e4e3d1050cf45b5694dfb95 Author: Aaron Ai <[email protected]> AuthorDate: Fri Feb 17 10:47:51 2023 +0800 Add more method for IClientConfig --- csharp/rocketmq-client-csharp/Client.cs | 6 ++++-- csharp/rocketmq-client-csharp/ClientConfig.cs | 4 ++-- csharp/rocketmq-client-csharp/IClientConfig.cs | 6 ++++++ csharp/rocketmq-client-csharp/Producer.cs | 2 +- csharp/rocketmq-client-csharp/Session.cs | 9 ++++++--- csharp/rocketmq-client-csharp/SimpleConsumer.cs | 2 +- 6 files changed, 20 insertions(+), 9 deletions(-) diff --git a/csharp/rocketmq-client-csharp/Client.cs b/csharp/rocketmq-client-csharp/Client.cs index 8e49851a..9311b48a 100644 --- a/csharp/rocketmq-client-csharp/Client.cs +++ b/csharp/rocketmq-client-csharp/Client.cs @@ -48,6 +48,7 @@ namespace Org.Apache.Rocketmq private readonly CancellationTokenSource _statsCts; protected readonly ClientConfig ClientConfig; + protected readonly Endpoints Endpoints; protected readonly IClientManager ClientManager; protected readonly string ClientId; @@ -61,6 +62,7 @@ namespace Org.Apache.Rocketmq protected Client(ClientConfig clientConfig) { ClientConfig = clientConfig; + Endpoints = new Endpoints(clientConfig.Endpoints); ClientId = Utilities.GetClientId(); ClientManager = new ClientManager(this); @@ -288,11 +290,11 @@ namespace Org.Apache.Rocketmq { Name = topic }, - Endpoints = ClientConfig.Endpoints.ToProtobuf() + Endpoints = Endpoints.ToProtobuf() }; var response = - await ClientManager.QueryRoute(ClientConfig.Endpoints, request, ClientConfig.RequestTimeout); + await ClientManager.QueryRoute(Endpoints, request, ClientConfig.RequestTimeout); var code = response.Status.Code; if (!Proto.Code.Ok.Equals(code)) { diff --git a/csharp/rocketmq-client-csharp/ClientConfig.cs b/csharp/rocketmq-client-csharp/ClientConfig.cs index 7e434eae..82ee8403 100644 --- a/csharp/rocketmq-client-csharp/ClientConfig.cs +++ b/csharp/rocketmq-client-csharp/ClientConfig.cs @@ -25,13 +25,13 @@ namespace Org.Apache.Rocketmq public ClientConfig(string endpoints) { RequestTimeout = TimeSpan.FromSeconds(3); - Endpoints = new Endpoints(endpoints); + Endpoints = endpoints; } public ICredentialsProvider CredentialsProvider { get; set; } public TimeSpan RequestTimeout { get; set; } - public Endpoints Endpoints { get; } + public string Endpoints { get; } } } \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/IClientConfig.cs b/csharp/rocketmq-client-csharp/IClientConfig.cs index a50bdf93..d1b7ffe6 100644 --- a/csharp/rocketmq-client-csharp/IClientConfig.cs +++ b/csharp/rocketmq-client-csharp/IClientConfig.cs @@ -15,10 +15,16 @@ * limitations under the License. */ +using System; + namespace Org.Apache.Rocketmq { public interface IClientConfig { ICredentialsProvider CredentialsProvider { get; } + + TimeSpan RequestTimeout { get; } + + string Endpoints { get; } } } \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/Producer.cs b/csharp/rocketmq-client-csharp/Producer.cs index f9825d32..c4e78e91 100644 --- a/csharp/rocketmq-client-csharp/Producer.cs +++ b/csharp/rocketmq-client-csharp/Producer.cs @@ -47,7 +47,7 @@ namespace Org.Apache.Rocketmq base(clientConfig) { var retryPolicy = ExponentialBackoffRetryPolicy.ImmediatelyRetryPolicy(maxAttempts); - PublishingSettings = new PublishingSettings(ClientId, clientConfig.Endpoints, retryPolicy, + PublishingSettings = new PublishingSettings(ClientId, Endpoints, retryPolicy, clientConfig.RequestTimeout, publishingTopics); _publishingRouteDataCache = new ConcurrentDictionary<string, PublishingLoadBalancer>(); _publishingTopics = publishingTopics; diff --git a/csharp/rocketmq-client-csharp/Session.cs b/csharp/rocketmq-client-csharp/Session.cs index 24ff9c7c..0d35be0a 100644 --- a/csharp/rocketmq-client-csharp/Session.cs +++ b/csharp/rocketmq-client-csharp/Session.cs @@ -15,6 +15,7 @@ * limitations under the License. */ +using System; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; @@ -30,6 +31,8 @@ namespace Org.Apache.Rocketmq { private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger(); + private static readonly TimeSpan SettingsInitializationTimeout = TimeSpan.FromSeconds(3); + private readonly grpc::AsyncDuplexStreamingCall<Proto::TelemetryCommand, Proto::TelemetryCommand> _streamingCall; @@ -57,12 +60,12 @@ namespace Org.Apache.Rocketmq { var writer = _streamingCall.RequestStream; await writer.WriteAsync(telemetryCommand); - } + } public async Task SyncSettings(bool awaitResp) { - // TODO - await _semaphore.WaitAsync(); + // Add more buffer time. + await _semaphore.WaitAsync(_client.GetClientConfig().RequestTimeout.Add(SettingsInitializationTimeout)); try { var writer = _streamingCall.RequestStream; diff --git a/csharp/rocketmq-client-csharp/SimpleConsumer.cs b/csharp/rocketmq-client-csharp/SimpleConsumer.cs index cdd5aa64..f6abd5ea 100644 --- a/csharp/rocketmq-client-csharp/SimpleConsumer.cs +++ b/csharp/rocketmq-client-csharp/SimpleConsumer.cs @@ -48,7 +48,7 @@ namespace Org.Apache.Rocketmq _awaitDuration = awaitDuration; _subscriptionRouteDataCache = new ConcurrentDictionary<string, SubscriptionLoadBalancer>(); _subscriptionExpressions = subscriptionExpressions; - _simpleSubscriptionSettings = new SimpleSubscriptionSettings(ClientId, clientConfig.Endpoints, + _simpleSubscriptionSettings = new SimpleSubscriptionSettings(ClientId, Endpoints, ConsumerGroup, clientConfig.RequestTimeout, awaitDuration, subscriptionExpressions); _topicRoundRobinIndex = 0; }
