This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 656a934 Added support for grafeful shutdown while processing messages
656a934 is described below
commit 656a9348a2c12f4649a33271422a4af99dca748b
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Fri Sep 20 16:22:58 2024 +0200
Added support for grafeful shutdown while processing messages
---
CHANGELOG.md | 1 +
src/DotPulsar/Internal/MessageProcessor.cs | 22 +++++-
src/DotPulsar/ProcessingOptions.cs | 16 +++++
.../Internal/MessageProcessorTests.cs | 81 +++++++++++++++++++++-
4 files changed, 115 insertions(+), 5 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 6aa11c4..a98ab70 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -9,6 +9,7 @@ The format is based on [Keep a
Changelog](https://keepachangelog.com/en/1.1.0/)
### Added
- Multiple messages can now be acknowledged with
Acknowledge(IEnumerable\<MessageId> messageIds, CancellationToken
cancellationToken)
+- ProcessingOptions has a new ShutdownGracePeriod property for doing a
graceful shutdown by allowing active tasks to finish
### Changed
diff --git a/src/DotPulsar/Internal/MessageProcessor.cs
b/src/DotPulsar/Internal/MessageProcessor.cs
index caf7020..3ce7492 100644
--- a/src/DotPulsar/Internal/MessageProcessor.cs
+++ b/src/DotPulsar/Internal/MessageProcessor.cs
@@ -38,6 +38,7 @@ public sealed class MessageProcessor<TMessage> : IDisposable
private readonly bool _ensureOrderedAcknowledgment;
private readonly int _maxDegreeOfParallelism;
private readonly int _maxMessagesPerTask;
+ private readonly TimeSpan _shutdownGracePeriod;
private readonly TaskScheduler _taskScheduler;
public MessageProcessor(
@@ -81,6 +82,7 @@ public sealed class MessageProcessor<TMessage> : IDisposable
_ensureOrderedAcknowledgment = options.EnsureOrderedAcknowledgment;
_maxDegreeOfParallelism = options.MaxDegreeOfParallelism;
_maxMessagesPerTask = options.MaxMessagesPerTask;
+ _shutdownGracePeriod = options.ShutdownGracePeriod;
_taskScheduler = options.TaskScheduler;
}
@@ -92,22 +94,36 @@ public sealed class MessageProcessor<TMessage> : IDisposable
public async ValueTask Process(CancellationToken cancellationToken)
{
+ using var cts = new CancellationTokenSource();
+ using var registration = Link(cts, cancellationToken);
+
for (var i = 1; i < _maxDegreeOfParallelism; ++i)
{
- StartNewProcessorTask(cancellationToken);
+ StartNewProcessorTask(cts.Token);
}
while (true)
{
- StartNewProcessorTask(cancellationToken);
+ if (!cancellationToken.IsCancellationRequested)
+ StartNewProcessorTask(cts.Token);
+
+ if (_processorTasks.Count == 0)
+ return;
+
var completedTask = await
Task.WhenAny(_processorTasks).ConfigureAwait(false);
if (completedTask.IsFaulted)
ExceptionDispatchInfo.Capture(completedTask.Exception!.InnerException!).Throw();
_processorTasks.Remove(completedTask);
- cancellationToken.ThrowIfCancellationRequested();
}
}
+ private CancellationTokenRegistration Link(CancellationTokenSource cts,
CancellationToken cancellationToken)
+ {
+ return _shutdownGracePeriod != TimeSpan.Zero
+ ? cancellationToken.Register(() =>
cts.CancelAfter(_shutdownGracePeriod))
+ : cancellationToken.Register(cts.Cancel);
+ }
+
private async ValueTask Processor(CancellationToken cancellationToken)
{
var messagesProcessed = 0;
diff --git a/src/DotPulsar/ProcessingOptions.cs
b/src/DotPulsar/ProcessingOptions.cs
index 166c4c7..b1a06cc 100644
--- a/src/DotPulsar/ProcessingOptions.cs
+++ b/src/DotPulsar/ProcessingOptions.cs
@@ -28,6 +28,7 @@ public sealed class ProcessingOptions
private bool _linkTraces;
private int _maxDegreeOfParallelism;
private int _maxMessagesPerTask;
+ private TimeSpan _shutdownGracePeriod;
private TaskScheduler _taskScheduler;
/// <summary>
@@ -39,6 +40,7 @@ public sealed class ProcessingOptions
_linkTraces = false;
_maxDegreeOfParallelism = 1;
_maxMessagesPerTask = Unbounded;
+ _shutdownGracePeriod = TimeSpan.Zero;
_taskScheduler = TaskScheduler.Default;
}
@@ -90,6 +92,20 @@ public sealed class ProcessingOptions
}
}
+ /// <summary>
+ /// The amount of time we give the active tasks to finish. The Default is
TimeSpan.Zero, meaning no graceful shutdown.
+ /// </summary>
+ public TimeSpan ShutdownGracePeriod
+ {
+ get => _shutdownGracePeriod;
+ set
+ {
+ if (value < TimeSpan.Zero)
+ throw new ArgumentOutOfRangeException(nameof(value), value,
"ShutdownGracePeriod must be zero or above");
+ _shutdownGracePeriod = value;
+ }
+ }
+
/// <summary>
/// The TaskScheduler to use for scheduling tasks. The default is
TaskScheduler.Default.
/// </summary>
diff --git a/tests/DotPulsar.Tests/Internal/MessageProcessorTests.cs
b/tests/DotPulsar.Tests/Internal/MessageProcessorTests.cs
index 9a6955d..69a7ba2 100644
--- a/tests/DotPulsar.Tests/Internal/MessageProcessorTests.cs
+++ b/tests/DotPulsar.Tests/Internal/MessageProcessorTests.cs
@@ -17,10 +17,24 @@ namespace DotPulsar.Tests.Internal;
using DotPulsar.Abstractions;
using DotPulsar.Exceptions;
using DotPulsar.Internal;
+using System.Threading;
[Trait("Category", "Unit")]
-public sealed class MessageProcessorTests
+public sealed class MessageProcessorTests : IDisposable
{
+ private bool _taskHasStarted;
+ private bool _taskHasCompleted;
+ private readonly SemaphoreSlim _semaphore;
+ private readonly CancellationTokenSource _cts;
+
+ public MessageProcessorTests()
+ {
+ _taskHasStarted = false;
+ _taskHasCompleted = false;
+ _semaphore = new SemaphoreSlim(1);
+ _cts = new CancellationTokenSource();
+ }
+
[Theory]
[InlineAutoData(SubscriptionType.Shared)]
[InlineAutoData(SubscriptionType.KeyShared)]
@@ -31,6 +45,7 @@ public sealed class MessageProcessorTests
{
//Arrange
consumer.SubscriptionType.Returns(subscriptionType);
+ ValueTask ProcessMessage(IMessage<byte[]> _1, CancellationToken _2) =>
ValueTask.CompletedTask;
//Act
var exception = Record.Exception(() => new
MessageProcessor<byte[]>(consumer, ProcessMessage, options));
@@ -39,5 +54,67 @@ public sealed class MessageProcessorTests
exception.Should().BeOfType<ProcessingException>();
}
- private static ValueTask ProcessMessage(IMessage<byte[]> _1,
CancellationToken _2) => ValueTask.CompletedTask;
+ [Theory, AutoData]
+ public async Task
Process_GivenNoShutdownGracePeriod_ShouldNotLetTaskComplete(
+ [AutoFixture.Xunit2.Frozen] IConsumer<byte[]> consumer,
+ ProcessingOptions options)
+ {
+ //Arrange
+ consumer.Receive(Arg.Any<CancellationToken>()).Returns(_ =>
NewMessage());
+ var uut = new MessageProcessor<byte[]>(consumer, ProcessMessage,
options);
+
+ //Act
+ await _semaphore.WaitAsync();
+ var processTask = uut.Process(_cts.Token).AsTask();
+ while (!_taskHasStarted) { }
+ _cts.Cancel();
+ await processTask;
+ _semaphore.Release();
+
+ //Assert
+ _taskHasCompleted.Should().BeFalse();
+ }
+
+ [Theory, AutoData]
+ public async Task Process_GivenShutdownGracePeriod_ShouldLetTaskComplete(
+ [AutoFixture.Xunit2.Frozen] IConsumer<byte[]> consumer,
+ ProcessingOptions options)
+ {
+ //Arrange
+ options.ShutdownGracePeriod = TimeSpan.FromHours(1);
+ consumer.Receive(Arg.Any<CancellationToken>()).Returns(_ =>
NewMessage());
+ var uut = new MessageProcessor<byte[]>(consumer, ProcessMessage,
options);
+
+ //Act
+ await _semaphore.WaitAsync();
+ var processTask = uut.Process(_cts.Token).AsTask();
+ while (!_taskHasStarted) { }
+ _cts.Cancel();
+ _semaphore.Release();
+ await processTask;
+
+ //Assert
+ _taskHasCompleted.Should().BeTrue();
+ }
+
+ private async ValueTask ProcessMessage(IMessage<byte[]> _,
CancellationToken token)
+ {
+ _taskHasStarted = true;
+ await _semaphore.WaitAsync(token);
+ _semaphore.Release();
+ _taskHasCompleted = true;
+ }
+
+ private static IMessage<byte[]> NewMessage()
+ {
+ var message = Substitute.For<IMessage<byte[]>>();
+ message.MessageId.Returns(new MessageId(0, 0, 0, 0));
+ return message;
+ }
+
+ public void Dispose()
+ {
+ _semaphore.Dispose();
+ _cts.Dispose();
+ }
}