This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ca9ba10  Async Process (#155)
ca9ba10 is described below

commit ca9ba10d7d42f5d18a5e203e9a216a9ab456031c
Author: Kristian Andersen <[email protected]>
AuthorDate: Fri May 26 12:47:15 2023 +0200

    Async Process (#155)
    
    * Convert reader and consumer to process actions sequentially
    
    Queue actions and process one by one
    
    * Refactor to reduce duplication
    
    Move process queue to Abscract Process class
---
 src/DotPulsar/Internal/Abstractions/Process.cs | 32 ++++++++++++++++++++++++--
 src/DotPulsar/Internal/ConsumerProcess.cs      | 17 ++++++--------
 src/DotPulsar/Internal/ProducerProcess.cs      | 29 ++++-------------------
 src/DotPulsar/Internal/ReaderProcess.cs        | 17 ++++++--------
 4 files changed, 48 insertions(+), 47 deletions(-)

diff --git a/src/DotPulsar/Internal/Abstractions/Process.cs 
b/src/DotPulsar/Internal/Abstractions/Process.cs
index d509bfd..7ff5081 100644
--- a/src/DotPulsar/Internal/Abstractions/Process.cs
+++ b/src/DotPulsar/Internal/Abstractions/Process.cs
@@ -22,6 +22,8 @@ using System.Threading.Tasks;
 public abstract class Process : IProcess
 {
     protected readonly CancellationTokenSource CancellationTokenSource;
+    protected readonly AsyncQueue<Func<CancellationToken, Task>> ActionQueue;
+    private Task? _actionProcessorTask;
     protected ChannelState ChannelState;
     protected ExecutorState ExecutorState;
     protected Exception? Exception;
@@ -29,6 +31,7 @@ public abstract class Process : IProcess
     protected Process(Guid correlationId)
     {
         CancellationTokenSource = new CancellationTokenSource();
+        ActionQueue = new AsyncQueue<Func<CancellationToken, Task>>();
         ChannelState = ChannelState.Disconnected;
         ExecutorState = ExecutorState.Ok;
         CorrelationId = correlationId;
@@ -38,9 +41,18 @@ public abstract class Process : IProcess
     protected ulong? TopicEpoch { get; private set; }
 
     public void Start()
-        => CalculateState();
+    {
+        _actionProcessorTask = ProcessActions(CancellationTokenSource.Token);
+        CalculateState();
+    }
 
-    public abstract ValueTask DisposeAsync();
+    public virtual async ValueTask DisposeAsync()
+    {
+        if (_actionProcessorTask != null)
+        {
+            await _actionProcessorTask.ConfigureAwait(false);
+        }
+    }
 
     public void Handle(IEvent e)
     {
@@ -85,4 +97,20 @@ public abstract class Process : IProcess
     }
 
     protected abstract void CalculateState();
+
+    private async Task ProcessActions(CancellationToken cancellationToken)
+    {
+        while (!cancellationToken.IsCancellationRequested)
+        {
+            try
+            {
+                var func = await 
ActionQueue.Dequeue(cancellationToken).ConfigureAwait(false);
+                await func.Invoke(cancellationToken).ConfigureAwait(false);
+            }
+            catch
+            {
+                // Ignore
+            }
+        }
+    }
 }
diff --git a/src/DotPulsar/Internal/ConsumerProcess.cs 
b/src/DotPulsar/Internal/ConsumerProcess.cs
index 1246a93..d033f2d 100644
--- a/src/DotPulsar/Internal/ConsumerProcess.cs
+++ b/src/DotPulsar/Internal/ConsumerProcess.cs
@@ -23,7 +23,6 @@ public sealed class ConsumerProcess : Process
     private readonly IStateManager<ConsumerState> _stateManager;
     private readonly IContainsChannel _consumer;
     private readonly bool _isFailoverSubscription;
-    private Task? _establishNewChannelTask;
 
     public ConsumerProcess(
         Guid correlationId,
@@ -38,6 +37,7 @@ public sealed class ConsumerProcess : Process
 
     public override async ValueTask DisposeAsync()
     {
+        await base.DisposeAsync().ConfigureAwait(false);
         _stateManager.SetState(ConsumerState.Closed);
         CancellationTokenSource.Cancel();
         await _consumer.DisposeAsync().ConfigureAwait(false);
@@ -52,7 +52,7 @@ public sealed class ConsumerProcess : Process
         {
             var formerState = _stateManager.SetState(ConsumerState.Faulted);
             if (formerState != ConsumerState.Faulted)
-                Task.Run(() => _consumer.ChannelFaulted(Exception!));
+                ActionQueue.Enqueue(async _ => await 
_consumer.ChannelFaulted(Exception!) );
             return;
         }
 
@@ -67,7 +67,11 @@ public sealed class ConsumerProcess : Process
             case ChannelState.ClosedByServer:
             case ChannelState.Disconnected:
                 _stateManager.SetState(ConsumerState.Disconnected);
-                EstablishNewChannel();
+                ActionQueue.Enqueue(async x =>
+                {
+                    await _consumer.CloseChannel(x).ConfigureAwait(false);
+                    await 
_consumer.EstablishNewChannel(x).ConfigureAwait(false);
+                });
                 return;
             case ChannelState.Connected:
                 if (!_isFailoverSubscription)
@@ -81,11 +85,4 @@ public sealed class ConsumerProcess : Process
                 return;
         }
     }
-
-    private void EstablishNewChannel()
-    {
-        var token = CancellationTokenSource.Token;
-        if (_establishNewChannelTask is null || 
_establishNewChannelTask.IsCompleted)
-            _establishNewChannelTask = Task.Run(() => 
_consumer.EstablishNewChannel(token).ConfigureAwait(false), token);
-    }
 }
diff --git a/src/DotPulsar/Internal/ProducerProcess.cs 
b/src/DotPulsar/Internal/ProducerProcess.cs
index d5f12d3..474749e 100644
--- a/src/DotPulsar/Internal/ProducerProcess.cs
+++ b/src/DotPulsar/Internal/ProducerProcess.cs
@@ -17,15 +17,12 @@ namespace DotPulsar.Internal;
 using DotPulsar.Exceptions;
 using DotPulsar.Internal.Abstractions;
 using System;
-using System.Threading;
 using System.Threading.Tasks;
 
 public sealed class ProducerProcess : Process
 {
     private readonly IStateManager<ProducerState> _stateManager;
     private readonly IContainsProducerChannel _producer;
-    private readonly AsyncQueue<Func<CancellationToken, Task>> _actionQueue;
-    private readonly Task _actionProcessorTask;
 
     public ProducerProcess(
         Guid correlationId,
@@ -34,13 +31,11 @@ public sealed class ProducerProcess : Process
     {
         _stateManager = stateManager;
         _producer = producer;
-        _actionQueue = new AsyncQueue<Func<CancellationToken, Task>>();
-        _actionProcessorTask = ProcessActions(CancellationTokenSource.Token);
     }
 
     public override async ValueTask DisposeAsync()
     {
-        await _actionProcessorTask.ConfigureAwait(false);
+        await base.DisposeAsync().ConfigureAwait(false);
         _stateManager.SetState(ProducerState.Closed);
         CancellationTokenSource.Cancel();
         await _producer.DisposeAsync().ConfigureAwait(false);
@@ -56,7 +51,7 @@ public sealed class ProducerProcess : Process
             ProducerState newState = Exception! is ProducerFencedException ? 
ProducerState.Fenced : ProducerState.Faulted;
             var formerState = _stateManager.SetState(newState);
             if (formerState != ProducerState.Faulted && formerState != 
ProducerState.Fenced)
-                _actionQueue.Enqueue(async _ => await 
_producer.ChannelFaulted(Exception!));
+                ActionQueue.Enqueue(async _ => await 
_producer.ChannelFaulted(Exception!));
             return;
         }
 
@@ -65,14 +60,14 @@ public sealed class ProducerProcess : Process
             case ChannelState.ClosedByServer:
             case ChannelState.Disconnected:
                 _stateManager.SetState(ProducerState.Disconnected);
-                _actionQueue.Enqueue(async x =>
+                ActionQueue.Enqueue(async x =>
                 {
                     await _producer.CloseChannel(x).ConfigureAwait(false);
                     await 
_producer.EstablishNewChannel(x).ConfigureAwait(false);
                 });
                 return;
             case ChannelState.Connected:
-                _actionQueue.Enqueue(async x =>
+                ActionQueue.Enqueue(async x =>
                 {
                     await _producer.ActivateChannel(TopicEpoch, 
x).ConfigureAwait(false);
                     _stateManager.SetState(ProducerState.Connected);
@@ -83,20 +78,4 @@ public sealed class ProducerProcess : Process
                 return;
         }
     }
-
-    private async Task ProcessActions(CancellationToken cancellationToken)
-    {
-        while (!cancellationToken.IsCancellationRequested)
-        {
-            try
-            {
-                var func = await 
_actionQueue.Dequeue(cancellationToken).ConfigureAwait(false);
-                await func.Invoke(cancellationToken).ConfigureAwait(false);
-            }
-            catch
-            {
-                // Ignore
-            }
-        }
-    }
 }
diff --git a/src/DotPulsar/Internal/ReaderProcess.cs 
b/src/DotPulsar/Internal/ReaderProcess.cs
index 1076d84..e421c5f 100644
--- a/src/DotPulsar/Internal/ReaderProcess.cs
+++ b/src/DotPulsar/Internal/ReaderProcess.cs
@@ -22,7 +22,6 @@ public sealed class ReaderProcess : Process
 {
     private readonly IStateManager<ReaderState> _stateManager;
     private readonly IContainsChannel _reader;
-    private Task? _establishNewChannelTask;
 
     public ReaderProcess(
         Guid correlationId,
@@ -35,6 +34,7 @@ public sealed class ReaderProcess : Process
 
     public override async ValueTask DisposeAsync()
     {
+        await base.DisposeAsync().ConfigureAwait(false);
         _stateManager.SetState(ReaderState.Closed);
         CancellationTokenSource.Cancel();
         await _reader.DisposeAsync().ConfigureAwait(false);
@@ -50,7 +50,7 @@ public sealed class ReaderProcess : Process
             _stateManager.SetState(ReaderState.Faulted);
             var formerState = _stateManager.SetState(ReaderState.Faulted);
             if (formerState != ReaderState.Faulted)
-                Task.Run(() => _reader.ChannelFaulted(Exception!));
+                ActionQueue.Enqueue(async _ => await 
_reader.ChannelFaulted(Exception!) );
             return;
         }
 
@@ -59,7 +59,11 @@ public sealed class ReaderProcess : Process
             case ChannelState.ClosedByServer:
             case ChannelState.Disconnected:
                 _stateManager.SetState(ReaderState.Disconnected);
-                EstablishNewChannel();
+                ActionQueue.Enqueue(async x =>
+                {
+                    await _reader.CloseChannel(x).ConfigureAwait(false);
+                    await _reader.EstablishNewChannel(x).ConfigureAwait(false);
+                });
                 return;
             case ChannelState.Connected:
                 _stateManager.SetState(ReaderState.Connected);
@@ -69,11 +73,4 @@ public sealed class ReaderProcess : Process
                 return;
         }
     }
-
-    private void EstablishNewChannel()
-    {
-        var token = CancellationTokenSource.Token;
-        if (_establishNewChannelTask is null || 
_establishNewChannelTask.IsCompleted)
-            _establishNewChannelTask = Task.Run(() => 
_reader.EstablishNewChannel(token).ConfigureAwait(false), token);
-    }
 }

Reply via email to