This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a8666f1  Consumer and Reader state system improved to handle 
partitioned topics (#170)
a8666f1 is described below

commit a8666f13d3f8f8c14a9bddf958ed7c15978e6129
Author: entvex <[email protected]>
AuthorDate: Wed Aug 23 14:09:08 2023 +0200

    Consumer and Reader state system improved to handle partitioned topics 
(#170)
    
    During the work on the state system the following bugs were found and fixed:
    It seems that the Reader and Consumer failed to close their state properly 
during disposal.
    SubReader and subConsumer may have mistakenly triggered the state system 
for the Reader and Consumer, respectively.
    
    Co-authored-by: David Jensen <[email protected]>
---
 src/DotPulsar/ConsumerState.cs     |  5 ++++
 src/DotPulsar/Internal/Consumer.cs | 55 ++++++++++++++++++++++++++++++++++----
 src/DotPulsar/Internal/Reader.cs   | 50 ++++++++++++++++++++++++++++++----
 src/DotPulsar/ReaderState.cs       |  5 ++++
 4 files changed, 105 insertions(+), 10 deletions(-)

diff --git a/src/DotPulsar/ConsumerState.cs b/src/DotPulsar/ConsumerState.cs
index d251e22..77add56 100644
--- a/src/DotPulsar/ConsumerState.cs
+++ b/src/DotPulsar/ConsumerState.cs
@@ -39,6 +39,11 @@ public enum ConsumerState : byte
     /// </summary>
     Faulted,
 
+    /// <summary>
+    /// Some of the sub-consumers are disconnected.
+    /// </summary>
+    PartiallyConnected,
+
     /// <summary>
     /// The consumer is connected and inactive. The subscription type is 
'Failover' and this consumer is not the active consumer.
     /// </summary>
diff --git a/src/DotPulsar/Internal/Consumer.cs 
b/src/DotPulsar/Internal/Consumer.cs
index b1aaf59..71f6cee 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -20,6 +20,7 @@ using DotPulsar.Internal.Compression;
 using DotPulsar.Internal.PulsarApi;
 using System;
 using System.Collections.Generic;
+using System.Linq;
 using System.Threading;
 using System.Threading.Tasks;
 
@@ -100,6 +101,8 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
         _numberOfPartitions = Convert.ToInt32(await 
_connectionPool.GetNumberOfPartitions(Topic, _cts.Token).ConfigureAwait(false));
         _isPartitioned = _numberOfPartitions != 0;
         _receiveTaskQueueForSubConsumers = new 
Task<IMessage<TMessage>>[_numberOfPartitions];
+        var consumerStateTasks = new Task<ConsumerState>[_numberOfPartitions];
+        ConsumerState[] subConsumerStates;
 
         for (var i = 0; i < _receiveTaskQueueForSubConsumers.Length; i++)
         {
@@ -110,20 +113,62 @@ public sealed class Consumer<TMessage> : 
IConsumer<TMessage>
         {
             _subConsumerIndex = -1;
             _subConsumers = new SubConsumer<TMessage>[_numberOfPartitions];
+            subConsumerStates = new ConsumerState[_numberOfPartitions];
             for (var partition = 0; partition < _numberOfPartitions; 
partition++)
             {
                 var partitionedTopicName = GetPartitionedTopicName(partition);
                 _subConsumers[partition] = 
CreateSubConsumer(partitionedTopicName);
+                consumerStateTasks[partition] = 
_subConsumers[partition].OnStateChangeFrom(ConsumerState.Disconnected, 
_cts.Token).AsTask();
             }
         }
         else
         {
             _subConsumerIndex = 0;
+            consumerStateTasks = new Task<ConsumerState>[1];
+            subConsumerStates = new ConsumerState[1];
             _subConsumers = new SubConsumer<TMessage>[1];
             _subConsumers[0] = CreateSubConsumer(Topic);
+            consumerStateTasks[0] = 
_subConsumers[0].OnStateChangeFrom(ConsumerState.Disconnected, 
_cts.Token).AsTask();
         }
         _allSubConsumersAreReady = true;
         _semaphoreSlim.Release();
+
+        while (true)
+        {
+            await Task.WhenAny(consumerStateTasks).ConfigureAwait(false);
+
+            for (var i = 0; i < consumerStateTasks.Length; ++i)
+            {
+                var task = consumerStateTasks[i];
+                if (!task.IsCompleted)
+                    continue;
+
+                var state = task.Result;
+                subConsumerStates[i] = state;
+
+                consumerStateTasks[i] = 
_subConsumers[i].OnStateChangeFrom(state, _cts.Token).AsTask();
+            }
+
+            if (subConsumerStates.Any(x => x == ConsumerState.Faulted))
+                _state.SetState(ConsumerState.Faulted);
+            else if (subConsumerStates.All(x => x == ConsumerState.Active))
+                _state.SetState(ConsumerState.Active);
+            else if (subConsumerStates.All(x => x == 
ConsumerState.Disconnected))
+                _state.SetState(ConsumerState.Disconnected);
+            else if (subConsumerStates.All(x => x == ConsumerState.Inactive))
+                _state.SetState(ConsumerState.Inactive);
+            else if (subConsumerStates.All(x => x == 
ConsumerState.ReachedEndOfTopic))
+                _state.SetState(ConsumerState.ReachedEndOfTopic);
+            else if (subConsumerStates.Length > 1) //States for a partitioned 
topic
+            {
+                if (subConsumerStates.Any(x => x == ConsumerState.Active) && 
subConsumerStates.Any(x => x == ConsumerState.Disconnected) ||
+                    subConsumerStates.Any(x => x == 
ConsumerState.Disconnected) && subConsumerStates.Any(x => x == 
ConsumerState.Inactive) ||
+                    subConsumerStates.Any(x => x == ConsumerState.Inactive) && 
subConsumerStates.Any(x => x == ConsumerState.Active) && 
subConsumerStates.Any(x => x == ConsumerState.Disconnected))
+                    _state.SetState(ConsumerState.PartiallyConnected);
+                if (subConsumerStates.Any(x => x == ConsumerState.Active) && 
subConsumerStates.Any(x => x == ConsumerState.Inactive))
+                    _state.SetState(ConsumerState.Inactive);
+            }
+        }
     }
 
     public async ValueTask<ConsumerState> OnStateChangeTo(ConsumerState state, 
CancellationToken cancellationToken)
@@ -143,6 +188,11 @@ public sealed class Consumer<TMessage> : 
IConsumer<TMessage>
         if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
             return;
 
+        _cts.Cancel();
+        _cts.Dispose();
+
+        _state.SetState(ConsumerState.Closed);
+
         foreach (var subConsumer in _subConsumers)
         {
             await subConsumer.DisposeAsync().ConfigureAwait(false);
@@ -386,13 +436,8 @@ public sealed class Consumer<TMessage> : 
IConsumer<TMessage>
         var stateManager = CreateStateManager();
         var initialChannel = new NotReadyChannel<TMessage>();
         var executor = new Executor(correlationId, _processManager, 
_exceptionHandler);
-
         var subConsumer = new SubConsumer<TMessage>(correlationId, ServiceUrl, 
_consumerOptions.SubscriptionName, topic,
             _processManager, initialChannel, executor, stateManager, 
consumerChannelFactory);
-
-        if (_consumerOptions.StateChangedHandler is not null)
-            _ = StateMonitor.MonitorConsumer(subConsumer, 
_consumerOptions.StateChangedHandler);
-
         var process = new ConsumerProcess(correlationId, stateManager, 
subConsumer, _consumerOptions.SubscriptionType == SubscriptionType.Failover);
         _processManager.Add(process);
         process.Start();
diff --git a/src/DotPulsar/Internal/Reader.cs b/src/DotPulsar/Internal/Reader.cs
index 30b6244..8a30ee0 100644
--- a/src/DotPulsar/Internal/Reader.cs
+++ b/src/DotPulsar/Internal/Reader.cs
@@ -20,6 +20,7 @@ using DotPulsar.Internal.Compression;
 using DotPulsar.Internal.PulsarApi;
 using System;
 using System.Collections.Generic;
+using System.Linq;
 using System.Threading;
 using System.Threading.Tasks;
 
@@ -92,10 +93,11 @@ public sealed class Reader<TMessage> : IReader<TMessage>
     {
         await _semaphoreSlim.WaitAsync().ConfigureAwait(false);
         _numberOfPartitions = Convert.ToInt32(await 
_connectionPool.GetNumberOfPartitions(Topic, _cts.Token).ConfigureAwait(false));
-
         _isPartitioned = _numberOfPartitions != 0;
-
         _receiveTaskQueueForSubReaders = new 
Task<IMessage<TMessage>>[_numberOfPartitions];
+        var readerStateTasks = new Task<ReaderState>[_numberOfPartitions];
+        ReaderState[] subReaderStates;
+
         for (var i = 0; i < _receiveTaskQueueForSubReaders.Length; i++)
         {
             _receiveTaskQueueForSubReaders[i] = 
_emptyTaskCompletionSource.Task;
@@ -105,21 +107,56 @@ public sealed class Reader<TMessage> : IReader<TMessage>
         {
             _subReaderIndex = -1;
             _subReaders = new SubReader<TMessage>[_numberOfPartitions];
-
+            subReaderStates = new ReaderState[_numberOfPartitions];
             for (var partition = 0; partition < _numberOfPartitions; 
partition++)
             {
                 var partitionedTopicName = GetPartitionedTopicName(partition);
                 _subReaders[partition] = CreateSubReader(partitionedTopicName);
+                readerStateTasks[partition] = 
_subReaders[partition].OnStateChangeFrom(ReaderState.Disconnected, 
_cts.Token).AsTask();
             }
         }
         else
         {
             _subReaderIndex = 0;
+            readerStateTasks = new Task<ReaderState>[1];
+            subReaderStates = new ReaderState[1];
             _subReaders = new SubReader<TMessage>[1];
             _subReaders[0] = CreateSubReader(Topic);
+            readerStateTasks[0] = 
_subReaders[0].OnStateChangeFrom(ReaderState.Disconnected, _cts.Token).AsTask();
         }
         _allSubReadersAreReady = true;
         _semaphoreSlim.Release();
+
+        while (true)
+        {
+            await Task.WhenAny(readerStateTasks).ConfigureAwait(false);
+
+            for (var i = 0; i < readerStateTasks.Length; ++i)
+            {
+                var task = readerStateTasks[i];
+                if (!task.IsCompleted)
+                    continue;
+
+                var state = task.Result;
+                subReaderStates[i] = state;
+
+                readerStateTasks[i] = _subReaders[i].OnStateChangeFrom(state, 
_cts.Token).AsTask();
+            }
+
+            if (subReaderStates.Any(x => x == ReaderState.Faulted))
+                _state.SetState(ReaderState.Faulted);
+            else if (subReaderStates.All(x => x == ReaderState.Connected))
+                _state.SetState(ReaderState.Connected);
+            else if (subReaderStates.All(x => x == ReaderState.Disconnected))
+                _state.SetState(ReaderState.Disconnected);
+            else if (subReaderStates.All(x => x == 
ReaderState.ReachedEndOfTopic))
+                _state.SetState(ReaderState.ReachedEndOfTopic);
+            else if (subReaderStates.Length > 1) //States for a partitioned 
topic
+            {
+                if (subReaderStates.Any(x => x == ReaderState.Connected) && 
subReaderStates.Any(x => x == ReaderState.Disconnected))
+                    _state.SetState(ReaderState.PartiallyConnected);
+            }
+        }
     }
 
     public async ValueTask<ReaderState> OnStateChangeTo(ReaderState state, 
CancellationToken cancellationToken)
@@ -250,6 +287,11 @@ public sealed class Reader<TMessage> : IReader<TMessage>
         if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
             return;
 
+        _cts.Cancel();
+        _cts.Dispose();
+
+        _state.SetState(ReaderState.Closed);
+
         foreach (var subConsumer in _subReaders)
         {
             await subConsumer.DisposeAsync().ConfigureAwait(false);
@@ -281,8 +323,6 @@ public sealed class Reader<TMessage> : IReader<TMessage>
         var initialChannel = new NotReadyChannel<TMessage>();
         var executor = new Executor(correlationId, _processManager, 
_exceptionHandler);
         var subReader = new SubReader<TMessage>(correlationId, ServiceUrl, 
topic, _processManager, initialChannel, executor, stateManager, factory);
-        if (_readerOptions.StateChangedHandler is not null)
-            _ = StateMonitor.MonitorReader(subReader, 
_readerOptions.StateChangedHandler);
         var process = new ReaderProcess(correlationId, stateManager, 
subReader);
         _processManager.Add(process);
         process.Start();
diff --git a/src/DotPulsar/ReaderState.cs b/src/DotPulsar/ReaderState.cs
index d30afbd..fdbcd4b 100644
--- a/src/DotPulsar/ReaderState.cs
+++ b/src/DotPulsar/ReaderState.cs
@@ -39,6 +39,11 @@ public enum ReaderState : byte
     /// </summary>
     Faulted,
 
+    /// <summary>
+    /// Some of the sub-readers are disconnected.
+    /// </summary>
+    PartiallyConnected,
+
     /// <summary>
     /// The reader has reached the end of the topic. This is a final state.
     /// </summary>

Reply via email to