This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 53a3b61  Fix disposal of process not terminating (#160)
53a3b61 is described below

commit 53a3b61e02048f41757a7dad1c0407ee84fdf72e
Author: Kristian Andersen <[email protected]>
AuthorDate: Fri Jun 2 08:05:04 2023 +0200

    Fix disposal of process not terminating (#160)
---
 src/DotPulsar/Internal/Abstractions/Process.cs | 7 ++++---
 src/DotPulsar/Internal/ConsumerProcess.cs      | 1 -
 src/DotPulsar/Internal/ProducerProcess.cs      | 1 -
 src/DotPulsar/Internal/ReaderProcess.cs        | 1 -
 4 files changed, 4 insertions(+), 6 deletions(-)

diff --git a/src/DotPulsar/Internal/Abstractions/Process.cs 
b/src/DotPulsar/Internal/Abstractions/Process.cs
index 7ff5081..a2c5158 100644
--- a/src/DotPulsar/Internal/Abstractions/Process.cs
+++ b/src/DotPulsar/Internal/Abstractions/Process.cs
@@ -21,7 +21,7 @@ using System.Threading.Tasks;
 
 public abstract class Process : IProcess
 {
-    protected readonly CancellationTokenSource CancellationTokenSource;
+    private readonly CancellationTokenSource _cancellationTokenSource;
     protected readonly AsyncQueue<Func<CancellationToken, Task>> ActionQueue;
     private Task? _actionProcessorTask;
     protected ChannelState ChannelState;
@@ -30,7 +30,7 @@ public abstract class Process : IProcess
 
     protected Process(Guid correlationId)
     {
-        CancellationTokenSource = new CancellationTokenSource();
+        _cancellationTokenSource = new CancellationTokenSource();
         ActionQueue = new AsyncQueue<Func<CancellationToken, Task>>();
         ChannelState = ChannelState.Disconnected;
         ExecutorState = ExecutorState.Ok;
@@ -42,12 +42,13 @@ public abstract class Process : IProcess
 
     public void Start()
     {
-        _actionProcessorTask = ProcessActions(CancellationTokenSource.Token);
+        _actionProcessorTask = ProcessActions(_cancellationTokenSource.Token);
         CalculateState();
     }
 
     public virtual async ValueTask DisposeAsync()
     {
+        _cancellationTokenSource.Cancel();
         if (_actionProcessorTask != null)
         {
             await _actionProcessorTask.ConfigureAwait(false);
diff --git a/src/DotPulsar/Internal/ConsumerProcess.cs 
b/src/DotPulsar/Internal/ConsumerProcess.cs
index d033f2d..aa0f9f6 100644
--- a/src/DotPulsar/Internal/ConsumerProcess.cs
+++ b/src/DotPulsar/Internal/ConsumerProcess.cs
@@ -39,7 +39,6 @@ public sealed class ConsumerProcess : Process
     {
         await base.DisposeAsync().ConfigureAwait(false);
         _stateManager.SetState(ConsumerState.Closed);
-        CancellationTokenSource.Cancel();
         await _consumer.DisposeAsync().ConfigureAwait(false);
     }
 
diff --git a/src/DotPulsar/Internal/ProducerProcess.cs 
b/src/DotPulsar/Internal/ProducerProcess.cs
index 474749e..f2db79f 100644
--- a/src/DotPulsar/Internal/ProducerProcess.cs
+++ b/src/DotPulsar/Internal/ProducerProcess.cs
@@ -37,7 +37,6 @@ public sealed class ProducerProcess : Process
     {
         await base.DisposeAsync().ConfigureAwait(false);
         _stateManager.SetState(ProducerState.Closed);
-        CancellationTokenSource.Cancel();
         await _producer.DisposeAsync().ConfigureAwait(false);
     }
 
diff --git a/src/DotPulsar/Internal/ReaderProcess.cs 
b/src/DotPulsar/Internal/ReaderProcess.cs
index 98b6407..d904710 100644
--- a/src/DotPulsar/Internal/ReaderProcess.cs
+++ b/src/DotPulsar/Internal/ReaderProcess.cs
@@ -36,7 +36,6 @@ public sealed class ReaderProcess : Process
     {
         await base.DisposeAsync().ConfigureAwait(false);
         _stateManager.SetState(ReaderState.Closed);
-        CancellationTokenSource.Cancel();
         await _reader.DisposeAsync().ConfigureAwait(false);
     }
 

Reply via email to