This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 64b05cf  Added Acknowledge(IEnumerable<MessageId> messageIds, 
CancellationToken cancellationToken) to the consumer
64b05cf is described below

commit 64b05cfc47cdf1580a3ae7a063390762ff8ba1d3
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Thu Sep 19 12:46:56 2024 +0200

    Added Acknowledge(IEnumerable<MessageId> messageIds, CancellationToken 
cancellationToken) to the consumer
---
 CHANGELOG.md                              |  4 +++
 benchmarks/Compression/Compression.csproj |  2 +-
 src/DotPulsar/Abstractions/IConsumer.cs   |  5 +++
 src/DotPulsar/Internal/Consumer.cs        | 53 ++++++++++++++++---------------
 src/DotPulsar/Internal/SubConsumer.cs     | 26 +++++++++++++++
 5 files changed, 64 insertions(+), 26 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 023ff2e..6aa11c4 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -6,6 +6,10 @@ The format is based on [Keep a 
Changelog](https://keepachangelog.com/en/1.1.0/)
 
 ## [Unreleased]
 
+### Added
+
+- Multiple messages can now be acknowledged with 
Acknowledge(IEnumerable\<MessageId> messageIds, CancellationToken 
cancellationToken)
+
 ### Changed
 
 - Updated the Microsoft.Extensions.ObjectPool dependency from version 8.0.7 to 
8.0.8
diff --git a/benchmarks/Compression/Compression.csproj 
b/benchmarks/Compression/Compression.csproj
index 2bb1057..41f8afb 100644
--- a/benchmarks/Compression/Compression.csproj
+++ b/benchmarks/Compression/Compression.csproj
@@ -10,7 +10,7 @@
   <ItemGroup>
     <PackageReference Include="BenchmarkDotNet" Version="0.14.0" />
     <PackageReference Include="DotNetZip" Version="1.16.0" />
-    <PackageReference Include="Google.Protobuf" Version="3.28.1" />
+    <PackageReference Include="Google.Protobuf" Version="3.28.2" />
     <PackageReference Include="Grpc.Tools" Version="2.66.0">
       <PrivateAssets>all</PrivateAssets>
       <IncludeAssets>runtime; build; native; contentfiles; analyzers; 
buildtransitive</IncludeAssets>
diff --git a/src/DotPulsar/Abstractions/IConsumer.cs 
b/src/DotPulsar/Abstractions/IConsumer.cs
index 55ef779..5f90a38 100644
--- a/src/DotPulsar/Abstractions/IConsumer.cs
+++ b/src/DotPulsar/Abstractions/IConsumer.cs
@@ -24,6 +24,11 @@ public interface IConsumer : IGetLastMessageIds, ISeek, 
IState<ConsumerState>, I
     /// </summary>
     ValueTask Acknowledge(MessageId messageId, CancellationToken 
cancellationToken = default);
 
+    /// <summary>
+    /// Acknowledge the consumption of multiple messages using the MessageIds.
+    /// </summary>
+    ValueTask Acknowledge(IEnumerable<MessageId> messageIds, CancellationToken 
cancellationToken = default);
+
     /// <summary>
     /// Acknowledge the consumption of all the messages in the topic up to and 
including the provided MessageId.
     /// </summary>
diff --git a/src/DotPulsar/Internal/Consumer.cs 
b/src/DotPulsar/Internal/Consumer.cs
index 18e326d..0e85429 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -230,6 +230,26 @@ public sealed class Consumer<TMessage> : 
IConsumer<TMessage>
             await _subConsumers[messageId.Partition].Acknowledge(messageId, 
cancellationToken).ConfigureAwait(false);
     }
 
+    public async ValueTask Acknowledge(IEnumerable<MessageId> messageIds, 
CancellationToken cancellationToken = default)
+    {
+        await Guard(cancellationToken).ConfigureAwait(false);
+
+        if (!_isPartitionedTopic)
+        {
+            await _subConsumers[_subConsumerIndex].Acknowledge(messageIds, 
cancellationToken).ConfigureAwait(false);
+            return;
+        }
+
+        var groupedMessageIds = messageIds.GroupBy(messageIds => 
messageIds.Partition);
+        var acknowledgeTasks = new List<Task>();
+        foreach (var group in groupedMessageIds)
+        {
+            acknowledgeTasks.Add(_subConsumers[group.Key].Acknowledge(group, 
cancellationToken).AsTask());
+        }
+
+        await Task.WhenAll(acknowledgeTasks).ConfigureAwait(false);
+    }
+
     public async ValueTask AcknowledgeCumulative(MessageId messageId, 
CancellationToken cancellationToken)
     {
         await Guard(cancellationToken).ConfigureAwait(false);
@@ -250,32 +270,13 @@ public sealed class Consumer<TMessage> : 
IConsumer<TMessage>
             return;
         }
 
