This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c76aef  Aligning state management with the producer
5c76aef is described below

commit 5c76aef5b633778eafc08d865ec4e868850eb40c
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Wed Aug 23 15:56:02 2023 +0200

    Aligning state management with the producer
---
 src/DotPulsar/Internal/Producer.cs | 62 ++++++++++++++------------------------
 1 file changed, 23 insertions(+), 39 deletions(-)

diff --git a/src/DotPulsar/Internal/Producer.cs 
b/src/DotPulsar/Internal/Producer.cs
index 7fcda17..0c7a0f2 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -113,25 +113,22 @@ public sealed class Producer<TMessage> : 
IProducer<TMessage>, IRegisterEvent
     {
         var numberOfPartitions = await 
_connectionPool.GetNumberOfPartitions(Topic, _cts.Token).ConfigureAwait(false);
         var isPartitionedTopic = numberOfPartitions != 0;
-        var monitoringTasks = new Task<ProducerState>[isPartitionedTopic ? 
numberOfPartitions : 1];
+        var numberOfSubProducers = isPartitionedTopic ? numberOfPartitions : 1;
+        var monitoringTasks = new Task<ProducerState>[numberOfSubProducers];
+        var states = new ProducerState[numberOfSubProducers];
 
         var topic = Topic;
 
-        for (var partition = 0; partition < monitoringTasks.Length; 
++partition)
+        for (var i = 0; i < numberOfSubProducers; ++i)
         {
-            if (isPartitionedTopic)
-                topic = $"{Topic}-partition-{partition}";
-
-            var producer = CreateSubProducer(topic, isPartitionedTopic ? 
partition : -1);
-            _ = _producers.TryAdd(partition, producer);
-            monitoringTasks[partition] = 
producer.OnStateChangeFrom(ProducerState.Disconnected, _cts.Token).AsTask();
+            var topicName = isPartitionedTopic ? GetPartitionedTopicName(i) : 
Topic;
+            var producer = CreateSubProducer(topic, isPartitionedTopic ? i : 
-1);
+            _ = _producers.TryAdd(i, producer);
+            monitoringTasks[i] = 
producer.OnStateChangeFrom(ProducerState.Disconnected, _cts.Token).AsTask();
         }
 
         Interlocked.Exchange(ref _producerCount, monitoringTasks.Length);
 
-        var connectedProducers = 0;
-        var waitingForExclusive = new bool[isPartitionedTopic ? 
numberOfPartitions : 1];
-
         while (true)
         {
             await Task.WhenAny(monitoringTasks).ConfigureAwait(false);
@@ -143,39 +140,24 @@ public sealed class Producer<TMessage> : 
IProducer<TMessage>, IRegisterEvent
                     continue;
 
                 var state = task.Result;
-                switch (state)
-                {
-                    case ProducerState.Connected:
-                        if (waitingForExclusive[i])
-                            waitingForExclusive[i] = false;
-                        else
-                            ++connectedProducers;
-                        break;
-                    case ProducerState.Disconnected:
-                        --connectedProducers;
-                        waitingForExclusive[i] = false;
-                        break;
-                    case ProducerState.WaitingForExclusive:
-                        ++connectedProducers;
-                        waitingForExclusive[i] = true;
-                        break;
-                    case ProducerState.Fenced:
-                    case ProducerState.Faulted:
-                        _state.SetState(state);
-                        return;
-                }
-
+                states[i] = state;
                 monitoringTasks[i] = _producers[i].OnStateChangeFrom(state, 
_cts.Token).AsTask();
             }
 
-            if (connectedProducers == 0)
-                _state.SetState(ProducerState.Disconnected);
-            else if (connectedProducers == monitoringTasks.Length && 
waitingForExclusive.All(x => x != true))
+            if (!isPartitionedTopic)
+                _state.SetState(states[0]);
+            else if (states.Any(x => x == ProducerState.Faulted))
+                _state.SetState(ProducerState.Faulted);
+            else if (states.Any(x => x == ProducerState.Fenced))
+                _state.SetState(ProducerState.Fenced);
+            else if (states.All(x => x == ProducerState.Connected))
                 _state.SetState(ProducerState.Connected);
-            else if (waitingForExclusive.Any(x => x))
-                _state.SetState(ProducerState.WaitingForExclusive);
-            else
+            else if (states.All(x => x == ProducerState.Disconnected))
+                _state.SetState(ProducerState.Disconnected);
+            else if (states.Any(x => x == ProducerState.Disconnected))
                 _state.SetState(ProducerState.PartiallyConnected);
+            else
+                _state.SetState(ProducerState.WaitingForExclusive);
         }
     }
 
@@ -373,5 +355,7 @@ public sealed class Producer<TMessage> : 
IProducer<TMessage>, IRegisterEvent
     private static StateManager<ProducerState> CreateStateManager()
         => new(ProducerState.Disconnected, ProducerState.Closed, 
ProducerState.Faulted, ProducerState.Fenced);
 
+    private string GetPartitionedTopicName(int partitionNumber) => 
$"{Topic}-partition-{partitionNumber}";
+
     public void Register(IEvent @event) { }
 }

Reply via email to