This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 19ef3e6  Added support for multi-topic subscriptions. Needs testing.
19ef3e6 is described below

commit 19ef3e6777755c2a323f55f59173acd2dde8d778
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Thu Jan 30 11:39:21 2025 +0100

    Added support for multi-topic subscriptions. Needs testing.
---
 CHANGELOG.md                                       |   4 +
 src/DotPulsar/Abstractions/IConsumerBuilder.cs     |   7 +-
 src/DotPulsar/ConsumerOptions.cs                   |  45 ++++-
 .../Extensions/ConsumerBuilderExtensions.cs        |  11 ++
 src/DotPulsar/Internal/Consumer.cs                 | 214 ++++++++++++---------
 src/DotPulsar/Internal/ConsumerBuilder.cs          |  26 ++-
 src/DotPulsar/Internal/Reader.cs                   |   2 +
 tests/DotPulsar.Tests/Internal/ConsumerTests.cs    |  33 ++++
 8 files changed, 242 insertions(+), 100 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index f65181c..ee245a0 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -6,6 +6,10 @@ The format is based on [Keep a 
Changelog](https://keepachangelog.com/en/1.1.0/)
 
 ## [Unreleased]
 
+### Added
+
+- Multi-topic subscriptions
+
 ### Changed
 
 - Updated the protobuf-net dependency from version 3.2.45 to 3.2.46
diff --git a/src/DotPulsar/Abstractions/IConsumerBuilder.cs 
b/src/DotPulsar/Abstractions/IConsumerBuilder.cs
index 0cd5dc5..f02145a 100644
--- a/src/DotPulsar/Abstractions/IConsumerBuilder.cs
+++ b/src/DotPulsar/Abstractions/IConsumerBuilder.cs
@@ -70,10 +70,15 @@ public interface IConsumerBuilder<TMessage>
     IConsumerBuilder<TMessage> SubscriptionType(SubscriptionType type);
 
     /// <summary>
-    /// Set the topic for this consumer. This is required.
+    /// Set the topic for this consumer. This, or setting multiple topics, is 
required.
     /// </summary>
     IConsumerBuilder<TMessage> Topic(string topic);
 
+    /// <summary>
+    /// Set the topics for this consumer. This, or setting a single topic, is 
required.
+    /// </summary>
+    IConsumerBuilder<TMessage> Topics(IEnumerable<string> topics);
+
     /// <summary>
     /// Create the consumer.
     /// </summary>
diff --git a/src/DotPulsar/ConsumerOptions.cs b/src/DotPulsar/ConsumerOptions.cs
index 84f185b..669ef87 100644
--- a/src/DotPulsar/ConsumerOptions.cs
+++ b/src/DotPulsar/ConsumerOptions.cs
@@ -51,10 +51,9 @@ public sealed class ConsumerOptions<TMessage>
     /// </summary>
     public static readonly SubscriptionType DefaultSubscriptionType = 
SubscriptionType.Exclusive;
 
-    /// <summary>
-    /// Initializes a new instance using the specified subscription name and 
topic.
-    /// </summary>
-    public ConsumerOptions(string subscriptionName, string topic, 
ISchema<TMessage> schema)
+    private readonly HashSet<string> _topics;
+
+    private ConsumerOptions(string subscriptionName, ISchema<TMessage> schema, 
string topic, IEnumerable<string> topics)
     {
         InitialPosition = DefaultInitialPosition;
         PriorityLevel = DefaultPriorityLevel;
@@ -64,10 +63,27 @@ public sealed class ConsumerOptions<TMessage>
         SubscriptionType = DefaultSubscriptionType;
         SubscriptionProperties = [];
         SubscriptionName = subscriptionName;
-        Topic = topic;
         Schema = schema;
+        Topic = topic;
+        _topics = [];
+        foreach (var t in topics)
+        {
+            _topics.Add(t);
+        }
     }
 
+    /// <summary>
+    /// Initializes a new instance using the specified subscription name, 
topic and schema.
+    /// </summary>
+    public ConsumerOptions(string subscriptionName, string topic, 
ISchema<TMessage> schema)
+        : this (subscriptionName, schema, topic, Array.Empty<string>()) { }
+
+    /// <summary>
+    /// Initializes a new instance using the specified subscription name, 
topics and schema.
+    /// </summary>
+    public ConsumerOptions(string subscriptionName, IEnumerable<string> 
topics, ISchema<TMessage> schema)
+        : this(subscriptionName, schema, string.Empty, topics) { }
+
     /// <summary>
     /// Set the consumer name. This is optional.
     /// </summary>
@@ -124,7 +140,24 @@ public sealed class ConsumerOptions<TMessage>
     public SubscriptionType SubscriptionType { get; set; }
 
     /// <summary>
-    /// Set the topic for this consumer. This is required.
+    /// Set the topic for this consumer. This, or setting multiple topics, is 
required.
     /// </summary>
     public string Topic { get; set; }
+
+    /// <summary>
+    /// Set the topics for this consumer. This, or setting a single topic, is 
required.
+    /// </summary>
+    public IEnumerable<string> Topics
+    {
+        get => _topics;
+        set
+        {
+            _topics.Clear();
+
+            foreach (var topic in value)
+            {
+                _topics.Add(topic);
+            }
+        }
+    }
 }
