This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 64b05cf Added Acknowledge(IEnumerable<MessageId> messageIds,
CancellationToken cancellationToken) to the consumer
64b05cf is described below
commit 64b05cfc47cdf1580a3ae7a063390762ff8ba1d3
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Thu Sep 19 12:46:56 2024 +0200
Added Acknowledge(IEnumerable<MessageId> messageIds, CancellationToken
cancellationToken) to the consumer
---
CHANGELOG.md | 4 +++
benchmarks/Compression/Compression.csproj | 2 +-
src/DotPulsar/Abstractions/IConsumer.cs | 5 +++
src/DotPulsar/Internal/Consumer.cs | 53 ++++++++++++++++---------------
src/DotPulsar/Internal/SubConsumer.cs | 26 +++++++++++++++
5 files changed, 64 insertions(+), 26 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 023ff2e..6aa11c4 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -6,6 +6,10 @@ The format is based on [Keep a
Changelog](https://keepachangelog.com/en/1.1.0/)
## [Unreleased]
+### Added
+
+- Multiple messages can now be acknowledged with
Acknowledge(IEnumerable\<MessageId> messageIds, CancellationToken
cancellationToken)
+
### Changed
- Updated the Microsoft.Extensions.ObjectPool dependency from version 8.0.7 to
8.0.8
diff --git a/benchmarks/Compression/Compression.csproj
b/benchmarks/Compression/Compression.csproj
index 2bb1057..41f8afb 100644
--- a/benchmarks/Compression/Compression.csproj
+++ b/benchmarks/Compression/Compression.csproj
@@ -10,7 +10,7 @@
<ItemGroup>
<PackageReference Include="BenchmarkDotNet" Version="0.14.0" />
<PackageReference Include="DotNetZip" Version="1.16.0" />
- <PackageReference Include="Google.Protobuf" Version="3.28.1" />
+ <PackageReference Include="Google.Protobuf" Version="3.28.2" />
<PackageReference Include="Grpc.Tools" Version="2.66.0">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers;
buildtransitive</IncludeAssets>
diff --git a/src/DotPulsar/Abstractions/IConsumer.cs
b/src/DotPulsar/Abstractions/IConsumer.cs
index 55ef779..5f90a38 100644
--- a/src/DotPulsar/Abstractions/IConsumer.cs
+++ b/src/DotPulsar/Abstractions/IConsumer.cs
@@ -24,6 +24,11 @@ public interface IConsumer : IGetLastMessageIds, ISeek,
IState<ConsumerState>, I
/// </summary>
ValueTask Acknowledge(MessageId messageId, CancellationToken
cancellationToken = default);
+ /// <summary>
+ /// Acknowledge the consumption of multiple messages using the MessageIds.
+ /// </summary>
+ ValueTask Acknowledge(IEnumerable<MessageId> messageIds, CancellationToken
cancellationToken = default);
+
/// <summary>
/// Acknowledge the consumption of all the messages in the topic up to and
including the provided MessageId.
/// </summary>
diff --git a/src/DotPulsar/Internal/Consumer.cs
b/src/DotPulsar/Internal/Consumer.cs
index 18e326d..0e85429 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -230,6 +230,26 @@ public sealed class Consumer<TMessage> :
IConsumer<TMessage>
await _subConsumers[messageId.Partition].Acknowledge(messageId,
cancellationToken).ConfigureAwait(false);
}
+ public async ValueTask Acknowledge(IEnumerable<MessageId> messageIds,
CancellationToken cancellationToken = default)
+ {
+ await Guard(cancellationToken).ConfigureAwait(false);
+
+ if (!_isPartitionedTopic)
+ {
+ await _subConsumers[_subConsumerIndex].Acknowledge(messageIds,
cancellationToken).ConfigureAwait(false);
+ return;
+ }
+
+ var groupedMessageIds = messageIds.GroupBy(messageIds =>
messageIds.Partition);
+ var acknowledgeTasks = new List<Task>();
+ foreach (var group in groupedMessageIds)
+ {
+ acknowledgeTasks.Add(_subConsumers[group.Key].Acknowledge(group,
cancellationToken).AsTask());
+ }
+
+ await Task.WhenAll(acknowledgeTasks).ConfigureAwait(false);
+ }
+
public async ValueTask AcknowledgeCumulative(MessageId messageId,
CancellationToken cancellationToken)
{
await Guard(cancellationToken).ConfigureAwait(false);
@@ -250,32 +270,13 @@ public sealed class Consumer<TMessage> :
IConsumer<TMessage>
return;
}
- var messageIdSortedIntoTopics = new Dictionary<int,
LinkedList<MessageId>>(_numberOfPartitions);
- //sort messageIds into topics
- foreach (var messageId in messageIds)
- {
- if (messageIdSortedIntoTopics.ContainsKey(messageId.Partition))
- {
-
messageIdSortedIntoTopics[messageId.Partition].AddLast(messageId);
- }
- else
- {
- var linkedList = new LinkedList<MessageId>();
- linkedList.AddLast(messageId);
- messageIdSortedIntoTopics.Add(messageId.Partition, linkedList);
- }
- }
- var redeliverUnacknowledgedMessagesTasks = new
Task[messageIdSortedIntoTopics.Count];
- var iterations = -1;
- //Collect tasks from _subConsumers RedeliverUnacknowledgedMessages
without waiting
- foreach (var messageIdSortedByPartition in messageIdSortedIntoTopics)
+ var groupedMessageIds = messageIds.GroupBy(messageIds =>
messageIds.Partition);
+ var redeliverTasks = new List<Task>();
+ foreach (var group in groupedMessageIds)
{
- iterations++;
- var task =
_subConsumers[messageIdSortedByPartition.Key].RedeliverUnacknowledgedMessages(messageIdSortedByPartition.Value,
cancellationToken).AsTask();
- redeliverUnacknowledgedMessagesTasks[iterations] = task;
+
redeliverTasks.Add(_subConsumers[group.Key].RedeliverUnacknowledgedMessages(group,
cancellationToken).AsTask());
}
- //await all of the tasks.
- await
Task.WhenAll(redeliverUnacknowledgedMessagesTasks).ConfigureAwait(false);
+ await Task.WhenAll(redeliverTasks).ConfigureAwait(false);
}
public async ValueTask RedeliverUnacknowledgedMessages(CancellationToken
cancellationToken)
@@ -288,10 +289,12 @@ public sealed class Consumer<TMessage> :
IConsumer<TMessage>
return;
}
+ var redeliverTasks = new List<Task>(_numberOfPartitions);
foreach (var subConsumer in _subConsumers)
{
- await
subConsumer.RedeliverUnacknowledgedMessages(cancellationToken).ConfigureAwait(false);
+
redeliverTasks.Add(subConsumer.RedeliverUnacknowledgedMessages(cancellationToken).AsTask());
}
+ await Task.WhenAll(redeliverTasks).ConfigureAwait(false);
}
public async ValueTask Unsubscribe(CancellationToken cancellationToken)
diff --git a/src/DotPulsar/Internal/SubConsumer.cs
b/src/DotPulsar/Internal/SubConsumer.cs
index c9eb3e5..a6001fe 100644
--- a/src/DotPulsar/Internal/SubConsumer.cs
+++ b/src/DotPulsar/Internal/SubConsumer.cs
@@ -100,6 +100,9 @@ public sealed class SubConsumer<TMessage> :
IConsumer<TMessage>, IContainsChanne
public async ValueTask Acknowledge(MessageId messageId, CancellationToken
cancellationToken)
=> await InternalAcknowledge(messageId, CommandAck.AckType.Individual,
cancellationToken).ConfigureAwait(false);
+ public async ValueTask Acknowledge(IEnumerable<MessageId> messageIds,
CancellationToken cancellationToken = default)
+ => await InternalAcknowledge(messageIds,
cancellationToken).ConfigureAwait(false);
+
public async ValueTask AcknowledgeCumulative(MessageId messageId,
CancellationToken cancellationToken)
=> await InternalAcknowledge(messageId, CommandAck.AckType.Cumulative,
cancellationToken).ConfigureAwait(false);
@@ -220,6 +223,29 @@ public sealed class SubConsumer<TMessage> :
IConsumer<TMessage>, IContainsChanne
_commandAckPool.Return(commandAck);
}
}
+
+ private async ValueTask InternalAcknowledge(IEnumerable<MessageId>
messageIds, CancellationToken cancellationToken)
+ {
+ var commandAck = _commandAckPool.Get();
+ commandAck.Type = CommandAck.AckType.Individual;
+ commandAck.MessageIds.Clear();
+
+ foreach (var messageId in messageIds)
+ {
+ commandAck.MessageIds.Add(messageId.ToMessageIdData());
+ }
+
+ try
+ {
+ await _executor.Execute(() => InternalAcknowledge(commandAck,
cancellationToken), cancellationToken).ConfigureAwait(false);
+ }
+ finally
+ {
+ commandAck.MessageIds.Clear();
+ _commandAckPool.Return(commandAck);
+ }
+ }
+
public ValueTask<IEnumerable<MessageId>>
GetLastMessageIds(CancellationToken cancellationToken = default) =>
throw new NotImplementedException();
}