This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9cb638b Fixed memory-leak when consuming from a partitioned topic
9cb638b is described below
commit 9cb638bf0de91e60f02c7b43613b01402cd6cb18
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Mon May 12 21:16:40 2025 +0200
Fixed memory-leak when consuming from a partitioned topic
---
src/DotPulsar/Internal/Consumer.cs | 44 +++++++++++++++++++++-----------------
1 file changed, 24 insertions(+), 20 deletions(-)
diff --git a/src/DotPulsar/Internal/Consumer.cs
b/src/DotPulsar/Internal/Consumer.cs
index 54b1d4c..70a1ee7 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -33,8 +33,10 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
private readonly SemaphoreSlim _semaphoreSlim;
private readonly AsyncLock _lock;
private readonly Dictionary<string, SubConsumer<TMessage>> _subConsumers;
- private readonly LinkedList<Task<IMessage<TMessage>>> _receiveTasks;
- private Dictionary<string, SubConsumer<TMessage>>.Enumerator
_receiveEnumerator;
+ private readonly TaskCompletionSource<IMessage<TMessage>> _neverEndingTask;
+ private SubConsumer<TMessage>[] _receivers;
+ private Task<IMessage<TMessage>>[] _receiveTasks;
+ private int _receiveIndex;
private SubConsumer<TMessage>? _singleSubConsumer;
private bool _allSubConsumersAreReady;
private int _isDisposed;
@@ -64,6 +66,8 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
Topic = consumerOptions.TopicsPattern.ToString();
else
Topic = string.Join(",", consumerOptions.Topics);
+ _neverEndingTask = new TaskCompletionSource<IMessage<TMessage>>();
+ _receivers = [];
_receiveTasks = [];
_cts = new CancellationTokenSource();
_exceptionHandler = exceptionHandler;
@@ -76,7 +80,6 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
_allSubConsumersAreReady = false;
_isDisposed = 0;
_subConsumers = [];
- _receiveEnumerator = _subConsumers.GetEnumerator();
_singleSubConsumer = null;
_ = Setup();
@@ -134,6 +137,7 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
}
_numberOfSubConsumers = topics.Count;
+ _receiveTasks = Enumerable.Repeat(_neverEndingTask.Task,
_numberOfSubConsumers + 1).ToArray();
var monitoringTasks = new
Task<ConsumerStateChanged>[_numberOfSubConsumers];
var states = new ConsumerState[_numberOfSubConsumers];
@@ -148,7 +152,7 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
if (_numberOfSubConsumers == 1)
_singleSubConsumer = _subConsumers.First().Value;
- _receiveEnumerator = _subConsumers.GetEnumerator();
+ _receivers = _subConsumers.Values.ToArray();
_allSubConsumersAreReady = true;
_semaphoreSlim.Release();
@@ -217,39 +221,39 @@ public sealed class Consumer<TMessage> :
IConsumer<TMessage>
{
while (true)
{
- var receiveTaskNode = _receiveTasks.First;
- while (receiveTaskNode is not null)
+ for (var i = 0; i < _numberOfSubConsumers; ++i)
{
- if (receiveTaskNode.Value.IsCompleted)
+ var task = _receiveTasks[i];
+ if (task.IsCompleted)
{
- _receiveTasks.Remove(receiveTaskNode);
- return receiveTaskNode.Value.Result;
+ _receiveTasks[i] = _neverEndingTask.Task;
+ return task.Result;
}
- receiveTaskNode = receiveTaskNode.Next;
}
- for (var i = 0; i < _numberOfSubConsumers; ++i)
+ for (var i = 0; i < _numberOfSubConsumers; ++i,
++_receiveIndex)
{
- if (!_receiveEnumerator.MoveNext())
- {
- _receiveEnumerator = _subConsumers.GetEnumerator();
- _receiveEnumerator.MoveNext();
- }
+ if (_receiveIndex == _numberOfSubConsumers)
+ _receiveIndex = 0;
+
+ var task = _receiveTasks[_receiveIndex];
+ if (task != _neverEndingTask.Task)
+ continue;
- var subConsumer = _receiveEnumerator.Current.Value;
+ var subConsumer = _receivers[_receiveIndex];
var receiveTask = subConsumer.Receive(_cts.Token);
if (receiveTask.IsCompleted)
return receiveTask.Result;
- _receiveTasks.AddLast(receiveTask.AsTask());
+ _receiveTasks[_receiveIndex] = receiveTask.AsTask();
}
var tcs = new TaskCompletionSource<IMessage<TMessage>>();
using var registration = cancellationToken.Register(() =>
tcs.TrySetCanceled());
- _receiveTasks.AddLast(tcs.Task);
+ _receiveTasks[_numberOfSubConsumers] = tcs.Task;
await Task.WhenAny(_receiveTasks).ConfigureAwait(false);
- _receiveTasks.RemoveLast();
+ _receiveTasks[_numberOfSubConsumers] = _neverEndingTask.Task;
cancellationToken.ThrowIfCancellationRequested();
}
}