diff --git a/src/DotPulsar/Extensions/ConsumerBuilderExtensions.cs 
b/src/DotPulsar/Extensions/ConsumerBuilderExtensions.cs
index 91148d5..facfa26 100644
--- a/src/DotPulsar/Extensions/ConsumerBuilderExtensions.cs
+++ b/src/DotPulsar/Extensions/ConsumerBuilderExtensions.cs
@@ -57,4 +57,15 @@ public static class ConsumerBuilderExtensions
         builder.StateChangedHandler(new 
FuncStateChangedHandler<ConsumerStateChanged>(handler, cancellationToken));
         return builder;
     }
+
+    /// <summary>
+    /// Set the topics for this consumer. This, or setting a single topic, is 
required.
+    /// </summary>
+    public static IConsumerBuilder<TMessage> Topics<TMessage>(
+        this IConsumerBuilder<TMessage> builder,
+        params string[] topics)
+    {
+        builder.Topics(topics);
+        return builder;
+    }
 }
diff --git a/src/DotPulsar/Internal/Consumer.cs 
b/src/DotPulsar/Internal/Consumer.cs
index dc45883..fc6eba0 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -16,13 +16,13 @@ namespace DotPulsar.Internal;
 
 using DotPulsar.Abstractions;
 using DotPulsar.Exceptions;
+using DotPulsar.Extensions;
 using DotPulsar.Internal.Abstractions;
 using DotPulsar.Internal.Compression;
 using DotPulsar.Internal.PulsarApi;
 
 public sealed class Consumer<TMessage> : IConsumer<TMessage>
 {
-    private readonly TaskCompletionSource<IMessage<TMessage>> 
_emptyTaskCompletionSource;
     private readonly IConnectionPool _connectionPool;
     private readonly ProcessManager _processManager;
     private readonly StateManager<ConsumerState> _state;
@@ -32,13 +32,13 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
     private readonly IExecute _executor;
     private readonly SemaphoreSlim _semaphoreSlim;
     private readonly AsyncLock _lock;
-    private SubConsumer<TMessage>[] _subConsumers;
+    private readonly Dictionary<string, SubConsumer<TMessage>> _subConsumers;
+    private readonly LinkedList<Task<IMessage<TMessage>>> _receiveTasks;
+    private Dictionary<string, SubConsumer<TMessage>>.Enumerator 
_receiveEnumerator;
+    private SubConsumer<TMessage>? _singleSubConsumer;
     private bool _allSubConsumersAreReady;
     private int _isDisposed;
-    private bool _isPartitionedTopic;
-    private int _numberOfPartitions;
-    private Task<IMessage<TMessage>>[] _receiveTasks;
-    private int _subConsumerIndex;
+    private int _numberOfSubConsumers;
     private Exception? _faultException;
 
     public Uri ServiceUrl { get; }
@@ -58,8 +58,11 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
         ServiceUrl = serviceUrl;
         SubscriptionName = consumerOptions.SubscriptionName;
         SubscriptionType = consumerOptions.SubscriptionType;
-        Topic = consumerOptions.Topic;
-        _receiveTasks = Array.Empty<Task<IMessage<TMessage>>>();
+        if (!string.IsNullOrEmpty(consumerOptions.Topic))
+            Topic = consumerOptions.Topic;
+        else
+            Topic = string.Join(",", consumerOptions.Topics);
+        _receiveTasks = [];
         _cts = new CancellationTokenSource();
         _exceptionHandler = exceptionHandler;
         _semaphoreSlim = new SemaphoreSlim(1);
@@ -68,12 +71,11 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
         _consumerOptions = consumerOptions;
         _connectionPool = connectionPool;
         _exceptionHandler = exceptionHandler;
-        _isPartitionedTopic = false;
         _allSubConsumersAreReady = false;
         _isDisposed = 0;
-        _subConsumers = Array.Empty<SubConsumer<TMessage>>();
-
-        _emptyTaskCompletionSource = new 
TaskCompletionSource<IMessage<TMessage>>();
+        _subConsumers = [];
+        _receiveEnumerator = _subConsumers.GetEnumerator();
+        _singleSubConsumer = null;
 
         _ = Setup();
     }
