This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new db32298  State management clean up
db32298 is described below

commit db32298a5598490a29ba5f4e68ef0ff6c0694f89
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Wed Aug 23 15:13:56 2023 +0200

    State management clean up
---
 src/DotPulsar/Internal/Consumer.cs | 135 +++++++++++++++++--------------------
 src/DotPulsar/Internal/Reader.cs   |  98 +++++++++++----------------
 2 files changed, 101 insertions(+), 132 deletions(-)

diff --git a/src/DotPulsar/Internal/Consumer.cs 
b/src/DotPulsar/Internal/Consumer.cs
index 71f6cee..3a7c405 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -38,9 +38,9 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
     private SubConsumer<TMessage>[] _subConsumers;
     private bool _allSubConsumersAreReady;
     private int _isDisposed;
-    private bool _isPartitioned;
+    private bool _isPartitionedTopic;
     private int _numberOfPartitions;
-    private Task<IMessage<TMessage>>[] _receiveTaskQueueForSubConsumers;
+    private Task<IMessage<TMessage>>[] _receiveTasks;
     private int _subConsumerIndex;
     private Exception? _faultException;
 
@@ -59,7 +59,7 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
         ServiceUrl = serviceUrl;
         SubscriptionName = consumerOptions.SubscriptionName;
         Topic = consumerOptions.Topic;
-        _receiveTaskQueueForSubConsumers = 
Array.Empty<Task<IMessage<TMessage>>>();
+        _receiveTasks = Array.Empty<Task<IMessage<TMessage>>>();
         _cts = new CancellationTokenSource();
         _exceptionHandler = exceptionHandler;
         _semaphoreSlim = new SemaphoreSlim(1);
@@ -68,7 +68,7 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
         _consumerOptions = consumerOptions;
         _connectionPool = connectionPool;
         _exceptionHandler = exceptionHandler;
-        _isPartitioned = false;
+        _isPartitionedTopic = false;
         _allSubConsumersAreReady = false;
         _isDisposed = 0;
         _subConsumers = null!;
