This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 656a934  Added support for grafeful shutdown while processing messages
656a934 is described below

commit 656a9348a2c12f4649a33271422a4af99dca748b
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Fri Sep 20 16:22:58 2024 +0200

    Added support for grafeful shutdown while processing messages
---
 CHANGELOG.md                                       |  1 +
 src/DotPulsar/Internal/MessageProcessor.cs         | 22 +++++-
 src/DotPulsar/ProcessingOptions.cs                 | 16 +++++
 .../Internal/MessageProcessorTests.cs              | 81 +++++++++++++++++++++-
 4 files changed, 115 insertions(+), 5 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 6aa11c4..a98ab70 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -9,6 +9,7 @@ The format is based on [Keep a 
Changelog](https://keepachangelog.com/en/1.1.0/)
 ### Added
 
 - Multiple messages can now be acknowledged with 
Acknowledge(IEnumerable\<MessageId> messageIds, CancellationToken 
cancellationToken)
+- ProcessingOptions has a new ShutdownGracePeriod property for doing a 
graceful shutdown by allowing active tasks to finish 
 
 ### Changed
 
diff --git a/src/DotPulsar/Internal/MessageProcessor.cs 
b/src/DotPulsar/Internal/MessageProcessor.cs
index caf7020..3ce7492 100644
--- a/src/DotPulsar/Internal/MessageProcessor.cs
+++ b/src/DotPulsar/Internal/MessageProcessor.cs
@@ -38,6 +38,7 @@ public sealed class MessageProcessor<TMessage> : IDisposable
     private readonly bool _ensureOrderedAcknowledgment;
     private readonly int _maxDegreeOfParallelism;
     private readonly int _maxMessagesPerTask;
+    private readonly TimeSpan _shutdownGracePeriod;
     private readonly TaskScheduler _taskScheduler;
 
     public MessageProcessor(
@@ -81,6 +82,7 @@ public sealed class MessageProcessor<TMessage> : IDisposable
         _ensureOrderedAcknowledgment = options.EnsureOrderedAcknowledgment;
         _maxDegreeOfParallelism = options.MaxDegreeOfParallelism;
         _maxMessagesPerTask = options.MaxMessagesPerTask;
+        _shutdownGracePeriod = options.ShutdownGracePeriod;
         _taskScheduler = options.TaskScheduler;
     }
 
@@ -92,22 +94,36 @@ public sealed class MessageProcessor<TMessage> : IDisposable
 
     public async ValueTask Process(CancellationToken cancellationToken)
     {
+        using var cts = new CancellationTokenSource();
+        using var registration = Link(cts, cancellationToken);
+
         for (var i = 1; i < _maxDegreeOfParallelism; ++i)
         {
-            StartNewProcessorTask(cancellationToken);
+            StartNewProcessorTask(cts.Token);
         }
 
         while (true)
         {
-            StartNewProcessorTask(cancellationToken);
+            if (!cancellationToken.IsCancellationRequested)
+                StartNewProcessorTask(cts.Token);
+
+            if (_processorTasks.Count == 0)
+                return;
+
             var completedTask = await 
Task.WhenAny(_processorTasks).ConfigureAwait(false);
             if (completedTask.IsFaulted)
                 
ExceptionDispatchInfo.Capture(completedTask.Exception!.InnerException!).Throw();
             _processorTasks.Remove(completedTask);
-            cancellationToken.ThrowIfCancellationRequested();
         }
     }
 
+    private CancellationTokenRegistration Link(CancellationTokenSource cts, 
CancellationToken cancellationToken)
+    {
+        return _shutdownGracePeriod != TimeSpan.Zero
+            ? cancellationToken.Register(() => 
cts.CancelAfter(_shutdownGracePeriod))
+            : cancellationToken.Register(cts.Cancel);
+    }
+
     private async ValueTask Processor(CancellationToken cancellationToken)
     {
         var messagesProcessed = 0;
diff --git a/src/DotPulsar/ProcessingOptions.cs 
b/src/DotPulsar/ProcessingOptions.cs
index 166c4c7..b1a06cc 100644
--- a/src/DotPulsar/ProcessingOptions.cs
+++ b/src/DotPulsar/ProcessingOptions.cs
@@ -28,6 +28,7 @@ public sealed class ProcessingOptions
     private bool _linkTraces;
     private int _maxDegreeOfParallelism;
     private int _maxMessagesPerTask;
+    private TimeSpan _shutdownGracePeriod;
     private TaskScheduler _taskScheduler;
 
     /// <summary>
@@ -39,6 +40,7 @@ public sealed class ProcessingOptions
         _linkTraces = false;
         _maxDegreeOfParallelism = 1;
         _maxMessagesPerTask = Unbounded;
+        _shutdownGracePeriod = TimeSpan.Zero;
         _taskScheduler = TaskScheduler.Default;
     }
 
@@ -90,6 +92,20 @@ public sealed class ProcessingOptions
         }
     }
 
