This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git

commit 70282e854e28643057cabe7fb32e30aa34922d96
Author: Aaron Ai <[email protected]>
AuthorDate: Mon Feb 13 20:23:19 2023 +0800

    Add Client#GetTopics
---
 csharp/rocketmq-client-csharp/Client.cs         | 53 +++++++++++++++----------
 csharp/rocketmq-client-csharp/Consumer.cs       |  4 +-
 csharp/rocketmq-client-csharp/Producer.cs       |  9 ++++-
 csharp/rocketmq-client-csharp/SimpleConsumer.cs | 10 +++--
 4 files changed, 47 insertions(+), 29 deletions(-)

diff --git a/csharp/rocketmq-client-csharp/Client.cs 
b/csharp/rocketmq-client-csharp/Client.cs
index 133fed00..afbfbe5b 100644
--- a/csharp/rocketmq-client-csharp/Client.cs
+++ b/csharp/rocketmq-client-csharp/Client.cs
@@ -31,21 +31,22 @@ namespace Org.Apache.Rocketmq
     {
         private static readonly Logger Logger = 
MqLogManager.Instance.GetCurrentClassLogger();
 
-        private static readonly TimeSpan HeartbeatScheduleDelay = 
TimeSpan.FromSeconds(10);
+        private static readonly TimeSpan HeartbeatScheduleDelay = 
TimeSpan.FromSeconds(1);
+        private static readonly TimeSpan HeartbeatSchedulePeriod = 
TimeSpan.FromSeconds(10);
         private readonly CancellationTokenSource _heartbeatCts;
 
-        private static readonly TimeSpan TopicRouteUpdateScheduleDelay = 
TimeSpan.FromSeconds(30);
+        private static readonly TimeSpan TopicRouteUpdateScheduleDelay = 
TimeSpan.FromSeconds(10);
+        private static readonly TimeSpan TopicRouteUpdateSchedulePeriod = 
TimeSpan.FromSeconds(30);
         private readonly CancellationTokenSource _topicRouteUpdateCtx;
 
-        private static readonly TimeSpan SettingsSyncScheduleDelay = 
TimeSpan.FromMinutes(5);
+        private static readonly TimeSpan SettingsSyncScheduleDelay = 
TimeSpan.FromSeconds(1);
+        private static readonly TimeSpan SettingsSyncSchedulePeriod = 
TimeSpan.FromMinutes(5);
         private readonly CancellationTokenSource _settingsSyncCtx;
 
         protected readonly ClientConfig ClientConfig;
         protected readonly IClientManager ClientManager;
         protected readonly string ClientId;
 
-        protected readonly ICollection<string> Topics;
-
         protected readonly ConcurrentDictionary<Endpoints, bool> Isolated;
         private readonly ConcurrentDictionary<string, TopicRouteData> 
_topicRouteCache;
         private readonly CancellationTokenSource _telemetryCts;
@@ -53,10 +54,9 @@ namespace Org.Apache.Rocketmq
         private readonly Dictionary<Endpoints, Session> _sessionsTable;
         private readonly ReaderWriterLockSlim _sessionLock;
 
-        protected Client(ClientConfig clientConfig, ICollection<string> topics)
+        protected Client(ClientConfig clientConfig)
         {
             ClientConfig = clientConfig;
-            Topics = topics;
             ClientId = Utilities.GetClientId();
 
             ClientManager = new ClientManager(this);
@@ -75,10 +75,12 @@ namespace Org.Apache.Rocketmq
         public virtual async Task Start()
         {
             Logger.Debug($"Begin to start the rocketmq client, 
clientId={ClientId}");
-            ScheduleWithFixedDelay(UpdateTopicRouteCache, 
TopicRouteUpdateScheduleDelay, _topicRouteUpdateCtx.Token);
-            ScheduleWithFixedDelay(Heartbeat, HeartbeatScheduleDelay, 
_heartbeatCts.Token);
-            ScheduleWithFixedDelay(SyncSettings, SettingsSyncScheduleDelay, 
_settingsSyncCtx.Token);
-            foreach (var topic in Topics)
+            ScheduleWithFixedDelay(UpdateTopicRouteCache, 
TopicRouteUpdateScheduleDelay, TopicRouteUpdateSchedulePeriod,
+                _topicRouteUpdateCtx.Token);
+            ScheduleWithFixedDelay(Heartbeat, HeartbeatScheduleDelay, 
HeartbeatSchedulePeriod, _heartbeatCts.Token);
+            ScheduleWithFixedDelay(SyncSettings, SettingsSyncScheduleDelay, 
SettingsSyncSchedulePeriod,
+                _settingsSyncCtx.Token);
+            foreach (var topic in GetTopics())
             {
                 await FetchTopicRoute(topic);
             }
@@ -132,6 +134,8 @@ namespace Org.Apache.Rocketmq
             }
         }
 
+        protected abstract ICollection<string> GetTopics();
+
         protected abstract Proto::HeartbeatRequest WrapHeartbeatRequest();
 
 
@@ -183,10 +187,9 @@ namespace Org.Apache.Rocketmq
         private async void UpdateTopicRouteCache()
         {
             Logger.Info($"Start to update topic route cache for a new round, 
clientId={ClientId}");
-            foreach (var topic in Topics)
+            foreach (var topic in GetTopics())
             {
-                var topicRouteData = await FetchTopicRoute(topic);
-                _topicRouteCache[topic] = topicRouteData;
+                await FetchTopicRoute(topic);
             }
         }
 
@@ -199,10 +202,11 @@ namespace Org.Apache.Rocketmq
             }
         }
 
-        private void ScheduleWithFixedDelay(Action action, TimeSpan period, 
CancellationToken token)
+        private void ScheduleWithFixedDelay(Action action, TimeSpan delay, 
TimeSpan period, CancellationToken token)
         {
             Task.Run(async () =>
             {
+                await Task.Delay(delay, token);
                 while (!token.IsCancellationRequested)
                 {
                     try
@@ -221,7 +225,18 @@ namespace Org.Apache.Rocketmq
             }, token);
         }
 
-        protected async Task<TopicRouteData> FetchTopicRoute(string topic)
+        protected async Task<TopicRouteData> GetRouteData(string topic)
+        {
+            if (_topicRouteCache.TryGetValue(topic, out var topicRouteData))
+            {
+                return topicRouteData;
+            }
+
+            topicRouteData = await FetchTopicRoute(topic);
+            return topicRouteData;
+        }
+
+        private async Task<TopicRouteData> FetchTopicRoute(string topic)
         {
             var topicRouteData = await FetchTopicRoute0(topic);
             await OnTopicRouteDataFetched(topic, topicRouteData);
@@ -260,12 +275,6 @@ namespace Org.Apache.Rocketmq
         private async void Heartbeat()
         {
             var endpoints = GetTotalRouteEndpoints();
-            if (0 == endpoints.Count)
-            {
-                Logger.Debug("No broker endpoints available in topic route");
-                return;
-            }
-
             var request = WrapHeartbeatRequest();
             Dictionary<Endpoints, Task<Proto.HeartbeatResponse>> responses = 
new();
             // Collect task into a map.
diff --git a/csharp/rocketmq-client-csharp/Consumer.cs 
b/csharp/rocketmq-client-csharp/Consumer.cs
index 25df84d3..9a58104f 100644
--- a/csharp/rocketmq-client-csharp/Consumer.cs
+++ b/csharp/rocketmq-client-csharp/Consumer.cs
@@ -28,8 +28,8 @@ namespace Org.Apache.Rocketmq
     {
         protected readonly string ConsumerGroup;
 
-        protected Consumer(ClientConfig clientConfig, string consumerGroup, 
ICollection<string> topics) : base(
-            clientConfig, topics)
+        protected Consumer(ClientConfig clientConfig, string consumerGroup) : 
base(
+            clientConfig)
         {
             ConsumerGroup = consumerGroup;
         }
diff --git a/csharp/rocketmq-client-csharp/Producer.cs 
b/csharp/rocketmq-client-csharp/Producer.cs
index cc7794f6..7803e2aa 100644
--- a/csharp/rocketmq-client-csharp/Producer.cs
+++ b/csharp/rocketmq-client-csharp/Producer.cs
@@ -44,7 +44,7 @@ namespace Org.Apache.Rocketmq
 
         private Producer(ClientConfig clientConfig, 
ConcurrentDictionary<string, bool> publishingTopics,
             int maxAttempts) :
-            base(clientConfig, publishingTopics.Keys)
+            base(clientConfig)
         {
             var retryPolicy = 
ExponentialBackoffRetryPolicy.ImmediatelyRetryPolicy(maxAttempts);
             _publishingSettings = new PublishingSettings(ClientId, 
clientConfig.Endpoints, retryPolicy,
@@ -61,6 +61,11 @@ namespace Org.Apache.Rocketmq
             }
         }
 
+        protected override ICollection<string> GetTopics()
+        {
+            return _publishingTopics.Keys;
+        }
+
         public override async Task Start()
         {
             Logger.Info($"Begin to start the rocketmq producer, 
clientId={ClientId}");
@@ -90,7 +95,7 @@ namespace Org.Apache.Rocketmq
                 return publishingLoadBalancer;
             }
 
-            var topicRouteData = await FetchTopicRoute(topic);
+            var topicRouteData = await GetRouteData(topic);
             publishingLoadBalancer = new 
PublishingLoadBalancer(topicRouteData);
             _publishingRouteDataCache.TryAdd(topic, publishingLoadBalancer);
 
diff --git a/csharp/rocketmq-client-csharp/SimpleConsumer.cs 
b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
index cb380d89..0481f7e8 100644
--- a/csharp/rocketmq-client-csharp/SimpleConsumer.cs
+++ b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
@@ -43,8 +43,7 @@ namespace Org.Apache.Rocketmq
         }
 
         private SimpleConsumer(ClientConfig clientConfig, string 
consumerGroup, TimeSpan awaitDuration,
-            ConcurrentDictionary<string, FilterExpression> 
subscriptionExpressions) : base(clientConfig, consumerGroup,
-            subscriptionExpressions.Keys)
+            ConcurrentDictionary<string, FilterExpression> 
subscriptionExpressions) : base(clientConfig, consumerGroup)
         {
             _awaitDuration = awaitDuration;
             _subscriptionRouteDataCache = new ConcurrentDictionary<string, 
SubscriptionLoadBalancer>();
@@ -79,6 +78,11 @@ namespace Org.Apache.Rocketmq
             await base.Shutdown();
             Logger.Info($"The rocketmq simple consumer starts successfully, 
clientId={ClientId}");
         }
+        
+        protected override ICollection<string> GetTopics()
+        {
+            return _subscriptionExpressions.Keys;
+        }
 
         protected override Proto.HeartbeatRequest WrapHeartbeatRequest()
         {
@@ -106,7 +110,7 @@ namespace Org.Apache.Rocketmq
                 return subscriptionLoadBalancer;
             }
 
-            var topicRouteData = await FetchTopicRoute(topic);
+            var topicRouteData = await GetRouteData(topic);
             subscriptionLoadBalancer = new 
SubscriptionLoadBalancer(topicRouteData);
             _subscriptionRouteDataCache.TryAdd(topic, 
subscriptionLoadBalancer);
 

Reply via email to