@@ -96,78 +96,59 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
 
     private async Task Monitor()
     {
-        await _semaphoreSlim.WaitAsync().ConfigureAwait(false);
+        await _semaphoreSlim.WaitAsync(_cts.Token).ConfigureAwait(false);
 
         _numberOfPartitions = Convert.ToInt32(await 
_connectionPool.GetNumberOfPartitions(Topic, _cts.Token).ConfigureAwait(false));
-        _isPartitioned = _numberOfPartitions != 0;
-        _receiveTaskQueueForSubConsumers = new 
Task<IMessage<TMessage>>[_numberOfPartitions];
-        var consumerStateTasks = new Task<ConsumerState>[_numberOfPartitions];
-        ConsumerState[] subConsumerStates;
-
-        for (var i = 0; i < _receiveTaskQueueForSubConsumers.Length; i++)
+        _isPartitionedTopic = _numberOfPartitions != 0;
+        var numberOfSubConsumers = _isPartitionedTopic ? _numberOfPartitions : 
1;
+        _receiveTasks = new Task<IMessage<TMessage>>[numberOfSubConsumers];
+        _subConsumers = new SubConsumer<TMessage>[numberOfSubConsumers];
+        var monitoringTasks = new Task<ConsumerState>[numberOfSubConsumers];
+        var states = new ConsumerState[numberOfSubConsumers];
+        _subConsumerIndex = _isPartitionedTopic ? -1 : 0;
+
+        for (var i = 0; i < numberOfSubConsumers; i++)
         {
-            _receiveTaskQueueForSubConsumers[i] = 
_emptyTaskCompletionSource.Task;
+            _receiveTasks[i] = _emptyTaskCompletionSource.Task;
+            var topicName = _isPartitionedTopic ? GetPartitionedTopicName(i) : 
Topic;
+            _subConsumers[i] = CreateSubConsumer(topicName);
+            monitoringTasks[i] = 
_subConsumers[i].OnStateChangeFrom(ConsumerState.Disconnected, 
_cts.Token).AsTask();
         }
 
-        if (_isPartitioned)
-        {
-            _subConsumerIndex = -1;
-            _subConsumers = new SubConsumer<TMessage>[_numberOfPartitions];
-            subConsumerStates = new ConsumerState[_numberOfPartitions];
-            for (var partition = 0; partition < _numberOfPartitions; 
partition++)
-            {
-                var partitionedTopicName = GetPartitionedTopicName(partition);
-                _subConsumers[partition] = 
CreateSubConsumer(partitionedTopicName);
-                consumerStateTasks[partition] = 
_subConsumers[partition].OnStateChangeFrom(ConsumerState.Disconnected, 
_cts.Token).AsTask();
-            }
-        }
-        else
-        {
-            _subConsumerIndex = 0;
-            consumerStateTasks = new Task<ConsumerState>[1];
-            subConsumerStates = new ConsumerState[1];
-            _subConsumers = new SubConsumer<TMessage>[1];
-            _subConsumers[0] = CreateSubConsumer(Topic);
-            consumerStateTasks[0] = 
_subConsumers[0].OnStateChangeFrom(ConsumerState.Disconnected, 
_cts.Token).AsTask();
-        }
         _allSubConsumersAreReady = true;
         _semaphoreSlim.Release();
 
         while (true)
         {
-            await Task.WhenAny(consumerStateTasks).ConfigureAwait(false);
+            await Task.WhenAny(monitoringTasks).ConfigureAwait(false);
 
-            for (var i = 0; i < consumerStateTasks.Length; ++i)
+            for (var i = 0; i < numberOfSubConsumers; ++i)
             {
-                var task = consumerStateTasks[i];
+                var task = monitoringTasks[i];
                 if (!task.IsCompleted)
                     continue;
 
                 var state = task.Result;
-                subConsumerStates[i] = state;
-
-                consumerStateTasks[i] = 
_subConsumers[i].OnStateChangeFrom(state, _cts.Token).AsTask();
+                states[i] = state;
+                monitoringTasks[i] = _subConsumers[i].OnStateChangeFrom(state, 
_cts.Token).AsTask();
             }
 
-            if (subConsumerStates.Any(x => x == ConsumerState.Faulted))
+            if (!_isPartitionedTopic)
+                _state.SetState(states[0]);
+            else if (states.Any(x => x == ConsumerState.Faulted))
                 _state.SetState(ConsumerState.Faulted);
-            else if (subConsumerStates.All(x => x == ConsumerState.Active))
+            else if (states.All(x => x == ConsumerState.Active))
                 _state.SetState(ConsumerState.Active);
-            else if (subConsumerStates.All(x => x == 
ConsumerState.Disconnected))
-                _state.SetState(ConsumerState.Disconnected);
-            else if (subConsumerStates.All(x => x == ConsumerState.Inactive))
+            else if (states.All(x => x == ConsumerState.Inactive))
                 _state.SetState(ConsumerState.Inactive);
-            else if (subConsumerStates.All(x => x == 
ConsumerState.ReachedEndOfTopic))
+            else if (states.All(x => x == ConsumerState.ReachedEndOfTopic))
                 _state.SetState(ConsumerState.ReachedEndOfTopic);
-            else if (subConsumerStates.Length > 1) //States for a partitioned 
topic
-            {
-                if (subConsumerStates.Any(x => x == ConsumerState.Active) && 
subConsumerStates.Any(x => x == ConsumerState.Disconnected) ||
-                    subConsumerStates.Any(x => x == 
ConsumerState.Disconnected) && subConsumerStates.Any(x => x == 
ConsumerState.Inactive) ||
-                    subConsumerStates.Any(x => x == ConsumerState.Inactive) && 
subConsumerStates.Any(x => x == ConsumerState.Active) && 
subConsumerStates.Any(x => x == ConsumerState.Disconnected))
-                    _state.SetState(ConsumerState.PartiallyConnected);
-                if (subConsumerStates.Any(x => x == ConsumerState.Active) && 
subConsumerStates.Any(x => x == ConsumerState.Inactive))
-                    _state.SetState(ConsumerState.Inactive);
-            }
+            else if (states.All(x => x == ConsumerState.Disconnected))
+                _state.SetState(ConsumerState.Disconnected);
+            else if (states.Any(x => x == ConsumerState.Disconnected))
+                _state.SetState(ConsumerState.PartiallyConnected);
+            else
+                _state.SetState(ConsumerState.Inactive);
         }
     }
 