+    /// <summary>
+    /// The amount of time we give the active tasks to finish. The Default is 
TimeSpan.Zero, meaning no graceful shutdown.
+    /// </summary>
+    public TimeSpan ShutdownGracePeriod
+    {
+        get => _shutdownGracePeriod;
+        set
+        {
+            if (value < TimeSpan.Zero)
+                throw new ArgumentOutOfRangeException(nameof(value), value, 
"ShutdownGracePeriod must be zero or above");
+            _shutdownGracePeriod = value;
+        }
+    }
+
     /// <summary>
     /// The TaskScheduler to use for scheduling tasks. The default is 
TaskScheduler.Default.
     /// </summary>
diff --git a/tests/DotPulsar.Tests/Internal/MessageProcessorTests.cs 
b/tests/DotPulsar.Tests/Internal/MessageProcessorTests.cs
index 9a6955d..69a7ba2 100644
--- a/tests/DotPulsar.Tests/Internal/MessageProcessorTests.cs
+++ b/tests/DotPulsar.Tests/Internal/MessageProcessorTests.cs
@@ -17,10 +17,24 @@ namespace DotPulsar.Tests.Internal;
 using DotPulsar.Abstractions;
 using DotPulsar.Exceptions;
 using DotPulsar.Internal;
+using System.Threading;
 
 [Trait("Category", "Unit")]
-public sealed class MessageProcessorTests
+public sealed class MessageProcessorTests : IDisposable
 {
+    private bool _taskHasStarted;
+    private bool _taskHasCompleted;
+    private readonly SemaphoreSlim _semaphore;
+    private readonly CancellationTokenSource _cts;
+
+    public MessageProcessorTests()
+    {
+        _taskHasStarted = false;
+        _taskHasCompleted = false;
+        _semaphore = new SemaphoreSlim(1);
+        _cts = new CancellationTokenSource();
+    }
+
     [Theory]
     [InlineAutoData(SubscriptionType.Shared)]
     [InlineAutoData(SubscriptionType.KeyShared)]
@@ -31,6 +45,7 @@ public sealed class MessageProcessorTests
     {
         //Arrange
         consumer.SubscriptionType.Returns(subscriptionType);
+        ValueTask ProcessMessage(IMessage<byte[]> _1, CancellationToken _2) => 
ValueTask.CompletedTask;
 
         //Act
         var exception = Record.Exception(() => new 
MessageProcessor<byte[]>(consumer, ProcessMessage, options));
@@ -39,5 +54,67 @@ public sealed class MessageProcessorTests
         exception.Should().BeOfType<ProcessingException>();
     }
 
-    private static ValueTask ProcessMessage(IMessage<byte[]> _1, 
CancellationToken _2) => ValueTask.CompletedTask;
+    [Theory, AutoData]
+    public async Task 
Process_GivenNoShutdownGracePeriod_ShouldNotLetTaskComplete(
+        [AutoFixture.Xunit2.Frozen] IConsumer<byte[]> consumer,
+        ProcessingOptions options)
+    {
+        //Arrange
+        consumer.Receive(Arg.Any<CancellationToken>()).Returns(_ => 
NewMessage());
+        var uut = new MessageProcessor<byte[]>(consumer, ProcessMessage, 
options);
+
+        //Act
+        await _semaphore.WaitAsync();
+        var processTask = uut.Process(_cts.Token).AsTask();
+        while (!_taskHasStarted) { }
+        _cts.Cancel();
+        await processTask;
+        _semaphore.Release();
+
+        //Assert
+        _taskHasCompleted.Should().BeFalse();
+    }
+
+    [Theory, AutoData]
+    public async Task Process_GivenShutdownGracePeriod_ShouldLetTaskComplete(
+        [AutoFixture.Xunit2.Frozen] IConsumer<byte[]> consumer,
+        ProcessingOptions options)
+    {
+        //Arrange
+        options.ShutdownGracePeriod = TimeSpan.FromHours(1);
+        consumer.Receive(Arg.Any<CancellationToken>()).Returns(_ => 
NewMessage());
+        var uut = new MessageProcessor<byte[]>(consumer, ProcessMessage, 
options);
+
+        //Act
+        await _semaphore.WaitAsync();
+        var processTask = uut.Process(_cts.Token).AsTask();
+        while (!_taskHasStarted) { }
+        _cts.Cancel();
+        _semaphore.Release();
+        await processTask;
+
+        //Assert
+        _taskHasCompleted.Should().BeTrue();
+    }
+
+    private async ValueTask ProcessMessage(IMessage<byte[]> _, 
CancellationToken token)
+    {
+        _taskHasStarted = true;
+        await _semaphore.WaitAsync(token);
+        _semaphore.Release();
+        _taskHasCompleted = true;
+    }
+
+    private static IMessage<byte[]> NewMessage()
+    {
+        var message = Substitute.For<IMessage<byte[]>>();
+        message.MessageId.Returns(new MessageId(0, 0, 0, 0));
+        return message;
+    }
+
+    public void Dispose()
+    {
+        _semaphore.Dispose();
+        _cts.Dispose();
+    }
 }

Reply via email to