This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new db32298 State management clean up
db32298 is described below
commit db32298a5598490a29ba5f4e68ef0ff6c0694f89
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Wed Aug 23 15:13:56 2023 +0200
State management clean up
---
src/DotPulsar/Internal/Consumer.cs | 135 +++++++++++++++++--------------------
src/DotPulsar/Internal/Reader.cs | 98 +++++++++++----------------
2 files changed, 101 insertions(+), 132 deletions(-)
diff --git a/src/DotPulsar/Internal/Consumer.cs
b/src/DotPulsar/Internal/Consumer.cs
index 71f6cee..3a7c405 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -38,9 +38,9 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
private SubConsumer<TMessage>[] _subConsumers;
private bool _allSubConsumersAreReady;
private int _isDisposed;
- private bool _isPartitioned;
+ private bool _isPartitionedTopic;
private int _numberOfPartitions;
- private Task<IMessage<TMessage>>[] _receiveTaskQueueForSubConsumers;
+ private Task<IMessage<TMessage>>[] _receiveTasks;
private int _subConsumerIndex;
private Exception? _faultException;
@@ -59,7 +59,7 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
ServiceUrl = serviceUrl;
SubscriptionName = consumerOptions.SubscriptionName;
Topic = consumerOptions.Topic;
- _receiveTaskQueueForSubConsumers =
Array.Empty<Task<IMessage<TMessage>>>();
+ _receiveTasks = Array.Empty<Task<IMessage<TMessage>>>();
_cts = new CancellationTokenSource();
_exceptionHandler = exceptionHandler;
_semaphoreSlim = new SemaphoreSlim(1);
@@ -68,7 +68,7 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
_consumerOptions = consumerOptions;
_connectionPool = connectionPool;
_exceptionHandler = exceptionHandler;
- _isPartitioned = false;
+ _isPartitionedTopic = false;
_allSubConsumersAreReady = false;
_isDisposed = 0;
_subConsumers = null!;
@@ -96,78 +96,59 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
private async Task Monitor()
{
- await _semaphoreSlim.WaitAsync().ConfigureAwait(false);
+ await _semaphoreSlim.WaitAsync(_cts.Token).ConfigureAwait(false);
_numberOfPartitions = Convert.ToInt32(await
_connectionPool.GetNumberOfPartitions(Topic, _cts.Token).ConfigureAwait(false));
- _isPartitioned = _numberOfPartitions != 0;
- _receiveTaskQueueForSubConsumers = new
Task<IMessage<TMessage>>[_numberOfPartitions];
- var consumerStateTasks = new Task<ConsumerState>[_numberOfPartitions];
- ConsumerState[] subConsumerStates;
-
- for (var i = 0; i < _receiveTaskQueueForSubConsumers.Length; i++)
+ _isPartitionedTopic = _numberOfPartitions != 0;
+ var numberOfSubConsumers = _isPartitionedTopic ? _numberOfPartitions :
1;
+ _receiveTasks = new Task<IMessage<TMessage>>[numberOfSubConsumers];
+ _subConsumers = new SubConsumer<TMessage>[numberOfSubConsumers];
+ var monitoringTasks = new Task<ConsumerState>[numberOfSubConsumers];
+ var states = new ConsumerState[numberOfSubConsumers];
+ _subConsumerIndex = _isPartitionedTopic ? -1 : 0;
+
+ for (var i = 0; i < numberOfSubConsumers; i++)
{
- _receiveTaskQueueForSubConsumers[i] =
_emptyTaskCompletionSource.Task;
+ _receiveTasks[i] = _emptyTaskCompletionSource.Task;
+ var topicName = _isPartitionedTopic ? GetPartitionedTopicName(i) :
Topic;
+ _subConsumers[i] = CreateSubConsumer(topicName);
+ monitoringTasks[i] =
_subConsumers[i].OnStateChangeFrom(ConsumerState.Disconnected,
_cts.Token).AsTask();
}
- if (_isPartitioned)
- {
- _subConsumerIndex = -1;
- _subConsumers = new SubConsumer<TMessage>[_numberOfPartitions];
- subConsumerStates = new ConsumerState[_numberOfPartitions];
- for (var partition = 0; partition < _numberOfPartitions;
partition++)
- {
- var partitionedTopicName = GetPartitionedTopicName(partition);
- _subConsumers[partition] =
CreateSubConsumer(partitionedTopicName);
- consumerStateTasks[partition] =
_subConsumers[partition].OnStateChangeFrom(ConsumerState.Disconnected,
_cts.Token).AsTask();
- }
- }
- else
- {
- _subConsumerIndex = 0;
- consumerStateTasks = new Task<ConsumerState>[1];
- subConsumerStates = new ConsumerState[1];
- _subConsumers = new SubConsumer<TMessage>[1];
- _subConsumers[0] = CreateSubConsumer(Topic);
- consumerStateTasks[0] =
_subConsumers[0].OnStateChangeFrom(ConsumerState.Disconnected,
_cts.Token).AsTask();
- }
_allSubConsumersAreReady = true;
_semaphoreSlim.Release();
while (true)
{
- await Task.WhenAny(consumerStateTasks).ConfigureAwait(false);
+ await Task.WhenAny(monitoringTasks).ConfigureAwait(false);
- for (var i = 0; i < consumerStateTasks.Length; ++i)
+ for (var i = 0; i < numberOfSubConsumers; ++i)
{
- var task = consumerStateTasks[i];
+ var task = monitoringTasks[i];
if (!task.IsCompleted)
continue;
var state = task.Result;
- subConsumerStates[i] = state;
-
- consumerStateTasks[i] =
_subConsumers[i].OnStateChangeFrom(state, _cts.Token).AsTask();
+ states[i] = state;
+ monitoringTasks[i] = _subConsumers[i].OnStateChangeFrom(state,
_cts.Token).AsTask();
}
- if (subConsumerStates.Any(x => x == ConsumerState.Faulted))
+ if (!_isPartitionedTopic)
+ _state.SetState(states[0]);
+ else if (states.Any(x => x == ConsumerState.Faulted))
_state.SetState(ConsumerState.Faulted);
- else if (subConsumerStates.All(x => x == ConsumerState.Active))
+ else if (states.All(x => x == ConsumerState.Active))
_state.SetState(ConsumerState.Active);
- else if (subConsumerStates.All(x => x ==
ConsumerState.Disconnected))
- _state.SetState(ConsumerState.Disconnected);
- else if (subConsumerStates.All(x => x == ConsumerState.Inactive))
+ else if (states.All(x => x == ConsumerState.Inactive))
_state.SetState(ConsumerState.Inactive);
- else if (subConsumerStates.All(x => x ==
ConsumerState.ReachedEndOfTopic))
+ else if (states.All(x => x == ConsumerState.ReachedEndOfTopic))
_state.SetState(ConsumerState.ReachedEndOfTopic);
- else if (subConsumerStates.Length > 1) //States for a partitioned
topic
- {
- if (subConsumerStates.Any(x => x == ConsumerState.Active) &&
subConsumerStates.Any(x => x == ConsumerState.Disconnected) ||
- subConsumerStates.Any(x => x ==
ConsumerState.Disconnected) && subConsumerStates.Any(x => x ==
ConsumerState.Inactive) ||
- subConsumerStates.Any(x => x == ConsumerState.Inactive) &&
subConsumerStates.Any(x => x == ConsumerState.Active) &&
subConsumerStates.Any(x => x == ConsumerState.Disconnected))
- _state.SetState(ConsumerState.PartiallyConnected);
- if (subConsumerStates.Any(x => x == ConsumerState.Active) &&
subConsumerStates.Any(x => x == ConsumerState.Inactive))
- _state.SetState(ConsumerState.Inactive);
- }
+ else if (states.All(x => x == ConsumerState.Disconnected))
+ _state.SetState(ConsumerState.Disconnected);
+ else if (states.Any(x => x == ConsumerState.Disconnected))
+ _state.SetState(ConsumerState.PartiallyConnected);
+ else
+ _state.SetState(ConsumerState.Inactive);
}
}
@@ -203,7 +184,7 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
{
await Guard(cancellationToken).ConfigureAwait(false);
- if (!_isPartitioned)
+ if (!_isPartitionedTopic)
return await
_subConsumers[_subConsumerIndex].Receive(cancellationToken).ConfigureAwait(false);
var iterations = 0;
@@ -214,24 +195,24 @@ public sealed class Consumer<TMessage> :
IConsumer<TMessage>
if (_subConsumerIndex == _subConsumers.Length)
_subConsumerIndex = 0;
- var receiveTask =
_receiveTaskQueueForSubConsumers[_subConsumerIndex];
+ var receiveTask = _receiveTasks[_subConsumerIndex];
if (receiveTask == _emptyTaskCompletionSource.Task)
{
var receiveTaskValueTask =
_subConsumers[_subConsumerIndex].Receive(cancellationToken);
if (receiveTaskValueTask.IsCompleted)
return receiveTaskValueTask.Result;
- _receiveTaskQueueForSubConsumers[_subConsumerIndex] =
receiveTaskValueTask.AsTask();
+ _receiveTasks[_subConsumerIndex] =
receiveTaskValueTask.AsTask();
}
else
{
if (receiveTask.IsCompleted)
{
- _receiveTaskQueueForSubConsumers[_subConsumerIndex] =
_emptyTaskCompletionSource.Task;
+ _receiveTasks[_subConsumerIndex] =
_emptyTaskCompletionSource.Task;
return receiveTask.Result;
}
}
if (iterations == _subConsumers.Length)
- await
Task.WhenAny(_receiveTaskQueueForSubConsumers).ConfigureAwait(false);
+ await Task.WhenAny(_receiveTasks).ConfigureAwait(false);
}
}
@@ -239,7 +220,7 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
{
await Guard(cancellationToken).ConfigureAwait(false);
- if (!_isPartitioned)
+ if (!_isPartitionedTopic)
await _subConsumers[_subConsumerIndex].Acknowledge(messageId,
cancellationToken).ConfigureAwait(false);
else
await _subConsumers[messageId.Partition].Acknowledge(messageId,
cancellationToken).ConfigureAwait(false);
@@ -249,7 +230,7 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
{
await Guard(cancellationToken).ConfigureAwait(false);
- if (!_isPartitioned)
+ if (!_isPartitionedTopic)
await
_subConsumers[_subConsumerIndex].AcknowledgeCumulative(messageId,
cancellationToken).ConfigureAwait(false);
else
await
_subConsumers[messageId.Partition].AcknowledgeCumulative(messageId,
cancellationToken).ConfigureAwait(false);
@@ -259,7 +240,7 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
{
await Guard(cancellationToken).ConfigureAwait(false);
- if (!_isPartitioned)
+ if (!_isPartitionedTopic)
{
await
_subConsumers[_subConsumerIndex].RedeliverUnacknowledgedMessages(messageIds,
cancellationToken).ConfigureAwait(false);
return;
@@ -297,7 +278,7 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
{
await Guard(cancellationToken).ConfigureAwait(false);
- if (!_isPartitioned)
+ if (!_isPartitionedTopic)
{
await
_subConsumers[_subConsumerIndex].RedeliverUnacknowledgedMessages(cancellationToken).ConfigureAwait(false);
return;
@@ -313,26 +294,30 @@ public sealed class Consumer<TMessage> :
IConsumer<TMessage>
{
await Guard(cancellationToken).ConfigureAwait(false);
- if (!_isPartitioned)
+ if (!_isPartitionedTopic)
{
await
_subConsumers[_subConsumerIndex].Unsubscribe(cancellationToken).ConfigureAwait(false);
- return;
}
-
- var unsubscribeTasks = new List<Task>(_numberOfPartitions);
- foreach (var subConsumer in _subConsumers)
+ else
{
- var getLastMessageIdTask =
subConsumer.Unsubscribe(cancellationToken);
- unsubscribeTasks.Add(getLastMessageIdTask.AsTask());
+ var unsubscribeTasks = new List<Task>(_numberOfPartitions);
+ foreach (var subConsumer in _subConsumers)
+ {
+ var getLastMessageIdTask =
subConsumer.Unsubscribe(cancellationToken);
+ unsubscribeTasks.Add(getLastMessageIdTask.AsTask());
+ }
+
+ await Task.WhenAll(unsubscribeTasks).ConfigureAwait(false);
}
- await Task.WhenAll(unsubscribeTasks).ConfigureAwait(false);
+
+ _state.SetState(ConsumerState.Unsubscribed);
}
public async ValueTask Seek(MessageId messageId, CancellationToken
cancellationToken)
{
await Guard(cancellationToken).ConfigureAwait(false);
- if (!_isPartitioned)
+ if (!_isPartitionedTopic)
{
await _subConsumers[_subConsumerIndex].Seek(messageId,
cancellationToken).ConfigureAwait(false);
return;
@@ -351,7 +336,7 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
{
await Guard(cancellationToken).ConfigureAwait(false);
- if (!_isPartitioned)
+ if (!_isPartitionedTopic)
{
await _subConsumers[_subConsumerIndex].Seek(publishTime,
cancellationToken).ConfigureAwait(false);
return;
@@ -371,7 +356,7 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
{
await Guard(cancellationToken).ConfigureAwait(false);
- if (!_isPartitioned)
+ if (!_isPartitionedTopic)
return await
_subConsumers[_subConsumerIndex].GetLastMessageId(cancellationToken).ConfigureAwait(false);
throw new NotSupportedException("GetLastMessageId can't be used on
partitioned topics. Please use GetLastMessageIds");
@@ -381,7 +366,7 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
{
await Guard(cancellationToken).ConfigureAwait(false);
- if (!_isPartitioned)
+ if (!_isPartitionedTopic)
return new[] { await
_subConsumers[_subConsumerIndex].GetLastMessageId(cancellationToken).ConfigureAwait(false)
};
var getLastMessageIdsTasks = new
List<Task<MessageId>>(_numberOfPartitions);
diff --git a/src/DotPulsar/Internal/Reader.cs b/src/DotPulsar/Internal/Reader.cs
index 8a30ee0..ca0aefc 100644
--- a/src/DotPulsar/Internal/Reader.cs
+++ b/src/DotPulsar/Internal/Reader.cs
@@ -37,9 +37,9 @@ public sealed class Reader<TMessage> : IReader<TMessage>
private readonly SemaphoreSlim _semaphoreSlim;
private SubReader<TMessage>[] _subReaders;
private bool _allSubReadersAreReady;
- private Task<IMessage<TMessage>>[] _receiveTaskQueueForSubReaders;
+ private Task<IMessage<TMessage>>[] _receiveTasks;
private int _subReaderIndex;
- private bool _isPartitioned;
+ private bool _isPartitionedTopic;
private int _numberOfPartitions;
private int _isDisposed;
private Exception? _faultException;
@@ -62,7 +62,7 @@ public sealed class Reader<TMessage> : IReader<TMessage>
_exceptionHandler = exceptionHandler;
_semaphoreSlim = new SemaphoreSlim(1);
_state = CreateStateManager();
- _receiveTaskQueueForSubReaders =
Array.Empty<Task<IMessage<TMessage>>>();
+ _receiveTasks = Array.Empty<Task<IMessage<TMessage>>>();
_cts = new CancellationTokenSource();
_executor = new Executor(Guid.Empty, _processManager,
_exceptionHandler);
_isDisposed = 0;
@@ -91,71 +91,55 @@ public sealed class Reader<TMessage> : IReader<TMessage>
private async Task Monitor()
{
- await _semaphoreSlim.WaitAsync().ConfigureAwait(false);
- _numberOfPartitions = Convert.ToInt32(await
_connectionPool.GetNumberOfPartitions(Topic, _cts.Token).ConfigureAwait(false));
- _isPartitioned = _numberOfPartitions != 0;
- _receiveTaskQueueForSubReaders = new
Task<IMessage<TMessage>>[_numberOfPartitions];
- var readerStateTasks = new Task<ReaderState>[_numberOfPartitions];
- ReaderState[] subReaderStates;
+ await _semaphoreSlim.WaitAsync(_cts.Token).ConfigureAwait(false);
- for (var i = 0; i < _receiveTaskQueueForSubReaders.Length; i++)
+ _numberOfPartitions = Convert.ToInt32(await
_connectionPool.GetNumberOfPartitions(Topic, _cts.Token).ConfigureAwait(false));
+ _isPartitionedTopic = _numberOfPartitions != 0;
+ var numberOfSubReaders = _isPartitionedTopic ? _numberOfPartitions : 1;
+ _receiveTasks = new Task<IMessage<TMessage>>[numberOfSubReaders];
+ _subReaders = new SubReader<TMessage>[numberOfSubReaders];
+ var monitoringTasks = new Task<ReaderState>[numberOfSubReaders];
+ var states = new ReaderState[numberOfSubReaders];
+ _subReaderIndex = _isPartitionedTopic ? -1 : 0;
+
+ for (var i = 0; i < numberOfSubReaders; i++)
{
- _receiveTaskQueueForSubReaders[i] =
_emptyTaskCompletionSource.Task;
+ _receiveTasks[i] = _emptyTaskCompletionSource.Task;
+ var topicName = _isPartitionedTopic ? GetPartitionedTopicName(i) :
Topic;
+ _subReaders[i] = CreateSubReader(topicName);
+ monitoringTasks[i] =
_subReaders[i].OnStateChangeFrom(ReaderState.Disconnected, _cts.Token).AsTask();
}
- if (_isPartitioned)
- {
- _subReaderIndex = -1;
- _subReaders = new SubReader<TMessage>[_numberOfPartitions];
- subReaderStates = new ReaderState[_numberOfPartitions];
- for (var partition = 0; partition < _numberOfPartitions;
partition++)
- {
- var partitionedTopicName = GetPartitionedTopicName(partition);
- _subReaders[partition] = CreateSubReader(partitionedTopicName);
- readerStateTasks[partition] =
_subReaders[partition].OnStateChangeFrom(ReaderState.Disconnected,
_cts.Token).AsTask();
- }
- }
- else
- {
- _subReaderIndex = 0;
- readerStateTasks = new Task<ReaderState>[1];
- subReaderStates = new ReaderState[1];
- _subReaders = new SubReader<TMessage>[1];
- _subReaders[0] = CreateSubReader(Topic);
- readerStateTasks[0] =
_subReaders[0].OnStateChangeFrom(ReaderState.Disconnected, _cts.Token).AsTask();
- }
_allSubReadersAreReady = true;
_semaphoreSlim.Release();
while (true)
{
- await Task.WhenAny(readerStateTasks).ConfigureAwait(false);
+ await Task.WhenAny(monitoringTasks).ConfigureAwait(false);
- for (var i = 0; i < readerStateTasks.Length; ++i)
+ for (var i = 0; i < numberOfSubReaders; ++i)
{
- var task = readerStateTasks[i];
+ var task = monitoringTasks[i];
if (!task.IsCompleted)
continue;
var state = task.Result;
- subReaderStates[i] = state;
-
- readerStateTasks[i] = _subReaders[i].OnStateChangeFrom(state,
_cts.Token).AsTask();
+ states[i] = state;
+ monitoringTasks[i] = _subReaders[i].OnStateChangeFrom(state,
_cts.Token).AsTask();
}
- if (subReaderStates.Any(x => x == ReaderState.Faulted))
+ if (!_isPartitionedTopic)
+ _state.SetState(states[0]);
+ else if (states.Any(x => x == ReaderState.Faulted))
_state.SetState(ReaderState.Faulted);
- else if (subReaderStates.All(x => x == ReaderState.Connected))
+ else if (states.All(x => x == ReaderState.Connected))
_state.SetState(ReaderState.Connected);
- else if (subReaderStates.All(x => x == ReaderState.Disconnected))
- _state.SetState(ReaderState.Disconnected);
- else if (subReaderStates.All(x => x ==
ReaderState.ReachedEndOfTopic))
+ else if (states.All(x => x == ReaderState.ReachedEndOfTopic))
_state.SetState(ReaderState.ReachedEndOfTopic);
- else if (subReaderStates.Length > 1) //States for a partitioned
topic
- {
- if (subReaderStates.Any(x => x == ReaderState.Connected) &&
subReaderStates.Any(x => x == ReaderState.Disconnected))
- _state.SetState(ReaderState.PartiallyConnected);
- }
+ else if (states.All(x => x == ReaderState.Disconnected))
+ _state.SetState(ReaderState.Disconnected);
+ else
+ _state.SetState(ReaderState.PartiallyConnected);
}
}
@@ -176,7 +160,7 @@ public sealed class Reader<TMessage> : IReader<TMessage>
{
await Guard(cancellationToken).ConfigureAwait(false);
- if (!_isPartitioned)
+ if (!_isPartitionedTopic)
return await
_subReaders[_subReaderIndex].GetLastMessageId(cancellationToken).ConfigureAwait(false);
throw new NotSupportedException("GetLastMessageId can't be used on
partitioned topics. Please use GetLastMessageIds");
}
@@ -185,7 +169,7 @@ public sealed class Reader<TMessage> : IReader<TMessage>
{
await Guard(cancellationToken).ConfigureAwait(false);
- if (!_isPartitioned)
+ if (!_isPartitionedTopic)
return new[] { await
_subReaders[_subReaderIndex].GetLastMessageId(cancellationToken).ConfigureAwait(false)
};
var getLastMessageIdsTasks = new
List<Task<MessageId>>(_numberOfPartitions);
@@ -212,7 +196,7 @@ public sealed class Reader<TMessage> : IReader<TMessage>
{
await Guard(cancellationToken).ConfigureAwait(false);
- if (!_isPartitioned)
+ if (!_isPartitionedTopic)
return await
_subReaders[_subReaderIndex].Receive(cancellationToken).ConfigureAwait(false);
var iterations = 0;
@@ -223,24 +207,24 @@ public sealed class Reader<TMessage> : IReader<TMessage>
if (_subReaderIndex == _subReaders.Length)
_subReaderIndex = 0;
- var receiveTask = _receiveTaskQueueForSubReaders[_subReaderIndex];
+ var receiveTask = _receiveTasks[_subReaderIndex];
if (receiveTask == _emptyTaskCompletionSource.Task)
{
var receiveTaskValueTask =
_subReaders[_subReaderIndex].Receive(cancellationToken);
if (receiveTaskValueTask.IsCompleted)
return receiveTaskValueTask.Result;
- _receiveTaskQueueForSubReaders[_subReaderIndex] =
receiveTaskValueTask.AsTask();
+ _receiveTasks[_subReaderIndex] = receiveTaskValueTask.AsTask();
}
else
{
if (receiveTask.IsCompleted)
{
- _receiveTaskQueueForSubReaders[_subReaderIndex] =
_emptyTaskCompletionSource.Task;
+ _receiveTasks[_subReaderIndex] =
_emptyTaskCompletionSource.Task;
return receiveTask.Result;
}
}
if (iterations == _subReaders.Length)
- await
Task.WhenAny(_receiveTaskQueueForSubReaders).ConfigureAwait(false);
+ await Task.WhenAny(_receiveTasks).ConfigureAwait(false);
}
}
@@ -248,7 +232,7 @@ public sealed class Reader<TMessage> : IReader<TMessage>
{
await Guard(cancellationToken).ConfigureAwait(false);
- if (!_isPartitioned)
+ if (!_isPartitionedTopic)
{
await _subReaders[_subReaderIndex].Seek(messageId,
cancellationToken).ConfigureAwait(false);
return;
@@ -267,7 +251,7 @@ public sealed class Reader<TMessage> : IReader<TMessage>
{
await Guard(cancellationToken).ConfigureAwait(false);
- if (!_isPartitioned)
+ if (!_isPartitionedTopic)
{
await _subReaders[_subReaderIndex].Seek(publishTime,
cancellationToken).ConfigureAwait(false);
return;