This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 041b269 Added support for topics pattern
041b269 is described below
commit 041b2694f7a782c14986060257f7975f673f60e2
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Fri Jan 31 10:57:02 2025 +0100
Added support for topics pattern
---
CHANGELOG.md | 2 +-
src/DotPulsar/Abstractions/IConsumerBuilder.cs | 16 ++++++-
src/DotPulsar/ConsumerOptions.cs | 34 ++++++++++++---
.../InvalidTopicsPatternException.cs} | 11 ++---
src/DotPulsar/Internal/Abstractions/IConnection.cs | 1 +
.../Internal/Abstractions/IConnectionPool.cs | 4 ++
src/DotPulsar/Internal/ChannelManager.cs | 3 ++
src/DotPulsar/Internal/Connection.cs | 16 +++++++
src/DotPulsar/Internal/ConnectionPool.cs | 49 ++++++++++++++++++++++
src/DotPulsar/Internal/Consumer.cs | 15 ++++++-
src/DotPulsar/Internal/ConsumerBuilder.cs | 24 +++++++++--
.../Internal/Extensions/CommandExtensions.cs | 9 +++-
src/DotPulsar/Internal/RequestResponseHandler.cs | 7 ++++
...IConnectionPool.cs => RegexSubscriptionMode.cs} | 22 ++++++++--
tests/DotPulsar.Tests/IntegrationFixture.cs | 8 ++++
tests/DotPulsar.Tests/Internal/ConsumerTests.cs | 39 +++++++++++++++++
16 files changed, 237 insertions(+), 23 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index ee245a0..5e7c0b2 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -8,7 +8,7 @@ The format is based on [Keep a
Changelog](https://keepachangelog.com/en/1.1.0/)
### Added
-- Multi-topic subscriptions
+- Support for multi-topic subscriptions given either a list of topics and/or a
topics pattern
### Changed
diff --git a/src/DotPulsar/Abstractions/IConsumerBuilder.cs
b/src/DotPulsar/Abstractions/IConsumerBuilder.cs
index f02145a..a314dc9 100644
--- a/src/DotPulsar/Abstractions/IConsumerBuilder.cs
+++ b/src/DotPulsar/Abstractions/IConsumerBuilder.cs
@@ -14,6 +14,8 @@
namespace DotPulsar.Abstractions;
+using System.Text.RegularExpressions;
+
/// <summary>
/// A consumer building abstraction.
/// </summary>
@@ -44,6 +46,11 @@ public interface IConsumerBuilder<TMessage>
/// </summary>
IConsumerBuilder<TMessage> ReadCompacted(bool readCompacted);
+ /// <summary>
+ /// Determines which topics this consumer should be subscribed to -
Persistent, Non-Persistent, or both. The default is 'Persistent'.
+ /// </summary>
+ IConsumerBuilder<TMessage> RegexSubscriptionMode(RegexSubscriptionMode
regexSubscriptionMode);
+
/// <summary>
/// Whether to replicate the subscription's state across clusters (when
using geo-replication). The default is 'false'.
/// </summary>
@@ -70,15 +77,20 @@ public interface IConsumerBuilder<TMessage>
IConsumerBuilder<TMessage> SubscriptionType(SubscriptionType type);
/// <summary>
- /// Set the topic for this consumer. This, or setting multiple topics, is
required.
+ /// Set the topic for this consumer. This, or setting multiple topics or a
topic pattern, is required.
/// </summary>
IConsumerBuilder<TMessage> Topic(string topic);
/// <summary>
- /// Set the topics for this consumer. This, or setting a single topic, is
required.
+ /// Set the topics for this consumer. This, or setting a single topic or a
topic pattern, is required.
/// </summary>
IConsumerBuilder<TMessage> Topics(IEnumerable<string> topics);
+ /// <summary>
+ /// Specify a pattern for topics that this consumer subscribes to. This,
or setting a single topic or multiple topics, is required.
+ /// </summary>
+ IConsumerBuilder<TMessage> TopicsPattern(Regex topicsPattern);
+
/// <summary>
/// Create the consumer.
/// </summary>
diff --git a/src/DotPulsar/ConsumerOptions.cs b/src/DotPulsar/ConsumerOptions.cs
index 669ef87..46c655c 100644
--- a/src/DotPulsar/ConsumerOptions.cs
+++ b/src/DotPulsar/ConsumerOptions.cs
@@ -15,6 +15,7 @@
namespace DotPulsar;
using DotPulsar.Abstractions;
+using System.Text.RegularExpressions;
/// <summary>
/// The consumer building options.
@@ -41,6 +42,11 @@ public sealed class ConsumerOptions<TMessage>
/// </summary>
public static readonly bool DefaultReadCompacted = false;
+ /// <summary>
+ /// The default regex subscription mode.
+ /// </summary>
+ public static readonly RegexSubscriptionMode DefaultRegexSubscriptionMode
= DotPulsar.RegexSubscriptionMode.Persistent;
+
/// <summary>
/// The default of whether to replicate the subscription's state.
/// </summary>
@@ -53,12 +59,13 @@ public sealed class ConsumerOptions<TMessage>
private readonly HashSet<string> _topics;
- private ConsumerOptions(string subscriptionName, ISchema<TMessage> schema,
string topic, IEnumerable<string> topics)
+ private ConsumerOptions(string subscriptionName, ISchema<TMessage> schema,
string topic, Regex? topicsPattern, IEnumerable<string> topics)
{
InitialPosition = DefaultInitialPosition;
PriorityLevel = DefaultPriorityLevel;
MessagePrefetchCount = DefaultMessagePrefetchCount;
ReadCompacted = DefaultReadCompacted;
+ RegexSubscriptionMode = DefaultRegexSubscriptionMode;
ReplicateSubscriptionState = DefaultReplicateSubscriptionState;
SubscriptionType = DefaultSubscriptionType;
SubscriptionProperties = [];
@@ -70,19 +77,26 @@ public sealed class ConsumerOptions<TMessage>
{
_topics.Add(t);
}
+ TopicsPattern = topicsPattern;
}
/// <summary>
/// Initializes a new instance using the specified subscription name,
topic and schema.
/// </summary>
public ConsumerOptions(string subscriptionName, string topic,
ISchema<TMessage> schema)
- : this (subscriptionName, schema, topic, Array.Empty<string>()) { }
+ : this (subscriptionName, schema, topic, null, Array.Empty<string>())
{ }
/// <summary>
/// Initializes a new instance using the specified subscription name,
topics and schema.
/// </summary>
public ConsumerOptions(string subscriptionName, IEnumerable<string>
topics, ISchema<TMessage> schema)
- : this(subscriptionName, schema, string.Empty, topics) { }
+ : this(subscriptionName, schema, string.Empty, null, topics) { }
+
+ /// <summary>
+ /// Initializes a new instance using the specified subscription name,
topics pattern and schema.
+ /// </summary>
+ public ConsumerOptions(string subscriptionName, Regex topicsPattern,
ISchema<TMessage> schema)
+ : this(subscriptionName, schema, string.Empty, topicsPattern,
Array.Empty<string>()) { }
/// <summary>
/// Set the consumer name. This is optional.
@@ -109,6 +123,11 @@ public sealed class ConsumerOptions<TMessage>
/// </summary>
public bool ReadCompacted { get; set; }
+ /// <summary>
+ /// Determines which topics this consumer should be subscribed to -
Persistent, Non-Persistent, or both. The default is 'Persistent'.
+ /// </summary>
+ public RegexSubscriptionMode RegexSubscriptionMode { get; set; }
+
/// <summary>
/// Whether to replicate the subscription's state across clusters (when
using geo-replication). The default is 'false'.
/// </summary>
@@ -140,12 +159,12 @@ public sealed class ConsumerOptions<TMessage>
public SubscriptionType SubscriptionType { get; set; }
/// <summary>
- /// Set the topic for this consumer. This, or setting multiple topics, is
required.
+ /// Set the topic for this consumer. This, or setting multiple topics or a
topic pattern, is required.
/// </summary>
public string Topic { get; set; }
/// <summary>
- /// Set the topics for this consumer. This, or setting a single topic, is
required.
+ /// Set the topics for this consumer. This, or setting a single topic or a
topic pattern, is required.
/// </summary>
public IEnumerable<string> Topics
{
@@ -160,4 +179,9 @@ public sealed class ConsumerOptions<TMessage>
}
}
}
+
+ /// <summary>
+ /// Specify a pattern for topics that this consumer subscribes to. This,
or setting a single topic or multiple topics, is required.
+ /// </summary>
+ public Regex? TopicsPattern { get; set; }
}
diff --git a/src/DotPulsar/Internal/Abstractions/IConnectionPool.cs
b/src/DotPulsar/Exceptions/InvalidTopicsPatternException.cs
similarity index 64%
copy from src/DotPulsar/Internal/Abstractions/IConnectionPool.cs
copy to src/DotPulsar/Exceptions/InvalidTopicsPatternException.cs
index dd338bc..3bb5bb0 100644
--- a/src/DotPulsar/Internal/Abstractions/IConnectionPool.cs
+++ b/src/DotPulsar/Exceptions/InvalidTopicsPatternException.cs
@@ -12,11 +12,12 @@
* limitations under the License.
*/
-namespace DotPulsar.Internal.Abstractions;
+namespace DotPulsar.Exceptions;
-public interface IConnectionPool : IAsyncDisposable
+/// <summary>
+/// The topics pattern is not valid
+/// </summary>
+public sealed class InvalidTopicsPatternException : DotPulsarException
{
- ValueTask<IConnection> FindConnectionForTopic(string topic,
CancellationToken cancellationToken = default);
-
- ValueTask<uint> GetNumberOfPartitions(string topic, CancellationToken
cancellationToken = default);
+ public InvalidTopicsPatternException(string message) : base(message) { }
}
diff --git a/src/DotPulsar/Internal/Abstractions/IConnection.cs
b/src/DotPulsar/Internal/Abstractions/IConnection.cs
index 2f2a5b8..c92f596 100644
--- a/src/DotPulsar/Internal/Abstractions/IConnection.cs
+++ b/src/DotPulsar/Internal/Abstractions/IConnection.cs
@@ -40,4 +40,5 @@ public interface IConnection : IStateHolder<ConnectionState>,
IAsyncDisposable
Task Send(SendPackage command, TaskCompletionSource<BaseCommand>
responseTcs, CancellationToken cancellationToken);
Task<BaseCommand> Send(CommandGetOrCreateSchema command, CancellationToken
cancellationToken);
Task<BaseCommand> Send(CommandPartitionedTopicMetadata command,
CancellationToken cancellationToken);
+ Task<BaseCommand> Send(CommandGetTopicsOfNamespace command,
CancellationToken cancellationToken);
}
diff --git a/src/DotPulsar/Internal/Abstractions/IConnectionPool.cs
b/src/DotPulsar/Internal/Abstractions/IConnectionPool.cs
index dd338bc..b8d40cf 100644
--- a/src/DotPulsar/Internal/Abstractions/IConnectionPool.cs
+++ b/src/DotPulsar/Internal/Abstractions/IConnectionPool.cs
@@ -14,9 +14,13 @@
namespace DotPulsar.Internal.Abstractions;
+using System.Text.RegularExpressions;
+
public interface IConnectionPool : IAsyncDisposable
{
ValueTask<IConnection> FindConnectionForTopic(string topic,
CancellationToken cancellationToken = default);
ValueTask<uint> GetNumberOfPartitions(string topic, CancellationToken
cancellationToken = default);
+
+ ValueTask<IEnumerable<string>> GetTopicsOfNamespace(RegexSubscriptionMode
mode, Regex topicsPattern, CancellationToken cancellationToken = default);
}
diff --git a/src/DotPulsar/Internal/ChannelManager.cs
b/src/DotPulsar/Internal/ChannelManager.cs
index dc7e14a..8302920 100644
--- a/src/DotPulsar/Internal/ChannelManager.cs
+++ b/src/DotPulsar/Internal/ChannelManager.cs
@@ -171,6 +171,9 @@ public sealed class ChannelManager :
IStateHolder<ChannelManagerState>, IDisposa
public Task<BaseCommand> Outgoing(CommandPartitionedTopicMetadata command)
=> _requestResponseHandler.Outgoing(command);
+ public Task<BaseCommand> Outgoing(CommandGetTopicsOfNamespace command)
+ => _requestResponseHandler.Outgoing(command);
+
public Task<BaseCommand> Outgoing(CommandSeek command)
{
using (TakeConsumerSenderLock(command.ConsumerId))
diff --git a/src/DotPulsar/Internal/Connection.cs
b/src/DotPulsar/Internal/Connection.cs
index 03f618e..7b7189e 100644
--- a/src/DotPulsar/Internal/Connection.cs
+++ b/src/DotPulsar/Internal/Connection.cs
@@ -296,6 +296,22 @@ public sealed class Connection : IConnection
return await responseTask.ConfigureAwait(false);
}
+ public async Task<BaseCommand> Send(CommandGetTopicsOfNamespace command,
CancellationToken cancellationToken)
+ {
+ ThrowIfDisposed();
+
+ Task<BaseCommand>? responseTask;
+
+ using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
+ {
+ responseTask = _channelManager.Outgoing(command);
+ var sequence = Serializer.Serialize(command.AsBaseCommand());
+ await _stream.Send(sequence).ConfigureAwait(false);
+ }
+
+ return await responseTask.ConfigureAwait(false);
+ }
+
private async Task Send(BaseCommand command, CancellationToken
cancellationToken)
{
ThrowIfDisposed();
diff --git a/src/DotPulsar/Internal/ConnectionPool.cs
b/src/DotPulsar/Internal/ConnectionPool.cs
index 0e0e540..6d434bc 100644
--- a/src/DotPulsar/Internal/ConnectionPool.cs
+++ b/src/DotPulsar/Internal/ConnectionPool.cs
@@ -20,6 +20,7 @@ using DotPulsar.Internal.Abstractions;
using DotPulsar.Internal.Extensions;
using DotPulsar.Internal.PulsarApi;
using System.Collections.Concurrent;
+using System.Text.RegularExpressions;
public sealed class ConnectionPool : IConnectionPool
{
@@ -232,4 +233,52 @@ public sealed class ConnectionPool : IConnectionPool
return response.PartitionMetadataResponse.Partitions;
}
+
+ public async ValueTask<IEnumerable<string>>
GetTopicsOfNamespace(RegexSubscriptionMode mode, Regex topicsPattern,
CancellationToken cancellationToken = default)
+ {
+ var topicUriPattern = new
Regex(@"^(persistent|non-persistent)://([^/]+)/([^/]+)/(.+)$",
RegexOptions.Compiled);
+
+ var patternString = topicsPattern.ToString();
+
+ var match = topicUriPattern.Match(patternString);
+ if (!match.Success)
+ throw new InvalidTopicsPatternException($"The topics pattern
'{patternString}' is not valid");
+
+ var persistence = match.Groups[1].Value;
+ var tenant = match.Groups[2].Value;
+ var ns = match.Groups[3].Value;
+
+ if (!string.IsNullOrEmpty(persistence))
+ {
+ if (persistence.Equals("persistent"))
+ mode = RegexSubscriptionMode.Persistent;
+ else
+ mode = RegexSubscriptionMode.NonPersistent;
+ }
+
+ var getTopicsOfNamespace = new CommandGetTopicsOfNamespace
+ {
+ mode = (CommandGetTopicsOfNamespace.Mode) mode,
+ Namespace =$"{tenant}/{ns}",
+ TopicsPattern = patternString
+ };
+
+ var connection = await GetConnection(_serviceUrl,
cancellationToken).ConfigureAwait(false);
+ var response = await connection.Send(getTopicsOfNamespace,
cancellationToken).ConfigureAwait(false);
+
+ response.Expect(BaseCommand.Type.GetTopicsOfNamespaceResponse);
+
+ if (response.getTopicsOfNamespaceResponse.Filtered)
+ return response.getTopicsOfNamespaceResponse.Topics;
+
+ var topics = new List<string>();
+
+ foreach (var topic in response.getTopicsOfNamespaceResponse.Topics)
+ {
+ if (topicsPattern.Match(topic).Success)
+ topics.Add(topic);
+ }
+
+ return topics;
+ }
}
diff --git a/src/DotPulsar/Internal/Consumer.cs
b/src/DotPulsar/Internal/Consumer.cs
index fc6eba0..a472b1f 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -29,7 +29,7 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
private readonly ConsumerOptions<TMessage> _consumerOptions;
private readonly CancellationTokenSource _cts;
private readonly IHandleException _exceptionHandler;
- private readonly IExecute _executor;
+ private readonly Executor _executor;
private readonly SemaphoreSlim _semaphoreSlim;
private readonly AsyncLock _lock;
private readonly Dictionary<string, SubConsumer<TMessage>> _subConsumers;
@@ -60,6 +60,8 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
SubscriptionType = consumerOptions.SubscriptionType;
if (!string.IsNullOrEmpty(consumerOptions.Topic))
Topic = consumerOptions.Topic;
+ else if (consumerOptions.TopicsPattern is not null)
+ Topic = consumerOptions.TopicsPattern.ToString();
else
Topic = string.Join(",", consumerOptions.Topics);
_receiveTasks = [];
@@ -101,6 +103,7 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
private async Task Monitor()
{
var userDefinedTopics = new List<string>(_consumerOptions.Topics);
+
if (!string.IsNullOrEmpty(_consumerOptions.Topic))
userDefinedTopics.Add(_consumerOptions.Topic);
@@ -120,6 +123,14 @@ public sealed class Consumer<TMessage> :
IConsumer<TMessage>
}
}
+ if (_consumerOptions.TopicsPattern is not null)
+ {
+ topics.AddRange(await
_connectionPool.GetTopicsOfNamespace(_consumerOptions.RegexSubscriptionMode,
_consumerOptions.TopicsPattern, _cts.Token).ConfigureAwait(false));
+ }
+
+ if (topics.Count == 0)
+ throw new TopicNotFoundException("No topics were found");
+
_numberOfSubConsumers = topics.Count;
var monitoringTasks = new
Task<ConsumerStateChanged>[_numberOfSubConsumers];
var states = new ConsumerState[_numberOfSubConsumers];
@@ -450,7 +461,7 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
return subConsumer;
}
- private string GetPartitionedTopicName(string topic, int partitionNumber)
=> $"{topic}-partition-{partitionNumber}";
+ private static string GetPartitionedTopicName(string topic, int
partitionNumber) => $"{topic}-partition-{partitionNumber}";
private static StateManager<ConsumerState> CreateStateManager()
=> new(ConsumerState.Disconnected, ConsumerState.Closed,
ConsumerState.ReachedEndOfTopic, ConsumerState.Faulted);
diff --git a/src/DotPulsar/Internal/ConsumerBuilder.cs
b/src/DotPulsar/Internal/ConsumerBuilder.cs
index 28f22fc..90b2880 100644
--- a/src/DotPulsar/Internal/ConsumerBuilder.cs
+++ b/src/DotPulsar/Internal/ConsumerBuilder.cs
@@ -16,6 +16,7 @@ namespace DotPulsar.Internal;
using DotPulsar.Abstractions;
using DotPulsar.Exceptions;
+using System.Text.RegularExpressions;
public sealed class ConsumerBuilder<TMessage> : IConsumerBuilder<TMessage>
{
@@ -26,12 +27,14 @@ public sealed class ConsumerBuilder<TMessage> :
IConsumerBuilder<TMessage>
private int _priorityLevel;
private uint _messagePrefetchCount;
private bool _readCompacted;
+ private RegexSubscriptionMode _regexSubscriptionMode;
private bool _replicateSubscriptionState;
private string? _subscriptionName;
private readonly Dictionary<string, string> _subscriptionProperties;
private SubscriptionType _subscriptionType;
private string _topic;
private readonly HashSet<string> _topics;
+ private Regex? _topicsPattern;
private IHandleStateChanged<ConsumerStateChanged>? _stateChangedHandler;
public ConsumerBuilder(IPulsarClient pulsarClient, ISchema<TMessage>
schema)
@@ -42,6 +45,7 @@ public sealed class ConsumerBuilder<TMessage> :
IConsumerBuilder<TMessage>
_priorityLevel = ConsumerOptions<TMessage>.DefaultPriorityLevel;
_messagePrefetchCount =
ConsumerOptions<TMessage>.DefaultMessagePrefetchCount;
_readCompacted = ConsumerOptions<TMessage>.DefaultReadCompacted;
+ _regexSubscriptionMode =
ConsumerOptions<TMessage>.DefaultRegexSubscriptionMode;
_replicateSubscriptionState =
ConsumerOptions<TMessage>.DefaultReplicateSubscriptionState;
_subscriptionProperties = [];
_subscriptionType = ConsumerOptions<TMessage>.DefaultSubscriptionType;
@@ -79,6 +83,12 @@ public sealed class ConsumerBuilder<TMessage> :
IConsumerBuilder<TMessage>
return this;
}
+ public IConsumerBuilder<TMessage>
RegexSubscriptionMode(RegexSubscriptionMode regexSubscriptionMode)
+ {
+ _regexSubscriptionMode = regexSubscriptionMode;
+ return this;
+ }
+
public IConsumerBuilder<TMessage> ReplicateSubscriptionState(bool
replicateSubscriptionState)
{
_replicateSubscriptionState = replicateSubscriptionState;
@@ -127,13 +137,19 @@ public sealed class ConsumerBuilder<TMessage> :
IConsumerBuilder<TMessage>
return this;
}
+ public IConsumerBuilder<TMessage> TopicsPattern(Regex topicsPattern)
+ {
+ _topicsPattern = topicsPattern;
+ return this;
+ }
+
public IConsumer<TMessage> Create()
{
if (string.IsNullOrEmpty(_subscriptionName))
throw new ConfigurationException("SubscriptionName may not be null
or empty");
- if (string.IsNullOrEmpty(_topic) && _topics.Count == 0)
- throw new ConfigurationException("A 'Topic' or multiple 'Topics'
must be set");
+ if (string.IsNullOrEmpty(_topic) && _topics.Count == 0 &&
_topicsPattern is null)
+ throw new ConfigurationException("A 'Topic', multiple 'Topics', or
a 'TopicsPattern' must be set");
var options = new ConsumerOptions<TMessage>(_subscriptionName!,
_topic, _schema)
{
@@ -142,11 +158,13 @@ public sealed class ConsumerBuilder<TMessage> :
IConsumerBuilder<TMessage>
MessagePrefetchCount = _messagePrefetchCount,
PriorityLevel = _priorityLevel,
ReadCompacted = _readCompacted,
+ RegexSubscriptionMode = _regexSubscriptionMode,
ReplicateSubscriptionState = _replicateSubscriptionState,
StateChangedHandler = _stateChangedHandler,
SubscriptionProperties = _subscriptionProperties,
SubscriptionType = _subscriptionType,
- Topics = _topics
+ Topics = _topics,
+ TopicsPattern = _topicsPattern
};
return _pulsarClient.CreateConsumer(options);
diff --git a/src/DotPulsar/Internal/Extensions/CommandExtensions.cs
b/src/DotPulsar/Internal/Extensions/CommandExtensions.cs
index e1883f5..9da9e3d 100644
--- a/src/DotPulsar/Internal/Extensions/CommandExtensions.cs
+++ b/src/DotPulsar/Internal/Extensions/CommandExtensions.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -205,4 +205,11 @@ public static class CommandExtensions
CommandType = BaseCommand.Type.PartitionedMetadata,
PartitionMetadata = command
};
+
+ public static BaseCommand AsBaseCommand(this CommandGetTopicsOfNamespace
command)
+ => new()
+ {
+ CommandType = BaseCommand.Type.GetTopicsOfNamespace,
+ getTopicsOfNamespace = command
+ };
}
diff --git a/src/DotPulsar/Internal/RequestResponseHandler.cs
b/src/DotPulsar/Internal/RequestResponseHandler.cs
index 1e5a94c..a7903db 100644
--- a/src/DotPulsar/Internal/RequestResponseHandler.cs
+++ b/src/DotPulsar/Internal/RequestResponseHandler.cs
@@ -40,6 +40,7 @@ public sealed class RequestResponseHandler : IDisposable
_getResponseIdentifier.Set(BaseCommand.Type.CloseProducer, cmd =>
StandardRequest.WithProducerId(cmd.CloseProducer.RequestId,
cmd.CloseProducer.ProducerId));
_getResponseIdentifier.Set(BaseCommand.Type.LookupResponse, cmd =>
StandardRequest.WithRequestId(cmd.LookupTopicResponse.RequestId));
_getResponseIdentifier.Set(BaseCommand.Type.PartitionedMetadataResponse, cmd =>
StandardRequest.WithRequestId(cmd.PartitionMetadataResponse.RequestId));
+
_getResponseIdentifier.Set(BaseCommand.Type.GetTopicsOfNamespaceResponse, cmd
=> StandardRequest.WithRequestId(cmd.getTopicsOfNamespaceResponse.RequestId));
_getResponseIdentifier.Set(BaseCommand.Type.GetLastMessageIdResponse,
cmd => StandardRequest.WithRequestId(cmd.GetLastMessageIdResponse.RequestId));
_getResponseIdentifier.Set(BaseCommand.Type.GetOrCreateSchemaResponse,
cmd => StandardRequest.WithRequestId(cmd.GetOrCreateSchemaResponse.RequestId));
_getResponseIdentifier.Set(BaseCommand.Type.Success, cmd =>
StandardRequest.WithRequestId(cmd.Success.RequestId));
@@ -118,6 +119,12 @@ public sealed class RequestResponseHandler : IDisposable
return
_requests.CreateTask(StandardRequest.WithRequestId(command.RequestId));
}
+ public Task<BaseCommand> Outgoing(CommandGetTopicsOfNamespace command)
+ {
+ command.RequestId = _requestId.FetchNext();
+ return
_requests.CreateTask(StandardRequest.WithRequestId(command.RequestId));
+ }
+
public Task<BaseCommand> Outgoing(CommandSeek command)
{
command.RequestId = _requestId.FetchNext();
diff --git a/src/DotPulsar/Internal/Abstractions/IConnectionPool.cs
b/src/DotPulsar/RegexSubscriptionMode.cs
similarity index 51%
copy from src/DotPulsar/Internal/Abstractions/IConnectionPool.cs
copy to src/DotPulsar/RegexSubscriptionMode.cs
index dd338bc..6306c46 100644
--- a/src/DotPulsar/Internal/Abstractions/IConnectionPool.cs
+++ b/src/DotPulsar/RegexSubscriptionMode.cs
@@ -12,11 +12,25 @@
* limitations under the License.
*/
-namespace DotPulsar.Internal.Abstractions;
+namespace DotPulsar;
-public interface IConnectionPool : IAsyncDisposable
+/// <summary>
+/// When subscribing to topics using a regular expression, one can specify to
only pick a certain type of topics.
+/// </summary>
+public enum RegexSubscriptionMode : byte
{
- ValueTask<IConnection> FindConnectionForTopic(string topic,
CancellationToken cancellationToken = default);
+ /// <summary>
+ /// Only subscribe to persistent topics.
+ /// </summary>
+ Persistent = 0,
- ValueTask<uint> GetNumberOfPartitions(string topic, CancellationToken
cancellationToken = default);
+ /// <summary>
+ /// Only subscribe to non-persistent topics.
+ /// </summary>
+ NonPersistent = 1,
+
+ /// <summary>
+ /// Subscribe to both persistent and non-persistent topics.
+ /// </summary>
+ All = 2,
}
diff --git a/tests/DotPulsar.Tests/IntegrationFixture.cs
b/tests/DotPulsar.Tests/IntegrationFixture.cs
index 18c5eec..1d1f126 100644
--- a/tests/DotPulsar.Tests/IntegrationFixture.cs
+++ b/tests/DotPulsar.Tests/IntegrationFixture.cs
@@ -136,6 +136,14 @@ public class IntegrationFixture : IAsyncLifetime
return topic;
}
+ public async Task CreateTopics(IEnumerable<string> topics,
CancellationToken cancellationToken)
+ {
+ foreach (var topic in topics)
+ {
+ await CreateTopic(topic, cancellationToken);
+ }
+ }
+
public async Task CreateTopic(string topic, CancellationToken
cancellationToken)
{
var arguments = $"bin/pulsar-admin topics create {topic}";
diff --git a/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
b/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
index dfb76e0..70f8d61 100644
--- a/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
+++ b/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
@@ -17,6 +17,7 @@ namespace DotPulsar.Tests.Internal;
using DotPulsar.Abstractions;
using DotPulsar.Exceptions;
using DotPulsar.Extensions;
+using System.Text.RegularExpressions;
using Xunit.Abstractions;
[Collection("Integration"), Trait("Category", "Integration")]
@@ -170,6 +171,36 @@ public sealed class ConsumerTests : IDisposable
consumed.ShouldBe(produced, true);
}
+ [Fact]
+ public async Task Receive_GivenTopicsPattern_ShouldReceiveAll()
+ {
+ //Arrange
+ var match1 = $"persistent://public/default/match-{Guid.NewGuid():N}";
+ var match2 = $"persistent://public/default/match-{Guid.NewGuid():N}";
+ var nomatch1 =
$"non-persistent://public/default/match-{Guid.NewGuid():N}";
+ const string nomatch2 = "persistent://public/default/nomatch";
+
+ await _fixture.CreateTopics([match1, match2, nomatch1, nomatch2],
_cts.Token);
+
+ await using var client = CreateClient();
+ await using var consumer = CreateConsumer(client, new
Regex(@"persistent://public/default/match.*"));
+ await using var producer1 = CreateProducer(client, match1);
+ await using var producer2 = CreateProducer(client, match2);
+ await using var producer3 = CreateProducer(client, nomatch1);
+ await using var producer4 = CreateProducer(client, nomatch2);
+
+ //Act
+ var produced = new List<MessageId>();
+ produced.AddRange(await ProduceMessages(producer1, 10, "test message",
_cts.Token));
+ produced.AddRange(await ProduceMessages(producer2, 10, "test message",
_cts.Token));
+ _ = await ProduceMessages(producer3, 10, "test message", _cts.Token);
+ _ = await ProduceMessages(producer4, 10, "test message", _cts.Token);
+ var consumed = await ConsumeMessages(consumer, produced.Count,
_cts.Token);
+
+ //Assert
+ consumed.ShouldBe(produced, true);
+ }
+
[Fact]
public async Task
Receive_WhenFaultedAfterInvokingReceive_ShouldThrowConsumerFaultedException()
{
@@ -371,6 +402,14 @@ public sealed class ConsumerTests : IDisposable
.StateChangedHandler(_testOutputHelper.Log)
.Create();
+ private IConsumer<string> CreateConsumer(IPulsarClient pulsarClient, Regex
topicsPattern)
+ => pulsarClient.NewConsumer(Schema.String)
+ .InitialPosition(SubscriptionInitialPosition.Earliest)
+ .SubscriptionName(CreateSubscriptionName())
+ .TopicsPattern(topicsPattern)
+ .StateChangedHandler(_testOutputHelper.Log)
+ .Create();
+
private IPulsarClient CreateClient()
=> PulsarClient
.Builder()