This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 53a3b61 Fix disposal of process not terminating (#160)
53a3b61 is described below
commit 53a3b61e02048f41757a7dad1c0407ee84fdf72e
Author: Kristian Andersen <[email protected]>
AuthorDate: Fri Jun 2 08:05:04 2023 +0200
Fix disposal of process not terminating (#160)
---
src/DotPulsar/Internal/Abstractions/Process.cs | 7 ++++---
src/DotPulsar/Internal/ConsumerProcess.cs | 1 -
src/DotPulsar/Internal/ProducerProcess.cs | 1 -
src/DotPulsar/Internal/ReaderProcess.cs | 1 -
4 files changed, 4 insertions(+), 6 deletions(-)
diff --git a/src/DotPulsar/Internal/Abstractions/Process.cs
b/src/DotPulsar/Internal/Abstractions/Process.cs
index 7ff5081..a2c5158 100644
--- a/src/DotPulsar/Internal/Abstractions/Process.cs
+++ b/src/DotPulsar/Internal/Abstractions/Process.cs
@@ -21,7 +21,7 @@ using System.Threading.Tasks;
public abstract class Process : IProcess
{
- protected readonly CancellationTokenSource CancellationTokenSource;
+ private readonly CancellationTokenSource _cancellationTokenSource;
protected readonly AsyncQueue<Func<CancellationToken, Task>> ActionQueue;
private Task? _actionProcessorTask;
protected ChannelState ChannelState;
@@ -30,7 +30,7 @@ public abstract class Process : IProcess
protected Process(Guid correlationId)
{
- CancellationTokenSource = new CancellationTokenSource();
+ _cancellationTokenSource = new CancellationTokenSource();
ActionQueue = new AsyncQueue<Func<CancellationToken, Task>>();
ChannelState = ChannelState.Disconnected;
ExecutorState = ExecutorState.Ok;
@@ -42,12 +42,13 @@ public abstract class Process : IProcess
public void Start()
{
- _actionProcessorTask = ProcessActions(CancellationTokenSource.Token);
+ _actionProcessorTask = ProcessActions(_cancellationTokenSource.Token);
CalculateState();
}
public virtual async ValueTask DisposeAsync()
{
+ _cancellationTokenSource.Cancel();
if (_actionProcessorTask != null)
{
await _actionProcessorTask.ConfigureAwait(false);
diff --git a/src/DotPulsar/Internal/ConsumerProcess.cs
b/src/DotPulsar/Internal/ConsumerProcess.cs
index d033f2d..aa0f9f6 100644
--- a/src/DotPulsar/Internal/ConsumerProcess.cs
+++ b/src/DotPulsar/Internal/ConsumerProcess.cs
@@ -39,7 +39,6 @@ public sealed class ConsumerProcess : Process
{
await base.DisposeAsync().ConfigureAwait(false);
_stateManager.SetState(ConsumerState.Closed);
- CancellationTokenSource.Cancel();
await _consumer.DisposeAsync().ConfigureAwait(false);
}
diff --git a/src/DotPulsar/Internal/ProducerProcess.cs
b/src/DotPulsar/Internal/ProducerProcess.cs
index 474749e..f2db79f 100644
--- a/src/DotPulsar/Internal/ProducerProcess.cs
+++ b/src/DotPulsar/Internal/ProducerProcess.cs
@@ -37,7 +37,6 @@ public sealed class ProducerProcess : Process
{
await base.DisposeAsync().ConfigureAwait(false);
_stateManager.SetState(ProducerState.Closed);
- CancellationTokenSource.Cancel();
await _producer.DisposeAsync().ConfigureAwait(false);
}
diff --git a/src/DotPulsar/Internal/ReaderProcess.cs
b/src/DotPulsar/Internal/ReaderProcess.cs
index 98b6407..d904710 100644
--- a/src/DotPulsar/Internal/ReaderProcess.cs
+++ b/src/DotPulsar/Internal/ReaderProcess.cs
@@ -36,7 +36,6 @@ public sealed class ReaderProcess : Process
{
await base.DisposeAsync().ConfigureAwait(false);
_stateManager.SetState(ReaderState.Closed);
- CancellationTokenSource.Cancel();
await _reader.DisposeAsync().ConfigureAwait(false);
}