This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d8bef35  Handle disconnect -> close properly in ChoosePartions (#225)
d8bef35 is described below

commit d8bef35128f1407b8e965ef6ca79490da75261ac
Author: Kristian Andersen <[email protected]>
AuthorDate: Wed Jun 12 14:45:35 2024 +0200

    Handle disconnect -> close properly in ChoosePartions (#225)
---
 CHANGELOG.md                       | 7 +++++++
 src/DotPulsar/Internal/Producer.cs | 5 ++++-
 2 files changed, 11 insertions(+), 1 deletion(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 0c0a1d8..957ba44 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,6 +4,13 @@ All notable changes to this project will be documented in this 
file.
 
 The format is based on [Keep a 
Changelog](https://keepachangelog.com/en/1.1.0/) and this project adheres to 
[Semantic Versioning](https://semver.org/spec/v2.0.0.html).
 
+## [Unreleased]
+
+### Fixed
+
+- Fixed race condition in `Producer` between `Send(...)` and `DisposeAsync()` 
dispose causing an unintended
+  `DivideByZeroException`. It now correctly throws a `ProducerClosedException`
+
 ## [3.3.0] - 2024-06-10
 
 ### Added
diff --git a/src/DotPulsar/Internal/Producer.cs 
b/src/DotPulsar/Internal/Producer.cs
index 45db4b5..e9e2ad9 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -170,6 +170,7 @@ public sealed class Producer<TMessage> : 
IProducer<TMessage>, IRegisterEvent
         process.Start();
         return producer;
     }
+
     public bool IsFinalState()
         => _state.IsFinalState();
 
@@ -202,9 +203,11 @@ public sealed class Producer<TMessage> : 
IProducer<TMessage>, IRegisterEvent
     {
         if (_producerCount == 0)
         {
-            _ = await _state.StateChangedFrom(ProducerState.Disconnected, 
cancellationToken).ConfigureAwait(false);
+            var newState = await 
_state.StateChangedFrom(ProducerState.Disconnected, 
cancellationToken).ConfigureAwait(false);
             if (_faultException is not null)
                 throw new ProducerFaultedException(_faultException);
+            if (newState == ProducerState.Closed)
+                throw new ProducerClosedException();
         }
 
         if (_producerCount == 1)

Reply via email to