Thorbenl opened a new issue, #280:
URL: https://github.com/apache/pulsar-dotpulsar/issues/280
## Description
`Awaiter.AddTaskCompletionSource` (and `CreateTask`) use
`ConcurrentDictionary.TryAdd` and silently discard the return value. If an
entry for the same key `(ProducerId, SequenceId)` already exists — which
happens when messages are resent after a producer reconnect — the new TCS
registration is silently dropped.
This causes `ProducerSendReceiptOrderingException` during reconnect because:
1. Producer sends messages A(seq=1), B(seq=2), C(seq=3) — TCS entries
registered in `Awaiter`
2. Connection disruption occurs (e.g. auth token expiry triggering
`AuthChallenge` failure)
3. Receipt for A arrives and is processed, but entries for B and C remain in
`Awaiter` (connection didn't close cleanly, so `CloseProducer` never cancelled
them)
4. `EstablishNewChannel` reconnects and `MessageDispatcher` replays the send
queue
5. Resending B(seq=2) calls `AddTaskCompletionSource` with the same
`(ProducerId, SequenceId)` key — `TryAdd` returns `false`, new TCS is silently
dropped
6. When broker returns the receipt, `SetResult` completes the **old** TCS
(whose `ContinueWith` points to the now-disposed old `responseQueue`), so the
enqueue silently fails
7. The new `ResponseProcessor` never receives the receipt — eventually gets
a receipt for a later sequence, mismatches the queue head, throws
`ProducerSendReceiptOrderingException`
## Reproduction Steps
The bug can be demonstrated with a unit test (no broker needed):
\`\`\`csharp
var sut = new Awaiter<string, string>();
// Simulate first send — registers TCS for key "A"
var oldTask = sut.CreateTask("A");
// Simulate reconnect — tries to register NEW TCS for same key "A"
var newTcs = new TaskCompletionSource<string>();
sut.AddTaskCompletionSource("A", newTcs);
// Simulate broker returning receipt — completes the OLD task, not the new
one
sut.SetResult("A", "receipt");
// BUG: old task completes, new TCS is never completed
oldTask.IsCompleted; // true — but this TCS belonged to the old dispatcher
newTcs.Task.IsCompleted; // false — the new dispatcher never gets notified
\`\`\`
## Expected behavior
When `AddTaskCompletionSource` is called with a key that already exists, the
old entry should be replaced (or cancelled and replaced) so that the new TCS
receives the receipt.
## Actual behavior
`TryAdd` returns `false` and the new TCS is silently dropped. The old TCS
receives the receipt instead, but its continuation targets a disposed response
queue, so the receipt is effectively lost.
The producer enters an infinite reconnect loop: reconnect → resend → TCS
dropped → receipt lost → ordering exception → reconnect.
## Regression?
Unknown — the `TryAdd` pattern has been present since at least v3.x. The
issue becomes visible under connection instability (e.g. auth token rotation on
Kubernetes with OIDC/WorkloadIdentity).
## Known Workarounds
None that fully resolve the issue. Setting `FaultAction.Retry` for
`ProducerSendReceiptOrderingException` in the exception handler only adds a
delay before the same cycle repeats.
## Configuration
- DotPulsar 5.1.2
- .NET 10, Linux (Kubernetes)
- Apache Pulsar broker with OAuth2/OIDC authentication
- Architecture: x64
## Other information
The fix is straightforward: replace \`_ = _items.TryAdd(item, tcs)\` with
indexer assignment or \`AddOrUpdate\`. The old TCS should also be cancelled to
avoid orphaned continuations.
Relevant code: \`src/DotPulsar/Internal/Awaiter.cs\`, lines 29 and 35.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]