This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d8bef35 Handle disconnect -> close properly in ChoosePartions (#225)
d8bef35 is described below
commit d8bef35128f1407b8e965ef6ca79490da75261ac
Author: Kristian Andersen <[email protected]>
AuthorDate: Wed Jun 12 14:45:35 2024 +0200
Handle disconnect -> close properly in ChoosePartions (#225)
---
CHANGELOG.md | 7 +++++++
src/DotPulsar/Internal/Producer.cs | 5 ++++-
2 files changed, 11 insertions(+), 1 deletion(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 0c0a1d8..957ba44 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,6 +4,13 @@ All notable changes to this project will be documented in this
file.
The format is based on [Keep a
Changelog](https://keepachangelog.com/en/1.1.0/) and this project adheres to
[Semantic Versioning](https://semver.org/spec/v2.0.0.html).
+## [Unreleased]
+
+### Fixed
+
+- Fixed race condition in `Producer` between `Send(...)` and `DisposeAsync()`
dispose causing an unintended
+ `DivideByZeroException`. It now correctly throws a `ProducerClosedException`
+
## [3.3.0] - 2024-06-10
### Added
diff --git a/src/DotPulsar/Internal/Producer.cs
b/src/DotPulsar/Internal/Producer.cs
index 45db4b5..e9e2ad9 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -170,6 +170,7 @@ public sealed class Producer<TMessage> :
IProducer<TMessage>, IRegisterEvent
process.Start();
return producer;
}
+
public bool IsFinalState()
=> _state.IsFinalState();
@@ -202,9 +203,11 @@ public sealed class Producer<TMessage> :
IProducer<TMessage>, IRegisterEvent
{
if (_producerCount == 0)
{
- _ = await _state.StateChangedFrom(ProducerState.Disconnected,
cancellationToken).ConfigureAwait(false);
+ var newState = await
_state.StateChangedFrom(ProducerState.Disconnected,
cancellationToken).ConfigureAwait(false);
if (_faultException is not null)
throw new ProducerFaultedException(_faultException);
+ if (newState == ProducerState.Closed)
+ throw new ProducerClosedException();
}
if (_producerCount == 1)