@@ -203,7 +184,7 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
     {
         await Guard(cancellationToken).ConfigureAwait(false);
 
-        if (!_isPartitioned)
+        if (!_isPartitionedTopic)
             return await 
_subConsumers[_subConsumerIndex].Receive(cancellationToken).ConfigureAwait(false);
 
         var iterations = 0;
@@ -214,24 +195,24 @@ public sealed class Consumer<TMessage> : 
IConsumer<TMessage>
             if (_subConsumerIndex == _subConsumers.Length)
                 _subConsumerIndex = 0;
 
-            var receiveTask = 
_receiveTaskQueueForSubConsumers[_subConsumerIndex];
+            var receiveTask = _receiveTasks[_subConsumerIndex];
             if (receiveTask == _emptyTaskCompletionSource.Task)
             {
                 var receiveTaskValueTask = 
_subConsumers[_subConsumerIndex].Receive(cancellationToken);
                 if (receiveTaskValueTask.IsCompleted)
                     return receiveTaskValueTask.Result;
-                _receiveTaskQueueForSubConsumers[_subConsumerIndex] = 
receiveTaskValueTask.AsTask();
+                _receiveTasks[_subConsumerIndex] = 
receiveTaskValueTask.AsTask();
             }
             else
             {
                 if (receiveTask.IsCompleted)
                 {
-                    _receiveTaskQueueForSubConsumers[_subConsumerIndex] = 
_emptyTaskCompletionSource.Task;
+                    _receiveTasks[_subConsumerIndex] = 
_emptyTaskCompletionSource.Task;
                     return receiveTask.Result;
                 }
             }
             if (iterations == _subConsumers.Length)
-                await 
Task.WhenAny(_receiveTaskQueueForSubConsumers).ConfigureAwait(false);
+                await Task.WhenAny(_receiveTasks).ConfigureAwait(false);
         }
     }
 
