This is an automated email from the ASF dual-hosted git repository. aaronai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit 7be8d6da9fae153f5781fea13022849e9288d4d6 Author: Aaron Ai <[email protected]> AuthorDate: Fri Feb 17 10:31:42 2023 +0800 Add more stats info --- csharp/rocketmq-client-csharp/Client.cs | 29 ++++++++++++++++++++++------- 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/csharp/rocketmq-client-csharp/Client.cs b/csharp/rocketmq-client-csharp/Client.cs index 63c24c91..8e49851a 100644 --- a/csharp/rocketmq-client-csharp/Client.cs +++ b/csharp/rocketmq-client-csharp/Client.cs @@ -37,11 +37,15 @@ namespace Org.Apache.Rocketmq private static readonly TimeSpan TopicRouteUpdateScheduleDelay = TimeSpan.FromSeconds(10); private static readonly TimeSpan TopicRouteUpdateSchedulePeriod = TimeSpan.FromSeconds(30); - private readonly CancellationTokenSource _topicRouteUpdateCtx; + private readonly CancellationTokenSource _topicRouteUpdateCts; private static readonly TimeSpan SettingsSyncScheduleDelay = TimeSpan.FromSeconds(1); private static readonly TimeSpan SettingsSyncSchedulePeriod = TimeSpan.FromMinutes(5); - private readonly CancellationTokenSource _settingsSyncCtx; + private readonly CancellationTokenSource _settingsSyncCts; + + private static readonly TimeSpan StatsScheduleDelay = TimeSpan.FromSeconds(1); + private static readonly TimeSpan StatsSchedulePeriod = TimeSpan.FromSeconds(1); + private readonly CancellationTokenSource _statsCts; protected readonly ClientConfig ClientConfig; protected readonly IClientManager ClientManager; @@ -63,10 +67,11 @@ namespace Org.Apache.Rocketmq Isolated = new ConcurrentDictionary<Endpoints, bool>(); _topicRouteCache = new ConcurrentDictionary<string, TopicRouteData>(); - _topicRouteUpdateCtx = new CancellationTokenSource(); - _settingsSyncCtx = new CancellationTokenSource(); + _topicRouteUpdateCts = new CancellationTokenSource(); + _settingsSyncCts = new CancellationTokenSource(); _heartbeatCts = new CancellationTokenSource(); _telemetryCts = new CancellationTokenSource(); + _statsCts = new CancellationTokenSource(); _sessionsTable = new Dictionary<Endpoints, Session>(); _sessionLock = new ReaderWriterLockSlim(); @@ -76,10 +81,11 @@ namespace Org.Apache.Rocketmq { Logger.Debug($"Begin to start the rocketmq client, clientId={ClientId}"); ScheduleWithFixedDelay(UpdateTopicRouteCache, TopicRouteUpdateScheduleDelay, TopicRouteUpdateSchedulePeriod, - _topicRouteUpdateCtx.Token); + _topicRouteUpdateCts.Token); ScheduleWithFixedDelay(Heartbeat, HeartbeatScheduleDelay, HeartbeatSchedulePeriod, _heartbeatCts.Token); ScheduleWithFixedDelay(SyncSettings, SettingsSyncScheduleDelay, SettingsSyncSchedulePeriod, - _settingsSyncCtx.Token); + _settingsSyncCts.Token); + ScheduleWithFixedDelay(Stats, StatsScheduleDelay, StatsSchedulePeriod, _statsCts.Token); foreach (var topic in GetTopics()) { await FetchTopicRoute(topic); @@ -91,7 +97,7 @@ namespace Org.Apache.Rocketmq public virtual async Task Shutdown() { Logger.Debug($"Begin to shutdown rocketmq client, clientId={ClientId}"); - _topicRouteUpdateCtx.Cancel(); + _topicRouteUpdateCts.Cancel(); _heartbeatCts.Cancel(); _telemetryCts.Cancel(); NotifyClientTermination(); @@ -221,6 +227,15 @@ namespace Org.Apache.Rocketmq } } + private void Stats() + { + ThreadPool.GetAvailableThreads(out var availableWorker, out var availableIo); + Logger.Info($"ThreadCount={ThreadPool.ThreadCount}, " + + $"CompletedWorkItemCount={ThreadPool.CompletedWorkItemCount}, " + + $"PendingWorkItemCount={ThreadPool.PendingWorkItemCount}, AvailableWorkerThreads={availableWorker}, " + + $"AvailableCompletionPortThreads={availableIo}, ClientId={ClientId}"); + } + private void ScheduleWithFixedDelay(Action action, TimeSpan delay, TimeSpan period, CancellationToken token) { Task.Run(async () =>
