This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 2d04992 Fix broken tests
2d04992 is described below
commit 2d04992c537f7573920fa5a49278b5999e8b7fc3
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Wed Aug 23 23:28:18 2023 +0200
Fix broken tests
---
src/DotPulsar/Internal/Consumer.cs | 3 +++
src/DotPulsar/Internal/Producer.cs | 4 +---
src/DotPulsar/Internal/Reader.cs | 3 +++
3 files changed, 7 insertions(+), 3 deletions(-)
diff --git a/src/DotPulsar/Internal/Consumer.cs
b/src/DotPulsar/Internal/Consumer.cs
index 3a7c405..e0736ff 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -174,6 +174,9 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
_state.SetState(ConsumerState.Closed);
+ if (_subConsumers is null)
+ return;
+
foreach (var subConsumer in _subConsumers)
{
await subConsumer.DisposeAsync().ConfigureAwait(false);
diff --git a/src/DotPulsar/Internal/Producer.cs
b/src/DotPulsar/Internal/Producer.cs
index 0c7a0f2..3d6b1f5 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -117,12 +117,10 @@ public sealed class Producer<TMessage> :
IProducer<TMessage>, IRegisterEvent
var monitoringTasks = new Task<ProducerState>[numberOfSubProducers];
var states = new ProducerState[numberOfSubProducers];
- var topic = Topic;
-
for (var i = 0; i < numberOfSubProducers; ++i)
{
var topicName = isPartitionedTopic ? GetPartitionedTopicName(i) :
Topic;
- var producer = CreateSubProducer(topic, isPartitionedTopic ? i :
-1);
+ var producer = CreateSubProducer(topicName, isPartitionedTopic ? i
: -1);
_ = _producers.TryAdd(i, producer);
monitoringTasks[i] =
producer.OnStateChangeFrom(ProducerState.Disconnected, _cts.Token).AsTask();
}
diff --git a/src/DotPulsar/Internal/Reader.cs b/src/DotPulsar/Internal/Reader.cs
index ca0aefc..9bba145 100644
--- a/src/DotPulsar/Internal/Reader.cs
+++ b/src/DotPulsar/Internal/Reader.cs
@@ -276,6 +276,9 @@ public sealed class Reader<TMessage> : IReader<TMessage>
_state.SetState(ReaderState.Closed);
+ if (_subReaders is null)
+ return;
+
foreach (var subConsumer in _subReaders)
{
await subConsumer.DisposeAsync().ConfigureAwait(false);