@@ -98,42 +100,62 @@ public sealed class Consumer<TMessage> : 
IConsumer<TMessage>
 
     private async Task Monitor()
     {
-        _numberOfPartitions = Convert.ToInt32(await 
_connectionPool.GetNumberOfPartitions(Topic, _cts.Token).ConfigureAwait(false));
-        _isPartitionedTopic = _numberOfPartitions != 0;
-        var numberOfSubConsumers = _isPartitionedTopic ? _numberOfPartitions : 
1;
-        _receiveTasks = new Task<IMessage<TMessage>>[numberOfSubConsumers];
-        _subConsumers = new SubConsumer<TMessage>[numberOfSubConsumers];
-        var monitoringTasks = new Task<ConsumerState>[numberOfSubConsumers];
-        var states = new ConsumerState[numberOfSubConsumers];
-        _subConsumerIndex = _isPartitionedTopic ? -1 : 0;
-
-        for (var i = 0; i < numberOfSubConsumers; i++)
+        var userDefinedTopics = new List<string>(_consumerOptions.Topics);
+        if (!string.IsNullOrEmpty(_consumerOptions.Topic))
+            userDefinedTopics.Add(_consumerOptions.Topic);
+
+        var topics = new List<string>();
+        foreach (var topic in userDefinedTopics)
+        {
+            var numberOfPartitions = await 
_connectionPool.GetNumberOfPartitions(topic, _cts.Token).ConfigureAwait(false);
+            if (numberOfPartitions == 0)
+            {
+                topics.Add(topic);
+                continue;
+            }
+            
+            for (var i = 0; i < numberOfPartitions; ++i)
+            {
+                topics.Add(GetPartitionedTopicName(topic, i));
+            }
+        }
+
+        _numberOfSubConsumers = topics.Count;
+        var monitoringTasks = new 
Task<ConsumerStateChanged>[_numberOfSubConsumers];
+        var states = new ConsumerState[_numberOfSubConsumers];
+
+        for (var i = 0; i < _numberOfSubConsumers; ++i)
         {
-            _receiveTasks[i] = _emptyTaskCompletionSource.Task;
-            var topicName = _isPartitionedTopic ? GetPartitionedTopicName(i) : 
Topic;
-            _subConsumers[i] = CreateSubConsumer(topicName);
-            monitoringTasks[i] = 
_subConsumers[i].State.OnStateChangeFrom(ConsumerState.Disconnected, 
_cts.Token).AsTask();
+            var topic = topics[i];
+            var subConsumer = CreateSubConsumer(topic);
+            _subConsumers[topic] = subConsumer;
+            monitoringTasks[i] = 
subConsumer.StateChangedFrom(ConsumerState.Disconnected, _cts.Token).AsTask();
         }
 
-        _allSubConsumersAreReady = true;
+        if (_numberOfSubConsumers == 1)
+            _singleSubConsumer = _subConsumers.First().Value;
+
+        _receiveEnumerator = _subConsumers.GetEnumerator();
+        _receiveEnumerator.MoveNext();_allSubConsumersAreReady = true;
         _semaphoreSlim.Release();
 
         while (true)
         {
             await Task.WhenAny(monitoringTasks).ConfigureAwait(false);
 
-            for (var i = 0; i < numberOfSubConsumers; ++i)
+            for (var i = 0; i < _numberOfSubConsumers; ++i)
             {
                 var task = monitoringTasks[i];
                 if (!task.IsCompleted)
                     continue;
 
-                var state = task.Result;
+                var consumerStateChanged = task.Result;
+                var state = consumerStateChanged.ConsumerState;
                 states[i] = state;
-                monitoringTasks[i] = 
_subConsumers[i].State.OnStateChangeFrom(state, _cts.Token).AsTask();
+                monitoringTasks[i] = 
consumerStateChanged.Consumer.StateChangedFrom(state, _cts.Token).AsTask();
             }
 
-            if (!_isPartitionedTopic)
+            if (_singleSubConsumer is not null)
                 _state.SetState(states[0]);
             else if (states.Any(x => x == ConsumerState.Faulted))
                 _state.SetState(ConsumerState.Faulted);
@@ -164,8 +186,7 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
 
         foreach (var subConsumer in _subConsumers)
         {
-            if (subConsumer is not null)
-                await subConsumer.DisposeAsync().ConfigureAwait(false);
+            await subConsumer.Value.DisposeAsync().ConfigureAwait(false);
         }
 
         await _lock.DisposeAsync().ConfigureAwait(false);
@@ -176,60 +197,77 @@ public sealed class Consumer<TMessage> : 
IConsumer<TMessage>
     {
         await Guard(cancellationToken).ConfigureAwait(false);
 
-        if (!_isPartitionedTopic)
-            return await 
_subConsumers[_subConsumerIndex].Receive(cancellationToken).ConfigureAwait(false);
+        if (_singleSubConsumer is not null)
+            return await 
_singleSubConsumer.Receive(cancellationToken).ConfigureAwait(false);
 
-        var iterations = 0;
         using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
+        {
+            var startTopic = string.Empty;
+
             while (true)
             {
-                iterations++;
-                _subConsumerIndex++;
-                if (_subConsumerIndex == _subConsumers.Length)
-                    _subConsumerIndex = 0;
-
-                var receiveTask = _receiveTasks[_subConsumerIndex];
-                if (receiveTask == _emptyTaskCompletionSource.Task)
-                {
-                    var receiveTaskValueTask = 
_subConsumers[_subConsumerIndex].Receive(cancellationToken);
-                    if (receiveTaskValueTask.IsCompleted)
-                        return receiveTaskValueTask.Result;
-                    _receiveTasks[_subConsumerIndex] = 
receiveTaskValueTask.AsTask();
-                }
-                else
+                var receiveTaskNode = _receiveTasks.First;
+                while (receiveTaskNode is not null)
                 {
-                    if (receiveTask.IsCompleted)
+                    if (receiveTaskNode.Value.IsCompleted)
                     {
-                        _receiveTasks[_subConsumerIndex] = 
_emptyTaskCompletionSource.Task;
-                        return receiveTask.Result;
+                        _receiveTasks.Remove(receiveTaskNode);
+                        return receiveTaskNode.Value.Result;
                     }
+                    receiveTaskNode = receiveTaskNode.Next;
+                }
+
+                if (_receiveEnumerator.Current.Key is not null)
+                    startTopic = _receiveEnumerator.Current.Key;
+
+                if (!_receiveEnumerator.MoveNext())
+                {
+                    _receiveEnumerator = _subConsumers.GetEnumerator();
+                    _receiveEnumerator.MoveNext();
                 }
-                if (iterations == _subConsumers.Length)
+
+                var subConsumer = _receiveEnumerator.Current;
+
+                var receiveTask = subConsumer.Value.Receive(_cts.Token);
+                if (receiveTask.IsCompleted)
+                    return receiveTask.Result;
+
+                _receiveTasks.AddLast(receiveTask.AsTask());
+
+                if (startTopic == subConsumer.Key)
+                {
+                    var tcs = new TaskCompletionSource<IMessage<TMessage>>();
+                    using var registration = cancellationToken.Register(() => 
tcs.TrySetCanceled());
+                    _receiveTasks.AddLast(tcs.Task);
                     await Task.WhenAny(_receiveTasks).ConfigureAwait(false);
+                    _receiveTasks.RemoveLast();
+                    cancellationToken.ThrowIfCancellationRequested();
+                }
             }
+        }
     }
 
     public async ValueTask Acknowledge(MessageId messageId, CancellationToken 
cancellationToken)
     {
         await Guard(cancellationToken).ConfigureAwait(false);
 
-        if (!_isPartitionedTopic)
-            await _subConsumers[_subConsumerIndex].Acknowledge(messageId, 
cancellationToken).ConfigureAwait(false);
+        if (_singleSubConsumer is not null)
+            await _singleSubConsumer.Acknowledge(messageId, 
cancellationToken).ConfigureAwait(false);
         else
-            await _subConsumers[messageId.Partition].Acknowledge(messageId, 
cancellationToken).ConfigureAwait(false);
+            await _subConsumers[messageId.Topic].Acknowledge(messageId, 
cancellationToken).ConfigureAwait(false);
     }
 
     public async ValueTask Acknowledge(IEnumerable<MessageId> messageIds, 
CancellationToken cancellationToken = default)
     {
         await Guard(cancellationToken).ConfigureAwait(false);
 
-        if (!_isPartitionedTopic)
+        if (_singleSubConsumer is not null)
         {
-            await _subConsumers[_subConsumerIndex].Acknowledge(messageIds, 
cancellationToken).ConfigureAwait(false);
+            await _singleSubConsumer.Acknowledge(messageIds, 
cancellationToken).ConfigureAwait(false);
             return;
         }
 
-        var groupedMessageIds = messageIds.GroupBy(messageIds => 
messageIds.Partition);
+        var groupedMessageIds = messageIds.GroupBy(messageIds => 
messageIds.Topic);
         var acknowledgeTasks = new List<Task>();
         foreach (var group in groupedMessageIds)
         {
@@ -243,23 +281,23 @@ public sealed class Consumer<TMessage> : 
IConsumer<TMessage>
     {
         await Guard(cancellationToken).ConfigureAwait(false);
 
-        if (!_isPartitionedTopic)
-            await 
_subConsumers[_subConsumerIndex].AcknowledgeCumulative(messageId, 
cancellationToken).ConfigureAwait(false);
+        if (_singleSubConsumer is not null)
+            await _singleSubConsumer.AcknowledgeCumulative(messageId, 
cancellationToken).ConfigureAwait(false);
         else
-            await 
_subConsumers[messageId.Partition].AcknowledgeCumulative(messageId, 
cancellationToken).ConfigureAwait(false);
+            await 
_subConsumers[messageId.Topic].AcknowledgeCumulative(messageId, 
cancellationToken).ConfigureAwait(false);
     }
 
     public async ValueTask 
RedeliverUnacknowledgedMessages(IEnumerable<MessageId> messageIds, 
CancellationToken cancellationToken)
     {
         await Guard(cancellationToken).ConfigureAwait(false);
 
-        if (!_isPartitionedTopic)
+        if (_singleSubConsumer is not null)
         {
-            await 
_subConsumers[_subConsumerIndex].RedeliverUnacknowledgedMessages(messageIds, 
cancellationToken).ConfigureAwait(false);
+            await 
_singleSubConsumer.RedeliverUnacknowledgedMessages(messageIds, 
cancellationToken).ConfigureAwait(false);
             return;
         }
 
-        var groupedMessageIds = messageIds.GroupBy(messageIds => 
messageIds.Partition);
+        var groupedMessageIds = messageIds.GroupBy(messageIds => 
messageIds.Topic);
         var redeliverTasks = new List<Task>();
         foreach (var group in groupedMessageIds)
         {
@@ -272,16 +310,16 @@ public sealed class Consumer<TMessage> : 
IConsumer<TMessage>
     {
         await Guard(cancellationToken).ConfigureAwait(false);
 
-        if (!_isPartitionedTopic)
+        if (_singleSubConsumer is not null)
         {
-            await 
_subConsumers[_subConsumerIndex].RedeliverUnacknowledgedMessages(cancellationToken).ConfigureAwait(false);
+            await 
_singleSubConsumer.RedeliverUnacknowledgedMessages(cancellationToken).ConfigureAwait(false);
             return;
         }
 
-        var redeliverTasks = new List<Task>(_numberOfPartitions);
+        var redeliverTasks = new List<Task>(_numberOfSubConsumers);
         foreach (var subConsumer in _subConsumers)
         {
-            
redeliverTasks.Add(subConsumer.RedeliverUnacknowledgedMessages(cancellationToken).AsTask());
+            
redeliverTasks.Add(subConsumer.Value.RedeliverUnacknowledgedMessages(cancellationToken).AsTask());
         }
         await Task.WhenAll(redeliverTasks).ConfigureAwait(false);
     }
@@ -290,16 +328,16 @@ public sealed class Consumer<TMessage> : 
IConsumer<TMessage>
     {
         await Guard(cancellationToken).ConfigureAwait(false);
 
-        if (!_isPartitionedTopic)
+        if (_singleSubConsumer is not null)
         {
-            await 
_subConsumers[_subConsumerIndex].Unsubscribe(cancellationToken).ConfigureAwait(false);
+            await 
_singleSubConsumer.Unsubscribe(cancellationToken).ConfigureAwait(false);
         }
         else
         {
-            var unsubscribeTasks = new List<Task>(_numberOfPartitions);
+            var unsubscribeTasks = new List<Task>(_numberOfSubConsumers);
             foreach (var subConsumer in _subConsumers)
             {
-                var getLastMessageIdTask = 
subConsumer.Unsubscribe(cancellationToken);
+                var getLastMessageIdTask = 
subConsumer.Value.Unsubscribe(cancellationToken);
                 unsubscribeTasks.Add(getLastMessageIdTask.AsTask());
             }
 
@@ -313,16 +351,16 @@ public sealed class Consumer<TMessage> : 
IConsumer<TMessage>
     {
         await Guard(cancellationToken).ConfigureAwait(false);
 
-        if (!_isPartitionedTopic)
+        if (_singleSubConsumer is not null)
         {
-            await _subConsumers[_subConsumerIndex].Seek(messageId, 
cancellationToken).ConfigureAwait(false);
+            await _singleSubConsumer.Seek(messageId, 
cancellationToken).ConfigureAwait(false);
             return;
         }
 
-        var seekTasks = new List<Task>(_numberOfPartitions);
+        var seekTasks = new List<Task>(_numberOfSubConsumers);
         foreach (var subConsumer in _subConsumers)
         {
-            var getLastMessageIdTask = subConsumer.Seek(messageId, 
cancellationToken);
+            var getLastMessageIdTask = subConsumer.Value.Seek(messageId, 
cancellationToken);
             seekTasks.Add(getLastMessageIdTask.AsTask());
         }
         await Task.WhenAll(seekTasks).ConfigureAwait(false);
@@ -332,16 +370,16 @@ public sealed class Consumer<TMessage> : 
IConsumer<TMessage>
     {
         await Guard(cancellationToken).ConfigureAwait(false);
 
-        if (!_isPartitionedTopic)
+        if (_singleSubConsumer is not null)
         {
-            await _subConsumers[_subConsumerIndex].Seek(publishTime, 
cancellationToken).ConfigureAwait(false);
+            await _singleSubConsumer.Seek(publishTime, 
cancellationToken).ConfigureAwait(false);
             return;
         }
 
-        var seekTasks = new List<Task>(_numberOfPartitions);
+        var seekTasks = new List<Task>(_numberOfSubConsumers);
         foreach (var subConsumer in _subConsumers)
         {
-            var getLastMessageIdTask = subConsumer.Seek(publishTime, 
cancellationToken);
+            var getLastMessageIdTask = subConsumer.Value.Seek(publishTime, 
cancellationToken);
             seekTasks.Add(getLastMessageIdTask.AsTask());
         }
         await Task.WhenAll(seekTasks).ConfigureAwait(false);
@@ -351,14 +389,14 @@ public sealed class Consumer<TMessage> : 
IConsumer<TMessage>
     {
         await Guard(cancellationToken).ConfigureAwait(false);
 
-        if (!_isPartitionedTopic)
-            return [await 
_subConsumers[_subConsumerIndex].GetLastMessageId(cancellationToken).ConfigureAwait(false)];
+        if (_singleSubConsumer is not null)
+            return [await 
_singleSubConsumer.GetLastMessageId(cancellationToken).ConfigureAwait(false)];
 
-        var getLastMessageIdsTasks = new 
List<Task<MessageId>>(_numberOfPartitions);
+        var getLastMessageIdsTasks = new 
List<Task<MessageId>>(_numberOfSubConsumers);
 
         foreach (var subConsumer in _subConsumers)
         {
-            var getLastMessageIdTask = 
subConsumer.GetLastMessageId(cancellationToken);
+            var getLastMessageIdTask = 
subConsumer.Value.GetLastMessageId(cancellationToken);
             getLastMessageIdsTasks.Add(getLastMessageIdTask.AsTask());
         }
 
@@ -367,7 +405,7 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
 
         //collect MessageIds
         var messageIds = new List<MessageId>();
-        for (var i = 0; i < _subConsumers.Length; i++)
+        for (var i = 0; i < _subConsumers.Count; i++)
         {
             messageIds.Add(getLastMessageIdsTasks[i].Result);
         }
@@ -412,7 +450,7 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
         return subConsumer;
     }
 
-    private string GetPartitionedTopicName(int partitionNumber) => 
$"{Topic}-partition-{partitionNumber}";
+    private string GetPartitionedTopicName(string topic, int partitionNumber) 
=> $"{topic}-partition-{partitionNumber}";
 
     private static StateManager<ConsumerState> CreateStateManager()
         => new(ConsumerState.Disconnected, ConsumerState.Closed, 
ConsumerState.ReachedEndOfTopic, ConsumerState.Faulted);
diff --git a/src/DotPulsar/Internal/ConsumerBuilder.cs 
b/src/DotPulsar/Internal/ConsumerBuilder.cs
index f16883e..28f22fc 100644
--- a/src/DotPulsar/Internal/ConsumerBuilder.cs
+++ b/src/DotPulsar/Internal/ConsumerBuilder.cs
@@ -30,7 +30,8 @@ public sealed class ConsumerBuilder<TMessage> : 
IConsumerBuilder<TMessage>
     private string? _subscriptionName;
     private readonly Dictionary<string, string> _subscriptionProperties;
     private SubscriptionType _subscriptionType;
-    private string? _topic;
+    private string _topic;
+    private readonly HashSet<string> _topics;
     private IHandleStateChanged<ConsumerStateChanged>? _stateChangedHandler;
 
     public ConsumerBuilder(IPulsarClient pulsarClient, ISchema<TMessage> 
schema)
@@ -44,6 +45,8 @@ public sealed class ConsumerBuilder<TMessage> : 
IConsumerBuilder<TMessage>
         _replicateSubscriptionState = 
ConsumerOptions<TMessage>.DefaultReplicateSubscriptionState;
         _subscriptionProperties = [];
         _subscriptionType = ConsumerOptions<TMessage>.DefaultSubscriptionType;
+        _topic = string.Empty;
+        _topics = [];
     }
 
     public IConsumerBuilder<TMessage> ConsumerName(string name)
@@ -112,15 +115,27 @@ public sealed class ConsumerBuilder<TMessage> : 
IConsumerBuilder<TMessage>
         return this;
     }
 
+    public IConsumerBuilder<TMessage> Topics(IEnumerable<string> topics)
+    {
+        _topics.Clear();
+
+        foreach (var topic in topics)
+        {
+            _topics.Add(topic);
+        }
+
+        return this;
+    }
+
     public IConsumer<TMessage> Create()
     {
         if (string.IsNullOrEmpty(_subscriptionName))
             throw new ConfigurationException("SubscriptionName may not be null 
or empty");
 
-        if (string.IsNullOrEmpty(_topic))
-            throw new ConfigurationException("Topic may not be null or empty");
+        if (string.IsNullOrEmpty(_topic) && _topics.Count == 0)
+            throw new ConfigurationException("A 'Topic' or multiple 'Topics' 
must be set");
 
-        var options = new ConsumerOptions<TMessage>(_subscriptionName!, 
_topic!, _schema)
+        var options = new ConsumerOptions<TMessage>(_subscriptionName!, 
_topic, _schema)
         {
             ConsumerName = _consumerName,
             InitialPosition = _initialPosition,
@@ -130,7 +145,8 @@ public sealed class ConsumerBuilder<TMessage> : 
IConsumerBuilder<TMessage>
             ReplicateSubscriptionState = _replicateSubscriptionState,
             StateChangedHandler = _stateChangedHandler,
             SubscriptionProperties = _subscriptionProperties,
-            SubscriptionType = _subscriptionType
+            SubscriptionType = _subscriptionType,
+            Topics = _topics
         };
 
         return _pulsarClient.CreateConsumer(options);
diff --git a/src/DotPulsar/Internal/Reader.cs b/src/DotPulsar/Internal/Reader.cs
index 2043927..ed84add 100644
--- a/src/DotPulsar/Internal/Reader.cs
+++ b/src/DotPulsar/Internal/Reader.cs
@@ -177,6 +177,7 @@ public sealed class Reader<TMessage> : IReader<TMessage>
 
         var iterations = 0;
         using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
+        {
             while (true)
             {
                 iterations++;
@@ -203,6 +204,7 @@ public sealed class Reader<TMessage> : IReader<TMessage>
                 if (iterations == _subReaders.Length)
                     await Task.WhenAny(_receiveTasks).ConfigureAwait(false);
             }
+        }
     }
 
     public async ValueTask Seek(MessageId messageId, CancellationToken 
cancellationToken)
diff --git a/tests/DotPulsar.Tests/Internal/ConsumerTests.cs 
b/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
index 516bad4..dfb76e0 100644
--- a/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
+++ b/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
@@ -145,6 +145,31 @@ public sealed class ConsumerTests : IDisposable
         consumed.ShouldBe(produced, true);
     }
 
+    [Fact]
+    public async Task Receive_GivenMultipleTopics_ShouldReceiveAll()
+    {
+        //Arrange
+        const int numberOfMessages = 100;
+        const int partitions = 3;
+
+        var topic = await _fixture.CreateTopic(_cts.Token);
+        var partitionedTopic = await 
_fixture.CreatePartitionedTopic(partitions, _cts.Token);
+
+        await using var client = CreateClient();
+        await using var consumer = CreateConsumer(client, [topic, 
partitionedTopic]);
+        await using var producer = CreateProducer(client, topic);
+        await using var partitionedProducer = CreateProducer(client, 
partitionedTopic);
+
+        //Act
+        var produced = new List<MessageId>();
+        produced.AddRange(await ProduceMessages(producer, numberOfMessages, 
"test-message", _cts.Token));
+        produced.AddRange(await ProduceMessages(partitionedProducer, 
numberOfMessages, "test-message", _cts.Token));
+        var consumed = await ConsumeMessages(consumer, produced.Count, 
_cts.Token);
+
+        //Assert
+        consumed.ShouldBe(produced, true);
+    }
+
     [Fact]
     public async Task 
Receive_WhenFaultedAfterInvokingReceive_ShouldThrowConsumerFaultedException()
     {
@@ -338,6 +363,14 @@ public sealed class ConsumerTests : IDisposable
         .StateChangedHandler(_testOutputHelper.Log)
         .Create();
 
+    private IConsumer<string> CreateConsumer(IPulsarClient pulsarClient, 
IEnumerable<string> topics)
+        => pulsarClient.NewConsumer(Schema.String)
+        .InitialPosition(SubscriptionInitialPosition.Earliest)
+        .SubscriptionName(CreateSubscriptionName())
+        .Topics(topics)
+        .StateChangedHandler(_testOutputHelper.Log)
+        .Create();
+
     private IPulsarClient CreateClient()
         => PulsarClient
         .Builder()

Reply via email to