This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ca9ba10 Async Process (#155)
ca9ba10 is described below
commit ca9ba10d7d42f5d18a5e203e9a216a9ab456031c
Author: Kristian Andersen <[email protected]>
AuthorDate: Fri May 26 12:47:15 2023 +0200
Async Process (#155)
* Convert reader and consumer to process actions sequentially
Queue actions and process one by one
* Refactor to reduce duplication
Move process queue to Abscract Process class
---
src/DotPulsar/Internal/Abstractions/Process.cs | 32 ++++++++++++++++++++++++--
src/DotPulsar/Internal/ConsumerProcess.cs | 17 ++++++--------
src/DotPulsar/Internal/ProducerProcess.cs | 29 ++++-------------------
src/DotPulsar/Internal/ReaderProcess.cs | 17 ++++++--------
4 files changed, 48 insertions(+), 47 deletions(-)
diff --git a/src/DotPulsar/Internal/Abstractions/Process.cs
b/src/DotPulsar/Internal/Abstractions/Process.cs
index d509bfd..7ff5081 100644
--- a/src/DotPulsar/Internal/Abstractions/Process.cs
+++ b/src/DotPulsar/Internal/Abstractions/Process.cs
@@ -22,6 +22,8 @@ using System.Threading.Tasks;
public abstract class Process : IProcess
{
protected readonly CancellationTokenSource CancellationTokenSource;
+ protected readonly AsyncQueue<Func<CancellationToken, Task>> ActionQueue;
+ private Task? _actionProcessorTask;
protected ChannelState ChannelState;
protected ExecutorState ExecutorState;
protected Exception? Exception;
@@ -29,6 +31,7 @@ public abstract class Process : IProcess
protected Process(Guid correlationId)
{
CancellationTokenSource = new CancellationTokenSource();
+ ActionQueue = new AsyncQueue<Func<CancellationToken, Task>>();
ChannelState = ChannelState.Disconnected;
ExecutorState = ExecutorState.Ok;
CorrelationId = correlationId;
@@ -38,9 +41,18 @@ public abstract class Process : IProcess
protected ulong? TopicEpoch { get; private set; }
public void Start()
- => CalculateState();
+ {
+ _actionProcessorTask = ProcessActions(CancellationTokenSource.Token);
+ CalculateState();
+ }
- public abstract ValueTask DisposeAsync();
+ public virtual async ValueTask DisposeAsync()
+ {
+ if (_actionProcessorTask != null)
+ {
+ await _actionProcessorTask.ConfigureAwait(false);
+ }
+ }
public void Handle(IEvent e)
{
@@ -85,4 +97,20 @@ public abstract class Process : IProcess
}
protected abstract void CalculateState();
+
+ private async Task ProcessActions(CancellationToken cancellationToken)
+ {
+ while (!cancellationToken.IsCancellationRequested)
+ {
+ try
+ {
+ var func = await
ActionQueue.Dequeue(cancellationToken).ConfigureAwait(false);
+ await func.Invoke(cancellationToken).ConfigureAwait(false);
+ }
+ catch
+ {
+ // Ignore
+ }
+ }
+ }
}
diff --git a/src/DotPulsar/Internal/ConsumerProcess.cs
b/src/DotPulsar/Internal/ConsumerProcess.cs
index 1246a93..d033f2d 100644
--- a/src/DotPulsar/Internal/ConsumerProcess.cs
+++ b/src/DotPulsar/Internal/ConsumerProcess.cs
@@ -23,7 +23,6 @@ public sealed class ConsumerProcess : Process
private readonly IStateManager<ConsumerState> _stateManager;
private readonly IContainsChannel _consumer;
private readonly bool _isFailoverSubscription;
- private Task? _establishNewChannelTask;
public ConsumerProcess(
Guid correlationId,
@@ -38,6 +37,7 @@ public sealed class ConsumerProcess : Process
public override async ValueTask DisposeAsync()
{
+ await base.DisposeAsync().ConfigureAwait(false);
_stateManager.SetState(ConsumerState.Closed);
CancellationTokenSource.Cancel();
await _consumer.DisposeAsync().ConfigureAwait(false);
@@ -52,7 +52,7 @@ public sealed class ConsumerProcess : Process
{
var formerState = _stateManager.SetState(ConsumerState.Faulted);
if (formerState != ConsumerState.Faulted)
- Task.Run(() => _consumer.ChannelFaulted(Exception!));
+ ActionQueue.Enqueue(async _ => await
_consumer.ChannelFaulted(Exception!) );
return;
}
@@ -67,7 +67,11 @@ public sealed class ConsumerProcess : Process
case ChannelState.ClosedByServer:
case ChannelState.Disconnected:
_stateManager.SetState(ConsumerState.Disconnected);
- EstablishNewChannel();
+ ActionQueue.Enqueue(async x =>
+ {
+ await _consumer.CloseChannel(x).ConfigureAwait(false);
+ await
_consumer.EstablishNewChannel(x).ConfigureAwait(false);
+ });
return;
case ChannelState.Connected:
if (!_isFailoverSubscription)
@@ -81,11 +85,4 @@ public sealed class ConsumerProcess : Process
return;
}
}
-
- private void EstablishNewChannel()
- {
- var token = CancellationTokenSource.Token;
- if (_establishNewChannelTask is null ||
_establishNewChannelTask.IsCompleted)
- _establishNewChannelTask = Task.Run(() =>
_consumer.EstablishNewChannel(token).ConfigureAwait(false), token);
- }
}
diff --git a/src/DotPulsar/Internal/ProducerProcess.cs
b/src/DotPulsar/Internal/ProducerProcess.cs
index d5f12d3..474749e 100644
--- a/src/DotPulsar/Internal/ProducerProcess.cs
+++ b/src/DotPulsar/Internal/ProducerProcess.cs
@@ -17,15 +17,12 @@ namespace DotPulsar.Internal;
using DotPulsar.Exceptions;
using DotPulsar.Internal.Abstractions;
using System;
-using System.Threading;
using System.Threading.Tasks;
public sealed class ProducerProcess : Process
{
private readonly IStateManager<ProducerState> _stateManager;
private readonly IContainsProducerChannel _producer;
- private readonly AsyncQueue<Func<CancellationToken, Task>> _actionQueue;
- private readonly Task _actionProcessorTask;
public ProducerProcess(
Guid correlationId,
@@ -34,13 +31,11 @@ public sealed class ProducerProcess : Process
{
_stateManager = stateManager;
_producer = producer;
- _actionQueue = new AsyncQueue<Func<CancellationToken, Task>>();
- _actionProcessorTask = ProcessActions(CancellationTokenSource.Token);
}
public override async ValueTask DisposeAsync()
{
- await _actionProcessorTask.ConfigureAwait(false);
+ await base.DisposeAsync().ConfigureAwait(false);
_stateManager.SetState(ProducerState.Closed);
CancellationTokenSource.Cancel();
await _producer.DisposeAsync().ConfigureAwait(false);
@@ -56,7 +51,7 @@ public sealed class ProducerProcess : Process
ProducerState newState = Exception! is ProducerFencedException ?
ProducerState.Fenced : ProducerState.Faulted;
var formerState = _stateManager.SetState(newState);
if (formerState != ProducerState.Faulted && formerState !=
ProducerState.Fenced)
- _actionQueue.Enqueue(async _ => await
_producer.ChannelFaulted(Exception!));
+ ActionQueue.Enqueue(async _ => await
_producer.ChannelFaulted(Exception!));
return;
}
@@ -65,14 +60,14 @@ public sealed class ProducerProcess : Process
case ChannelState.ClosedByServer:
case ChannelState.Disconnected:
_stateManager.SetState(ProducerState.Disconnected);
- _actionQueue.Enqueue(async x =>
+ ActionQueue.Enqueue(async x =>
{
await _producer.CloseChannel(x).ConfigureAwait(false);
await
_producer.EstablishNewChannel(x).ConfigureAwait(false);
});
return;
case ChannelState.Connected:
- _actionQueue.Enqueue(async x =>
+ ActionQueue.Enqueue(async x =>
{
await _producer.ActivateChannel(TopicEpoch,
x).ConfigureAwait(false);
_stateManager.SetState(ProducerState.Connected);
@@ -83,20 +78,4 @@ public sealed class ProducerProcess : Process
return;
}
}
-
- private async Task ProcessActions(CancellationToken cancellationToken)
- {
- while (!cancellationToken.IsCancellationRequested)
- {
- try
- {
- var func = await
_actionQueue.Dequeue(cancellationToken).ConfigureAwait(false);
- await func.Invoke(cancellationToken).ConfigureAwait(false);
- }
- catch
- {
- // Ignore
- }
- }
- }
}
diff --git a/src/DotPulsar/Internal/ReaderProcess.cs
b/src/DotPulsar/Internal/ReaderProcess.cs
index 1076d84..e421c5f 100644
--- a/src/DotPulsar/Internal/ReaderProcess.cs
+++ b/src/DotPulsar/Internal/ReaderProcess.cs
@@ -22,7 +22,6 @@ public sealed class ReaderProcess : Process
{
private readonly IStateManager<ReaderState> _stateManager;
private readonly IContainsChannel _reader;
- private Task? _establishNewChannelTask;
public ReaderProcess(
Guid correlationId,
@@ -35,6 +34,7 @@ public sealed class ReaderProcess : Process
public override async ValueTask DisposeAsync()
{
+ await base.DisposeAsync().ConfigureAwait(false);
_stateManager.SetState(ReaderState.Closed);
CancellationTokenSource.Cancel();
await _reader.DisposeAsync().ConfigureAwait(false);
@@ -50,7 +50,7 @@ public sealed class ReaderProcess : Process
_stateManager.SetState(ReaderState.Faulted);
var formerState = _stateManager.SetState(ReaderState.Faulted);
if (formerState != ReaderState.Faulted)
- Task.Run(() => _reader.ChannelFaulted(Exception!));
+ ActionQueue.Enqueue(async _ => await
_reader.ChannelFaulted(Exception!) );
return;
}
@@ -59,7 +59,11 @@ public sealed class ReaderProcess : Process
case ChannelState.ClosedByServer:
case ChannelState.Disconnected:
_stateManager.SetState(ReaderState.Disconnected);
- EstablishNewChannel();
+ ActionQueue.Enqueue(async x =>
+ {
+ await _reader.CloseChannel(x).ConfigureAwait(false);
+ await _reader.EstablishNewChannel(x).ConfigureAwait(false);
+ });
return;
case ChannelState.Connected:
_stateManager.SetState(ReaderState.Connected);
@@ -69,11 +73,4 @@ public sealed class ReaderProcess : Process
return;
}
}
-
- private void EstablishNewChannel()
- {
- var token = CancellationTokenSource.Token;
- if (_establishNewChannelTask is null ||
_establishNewChannelTask.IsCompleted)
- _establishNewChannelTask = Task.Run(() =>
_reader.EstablishNewChannel(token).ConfigureAwait(false), token);
- }
}