This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 19d8e30 Fix Awaiter silently dropping duplicate TCS entries (#281)
19d8e30 is described below
commit 19d8e30073021cb3c27167eb5135ec8a5e52eec3
Author: Thorben Luepkes <[email protected]>
AuthorDate: Mon Feb 16 12:05:20 2026 +0100
Fix Awaiter silently dropping duplicate TCS entries (#281)
Awaiter.CreateTask and AddTaskCompletionSource used
ConcurrentDictionary.TryAdd which silently discards the
new TaskCompletionSource when the key already exists.
During producer reconnection, messages are resent with
the same sequence IDs. The new TCS registrations are
silently dropped, so broker receipts complete the old
(already cancelled) TCS instead of the new one. This
causes ProducerSendReceiptOrderingException because
SubProducer.ProcessReceipt never sees the receipt for
the head of the send queue.
The fix introduces a private AddOrReplace method that
removes and cancels any existing TCS before adding the
new one, ensuring resent messages always have a live
TCS to receive broker receipts.
Fix #280
---
src/DotPulsar/Internal/Awaiter.cs | 12 +-
tests/DotPulsar.Tests/Internal/AwaiterTests.cs | 160 +++++++++++++++++++++++++
2 files changed, 170 insertions(+), 2 deletions(-)
diff --git a/src/DotPulsar/Internal/Awaiter.cs
b/src/DotPulsar/Internal/Awaiter.cs
index 50c246d..6a184a9 100644
--- a/src/DotPulsar/Internal/Awaiter.cs
+++ b/src/DotPulsar/Internal/Awaiter.cs
@@ -26,13 +26,21 @@ public sealed class Awaiter<T, TResult> : IDisposable where
T : notnull
public Task<TResult> CreateTask(T item)
{
var tcs = new
TaskCompletionSource<TResult>(TaskCreationOptions.RunContinuationsAsynchronously);
- _ = _items.TryAdd(item, tcs);
+ AddOrReplace(item, tcs);
return tcs.Task;
}
public void AddTaskCompletionSource(T item, TaskCompletionSource<TResult>
tcs)
{
- _ = _items.TryAdd(item, tcs);
+ AddOrReplace(item, tcs);
+ }
+
+ private void AddOrReplace(T item, TaskCompletionSource<TResult> tcs)
+ {
+ if (_items.TryRemove(item, out var oldTcs))
+ oldTcs.TrySetCanceled();
+
+ _items.TryAdd(item, tcs);
}
public void SetResult(T item, TResult result)
diff --git a/tests/DotPulsar.Tests/Internal/AwaiterTests.cs
b/tests/DotPulsar.Tests/Internal/AwaiterTests.cs
new file mode 100644
index 0000000..ce59438
--- /dev/null
+++ b/tests/DotPulsar.Tests/Internal/AwaiterTests.cs
@@ -0,0 +1,160 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Tests.Internal;
+
+using DotPulsar.Internal;
+
+[Trait("Category", "Unit")]
+public class AwaiterTests
+{
+ [Fact]
+ public void SetResult_GivenSingleEntry_ShouldCompleteTask()
+ {
+ //Arrange
+ var sut = new Awaiter<string, string>();
+ var task = sut.CreateTask("key1");
+
+ //Act
+ sut.SetResult("key1", "value1");
+
+ //Assert
+ task.IsCompletedSuccessfully.ShouldBeTrue();
+ task.Result.ShouldBe("value1");
+
+ //Annihilate
+ sut.Dispose();
+ }
+
+ [Fact]
+ public void Cancel_GivenSingleEntry_ShouldCancelTask()
+ {
+ //Arrange
+ var sut = new Awaiter<string, string>();
+ var task = sut.CreateTask("key1");
+
+ //Act
+ sut.Cancel("key1");
+
+ //Assert
+ task.IsCanceled.ShouldBeTrue();
+
+ //Annihilate
+ sut.Dispose();
+ }
+
+ [Fact]
+ public void SetResult_GivenNoMatchingEntry_ShouldNotThrow()
+ {
+ //Arrange
+ var sut = new Awaiter<string, string>();
+
+ //Act & Assert
+ Should.NotThrow(() => sut.SetResult("nonexistent", "value"));
+
+ //Annihilate
+ sut.Dispose();
+ }
+
+ [Fact]
+ public void
AddTaskCompletionSource_GivenDuplicateKey_ShouldReplaceOldEntry()
+ {
+ //Arrange
+ var sut = new Awaiter<string, string>();
+ var oldTask = sut.CreateTask("key1");
+ var newTcs = new TaskCompletionSource<string>();
+
+ //Act
+ sut.AddTaskCompletionSource("key1", newTcs);
+
+ //Assert — old TCS should be cancelled since it was replaced
+ oldTask.IsCanceled.ShouldBeTrue();
+
+ //Annihilate
+ sut.Dispose();
+ }
+
+ [Fact]
+ public void
AddTaskCompletionSource_GivenDuplicateKey_ShouldCompleteNewTcsOnSetResult()
+ {
+ //Arrange
+ var sut = new Awaiter<string, string>();
+ _ = sut.CreateTask("key1");
+ var newTcs = new TaskCompletionSource<string>();
+ sut.AddTaskCompletionSource("key1", newTcs);
+
+ //Act
+ sut.SetResult("key1", "receipt");
+
+ //Assert — the NEW TCS should receive the result
+ newTcs.Task.IsCompletedSuccessfully.ShouldBeTrue();
+ newTcs.Task.Result.ShouldBe("receipt");
+
+ //Annihilate
+ sut.Dispose();
+ }
+
+ [Fact]
+ public void CreateTask_GivenDuplicateKey_ShouldReplaceOldEntry()
+ {
+ //Arrange
+ var sut = new Awaiter<string, string>();
+ var oldTask = sut.CreateTask("key1");
+
+ //Act
+ var newTask = sut.CreateTask("key1");
+
+ //Assert — old task should be cancelled, new task should be pending
+ oldTask.IsCanceled.ShouldBeTrue();
+ newTask.IsCompleted.ShouldBeFalse();
+
+ //Annihilate
+ sut.Dispose();
+ }
+
+ [Fact]
+ public void CreateTask_GivenDuplicateKey_ShouldCompleteNewTaskOnSetResult()
+ {
+ //Arrange
+ var sut = new Awaiter<string, string>();
+ _ = sut.CreateTask("key1");
+ var newTask = sut.CreateTask("key1");
+
+ //Act
+ sut.SetResult("key1", "receipt");
+
+ //Assert
+ newTask.IsCompletedSuccessfully.ShouldBeTrue();
+ newTask.Result.ShouldBe("receipt");
+
+ //Annihilate
+ sut.Dispose();
+ }
+
+ [Fact]
+ public void Dispose_GivenPendingEntries_ShouldCancelAll()
+ {
+ //Arrange
+ var sut = new Awaiter<string, string>();
+ var task1 = sut.CreateTask("key1");
+ var task2 = sut.CreateTask("key2");
+
+ //Act
+ sut.Dispose();
+
+ //Assert
+ task1.IsCanceled.ShouldBeTrue();
+ task2.IsCanceled.ShouldBeTrue();
+ }
+}