This is an automated email from the ASF dual-hosted git repository. blankensteiner pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push: new 023ffe2 Fix memory leak in Producer (#140) 023ffe2 is described below commit 023ffe2b09dda4b2dd06da09ba790f1d0e6424d1 Author: Kristian Andersen <kanderse...@users.noreply.github.com> AuthorDate: Fri Feb 17 12:12:04 2023 +0100 Fix memory leak in Producer (#140) * Fix memory leak in Producer Cancellation token registration for Send operation was not disposed * Found one more Fixed additional memory leak Ensure token registration is disposed even if TaskCompletionSource task is cancelled. --- CHANGELOG.md | 6 ++++++ src/DotPulsar/Internal/AsyncQueueWithCursor.cs | 15 ++++++++++++--- src/DotPulsar/Internal/Producer.cs | 11 +++++++++-- 3 files changed, 27 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3047925..87ff85a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [2.10.2] - 2023-02-17 + +### Fixed + +- Fixed a memory leak introduced in 2.8.0 with internal rewrite of producer functionality + ## [2.10.1] - 2023-02-15 ### Fixed diff --git a/src/DotPulsar/Internal/AsyncQueueWithCursor.cs b/src/DotPulsar/Internal/AsyncQueueWithCursor.cs index 1765c58..f7f3824 100644 --- a/src/DotPulsar/Internal/AsyncQueueWithCursor.cs +++ b/src/DotPulsar/Internal/AsyncQueueWithCursor.cs @@ -221,7 +221,8 @@ public sealed class AsyncQueueWithCursor<T> : IAsyncDisposable where T : IDispos /// </summary> public async Task WaitForEmpty(CancellationToken cancellationToken) { - var tcs = new TaskCompletionSource<object>(); + TaskCompletionSource<object> tcs; + CancellationTokenRegistration registration; lock (_queue) { ThrowIfDisposed(); @@ -229,11 +230,19 @@ public sealed class AsyncQueueWithCursor<T> : IAsyncDisposable where T : IDispos if (_queue.Count == 0) return; - cancellationToken.Register(() => tcs.TrySetCanceled()); + tcs = new TaskCompletionSource<object>(); + registration = cancellationToken.Register(() => tcs.TrySetCanceled(cancellationToken)); _queueEmptyTcs.Add(tcs); } - await tcs.Task.ConfigureAwait(false); + try + { + await tcs.Task.ConfigureAwait(false); + } + finally + { + registration.Dispose(); + } } public async ValueTask DisposeAsync() diff --git a/src/DotPulsar/Internal/Producer.cs b/src/DotPulsar/Internal/Producer.cs index be92548..f0a8926 100644 --- a/src/DotPulsar/Internal/Producer.cs +++ b/src/DotPulsar/Internal/Producer.cs @@ -243,7 +243,7 @@ public sealed class Producer<TMessage> : IProducer<TMessage>, IRegisterEvent public async ValueTask<MessageId> Send(MessageMetadata metadata, TMessage message, CancellationToken cancellationToken) { var tcs = new TaskCompletionSource<MessageId>(); - cancellationToken.Register(() => tcs.TrySetCanceled(cancellationToken)); + var registration = cancellationToken.Register(() => tcs.TrySetCanceled(cancellationToken)); ValueTask OnMessageSent(MessageId messageId) { @@ -257,7 +257,14 @@ public sealed class Producer<TMessage> : IProducer<TMessage>, IRegisterEvent await InternalSend(metadata, message, true, OnMessageSent, cancellationToken).ConfigureAwait(false); - return await tcs.Task.ConfigureAwait(false); + try + { + return await tcs.Task.ConfigureAwait(false); + } + finally + { + registration.Dispose(); + } } public async ValueTask Enqueue(MessageMetadata metadata, TMessage message, Func<MessageId, ValueTask>? onMessageSent = default, CancellationToken cancellationToken = default)