This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 19ef3e6 Added support for multi-topic subscriptions. Needs testing.
19ef3e6 is described below
commit 19ef3e6777755c2a323f55f59173acd2dde8d778
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Thu Jan 30 11:39:21 2025 +0100
Added support for multi-topic subscriptions. Needs testing.
---
CHANGELOG.md | 4 +
src/DotPulsar/Abstractions/IConsumerBuilder.cs | 7 +-
src/DotPulsar/ConsumerOptions.cs | 45 ++++-
.../Extensions/ConsumerBuilderExtensions.cs | 11 ++
src/DotPulsar/Internal/Consumer.cs | 214 ++++++++++++---------
src/DotPulsar/Internal/ConsumerBuilder.cs | 26 ++-
src/DotPulsar/Internal/Reader.cs | 2 +
tests/DotPulsar.Tests/Internal/ConsumerTests.cs | 33 ++++
8 files changed, 242 insertions(+), 100 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index f65181c..ee245a0 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -6,6 +6,10 @@ The format is based on [Keep a
Changelog](https://keepachangelog.com/en/1.1.0/)
## [Unreleased]
+### Added
+
+- Multi-topic subscriptions
+
### Changed
- Updated the protobuf-net dependency from version 3.2.45 to 3.2.46
diff --git a/src/DotPulsar/Abstractions/IConsumerBuilder.cs
b/src/DotPulsar/Abstractions/IConsumerBuilder.cs
index 0cd5dc5..f02145a 100644
--- a/src/DotPulsar/Abstractions/IConsumerBuilder.cs
+++ b/src/DotPulsar/Abstractions/IConsumerBuilder.cs
@@ -70,10 +70,15 @@ public interface IConsumerBuilder<TMessage>
IConsumerBuilder<TMessage> SubscriptionType(SubscriptionType type);
/// <summary>
- /// Set the topic for this consumer. This is required.
+ /// Set the topic for this consumer. This, or setting multiple topics, is
required.
/// </summary>
IConsumerBuilder<TMessage> Topic(string topic);
+ /// <summary>
+ /// Set the topics for this consumer. This, or setting a single topic, is
required.
+ /// </summary>
+ IConsumerBuilder<TMessage> Topics(IEnumerable<string> topics);
+
/// <summary>
/// Create the consumer.
/// </summary>
diff --git a/src/DotPulsar/ConsumerOptions.cs b/src/DotPulsar/ConsumerOptions.cs
index 84f185b..669ef87 100644
--- a/src/DotPulsar/ConsumerOptions.cs
+++ b/src/DotPulsar/ConsumerOptions.cs
@@ -51,10 +51,9 @@ public sealed class ConsumerOptions<TMessage>
/// </summary>
public static readonly SubscriptionType DefaultSubscriptionType =
SubscriptionType.Exclusive;
- /// <summary>
- /// Initializes a new instance using the specified subscription name and
topic.
- /// </summary>
- public ConsumerOptions(string subscriptionName, string topic,
ISchema<TMessage> schema)
+ private readonly HashSet<string> _topics;
+
+ private ConsumerOptions(string subscriptionName, ISchema<TMessage> schema,
string topic, IEnumerable<string> topics)
{
InitialPosition = DefaultInitialPosition;
PriorityLevel = DefaultPriorityLevel;
@@ -64,10 +63,27 @@ public sealed class ConsumerOptions<TMessage>
SubscriptionType = DefaultSubscriptionType;
SubscriptionProperties = [];
SubscriptionName = subscriptionName;
- Topic = topic;
Schema = schema;
+ Topic = topic;
+ _topics = [];
+ foreach (var t in topics)
+ {
+ _topics.Add(t);
+ }
}
+ /// <summary>
+ /// Initializes a new instance using the specified subscription name,
topic and schema.
+ /// </summary>
+ public ConsumerOptions(string subscriptionName, string topic,
ISchema<TMessage> schema)
+ : this (subscriptionName, schema, topic, Array.Empty<string>()) { }
+
+ /// <summary>
+ /// Initializes a new instance using the specified subscription name,
topics and schema.
+ /// </summary>
+ public ConsumerOptions(string subscriptionName, IEnumerable<string>
topics, ISchema<TMessage> schema)
+ : this(subscriptionName, schema, string.Empty, topics) { }
+
/// <summary>
/// Set the consumer name. This is optional.
/// </summary>
@@ -124,7 +140,24 @@ public sealed class ConsumerOptions<TMessage>
public SubscriptionType SubscriptionType { get; set; }
/// <summary>
- /// Set the topic for this consumer. This is required.
+ /// Set the topic for this consumer. This, or setting multiple topics, is
required.
/// </summary>
public string Topic { get; set; }
+
+ /// <summary>
+ /// Set the topics for this consumer. This, or setting a single topic, is
required.
+ /// </summary>
+ public IEnumerable<string> Topics
+ {
+ get => _topics;
+ set
+ {
+ _topics.Clear();
+
+ foreach (var topic in value)
+ {
+ _topics.Add(topic);
+ }
+ }
+ }
}
diff --git a/src/DotPulsar/Extensions/ConsumerBuilderExtensions.cs
b/src/DotPulsar/Extensions/ConsumerBuilderExtensions.cs
index 91148d5..facfa26 100644
--- a/src/DotPulsar/Extensions/ConsumerBuilderExtensions.cs
+++ b/src/DotPulsar/Extensions/ConsumerBuilderExtensions.cs
@@ -57,4 +57,15 @@ public static class ConsumerBuilderExtensions
builder.StateChangedHandler(new
FuncStateChangedHandler<ConsumerStateChanged>(handler, cancellationToken));
return builder;
}
+
+ /// <summary>
+ /// Set the topics for this consumer. This, or setting a single topic, is
required.
+ /// </summary>
+ public static IConsumerBuilder<TMessage> Topics<TMessage>(
+ this IConsumerBuilder<TMessage> builder,
+ params string[] topics)
+ {
+ builder.Topics(topics);
+ return builder;
+ }
}
diff --git a/src/DotPulsar/Internal/Consumer.cs
b/src/DotPulsar/Internal/Consumer.cs
index dc45883..fc6eba0 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -16,13 +16,13 @@ namespace DotPulsar.Internal;
using DotPulsar.Abstractions;
using DotPulsar.Exceptions;
+using DotPulsar.Extensions;
using DotPulsar.Internal.Abstractions;
using DotPulsar.Internal.Compression;
using DotPulsar.Internal.PulsarApi;
public sealed class Consumer<TMessage> : IConsumer<TMessage>
{
- private readonly TaskCompletionSource<IMessage<TMessage>>
_emptyTaskCompletionSource;
private readonly IConnectionPool _connectionPool;
private readonly ProcessManager _processManager;
private readonly StateManager<ConsumerState> _state;
@@ -32,13 +32,13 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
private readonly IExecute _executor;
private readonly SemaphoreSlim _semaphoreSlim;
private readonly AsyncLock _lock;
- private SubConsumer<TMessage>[] _subConsumers;
+ private readonly Dictionary<string, SubConsumer<TMessage>> _subConsumers;
+ private readonly LinkedList<Task<IMessage<TMessage>>> _receiveTasks;
+ private Dictionary<string, SubConsumer<TMessage>>.Enumerator
_receiveEnumerator;
+ private SubConsumer<TMessage>? _singleSubConsumer;
private bool _allSubConsumersAreReady;
private int _isDisposed;
- private bool _isPartitionedTopic;
- private int _numberOfPartitions;
- private Task<IMessage<TMessage>>[] _receiveTasks;
- private int _subConsumerIndex;
+ private int _numberOfSubConsumers;
private Exception? _faultException;
public Uri ServiceUrl { get; }
@@ -58,8 +58,11 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
ServiceUrl = serviceUrl;
SubscriptionName = consumerOptions.SubscriptionName;
SubscriptionType = consumerOptions.SubscriptionType;
- Topic = consumerOptions.Topic;
- _receiveTasks = Array.Empty<Task<IMessage<TMessage>>>();
+ if (!string.IsNullOrEmpty(consumerOptions.Topic))
+ Topic = consumerOptions.Topic;
+ else
+ Topic = string.Join(",", consumerOptions.Topics);
+ _receiveTasks = [];
_cts = new CancellationTokenSource();
_exceptionHandler = exceptionHandler;
_semaphoreSlim = new SemaphoreSlim(1);
@@ -68,12 +71,11 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
_consumerOptions = consumerOptions;
_connectionPool = connectionPool;
_exceptionHandler = exceptionHandler;
- _isPartitionedTopic = false;
_allSubConsumersAreReady = false;
_isDisposed = 0;
- _subConsumers = Array.Empty<SubConsumer<TMessage>>();
-
- _emptyTaskCompletionSource = new
TaskCompletionSource<IMessage<TMessage>>();
+ _subConsumers = [];
+ _receiveEnumerator = _subConsumers.GetEnumerator();
+ _singleSubConsumer = null;
_ = Setup();
}
@@ -98,42 +100,62 @@ public sealed class Consumer<TMessage> :
IConsumer<TMessage>
private async Task Monitor()
{
- _numberOfPartitions = Convert.ToInt32(await
_connectionPool.GetNumberOfPartitions(Topic, _cts.Token).ConfigureAwait(false));
- _isPartitionedTopic = _numberOfPartitions != 0;
- var numberOfSubConsumers = _isPartitionedTopic ? _numberOfPartitions :
1;
- _receiveTasks = new Task<IMessage<TMessage>>[numberOfSubConsumers];
- _subConsumers = new SubConsumer<TMessage>[numberOfSubConsumers];
- var monitoringTasks = new Task<ConsumerState>[numberOfSubConsumers];
- var states = new ConsumerState[numberOfSubConsumers];
- _subConsumerIndex = _isPartitionedTopic ? -1 : 0;
-
- for (var i = 0; i < numberOfSubConsumers; i++)
+ var userDefinedTopics = new List<string>(_consumerOptions.Topics);
+ if (!string.IsNullOrEmpty(_consumerOptions.Topic))
+ userDefinedTopics.Add(_consumerOptions.Topic);
+
+ var topics = new List<string>();
+ foreach (var topic in userDefinedTopics)
+ {
+ var numberOfPartitions = await
_connectionPool.GetNumberOfPartitions(topic, _cts.Token).ConfigureAwait(false);
+ if (numberOfPartitions == 0)
+ {
+ topics.Add(topic);
+ continue;
+ }
+
+ for (var i = 0; i < numberOfPartitions; ++i)
+ {
+ topics.Add(GetPartitionedTopicName(topic, i));
+ }
+ }
+
+ _numberOfSubConsumers = topics.Count;
+ var monitoringTasks = new
Task<ConsumerStateChanged>[_numberOfSubConsumers];
+ var states = new ConsumerState[_numberOfSubConsumers];
+
+ for (var i = 0; i < _numberOfSubConsumers; ++i)
{
- _receiveTasks[i] = _emptyTaskCompletionSource.Task;
- var topicName = _isPartitionedTopic ? GetPartitionedTopicName(i) :
Topic;
- _subConsumers[i] = CreateSubConsumer(topicName);
- monitoringTasks[i] =
_subConsumers[i].State.OnStateChangeFrom(ConsumerState.Disconnected,
_cts.Token).AsTask();
+ var topic = topics[i];
+ var subConsumer = CreateSubConsumer(topic);
+ _subConsumers[topic] = subConsumer;
+ monitoringTasks[i] =
subConsumer.StateChangedFrom(ConsumerState.Disconnected, _cts.Token).AsTask();
}
- _allSubConsumersAreReady = true;
+ if (_numberOfSubConsumers == 1)
+ _singleSubConsumer = _subConsumers.First().Value;
+
+ _receiveEnumerator = _subConsumers.GetEnumerator();
+ _receiveEnumerator.MoveNext();_allSubConsumersAreReady = true;
_semaphoreSlim.Release();
while (true)
{
await Task.WhenAny(monitoringTasks).ConfigureAwait(false);
- for (var i = 0; i < numberOfSubConsumers; ++i)
+ for (var i = 0; i < _numberOfSubConsumers; ++i)
{
var task = monitoringTasks[i];
if (!task.IsCompleted)
continue;
- var state = task.Result;
+ var consumerStateChanged = task.Result;
+ var state = consumerStateChanged.ConsumerState;
states[i] = state;
- monitoringTasks[i] =
_subConsumers[i].State.OnStateChangeFrom(state, _cts.Token).AsTask();
+ monitoringTasks[i] =
consumerStateChanged.Consumer.StateChangedFrom(state, _cts.Token).AsTask();
}
- if (!_isPartitionedTopic)
+ if (_singleSubConsumer is not null)
_state.SetState(states[0]);
else if (states.Any(x => x == ConsumerState.Faulted))
_state.SetState(ConsumerState.Faulted);
@@ -164,8 +186,7 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
foreach (var subConsumer in _subConsumers)
{
- if (subConsumer is not null)
- await subConsumer.DisposeAsync().ConfigureAwait(false);
+ await subConsumer.Value.DisposeAsync().ConfigureAwait(false);
}
await _lock.DisposeAsync().ConfigureAwait(false);
@@ -176,60 +197,77 @@ public sealed class Consumer<TMessage> :
IConsumer<TMessage>
{
await Guard(cancellationToken).ConfigureAwait(false);
- if (!_isPartitionedTopic)
- return await
_subConsumers[_subConsumerIndex].Receive(cancellationToken).ConfigureAwait(false);
+ if (_singleSubConsumer is not null)
+ return await
_singleSubConsumer.Receive(cancellationToken).ConfigureAwait(false);
- var iterations = 0;
using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
+ {
+ var startTopic = string.Empty;
+
while (true)
{
- iterations++;
- _subConsumerIndex++;
- if (_subConsumerIndex == _subConsumers.Length)
- _subConsumerIndex = 0;
-
- var receiveTask = _receiveTasks[_subConsumerIndex];
- if (receiveTask == _emptyTaskCompletionSource.Task)
- {
- var receiveTaskValueTask =
_subConsumers[_subConsumerIndex].Receive(cancellationToken);
- if (receiveTaskValueTask.IsCompleted)
- return receiveTaskValueTask.Result;
- _receiveTasks[_subConsumerIndex] =
receiveTaskValueTask.AsTask();
- }
- else
+ var receiveTaskNode = _receiveTasks.First;
+ while (receiveTaskNode is not null)
{
- if (receiveTask.IsCompleted)
+ if (receiveTaskNode.Value.IsCompleted)
{
- _receiveTasks[_subConsumerIndex] =
_emptyTaskCompletionSource.Task;
- return receiveTask.Result;
+ _receiveTasks.Remove(receiveTaskNode);
+ return receiveTaskNode.Value.Result;
}
+ receiveTaskNode = receiveTaskNode.Next;
+ }
+
+ if (_receiveEnumerator.Current.Key is not null)
+ startTopic = _receiveEnumerator.Current.Key;
+
+ if (!_receiveEnumerator.MoveNext())
+ {
+ _receiveEnumerator = _subConsumers.GetEnumerator();
+ _receiveEnumerator.MoveNext();
}
- if (iterations == _subConsumers.Length)
+
+ var subConsumer = _receiveEnumerator.Current;
+
+ var receiveTask = subConsumer.Value.Receive(_cts.Token);
+ if (receiveTask.IsCompleted)
+ return receiveTask.Result;
+
+ _receiveTasks.AddLast(receiveTask.AsTask());
+
+ if (startTopic == subConsumer.Key)
+ {
+ var tcs = new TaskCompletionSource<IMessage<TMessage>>();
+ using var registration = cancellationToken.Register(() =>
tcs.TrySetCanceled());
+ _receiveTasks.AddLast(tcs.Task);
await Task.WhenAny(_receiveTasks).ConfigureAwait(false);
+ _receiveTasks.RemoveLast();
+ cancellationToken.ThrowIfCancellationRequested();
+ }
}
+ }
}
public async ValueTask Acknowledge(MessageId messageId, CancellationToken
cancellationToken)
{
await Guard(cancellationToken).ConfigureAwait(false);
- if (!_isPartitionedTopic)
- await _subConsumers[_subConsumerIndex].Acknowledge(messageId,
cancellationToken).ConfigureAwait(false);
+ if (_singleSubConsumer is not null)
+ await _singleSubConsumer.Acknowledge(messageId,
cancellationToken).ConfigureAwait(false);
else
- await _subConsumers[messageId.Partition].Acknowledge(messageId,
cancellationToken).ConfigureAwait(false);
+ await _subConsumers[messageId.Topic].Acknowledge(messageId,
cancellationToken).ConfigureAwait(false);
}
public async ValueTask Acknowledge(IEnumerable<MessageId> messageIds,
CancellationToken cancellationToken = default)
{
await Guard(cancellationToken).ConfigureAwait(false);
- if (!_isPartitionedTopic)
+ if (_singleSubConsumer is not null)
{
- await _subConsumers[_subConsumerIndex].Acknowledge(messageIds,
cancellationToken).ConfigureAwait(false);
+ await _singleSubConsumer.Acknowledge(messageIds,
cancellationToken).ConfigureAwait(false);
return;
}
- var groupedMessageIds = messageIds.GroupBy(messageIds =>
messageIds.Partition);
+ var groupedMessageIds = messageIds.GroupBy(messageIds =>
messageIds.Topic);
var acknowledgeTasks = new List<Task>();
foreach (var group in groupedMessageIds)
{
@@ -243,23 +281,23 @@ public sealed class Consumer<TMessage> :
IConsumer<TMessage>
{
await Guard(cancellationToken).ConfigureAwait(false);
- if (!_isPartitionedTopic)
- await
_subConsumers[_subConsumerIndex].AcknowledgeCumulative(messageId,
cancellationToken).ConfigureAwait(false);
+ if (_singleSubConsumer is not null)
+ await _singleSubConsumer.AcknowledgeCumulative(messageId,
cancellationToken).ConfigureAwait(false);
else
- await
_subConsumers[messageId.Partition].AcknowledgeCumulative(messageId,
cancellationToken).ConfigureAwait(false);
+ await
_subConsumers[messageId.Topic].AcknowledgeCumulative(messageId,
cancellationToken).ConfigureAwait(false);
}
public async ValueTask
RedeliverUnacknowledgedMessages(IEnumerable<MessageId> messageIds,
CancellationToken cancellationToken)
{
await Guard(cancellationToken).ConfigureAwait(false);
- if (!_isPartitionedTopic)
+ if (_singleSubConsumer is not null)
{
- await
_subConsumers[_subConsumerIndex].RedeliverUnacknowledgedMessages(messageIds,
cancellationToken).ConfigureAwait(false);
+ await
_singleSubConsumer.RedeliverUnacknowledgedMessages(messageIds,
cancellationToken).ConfigureAwait(false);
return;
}
- var groupedMessageIds = messageIds.GroupBy(messageIds =>
messageIds.Partition);
+ var groupedMessageIds = messageIds.GroupBy(messageIds =>
messageIds.Topic);
var redeliverTasks = new List<Task>();
foreach (var group in groupedMessageIds)
{
@@ -272,16 +310,16 @@ public sealed class Consumer<TMessage> :
IConsumer<TMessage>
{
await Guard(cancellationToken).ConfigureAwait(false);
- if (!_isPartitionedTopic)
+ if (_singleSubConsumer is not null)
{
- await
_subConsumers[_subConsumerIndex].RedeliverUnacknowledgedMessages(cancellationToken).ConfigureAwait(false);
+ await
_singleSubConsumer.RedeliverUnacknowledgedMessages(cancellationToken).ConfigureAwait(false);
return;
}
- var redeliverTasks = new List<Task>(_numberOfPartitions);
+ var redeliverTasks = new List<Task>(_numberOfSubConsumers);
foreach (var subConsumer in _subConsumers)
{
-
redeliverTasks.Add(subConsumer.RedeliverUnacknowledgedMessages(cancellationToken).AsTask());
+
redeliverTasks.Add(subConsumer.Value.RedeliverUnacknowledgedMessages(cancellationToken).AsTask());
}
await Task.WhenAll(redeliverTasks).ConfigureAwait(false);
}
@@ -290,16 +328,16 @@ public sealed class Consumer<TMessage> :
IConsumer<TMessage>
{
await Guard(cancellationToken).ConfigureAwait(false);
- if (!_isPartitionedTopic)
+ if (_singleSubConsumer is not null)
{
- await
_subConsumers[_subConsumerIndex].Unsubscribe(cancellationToken).ConfigureAwait(false);
+ await
_singleSubConsumer.Unsubscribe(cancellationToken).ConfigureAwait(false);
}
else
{
- var unsubscribeTasks = new List<Task>(_numberOfPartitions);
+ var unsubscribeTasks = new List<Task>(_numberOfSubConsumers);
foreach (var subConsumer in _subConsumers)
{
- var getLastMessageIdTask =
subConsumer.Unsubscribe(cancellationToken);
+ var getLastMessageIdTask =
subConsumer.Value.Unsubscribe(cancellationToken);
unsubscribeTasks.Add(getLastMessageIdTask.AsTask());
}
@@ -313,16 +351,16 @@ public sealed class Consumer<TMessage> :
IConsumer<TMessage>
{
await Guard(cancellationToken).ConfigureAwait(false);
- if (!_isPartitionedTopic)
+ if (_singleSubConsumer is not null)
{
- await _subConsumers[_subConsumerIndex].Seek(messageId,
cancellationToken).ConfigureAwait(false);
+ await _singleSubConsumer.Seek(messageId,
cancellationToken).ConfigureAwait(false);
return;
}
- var seekTasks = new List<Task>(_numberOfPartitions);
+ var seekTasks = new List<Task>(_numberOfSubConsumers);
foreach (var subConsumer in _subConsumers)
{
- var getLastMessageIdTask = subConsumer.Seek(messageId,
cancellationToken);
+ var getLastMessageIdTask = subConsumer.Value.Seek(messageId,
cancellationToken);
seekTasks.Add(getLastMessageIdTask.AsTask());
}
await Task.WhenAll(seekTasks).ConfigureAwait(false);
@@ -332,16 +370,16 @@ public sealed class Consumer<TMessage> :
IConsumer<TMessage>
{
await Guard(cancellationToken).ConfigureAwait(false);
- if (!_isPartitionedTopic)
+ if (_singleSubConsumer is not null)
{
- await _subConsumers[_subConsumerIndex].Seek(publishTime,
cancellationToken).ConfigureAwait(false);
+ await _singleSubConsumer.Seek(publishTime,
cancellationToken).ConfigureAwait(false);
return;
}
- var seekTasks = new List<Task>(_numberOfPartitions);
+ var seekTasks = new List<Task>(_numberOfSubConsumers);
foreach (var subConsumer in _subConsumers)
{
- var getLastMessageIdTask = subConsumer.Seek(publishTime,
cancellationToken);
+ var getLastMessageIdTask = subConsumer.Value.Seek(publishTime,
cancellationToken);
seekTasks.Add(getLastMessageIdTask.AsTask());
}
await Task.WhenAll(seekTasks).ConfigureAwait(false);
@@ -351,14 +389,14 @@ public sealed class Consumer<TMessage> :
IConsumer<TMessage>
{
await Guard(cancellationToken).ConfigureAwait(false);
- if (!_isPartitionedTopic)
- return [await
_subConsumers[_subConsumerIndex].GetLastMessageId(cancellationToken).ConfigureAwait(false)];
+ if (_singleSubConsumer is not null)
+ return [await
_singleSubConsumer.GetLastMessageId(cancellationToken).ConfigureAwait(false)];
- var getLastMessageIdsTasks = new
List<Task<MessageId>>(_numberOfPartitions);
+ var getLastMessageIdsTasks = new
List<Task<MessageId>>(_numberOfSubConsumers);
foreach (var subConsumer in _subConsumers)
{
- var getLastMessageIdTask =
subConsumer.GetLastMessageId(cancellationToken);
+ var getLastMessageIdTask =
subConsumer.Value.GetLastMessageId(cancellationToken);
getLastMessageIdsTasks.Add(getLastMessageIdTask.AsTask());
}
@@ -367,7 +405,7 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
//collect MessageIds
var messageIds = new List<MessageId>();
- for (var i = 0; i < _subConsumers.Length; i++)
+ for (var i = 0; i < _subConsumers.Count; i++)
{
messageIds.Add(getLastMessageIdsTasks[i].Result);
}
@@ -412,7 +450,7 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
return subConsumer;
}
- private string GetPartitionedTopicName(int partitionNumber) =>
$"{Topic}-partition-{partitionNumber}";
+ private string GetPartitionedTopicName(string topic, int partitionNumber)
=> $"{topic}-partition-{partitionNumber}";
private static StateManager<ConsumerState> CreateStateManager()
=> new(ConsumerState.Disconnected, ConsumerState.Closed,
ConsumerState.ReachedEndOfTopic, ConsumerState.Faulted);
diff --git a/src/DotPulsar/Internal/ConsumerBuilder.cs
b/src/DotPulsar/Internal/ConsumerBuilder.cs
index f16883e..28f22fc 100644
--- a/src/DotPulsar/Internal/ConsumerBuilder.cs
+++ b/src/DotPulsar/Internal/ConsumerBuilder.cs
@@ -30,7 +30,8 @@ public sealed class ConsumerBuilder<TMessage> :
IConsumerBuilder<TMessage>
private string? _subscriptionName;
private readonly Dictionary<string, string> _subscriptionProperties;
private SubscriptionType _subscriptionType;
- private string? _topic;
+ private string _topic;
+ private readonly HashSet<string> _topics;
private IHandleStateChanged<ConsumerStateChanged>? _stateChangedHandler;
public ConsumerBuilder(IPulsarClient pulsarClient, ISchema<TMessage>
schema)
@@ -44,6 +45,8 @@ public sealed class ConsumerBuilder<TMessage> :
IConsumerBuilder<TMessage>
_replicateSubscriptionState =
ConsumerOptions<TMessage>.DefaultReplicateSubscriptionState;
_subscriptionProperties = [];
_subscriptionType = ConsumerOptions<TMessage>.DefaultSubscriptionType;
+ _topic = string.Empty;
+ _topics = [];
}
public IConsumerBuilder<TMessage> ConsumerName(string name)
@@ -112,15 +115,27 @@ public sealed class ConsumerBuilder<TMessage> :
IConsumerBuilder<TMessage>
return this;
}
+ public IConsumerBuilder<TMessage> Topics(IEnumerable<string> topics)
+ {
+ _topics.Clear();
+
+ foreach (var topic in topics)
+ {
+ _topics.Add(topic);
+ }
+
+ return this;
+ }
+
public IConsumer<TMessage> Create()
{
if (string.IsNullOrEmpty(_subscriptionName))
throw new ConfigurationException("SubscriptionName may not be null
or empty");
- if (string.IsNullOrEmpty(_topic))
- throw new ConfigurationException("Topic may not be null or empty");
+ if (string.IsNullOrEmpty(_topic) && _topics.Count == 0)
+ throw new ConfigurationException("A 'Topic' or multiple 'Topics'
must be set");
- var options = new ConsumerOptions<TMessage>(_subscriptionName!,
_topic!, _schema)
+ var options = new ConsumerOptions<TMessage>(_subscriptionName!,
_topic, _schema)
{
ConsumerName = _consumerName,
InitialPosition = _initialPosition,
@@ -130,7 +145,8 @@ public sealed class ConsumerBuilder<TMessage> :
IConsumerBuilder<TMessage>
ReplicateSubscriptionState = _replicateSubscriptionState,
StateChangedHandler = _stateChangedHandler,
SubscriptionProperties = _subscriptionProperties,
- SubscriptionType = _subscriptionType
+ SubscriptionType = _subscriptionType,
+ Topics = _topics
};
return _pulsarClient.CreateConsumer(options);
diff --git a/src/DotPulsar/Internal/Reader.cs b/src/DotPulsar/Internal/Reader.cs
index 2043927..ed84add 100644
--- a/src/DotPulsar/Internal/Reader.cs
+++ b/src/DotPulsar/Internal/Reader.cs
@@ -177,6 +177,7 @@ public sealed class Reader<TMessage> : IReader<TMessage>
var iterations = 0;
using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
+ {
while (true)
{
iterations++;
@@ -203,6 +204,7 @@ public sealed class Reader<TMessage> : IReader<TMessage>
if (iterations == _subReaders.Length)
await Task.WhenAny(_receiveTasks).ConfigureAwait(false);
}
+ }
}
public async ValueTask Seek(MessageId messageId, CancellationToken
cancellationToken)
diff --git a/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
b/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
index 516bad4..dfb76e0 100644
--- a/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
+++ b/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
@@ -145,6 +145,31 @@ public sealed class ConsumerTests : IDisposable
consumed.ShouldBe(produced, true);
}
+ [Fact]
+ public async Task Receive_GivenMultipleTopics_ShouldReceiveAll()
+ {
+ //Arrange
+ const int numberOfMessages = 100;
+ const int partitions = 3;
+
+ var topic = await _fixture.CreateTopic(_cts.Token);
+ var partitionedTopic = await
_fixture.CreatePartitionedTopic(partitions, _cts.Token);
+
+ await using var client = CreateClient();
+ await using var consumer = CreateConsumer(client, [topic,
partitionedTopic]);
+ await using var producer = CreateProducer(client, topic);
+ await using var partitionedProducer = CreateProducer(client,
partitionedTopic);
+
+ //Act
+ var produced = new List<MessageId>();
+ produced.AddRange(await ProduceMessages(producer, numberOfMessages,
"test-message", _cts.Token));
+ produced.AddRange(await ProduceMessages(partitionedProducer,
numberOfMessages, "test-message", _cts.Token));
+ var consumed = await ConsumeMessages(consumer, produced.Count,
_cts.Token);
+
+ //Assert
+ consumed.ShouldBe(produced, true);
+ }
+
[Fact]
public async Task
Receive_WhenFaultedAfterInvokingReceive_ShouldThrowConsumerFaultedException()
{
@@ -338,6 +363,14 @@ public sealed class ConsumerTests : IDisposable
.StateChangedHandler(_testOutputHelper.Log)
.Create();
+ private IConsumer<string> CreateConsumer(IPulsarClient pulsarClient,
IEnumerable<string> topics)
+ => pulsarClient.NewConsumer(Schema.String)
+ .InitialPosition(SubscriptionInitialPosition.Earliest)
+ .SubscriptionName(CreateSubscriptionName())
+ .Topics(topics)
+ .StateChangedHandler(_testOutputHelper.Log)
+ .Create();
+
private IPulsarClient CreateClient()
=> PulsarClient
.Builder()