@@ -239,7 +220,7 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
     {
         await Guard(cancellationToken).ConfigureAwait(false);
 
-        if (!_isPartitioned)
+        if (!_isPartitionedTopic)
             await _subConsumers[_subConsumerIndex].Acknowledge(messageId, 
cancellationToken).ConfigureAwait(false);
         else
             await _subConsumers[messageId.Partition].Acknowledge(messageId, 
cancellationToken).ConfigureAwait(false);
@@ -249,7 +230,7 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
     {
         await Guard(cancellationToken).ConfigureAwait(false);
 
-        if (!_isPartitioned)
+        if (!_isPartitionedTopic)
             await 
_subConsumers[_subConsumerIndex].AcknowledgeCumulative(messageId, 
cancellationToken).ConfigureAwait(false);
         else
             await 
_subConsumers[messageId.Partition].AcknowledgeCumulative(messageId, 
cancellationToken).ConfigureAwait(false);
@@ -259,7 +240,7 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
     {
         await Guard(cancellationToken).ConfigureAwait(false);
 
-        if (!_isPartitioned)
+        if (!_isPartitionedTopic)
         {
             await 
_subConsumers[_subConsumerIndex].RedeliverUnacknowledgedMessages(messageIds, 
cancellationToken).ConfigureAwait(false);
             return;
@@ -297,7 +278,7 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
     {
         await Guard(cancellationToken).ConfigureAwait(false);
 
-        if (!_isPartitioned)
+        if (!_isPartitionedTopic)
         {
             await 
_subConsumers[_subConsumerIndex].RedeliverUnacknowledgedMessages(cancellationToken).ConfigureAwait(false);
             return;
@@ -313,26 +294,30 @@ public sealed class Consumer<TMessage> : 
IConsumer<TMessage>
     {
         await Guard(cancellationToken).ConfigureAwait(false);
 
-        if (!_isPartitioned)
+        if (!_isPartitionedTopic)
         {
             await 
_subConsumers[_subConsumerIndex].Unsubscribe(cancellationToken).ConfigureAwait(false);
-            return;
         }
-
-        var unsubscribeTasks = new List<Task>(_numberOfPartitions);
-        foreach (var subConsumer in _subConsumers)
+        else
         {
-            var getLastMessageIdTask = 
subConsumer.Unsubscribe(cancellationToken);
-            unsubscribeTasks.Add(getLastMessageIdTask.AsTask());
+            var unsubscribeTasks = new List<Task>(_numberOfPartitions);
+            foreach (var subConsumer in _subConsumers)
+            {
+                var getLastMessageIdTask = 
subConsumer.Unsubscribe(cancellationToken);
+                unsubscribeTasks.Add(getLastMessageIdTask.AsTask());
+            }
+
+            await Task.WhenAll(unsubscribeTasks).ConfigureAwait(false);
         }
-        await Task.WhenAll(unsubscribeTasks).ConfigureAwait(false);
+
+        _state.SetState(ConsumerState.Unsubscribed);
     }
 
     public async ValueTask Seek(MessageId messageId, CancellationToken 
cancellationToken)
     {
         await Guard(cancellationToken).ConfigureAwait(false);
 
-        if (!_isPartitioned)
+        if (!_isPartitionedTopic)
         {
             await _subConsumers[_subConsumerIndex].Seek(messageId, 
cancellationToken).ConfigureAwait(false);
             return;
@@ -351,7 +336,7 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
     {
         await Guard(cancellationToken).ConfigureAwait(false);
 
-        if (!_isPartitioned)
+        if (!_isPartitionedTopic)
         {
             await _subConsumers[_subConsumerIndex].Seek(publishTime, 
cancellationToken).ConfigureAwait(false);
             return;
@@ -371,7 +356,7 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
     {
         await Guard(cancellationToken).ConfigureAwait(false);
 
-        if (!_isPartitioned)
+        if (!_isPartitionedTopic)
             return await 
_subConsumers[_subConsumerIndex].GetLastMessageId(cancellationToken).ConfigureAwait(false);
 
         throw new NotSupportedException("GetLastMessageId can't be used on 
partitioned topics. Please use GetLastMessageIds");
@@ -381,7 +366,7 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
     {
         await Guard(cancellationToken).ConfigureAwait(false);
 
-        if (!_isPartitioned)
+        if (!_isPartitionedTopic)
             return new[] { await 
_subConsumers[_subConsumerIndex].GetLastMessageId(cancellationToken).ConfigureAwait(false)
 };
 
         var getLastMessageIdsTasks = new 
List<Task<MessageId>>(_numberOfPartitions);
diff --git a/src/DotPulsar/Internal/Reader.cs b/src/DotPulsar/Internal/Reader.cs
index 8a30ee0..ca0aefc 100644
--- a/src/DotPulsar/Internal/Reader.cs
+++ b/src/DotPulsar/Internal/Reader.cs
@@ -37,9 +37,9 @@ public sealed class Reader<TMessage> : IReader<TMessage>
     private readonly SemaphoreSlim _semaphoreSlim;
     private SubReader<TMessage>[] _subReaders;
     private bool _allSubReadersAreReady;
-    private Task<IMessage<TMessage>>[] _receiveTaskQueueForSubReaders;
+    private Task<IMessage<TMessage>>[] _receiveTasks;
     private int _subReaderIndex;
-    private bool _isPartitioned;
+    private bool _isPartitionedTopic;
     private int _numberOfPartitions;
     private int _isDisposed;
     private Exception? _faultException;
@@ -62,7 +62,7 @@ public sealed class Reader<TMessage> : IReader<TMessage>
         _exceptionHandler = exceptionHandler;
         _semaphoreSlim = new SemaphoreSlim(1);
         _state = CreateStateManager();
-        _receiveTaskQueueForSubReaders = 
Array.Empty<Task<IMessage<TMessage>>>();
+        _receiveTasks = Array.Empty<Task<IMessage<TMessage>>>();
         _cts = new CancellationTokenSource();
         _executor = new Executor(Guid.Empty, _processManager, 
_exceptionHandler);
         _isDisposed = 0;
@@ -91,71 +91,55 @@ public sealed class Reader<TMessage> : IReader<TMessage>
 
     private async Task Monitor()
     {
-        await _semaphoreSlim.WaitAsync().ConfigureAwait(false);
-        _numberOfPartitions = Convert.ToInt32(await 
_connectionPool.GetNumberOfPartitions(Topic, _cts.Token).ConfigureAwait(false));
-        _isPartitioned = _numberOfPartitions != 0;
-        _receiveTaskQueueForSubReaders = new 
Task<IMessage<TMessage>>[_numberOfPartitions];
-        var readerStateTasks = new Task<ReaderState>[_numberOfPartitions];
-        ReaderState[] subReaderStates;
+        await _semaphoreSlim.WaitAsync(_cts.Token).ConfigureAwait(false);
 
-        for (var i = 0; i < _receiveTaskQueueForSubReaders.Length; i++)
+        _numberOfPartitions = Convert.ToInt32(await 
_connectionPool.GetNumberOfPartitions(Topic, _cts.Token).ConfigureAwait(false));
+        _isPartitionedTopic = _numberOfPartitions != 0;
+        var numberOfSubReaders = _isPartitionedTopic ? _numberOfPartitions : 1;
+        _receiveTasks = new Task<IMessage<TMessage>>[numberOfSubReaders];
+        _subReaders = new SubReader<TMessage>[numberOfSubReaders];
+        var monitoringTasks = new Task<ReaderState>[numberOfSubReaders];
+        var states = new ReaderState[numberOfSubReaders];
+        _subReaderIndex = _isPartitionedTopic ? -1 : 0;
+
+        for (var i = 0; i < numberOfSubReaders; i++)
         {
-            _receiveTaskQueueForSubReaders[i] = 
_emptyTaskCompletionSource.Task;
+            _receiveTasks[i] = _emptyTaskCompletionSource.Task;
+            var topicName = _isPartitionedTopic ? GetPartitionedTopicName(i) : 
Topic;
+            _subReaders[i] = CreateSubReader(topicName);
+            monitoringTasks[i] = 
_subReaders[i].OnStateChangeFrom(ReaderState.Disconnected, _cts.Token).AsTask();
         }
 
-        if (_isPartitioned)
-        {
-            _subReaderIndex = -1;
-            _subReaders = new SubReader<TMessage>[_numberOfPartitions];
-            subReaderStates = new ReaderState[_numberOfPartitions];
-            for (var partition = 0; partition < _numberOfPartitions; 
partition++)
-            {
-                var partitionedTopicName = GetPartitionedTopicName(partition);
-                _subReaders[partition] = CreateSubReader(partitionedTopicName);
-                readerStateTasks[partition] = 
_subReaders[partition].OnStateChangeFrom(ReaderState.Disconnected, 
_cts.Token).AsTask();
-            }
-        }
-        else
-        {
-            _subReaderIndex = 0;
-            readerStateTasks = new Task<ReaderState>[1];
-            subReaderStates = new ReaderState[1];
-            _subReaders = new SubReader<TMessage>[1];
-            _subReaders[0] = CreateSubReader(Topic);
-            readerStateTasks[0] = 
_subReaders[0].OnStateChangeFrom(ReaderState.Disconnected, _cts.Token).AsTask();
-        }
         _allSubReadersAreReady = true;
         _semaphoreSlim.Release();
 
         while (true)
         {
-            await Task.WhenAny(readerStateTasks).ConfigureAwait(false);
+            await Task.WhenAny(monitoringTasks).ConfigureAwait(false);
 
-            for (var i = 0; i < readerStateTasks.Length; ++i)
+            for (var i = 0; i < numberOfSubReaders; ++i)
             {
-                var task = readerStateTasks[i];
+                var task = monitoringTasks[i];
                 if (!task.IsCompleted)
                     continue;
 
                 var state = task.Result;
-                subReaderStates[i] = state;
-
-                readerStateTasks[i] = _subReaders[i].OnStateChangeFrom(state, 
_cts.Token).AsTask();
+                states[i] = state;
+                monitoringTasks[i] = _subReaders[i].OnStateChangeFrom(state, 
_cts.Token).AsTask();
             }
 
-            if (subReaderStates.Any(x => x == ReaderState.Faulted))
+            if (!_isPartitionedTopic)
+                _state.SetState(states[0]);
+            else if (states.Any(x => x == ReaderState.Faulted))
                 _state.SetState(ReaderState.Faulted);
-            else if (subReaderStates.All(x => x == ReaderState.Connected))
+            else if (states.All(x => x == ReaderState.Connected))
                 _state.SetState(ReaderState.Connected);
-            else if (subReaderStates.All(x => x == ReaderState.Disconnected))
-                _state.SetState(ReaderState.Disconnected);
-            else if (subReaderStates.All(x => x == 
ReaderState.ReachedEndOfTopic))
+            else if (states.All(x => x == ReaderState.ReachedEndOfTopic))
                 _state.SetState(ReaderState.ReachedEndOfTopic);
-            else if (subReaderStates.Length > 1) //States for a partitioned 
topic
-            {
-                if (subReaderStates.Any(x => x == ReaderState.Connected) && 
subReaderStates.Any(x => x == ReaderState.Disconnected))
-                    _state.SetState(ReaderState.PartiallyConnected);
-            }
+            else if (states.All(x => x == ReaderState.Disconnected))
+                _state.SetState(ReaderState.Disconnected);
+            else
+                _state.SetState(ReaderState.PartiallyConnected);
         }
     }
 
@@ -176,7 +160,7 @@ public sealed class Reader<TMessage> : IReader<TMessage>
     {
         await Guard(cancellationToken).ConfigureAwait(false);
 
-        if (!_isPartitioned)
+        if (!_isPartitionedTopic)
             return await 
_subReaders[_subReaderIndex].GetLastMessageId(cancellationToken).ConfigureAwait(false);
         throw new NotSupportedException("GetLastMessageId can't be used on 
partitioned topics. Please use GetLastMessageIds");
     }
@@ -185,7 +169,7 @@ public sealed class Reader<TMessage> : IReader<TMessage>
     {
         await Guard(cancellationToken).ConfigureAwait(false);
 
-        if (!_isPartitioned)
+        if (!_isPartitionedTopic)
             return new[] { await 
_subReaders[_subReaderIndex].GetLastMessageId(cancellationToken).ConfigureAwait(false)
 };
 
         var getLastMessageIdsTasks = new 
List<Task<MessageId>>(_numberOfPartitions);
@@ -212,7 +196,7 @@ public sealed class Reader<TMessage> : IReader<TMessage>
     {
         await Guard(cancellationToken).ConfigureAwait(false);
 
-        if (!_isPartitioned)
+        if (!_isPartitionedTopic)
             return await 
_subReaders[_subReaderIndex].Receive(cancellationToken).ConfigureAwait(false);
 
         var iterations = 0;
@@ -223,24 +207,24 @@ public sealed class Reader<TMessage> : IReader<TMessage>
             if (_subReaderIndex == _subReaders.Length)
                 _subReaderIndex = 0;
 
-            var receiveTask = _receiveTaskQueueForSubReaders[_subReaderIndex];
+            var receiveTask = _receiveTasks[_subReaderIndex];
             if (receiveTask == _emptyTaskCompletionSource.Task)
             {
                 var receiveTaskValueTask = 
_subReaders[_subReaderIndex].Receive(cancellationToken);
                 if (receiveTaskValueTask.IsCompleted)
                     return receiveTaskValueTask.Result;
-                _receiveTaskQueueForSubReaders[_subReaderIndex] = 
receiveTaskValueTask.AsTask();
+                _receiveTasks[_subReaderIndex] = receiveTaskValueTask.AsTask();
             }
             else
             {
                 if (receiveTask.IsCompleted)
                 {
-                    _receiveTaskQueueForSubReaders[_subReaderIndex] = 
_emptyTaskCompletionSource.Task;
+                    _receiveTasks[_subReaderIndex] = 
_emptyTaskCompletionSource.Task;
                     return receiveTask.Result;
                 }
             }
             if (iterations == _subReaders.Length)
-                await 
Task.WhenAny(_receiveTaskQueueForSubReaders).ConfigureAwait(false);
+                await Task.WhenAny(_receiveTasks).ConfigureAwait(false);
         }
     }
 
@@ -248,7 +232,7 @@ public sealed class Reader<TMessage> : IReader<TMessage>
     {
         await Guard(cancellationToken).ConfigureAwait(false);
 
-        if (!_isPartitioned)
+        if (!_isPartitionedTopic)
         {
             await _subReaders[_subReaderIndex].Seek(messageId, 
cancellationToken).ConfigureAwait(false);
             return;
@@ -267,7 +251,7 @@ public sealed class Reader<TMessage> : IReader<TMessage>
     {
         await Guard(cancellationToken).ConfigureAwait(false);
 
-        if (!_isPartitioned)
+        if (!_isPartitionedTopic)
         {
             await _subReaders[_subReaderIndex].Seek(publishTime, 
cancellationToken).ConfigureAwait(false);
             return;

Reply via email to