Thorbenl opened a new issue, #280:
URL: https://github.com/apache/pulsar-dotpulsar/issues/280

   ## Description
   
   `Awaiter.AddTaskCompletionSource` (and `CreateTask`) use 
`ConcurrentDictionary.TryAdd` and silently discard the return value. If an 
entry for the same key `(ProducerId, SequenceId)` already exists — which 
happens when messages are resent after a producer reconnect — the new TCS 
registration is silently dropped.
   
   This causes `ProducerSendReceiptOrderingException` during reconnect because:
   
   1. Producer sends messages A(seq=1), B(seq=2), C(seq=3) — TCS entries 
registered in `Awaiter`
   2. Connection disruption occurs (e.g. auth token expiry triggering 
`AuthChallenge` failure)
   3. Receipt for A arrives and is processed, but entries for B and C remain in 
`Awaiter` (connection didn't close cleanly, so `CloseProducer` never cancelled 
them)
   4. `EstablishNewChannel` reconnects and `MessageDispatcher` replays the send 
queue
   5. Resending B(seq=2) calls `AddTaskCompletionSource` with the same 
`(ProducerId, SequenceId)` key — `TryAdd` returns `false`, new TCS is silently 
dropped
   6. When broker returns the receipt, `SetResult` completes the **old** TCS 
(whose `ContinueWith` points to the now-disposed old `responseQueue`), so the 
enqueue silently fails
   7. The new `ResponseProcessor` never receives the receipt — eventually gets 
a receipt for a later sequence, mismatches the queue head, throws 
`ProducerSendReceiptOrderingException`
   
   ## Reproduction Steps
   
   The bug can be demonstrated with a unit test (no broker needed):
   
   \`\`\`csharp
   var sut = new Awaiter<string, string>();
   
   // Simulate first send — registers TCS for key "A"
   var oldTask = sut.CreateTask("A");
   
   // Simulate reconnect — tries to register NEW TCS for same key "A"
   var newTcs = new TaskCompletionSource<string>();
   sut.AddTaskCompletionSource("A", newTcs);
   
   // Simulate broker returning receipt — completes the OLD task, not the new 
one
   sut.SetResult("A", "receipt");
   
   // BUG: old task completes, new TCS is never completed
   oldTask.IsCompleted; // true — but this TCS belonged to the old dispatcher
   newTcs.Task.IsCompleted; // false — the new dispatcher never gets notified
   \`\`\`
   
   ## Expected behavior
   
   When `AddTaskCompletionSource` is called with a key that already exists, the 
old entry should be replaced (or cancelled and replaced) so that the new TCS 
receives the receipt.
   
   ## Actual behavior
   
   `TryAdd` returns `false` and the new TCS is silently dropped. The old TCS 
receives the receipt instead, but its continuation targets a disposed response 
queue, so the receipt is effectively lost.
   
   The producer enters an infinite reconnect loop: reconnect → resend → TCS 
dropped → receipt lost → ordering exception → reconnect.
   
   ## Regression?
   
   Unknown — the `TryAdd` pattern has been present since at least v3.x. The 
issue becomes visible under connection instability (e.g. auth token rotation on 
Kubernetes with OIDC/WorkloadIdentity).
   
   ## Known Workarounds
   
   None that fully resolve the issue. Setting `FaultAction.Retry` for 
`ProducerSendReceiptOrderingException` in the exception handler only adds a 
delay before the same cycle repeats.
   
   ## Configuration
   
   - DotPulsar 5.1.2
   - .NET 10, Linux (Kubernetes)
   - Apache Pulsar broker with OAuth2/OIDC authentication
   - Architecture: x64
   
   ## Other information
   
   The fix is straightforward: replace \`_ = _items.TryAdd(item, tcs)\` with 
indexer assignment or \`AddOrUpdate\`. The old TCS should also be cancelled to 
avoid orphaned continuations.
   
   Relevant code: \`src/DotPulsar/Internal/Awaiter.cs\`, lines 29 and 35.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to