This is an automated email from the ASF dual-hosted git repository. aaronai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit 70282e854e28643057cabe7fb32e30aa34922d96 Author: Aaron Ai <[email protected]> AuthorDate: Mon Feb 13 20:23:19 2023 +0800 Add Client#GetTopics --- csharp/rocketmq-client-csharp/Client.cs | 53 +++++++++++++++---------- csharp/rocketmq-client-csharp/Consumer.cs | 4 +- csharp/rocketmq-client-csharp/Producer.cs | 9 ++++- csharp/rocketmq-client-csharp/SimpleConsumer.cs | 10 +++-- 4 files changed, 47 insertions(+), 29 deletions(-) diff --git a/csharp/rocketmq-client-csharp/Client.cs b/csharp/rocketmq-client-csharp/Client.cs index 133fed00..afbfbe5b 100644 --- a/csharp/rocketmq-client-csharp/Client.cs +++ b/csharp/rocketmq-client-csharp/Client.cs @@ -31,21 +31,22 @@ namespace Org.Apache.Rocketmq { private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger(); - private static readonly TimeSpan HeartbeatScheduleDelay = TimeSpan.FromSeconds(10); + private static readonly TimeSpan HeartbeatScheduleDelay = TimeSpan.FromSeconds(1); + private static readonly TimeSpan HeartbeatSchedulePeriod = TimeSpan.FromSeconds(10); private readonly CancellationTokenSource _heartbeatCts; - private static readonly TimeSpan TopicRouteUpdateScheduleDelay = TimeSpan.FromSeconds(30); + private static readonly TimeSpan TopicRouteUpdateScheduleDelay = TimeSpan.FromSeconds(10); + private static readonly TimeSpan TopicRouteUpdateSchedulePeriod = TimeSpan.FromSeconds(30); private readonly CancellationTokenSource _topicRouteUpdateCtx; - private static readonly TimeSpan SettingsSyncScheduleDelay = TimeSpan.FromMinutes(5); + private static readonly TimeSpan SettingsSyncScheduleDelay = TimeSpan.FromSeconds(1); + private static readonly TimeSpan SettingsSyncSchedulePeriod = TimeSpan.FromMinutes(5); private readonly CancellationTokenSource _settingsSyncCtx; protected readonly ClientConfig ClientConfig; protected readonly IClientManager ClientManager; protected readonly string ClientId; - protected readonly ICollection<string> Topics; - protected readonly ConcurrentDictionary<Endpoints, bool> Isolated; private readonly ConcurrentDictionary<string, TopicRouteData> _topicRouteCache; private readonly CancellationTokenSource _telemetryCts; @@ -53,10 +54,9 @@ namespace Org.Apache.Rocketmq private readonly Dictionary<Endpoints, Session> _sessionsTable; private readonly ReaderWriterLockSlim _sessionLock; - protected Client(ClientConfig clientConfig, ICollection<string> topics) + protected Client(ClientConfig clientConfig) { ClientConfig = clientConfig; - Topics = topics; ClientId = Utilities.GetClientId(); ClientManager = new ClientManager(this); @@ -75,10 +75,12 @@ namespace Org.Apache.Rocketmq public virtual async Task Start() { Logger.Debug($"Begin to start the rocketmq client, clientId={ClientId}"); - ScheduleWithFixedDelay(UpdateTopicRouteCache, TopicRouteUpdateScheduleDelay, _topicRouteUpdateCtx.Token); - ScheduleWithFixedDelay(Heartbeat, HeartbeatScheduleDelay, _heartbeatCts.Token); - ScheduleWithFixedDelay(SyncSettings, SettingsSyncScheduleDelay, _settingsSyncCtx.Token); - foreach (var topic in Topics) + ScheduleWithFixedDelay(UpdateTopicRouteCache, TopicRouteUpdateScheduleDelay, TopicRouteUpdateSchedulePeriod, + _topicRouteUpdateCtx.Token); + ScheduleWithFixedDelay(Heartbeat, HeartbeatScheduleDelay, HeartbeatSchedulePeriod, _heartbeatCts.Token); + ScheduleWithFixedDelay(SyncSettings, SettingsSyncScheduleDelay, SettingsSyncSchedulePeriod, + _settingsSyncCtx.Token); + foreach (var topic in GetTopics()) { await FetchTopicRoute(topic); } @@ -132,6 +134,8 @@ namespace Org.Apache.Rocketmq } } + protected abstract ICollection<string> GetTopics(); + protected abstract Proto::HeartbeatRequest WrapHeartbeatRequest(); @@ -183,10 +187,9 @@ namespace Org.Apache.Rocketmq private async void UpdateTopicRouteCache() { Logger.Info($"Start to update topic route cache for a new round, clientId={ClientId}"); - foreach (var topic in Topics) + foreach (var topic in GetTopics()) { - var topicRouteData = await FetchTopicRoute(topic); - _topicRouteCache[topic] = topicRouteData; + await FetchTopicRoute(topic); } } @@ -199,10 +202,11 @@ namespace Org.Apache.Rocketmq } } - private void ScheduleWithFixedDelay(Action action, TimeSpan period, CancellationToken token) + private void ScheduleWithFixedDelay(Action action, TimeSpan delay, TimeSpan period, CancellationToken token) { Task.Run(async () => { + await Task.Delay(delay, token); while (!token.IsCancellationRequested) { try @@ -221,7 +225,18 @@ namespace Org.Apache.Rocketmq }, token); } - protected async Task<TopicRouteData> FetchTopicRoute(string topic) + protected async Task<TopicRouteData> GetRouteData(string topic) + { + if (_topicRouteCache.TryGetValue(topic, out var topicRouteData)) + { + return topicRouteData; + } + + topicRouteData = await FetchTopicRoute(topic); + return topicRouteData; + } + + private async Task<TopicRouteData> FetchTopicRoute(string topic) { var topicRouteData = await FetchTopicRoute0(topic); await OnTopicRouteDataFetched(topic, topicRouteData); @@ -260,12 +275,6 @@ namespace Org.Apache.Rocketmq private async void Heartbeat() { var endpoints = GetTotalRouteEndpoints(); - if (0 == endpoints.Count) - { - Logger.Debug("No broker endpoints available in topic route"); - return; - } - var request = WrapHeartbeatRequest(); Dictionary<Endpoints, Task<Proto.HeartbeatResponse>> responses = new(); // Collect task into a map. diff --git a/csharp/rocketmq-client-csharp/Consumer.cs b/csharp/rocketmq-client-csharp/Consumer.cs index 25df84d3..9a58104f 100644 --- a/csharp/rocketmq-client-csharp/Consumer.cs +++ b/csharp/rocketmq-client-csharp/Consumer.cs @@ -28,8 +28,8 @@ namespace Org.Apache.Rocketmq { protected readonly string ConsumerGroup; - protected Consumer(ClientConfig clientConfig, string consumerGroup, ICollection<string> topics) : base( - clientConfig, topics) + protected Consumer(ClientConfig clientConfig, string consumerGroup) : base( + clientConfig) { ConsumerGroup = consumerGroup; } diff --git a/csharp/rocketmq-client-csharp/Producer.cs b/csharp/rocketmq-client-csharp/Producer.cs index cc7794f6..7803e2aa 100644 --- a/csharp/rocketmq-client-csharp/Producer.cs +++ b/csharp/rocketmq-client-csharp/Producer.cs @@ -44,7 +44,7 @@ namespace Org.Apache.Rocketmq private Producer(ClientConfig clientConfig, ConcurrentDictionary<string, bool> publishingTopics, int maxAttempts) : - base(clientConfig, publishingTopics.Keys) + base(clientConfig) { var retryPolicy = ExponentialBackoffRetryPolicy.ImmediatelyRetryPolicy(maxAttempts); _publishingSettings = new PublishingSettings(ClientId, clientConfig.Endpoints, retryPolicy, @@ -61,6 +61,11 @@ namespace Org.Apache.Rocketmq } } + protected override ICollection<string> GetTopics() + { + return _publishingTopics.Keys; + } + public override async Task Start() { Logger.Info($"Begin to start the rocketmq producer, clientId={ClientId}"); @@ -90,7 +95,7 @@ namespace Org.Apache.Rocketmq return publishingLoadBalancer; } - var topicRouteData = await FetchTopicRoute(topic); + var topicRouteData = await GetRouteData(topic); publishingLoadBalancer = new PublishingLoadBalancer(topicRouteData); _publishingRouteDataCache.TryAdd(topic, publishingLoadBalancer); diff --git a/csharp/rocketmq-client-csharp/SimpleConsumer.cs b/csharp/rocketmq-client-csharp/SimpleConsumer.cs index cb380d89..0481f7e8 100644 --- a/csharp/rocketmq-client-csharp/SimpleConsumer.cs +++ b/csharp/rocketmq-client-csharp/SimpleConsumer.cs @@ -43,8 +43,7 @@ namespace Org.Apache.Rocketmq } private SimpleConsumer(ClientConfig clientConfig, string consumerGroup, TimeSpan awaitDuration, - ConcurrentDictionary<string, FilterExpression> subscriptionExpressions) : base(clientConfig, consumerGroup, - subscriptionExpressions.Keys) + ConcurrentDictionary<string, FilterExpression> subscriptionExpressions) : base(clientConfig, consumerGroup) { _awaitDuration = awaitDuration; _subscriptionRouteDataCache = new ConcurrentDictionary<string, SubscriptionLoadBalancer>(); @@ -79,6 +78,11 @@ namespace Org.Apache.Rocketmq await base.Shutdown(); Logger.Info($"The rocketmq simple consumer starts successfully, clientId={ClientId}"); } + + protected override ICollection<string> GetTopics() + { + return _subscriptionExpressions.Keys; + } protected override Proto.HeartbeatRequest WrapHeartbeatRequest() { @@ -106,7 +110,7 @@ namespace Org.Apache.Rocketmq return subscriptionLoadBalancer; } - var topicRouteData = await FetchTopicRoute(topic); + var topicRouteData = await GetRouteData(topic); subscriptionLoadBalancer = new SubscriptionLoadBalancer(topicRouteData); _subscriptionRouteDataCache.TryAdd(topic, subscriptionLoadBalancer);