-        var messageIdSortedIntoTopics = new Dictionary<int, 
LinkedList<MessageId>>(_numberOfPartitions);
-        //sort messageIds into topics
-        foreach (var messageId in messageIds)
-        {
-            if (messageIdSortedIntoTopics.ContainsKey(messageId.Partition))
-            {
-                
messageIdSortedIntoTopics[messageId.Partition].AddLast(messageId);
-            }
-            else
-            {
-                var linkedList = new LinkedList<MessageId>();
-                linkedList.AddLast(messageId);
-                messageIdSortedIntoTopics.Add(messageId.Partition, linkedList);
-            }
-        }
-        var redeliverUnacknowledgedMessagesTasks = new 
Task[messageIdSortedIntoTopics.Count];
-        var iterations = -1;
-        //Collect tasks from _subConsumers RedeliverUnacknowledgedMessages 
without waiting
-        foreach (var messageIdSortedByPartition in messageIdSortedIntoTopics)
+        var groupedMessageIds = messageIds.GroupBy(messageIds => 
messageIds.Partition);
+        var redeliverTasks = new List<Task>();
+        foreach (var group in groupedMessageIds)
         {
-            iterations++;
-            var task = 
_subConsumers[messageIdSortedByPartition.Key].RedeliverUnacknowledgedMessages(messageIdSortedByPartition.Value,
 cancellationToken).AsTask();
-            redeliverUnacknowledgedMessagesTasks[iterations] = task;
+            
redeliverTasks.Add(_subConsumers[group.Key].RedeliverUnacknowledgedMessages(group,
 cancellationToken).AsTask());
         }
-        //await all of the tasks.
-        await 
Task.WhenAll(redeliverUnacknowledgedMessagesTasks).ConfigureAwait(false);
+        await Task.WhenAll(redeliverTasks).ConfigureAwait(false);
     }
 
     public async ValueTask RedeliverUnacknowledgedMessages(CancellationToken 
cancellationToken)
@@ -288,10 +289,12 @@ public sealed class Consumer<TMessage> : 
IConsumer<TMessage>
             return;
         }
 
+        var redeliverTasks = new List<Task>(_numberOfPartitions);
         foreach (var subConsumer in _subConsumers)
         {
-            await 
subConsumer.RedeliverUnacknowledgedMessages(cancellationToken).ConfigureAwait(false);
+            
redeliverTasks.Add(subConsumer.RedeliverUnacknowledgedMessages(cancellationToken).AsTask());
         }
+        await Task.WhenAll(redeliverTasks).ConfigureAwait(false);
     }
 
     public async ValueTask Unsubscribe(CancellationToken cancellationToken)
diff --git a/src/DotPulsar/Internal/SubConsumer.cs 
b/src/DotPulsar/Internal/SubConsumer.cs
index c9eb3e5..a6001fe 100644
--- a/src/DotPulsar/Internal/SubConsumer.cs
+++ b/src/DotPulsar/Internal/SubConsumer.cs
@@ -100,6 +100,9 @@ public sealed class SubConsumer<TMessage> : 
IConsumer<TMessage>, IContainsChanne
     public async ValueTask Acknowledge(MessageId messageId, CancellationToken 
cancellationToken)
         => await InternalAcknowledge(messageId, CommandAck.AckType.Individual, 
cancellationToken).ConfigureAwait(false);
 
+    public async ValueTask Acknowledge(IEnumerable<MessageId> messageIds, 
CancellationToken cancellationToken = default)
+        => await InternalAcknowledge(messageIds, 
cancellationToken).ConfigureAwait(false);
+
     public async ValueTask AcknowledgeCumulative(MessageId messageId, 
CancellationToken cancellationToken)
         => await InternalAcknowledge(messageId, CommandAck.AckType.Cumulative, 
cancellationToken).ConfigureAwait(false);
 
@@ -220,6 +223,29 @@ public sealed class SubConsumer<TMessage> : 
IConsumer<TMessage>, IContainsChanne
             _commandAckPool.Return(commandAck);
         }
     }
+
+    private async ValueTask InternalAcknowledge(IEnumerable<MessageId> 
messageIds, CancellationToken cancellationToken)
+    {
+        var commandAck = _commandAckPool.Get();
+        commandAck.Type = CommandAck.AckType.Individual;
+        commandAck.MessageIds.Clear();
+
+        foreach (var messageId in messageIds)
+        {
+            commandAck.MessageIds.Add(messageId.ToMessageIdData());
+        }
+
+        try
+        {
+            await _executor.Execute(() => InternalAcknowledge(commandAck, 
cancellationToken), cancellationToken).ConfigureAwait(false);
+        }
+        finally
+        {
+            commandAck.MessageIds.Clear();
+            _commandAckPool.Return(commandAck);
+        }
+    }
+
     public ValueTask<IEnumerable<MessageId>> 
GetLastMessageIds(CancellationToken cancellationToken = default) =>
         throw new NotImplementedException();
 }

Reply via email to