This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 5c76aef Aligning state management with the producer
5c76aef is described below
commit 5c76aef5b633778eafc08d865ec4e868850eb40c
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Wed Aug 23 15:56:02 2023 +0200
Aligning state management with the producer
---
src/DotPulsar/Internal/Producer.cs | 62 ++++++++++++++------------------------
1 file changed, 23 insertions(+), 39 deletions(-)
diff --git a/src/DotPulsar/Internal/Producer.cs
b/src/DotPulsar/Internal/Producer.cs
index 7fcda17..0c7a0f2 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -113,25 +113,22 @@ public sealed class Producer<TMessage> :
IProducer<TMessage>, IRegisterEvent
{
var numberOfPartitions = await
_connectionPool.GetNumberOfPartitions(Topic, _cts.Token).ConfigureAwait(false);
var isPartitionedTopic = numberOfPartitions != 0;
- var monitoringTasks = new Task<ProducerState>[isPartitionedTopic ?
numberOfPartitions : 1];
+ var numberOfSubProducers = isPartitionedTopic ? numberOfPartitions : 1;
+ var monitoringTasks = new Task<ProducerState>[numberOfSubProducers];
+ var states = new ProducerState[numberOfSubProducers];
var topic = Topic;
- for (var partition = 0; partition < monitoringTasks.Length;
++partition)
+ for (var i = 0; i < numberOfSubProducers; ++i)
{
- if (isPartitionedTopic)
- topic = $"{Topic}-partition-{partition}";
-
- var producer = CreateSubProducer(topic, isPartitionedTopic ?
partition : -1);
- _ = _producers.TryAdd(partition, producer);
- monitoringTasks[partition] =
producer.OnStateChangeFrom(ProducerState.Disconnected, _cts.Token).AsTask();
+ var topicName = isPartitionedTopic ? GetPartitionedTopicName(i) :
Topic;
+ var producer = CreateSubProducer(topic, isPartitionedTopic ? i :
-1);
+ _ = _producers.TryAdd(i, producer);
+ monitoringTasks[i] =
producer.OnStateChangeFrom(ProducerState.Disconnected, _cts.Token).AsTask();
}
Interlocked.Exchange(ref _producerCount, monitoringTasks.Length);
- var connectedProducers = 0;
- var waitingForExclusive = new bool[isPartitionedTopic ?
numberOfPartitions : 1];
-
while (true)
{
await Task.WhenAny(monitoringTasks).ConfigureAwait(false);
@@ -143,39 +140,24 @@ public sealed class Producer<TMessage> :
IProducer<TMessage>, IRegisterEvent
continue;
var state = task.Result;
- switch (state)
- {
- case ProducerState.Connected:
- if (waitingForExclusive[i])
- waitingForExclusive[i] = false;
- else
- ++connectedProducers;
- break;
- case ProducerState.Disconnected:
- --connectedProducers;
- waitingForExclusive[i] = false;
- break;
- case ProducerState.WaitingForExclusive:
- ++connectedProducers;
- waitingForExclusive[i] = true;
- break;
- case ProducerState.Fenced:
- case ProducerState.Faulted:
- _state.SetState(state);
- return;
- }
-
+ states[i] = state;
monitoringTasks[i] = _producers[i].OnStateChangeFrom(state,
_cts.Token).AsTask();
}
- if (connectedProducers == 0)
- _state.SetState(ProducerState.Disconnected);
- else if (connectedProducers == monitoringTasks.Length &&
waitingForExclusive.All(x => x != true))
+ if (!isPartitionedTopic)
+ _state.SetState(states[0]);
+ else if (states.Any(x => x == ProducerState.Faulted))
+ _state.SetState(ProducerState.Faulted);
+ else if (states.Any(x => x == ProducerState.Fenced))
+ _state.SetState(ProducerState.Fenced);
+ else if (states.All(x => x == ProducerState.Connected))
_state.SetState(ProducerState.Connected);
- else if (waitingForExclusive.Any(x => x))
- _state.SetState(ProducerState.WaitingForExclusive);
- else
+ else if (states.All(x => x == ProducerState.Disconnected))
+ _state.SetState(ProducerState.Disconnected);
+ else if (states.Any(x => x == ProducerState.Disconnected))
_state.SetState(ProducerState.PartiallyConnected);
+ else
+ _state.SetState(ProducerState.WaitingForExclusive);
}
}
@@ -373,5 +355,7 @@ public sealed class Producer<TMessage> :
IProducer<TMessage>, IRegisterEvent
private static StateManager<ProducerState> CreateStateManager()
=> new(ProducerState.Disconnected, ProducerState.Closed,
ProducerState.Faulted, ProducerState.Fenced);
+ private string GetPartitionedTopicName(int partitionNumber) =>
$"{Topic}-partition-{partitionNumber}";
+
public void Register(IEvent @event) { }
}