This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 041b269  Added support for topics pattern
041b269 is described below

commit 041b2694f7a782c14986060257f7975f673f60e2
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Fri Jan 31 10:57:02 2025 +0100

    Added support for topics pattern
---
 CHANGELOG.md                                       |  2 +-
 src/DotPulsar/Abstractions/IConsumerBuilder.cs     | 16 ++++++-
 src/DotPulsar/ConsumerOptions.cs                   | 34 ++++++++++++---
 .../InvalidTopicsPatternException.cs}              | 11 ++---
 src/DotPulsar/Internal/Abstractions/IConnection.cs |  1 +
 .../Internal/Abstractions/IConnectionPool.cs       |  4 ++
 src/DotPulsar/Internal/ChannelManager.cs           |  3 ++
 src/DotPulsar/Internal/Connection.cs               | 16 +++++++
 src/DotPulsar/Internal/ConnectionPool.cs           | 49 ++++++++++++++++++++++
 src/DotPulsar/Internal/Consumer.cs                 | 15 ++++++-
 src/DotPulsar/Internal/ConsumerBuilder.cs          | 24 +++++++++--
 .../Internal/Extensions/CommandExtensions.cs       |  9 +++-
 src/DotPulsar/Internal/RequestResponseHandler.cs   |  7 ++++
 ...IConnectionPool.cs => RegexSubscriptionMode.cs} | 22 ++++++++--
 tests/DotPulsar.Tests/IntegrationFixture.cs        |  8 ++++
 tests/DotPulsar.Tests/Internal/ConsumerTests.cs    | 39 +++++++++++++++++
 16 files changed, 237 insertions(+), 23 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index ee245a0..5e7c0b2 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -8,7 +8,7 @@ The format is based on [Keep a 
Changelog](https://keepachangelog.com/en/1.1.0/)
 
 ### Added
 
-- Multi-topic subscriptions
+- Support for multi-topic subscriptions given either a list of topics and/or a 
topics pattern
 
 ### Changed
 
diff --git a/src/DotPulsar/Abstractions/IConsumerBuilder.cs 
b/src/DotPulsar/Abstractions/IConsumerBuilder.cs
index f02145a..a314dc9 100644
--- a/src/DotPulsar/Abstractions/IConsumerBuilder.cs
+++ b/src/DotPulsar/Abstractions/IConsumerBuilder.cs
@@ -14,6 +14,8 @@
 
 namespace DotPulsar.Abstractions;
 
+using System.Text.RegularExpressions;
+
 /// <summary>
 /// A consumer building abstraction.
 /// </summary>
@@ -44,6 +46,11 @@ public interface IConsumerBuilder<TMessage>
     /// </summary>
     IConsumerBuilder<TMessage> ReadCompacted(bool readCompacted);
 
+    /// <summary>
+    /// Determines which topics this consumer should be subscribed to - 
Persistent, Non-Persistent, or both. The default is 'Persistent'.
+    /// </summary>
+    IConsumerBuilder<TMessage> RegexSubscriptionMode(RegexSubscriptionMode 
regexSubscriptionMode);
+
     /// <summary>
     /// Whether to replicate the subscription's state across clusters (when 
using geo-replication). The default is 'false'.
     /// </summary>
@@ -70,15 +77,20 @@ public interface IConsumerBuilder<TMessage>
     IConsumerBuilder<TMessage> SubscriptionType(SubscriptionType type);
 
     /// <summary>
-    /// Set the topic for this consumer. This, or setting multiple topics, is 
required.
+    /// Set the topic for this consumer. This, or setting multiple topics or a 
topic pattern, is required.
     /// </summary>
     IConsumerBuilder<TMessage> Topic(string topic);
 
     /// <summary>
-    /// Set the topics for this consumer. This, or setting a single topic, is 
required.
+    /// Set the topics for this consumer. This, or setting a single topic or a 
topic pattern, is required.
     /// </summary>
     IConsumerBuilder<TMessage> Topics(IEnumerable<string> topics);
 
+    /// <summary>
+    /// Specify a pattern for topics that this consumer subscribes to. This, 
or setting a single topic or multiple topics, is required.
+    /// </summary>
+    IConsumerBuilder<TMessage> TopicsPattern(Regex topicsPattern);
+
     /// <summary>
     /// Create the consumer.
     /// </summary>
diff --git a/src/DotPulsar/ConsumerOptions.cs b/src/DotPulsar/ConsumerOptions.cs
index 669ef87..46c655c 100644
--- a/src/DotPulsar/ConsumerOptions.cs
+++ b/src/DotPulsar/ConsumerOptions.cs
@@ -15,6 +15,7 @@
 namespace DotPulsar;
 
 using DotPulsar.Abstractions;
+using System.Text.RegularExpressions;
 
 /// <summary>
 /// The consumer building options.
@@ -41,6 +42,11 @@ public sealed class ConsumerOptions<TMessage>
     /// </summary>
     public static readonly bool DefaultReadCompacted = false;
 
+    /// <summary>
+    /// The default regex subscription mode.
+    /// </summary>
+    public static readonly RegexSubscriptionMode DefaultRegexSubscriptionMode 
= DotPulsar.RegexSubscriptionMode.Persistent;
+
     /// <summary>
     /// The default of whether to replicate the subscription's state.
     /// </summary>
@@ -53,12 +59,13 @@ public sealed class ConsumerOptions<TMessage>
 
     private readonly HashSet<string> _topics;
 
-    private ConsumerOptions(string subscriptionName, ISchema<TMessage> schema, 
string topic, IEnumerable<string> topics)
+    private ConsumerOptions(string subscriptionName, ISchema<TMessage> schema, 
string topic, Regex? topicsPattern, IEnumerable<string> topics)
     {
         InitialPosition = DefaultInitialPosition;
         PriorityLevel = DefaultPriorityLevel;
         MessagePrefetchCount = DefaultMessagePrefetchCount;
         ReadCompacted = DefaultReadCompacted;
+        RegexSubscriptionMode = DefaultRegexSubscriptionMode;
         ReplicateSubscriptionState = DefaultReplicateSubscriptionState;
         SubscriptionType = DefaultSubscriptionType;
         SubscriptionProperties = [];
@@ -70,19 +77,26 @@ public sealed class ConsumerOptions<TMessage>
         {
             _topics.Add(t);
         }
+        TopicsPattern = topicsPattern;
     }
 
     /// <summary>
     /// Initializes a new instance using the specified subscription name, 
topic and schema.
     /// </summary>
     public ConsumerOptions(string subscriptionName, string topic, 
ISchema<TMessage> schema)
-        : this (subscriptionName, schema, topic, Array.Empty<string>()) { }
+        : this (subscriptionName, schema, topic, null, Array.Empty<string>()) 
{ }
 
     /// <summary>
     /// Initializes a new instance using the specified subscription name, 
topics and schema.
     /// </summary>
     public ConsumerOptions(string subscriptionName, IEnumerable<string> 
topics, ISchema<TMessage> schema)
-        : this(subscriptionName, schema, string.Empty, topics) { }
+        : this(subscriptionName, schema, string.Empty, null, topics) { }
+
+    /// <summary>
+    /// Initializes a new instance using the specified subscription name, 
topics pattern and schema.
+    /// </summary>
+    public ConsumerOptions(string subscriptionName, Regex topicsPattern, 
ISchema<TMessage> schema)
+        : this(subscriptionName, schema, string.Empty, topicsPattern, 
Array.Empty<string>()) { }
 
     /// <summary>
     /// Set the consumer name. This is optional.
@@ -109,6 +123,11 @@ public sealed class ConsumerOptions<TMessage>
     /// </summary>
     public bool ReadCompacted { get; set; }
 
+    /// <summary>
+    /// Determines which topics this consumer should be subscribed to - 
Persistent, Non-Persistent, or both. The default is 'Persistent'.
+    /// </summary>
+    public RegexSubscriptionMode RegexSubscriptionMode { get; set; }
+
     /// <summary>
     /// Whether to replicate the subscription's state across clusters (when 
using geo-replication). The default is 'false'.
     /// </summary>
@@ -140,12 +159,12 @@ public sealed class ConsumerOptions<TMessage>
     public SubscriptionType SubscriptionType { get; set; }
 
     /// <summary>
-    /// Set the topic for this consumer. This, or setting multiple topics, is 
required.
+    /// Set the topic for this consumer. This, or setting multiple topics or a 
topic pattern, is required.
     /// </summary>
     public string Topic { get; set; }
 
     /// <summary>
-    /// Set the topics for this consumer. This, or setting a single topic, is 
required.
+    /// Set the topics for this consumer. This, or setting a single topic or a 
topic pattern, is required.
     /// </summary>
     public IEnumerable<string> Topics
     {
@@ -160,4 +179,9 @@ public sealed class ConsumerOptions<TMessage>
             }
         }
     }
+
+    /// <summary>
+    /// Specify a pattern for topics that this consumer subscribes to. This, 
or setting a single topic or multiple topics, is required.
+    /// </summary>
+    public Regex? TopicsPattern { get; set; }
 }
diff --git a/src/DotPulsar/Internal/Abstractions/IConnectionPool.cs 
b/src/DotPulsar/Exceptions/InvalidTopicsPatternException.cs
similarity index 64%
copy from src/DotPulsar/Internal/Abstractions/IConnectionPool.cs
copy to src/DotPulsar/Exceptions/InvalidTopicsPatternException.cs
index dd338bc..3bb5bb0 100644
--- a/src/DotPulsar/Internal/Abstractions/IConnectionPool.cs
+++ b/src/DotPulsar/Exceptions/InvalidTopicsPatternException.cs
@@ -12,11 +12,12 @@
  * limitations under the License.
  */
 
-namespace DotPulsar.Internal.Abstractions;
+namespace DotPulsar.Exceptions;
 
-public interface IConnectionPool : IAsyncDisposable
+/// <summary>
+/// The topics pattern is not valid
+/// </summary>
+public sealed class InvalidTopicsPatternException : DotPulsarException
 {
-    ValueTask<IConnection> FindConnectionForTopic(string topic, 
CancellationToken cancellationToken = default);
-
-    ValueTask<uint> GetNumberOfPartitions(string topic, CancellationToken 
cancellationToken = default);
+    public InvalidTopicsPatternException(string message) : base(message) { }
 }
diff --git a/src/DotPulsar/Internal/Abstractions/IConnection.cs 
b/src/DotPulsar/Internal/Abstractions/IConnection.cs
index 2f2a5b8..c92f596 100644
--- a/src/DotPulsar/Internal/Abstractions/IConnection.cs
+++ b/src/DotPulsar/Internal/Abstractions/IConnection.cs
@@ -40,4 +40,5 @@ public interface IConnection : IStateHolder<ConnectionState>, 
IAsyncDisposable
     Task Send(SendPackage command, TaskCompletionSource<BaseCommand> 
responseTcs, CancellationToken cancellationToken);
     Task<BaseCommand> Send(CommandGetOrCreateSchema command, CancellationToken 
cancellationToken);
     Task<BaseCommand> Send(CommandPartitionedTopicMetadata command, 
CancellationToken cancellationToken);
+    Task<BaseCommand> Send(CommandGetTopicsOfNamespace command, 
CancellationToken cancellationToken);
 }
diff --git a/src/DotPulsar/Internal/Abstractions/IConnectionPool.cs 
b/src/DotPulsar/Internal/Abstractions/IConnectionPool.cs
index dd338bc..b8d40cf 100644
--- a/src/DotPulsar/Internal/Abstractions/IConnectionPool.cs
+++ b/src/DotPulsar/Internal/Abstractions/IConnectionPool.cs
@@ -14,9 +14,13 @@
 
 namespace DotPulsar.Internal.Abstractions;
 
+using System.Text.RegularExpressions;
+
 public interface IConnectionPool : IAsyncDisposable
 {
     ValueTask<IConnection> FindConnectionForTopic(string topic, 
CancellationToken cancellationToken = default);
 
     ValueTask<uint> GetNumberOfPartitions(string topic, CancellationToken 
cancellationToken = default);
+
+    ValueTask<IEnumerable<string>> GetTopicsOfNamespace(RegexSubscriptionMode 
mode, Regex topicsPattern, CancellationToken cancellationToken = default);
 }
diff --git a/src/DotPulsar/Internal/ChannelManager.cs 
b/src/DotPulsar/Internal/ChannelManager.cs
index dc7e14a..8302920 100644
--- a/src/DotPulsar/Internal/ChannelManager.cs
+++ b/src/DotPulsar/Internal/ChannelManager.cs
@@ -171,6 +171,9 @@ public sealed class ChannelManager : 
IStateHolder<ChannelManagerState>, IDisposa
     public Task<BaseCommand> Outgoing(CommandPartitionedTopicMetadata command)
         => _requestResponseHandler.Outgoing(command);
 
+    public Task<BaseCommand> Outgoing(CommandGetTopicsOfNamespace command)
+        => _requestResponseHandler.Outgoing(command);
+
     public Task<BaseCommand> Outgoing(CommandSeek command)
     {
         using (TakeConsumerSenderLock(command.ConsumerId))
diff --git a/src/DotPulsar/Internal/Connection.cs 
b/src/DotPulsar/Internal/Connection.cs
index 03f618e..7b7189e 100644
--- a/src/DotPulsar/Internal/Connection.cs
+++ b/src/DotPulsar/Internal/Connection.cs
@@ -296,6 +296,22 @@ public sealed class Connection : IConnection
         return await responseTask.ConfigureAwait(false);
     }
 
+    public async Task<BaseCommand> Send(CommandGetTopicsOfNamespace command, 
CancellationToken cancellationToken)
+    {
+        ThrowIfDisposed();
+
+        Task<BaseCommand>? responseTask;
+
+        using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
+        {
+            responseTask = _channelManager.Outgoing(command);
+            var sequence = Serializer.Serialize(command.AsBaseCommand());
+            await _stream.Send(sequence).ConfigureAwait(false);
+        }
+
+        return await responseTask.ConfigureAwait(false);
+    }
+
     private async Task Send(BaseCommand command, CancellationToken 
cancellationToken)
     {
         ThrowIfDisposed();
diff --git a/src/DotPulsar/Internal/ConnectionPool.cs 
b/src/DotPulsar/Internal/ConnectionPool.cs
index 0e0e540..6d434bc 100644
--- a/src/DotPulsar/Internal/ConnectionPool.cs
+++ b/src/DotPulsar/Internal/ConnectionPool.cs
@@ -20,6 +20,7 @@ using DotPulsar.Internal.Abstractions;
 using DotPulsar.Internal.Extensions;
 using DotPulsar.Internal.PulsarApi;
 using System.Collections.Concurrent;
+using System.Text.RegularExpressions;
 
 public sealed class ConnectionPool : IConnectionPool
 {
@@ -232,4 +233,52 @@ public sealed class ConnectionPool : IConnectionPool
 
         return response.PartitionMetadataResponse.Partitions;
     }
+
+    public async ValueTask<IEnumerable<string>> 
GetTopicsOfNamespace(RegexSubscriptionMode mode, Regex topicsPattern, 
CancellationToken cancellationToken = default)
+    {
+        var topicUriPattern = new 
Regex(@"^(persistent|non-persistent)://([^/]+)/([^/]+)/(.+)$", 
RegexOptions.Compiled);
+
+        var patternString = topicsPattern.ToString();
+
+        var match = topicUriPattern.Match(patternString);
+        if (!match.Success)
+            throw new InvalidTopicsPatternException($"The topics pattern 
'{patternString}' is not valid");
+
+        var persistence = match.Groups[1].Value;
+        var tenant = match.Groups[2].Value;
+        var ns = match.Groups[3].Value;
+
+        if (!string.IsNullOrEmpty(persistence))
+        {
+            if (persistence.Equals("persistent"))
+                mode = RegexSubscriptionMode.Persistent;
+            else
+                mode = RegexSubscriptionMode.NonPersistent;
+        }
+
+        var getTopicsOfNamespace = new CommandGetTopicsOfNamespace
+        {
+            mode = (CommandGetTopicsOfNamespace.Mode) mode,
+            Namespace =$"{tenant}/{ns}",
+            TopicsPattern = patternString
+        };
+
+        var connection = await GetConnection(_serviceUrl, 
cancellationToken).ConfigureAwait(false);
+        var response = await connection.Send(getTopicsOfNamespace, 
cancellationToken).ConfigureAwait(false);
+
+        response.Expect(BaseCommand.Type.GetTopicsOfNamespaceResponse);
+
+        if (response.getTopicsOfNamespaceResponse.Filtered)
+            return response.getTopicsOfNamespaceResponse.Topics;
+
+        var topics = new List<string>();
+
+        foreach (var topic in response.getTopicsOfNamespaceResponse.Topics)
+        {
+            if (topicsPattern.Match(topic).Success)
+                topics.Add(topic);
+        }
+
+        return topics;
+    }
 }
diff --git a/src/DotPulsar/Internal/Consumer.cs 
b/src/DotPulsar/Internal/Consumer.cs
index fc6eba0..a472b1f 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -29,7 +29,7 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
     private readonly ConsumerOptions<TMessage> _consumerOptions;
     private readonly CancellationTokenSource _cts;
     private readonly IHandleException _exceptionHandler;
-    private readonly IExecute _executor;
+    private readonly Executor _executor;
     private readonly SemaphoreSlim _semaphoreSlim;
     private readonly AsyncLock _lock;
     private readonly Dictionary<string, SubConsumer<TMessage>> _subConsumers;
@@ -60,6 +60,8 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
         SubscriptionType = consumerOptions.SubscriptionType;
         if (!string.IsNullOrEmpty(consumerOptions.Topic))
             Topic = consumerOptions.Topic;
+        else if (consumerOptions.TopicsPattern is not null)
+            Topic = consumerOptions.TopicsPattern.ToString();
         else
             Topic = string.Join(",", consumerOptions.Topics);
         _receiveTasks = [];
@@ -101,6 +103,7 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
     private async Task Monitor()
     {
         var userDefinedTopics = new List<string>(_consumerOptions.Topics);
+
         if (!string.IsNullOrEmpty(_consumerOptions.Topic))
             userDefinedTopics.Add(_consumerOptions.Topic);
 
@@ -120,6 +123,14 @@ public sealed class Consumer<TMessage> : 
IConsumer<TMessage>
             }
         }
 
+        if (_consumerOptions.TopicsPattern is not null)
+        {
+            topics.AddRange(await 
_connectionPool.GetTopicsOfNamespace(_consumerOptions.RegexSubscriptionMode, 
_consumerOptions.TopicsPattern, _cts.Token).ConfigureAwait(false));
+        }
+
+        if (topics.Count == 0)
+            throw new TopicNotFoundException("No topics were found");
+
         _numberOfSubConsumers = topics.Count;
         var monitoringTasks = new 
Task<ConsumerStateChanged>[_numberOfSubConsumers];
         var states = new ConsumerState[_numberOfSubConsumers];
@@ -450,7 +461,7 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
         return subConsumer;
     }
 
-    private string GetPartitionedTopicName(string topic, int partitionNumber) 
=> $"{topic}-partition-{partitionNumber}";
+    private static string GetPartitionedTopicName(string topic, int 
partitionNumber) => $"{topic}-partition-{partitionNumber}";
 
     private static StateManager<ConsumerState> CreateStateManager()
         => new(ConsumerState.Disconnected, ConsumerState.Closed, 
ConsumerState.ReachedEndOfTopic, ConsumerState.Faulted);
diff --git a/src/DotPulsar/Internal/ConsumerBuilder.cs 
b/src/DotPulsar/Internal/ConsumerBuilder.cs
index 28f22fc..90b2880 100644
--- a/src/DotPulsar/Internal/ConsumerBuilder.cs
+++ b/src/DotPulsar/Internal/ConsumerBuilder.cs
@@ -16,6 +16,7 @@ namespace DotPulsar.Internal;
 
 using DotPulsar.Abstractions;
 using DotPulsar.Exceptions;
+using System.Text.RegularExpressions;
 
 public sealed class ConsumerBuilder<TMessage> : IConsumerBuilder<TMessage>
 {
@@ -26,12 +27,14 @@ public sealed class ConsumerBuilder<TMessage> : 
IConsumerBuilder<TMessage>
     private int _priorityLevel;
     private uint _messagePrefetchCount;
     private bool _readCompacted;
+    private RegexSubscriptionMode _regexSubscriptionMode;
     private bool _replicateSubscriptionState;
     private string? _subscriptionName;
     private readonly Dictionary<string, string> _subscriptionProperties;
     private SubscriptionType _subscriptionType;
     private string _topic;
     private readonly HashSet<string> _topics;
+    private Regex? _topicsPattern;
     private IHandleStateChanged<ConsumerStateChanged>? _stateChangedHandler;
 
     public ConsumerBuilder(IPulsarClient pulsarClient, ISchema<TMessage> 
schema)
@@ -42,6 +45,7 @@ public sealed class ConsumerBuilder<TMessage> : 
IConsumerBuilder<TMessage>
         _priorityLevel = ConsumerOptions<TMessage>.DefaultPriorityLevel;
         _messagePrefetchCount = 
ConsumerOptions<TMessage>.DefaultMessagePrefetchCount;
         _readCompacted = ConsumerOptions<TMessage>.DefaultReadCompacted;
+        _regexSubscriptionMode = 
ConsumerOptions<TMessage>.DefaultRegexSubscriptionMode;
         _replicateSubscriptionState = 
ConsumerOptions<TMessage>.DefaultReplicateSubscriptionState;
         _subscriptionProperties = [];
         _subscriptionType = ConsumerOptions<TMessage>.DefaultSubscriptionType;
@@ -79,6 +83,12 @@ public sealed class ConsumerBuilder<TMessage> : 
IConsumerBuilder<TMessage>
         return this;
     }
 
+    public IConsumerBuilder<TMessage> 
RegexSubscriptionMode(RegexSubscriptionMode regexSubscriptionMode)
+    {
+        _regexSubscriptionMode = regexSubscriptionMode;
+        return this;
+    }
+
     public IConsumerBuilder<TMessage> ReplicateSubscriptionState(bool 
replicateSubscriptionState)
     {
         _replicateSubscriptionState = replicateSubscriptionState;
@@ -127,13 +137,19 @@ public sealed class ConsumerBuilder<TMessage> : 
IConsumerBuilder<TMessage>
         return this;
     }
 
+    public IConsumerBuilder<TMessage> TopicsPattern(Regex topicsPattern)
+    {
+        _topicsPattern = topicsPattern;
+        return this;
+    }
+
     public IConsumer<TMessage> Create()
     {
         if (string.IsNullOrEmpty(_subscriptionName))
             throw new ConfigurationException("SubscriptionName may not be null 
or empty");
 
-        if (string.IsNullOrEmpty(_topic) && _topics.Count == 0)
-            throw new ConfigurationException("A 'Topic' or multiple 'Topics' 
must be set");
+        if (string.IsNullOrEmpty(_topic) && _topics.Count == 0 && 
_topicsPattern is null)
+            throw new ConfigurationException("A 'Topic', multiple 'Topics', or 
a 'TopicsPattern' must be set");
 
         var options = new ConsumerOptions<TMessage>(_subscriptionName!, 
_topic, _schema)
         {
@@ -142,11 +158,13 @@ public sealed class ConsumerBuilder<TMessage> : 
IConsumerBuilder<TMessage>
             MessagePrefetchCount = _messagePrefetchCount,
             PriorityLevel = _priorityLevel,
             ReadCompacted = _readCompacted,
+            RegexSubscriptionMode = _regexSubscriptionMode,
             ReplicateSubscriptionState = _replicateSubscriptionState,
             StateChangedHandler = _stateChangedHandler,
             SubscriptionProperties = _subscriptionProperties,
             SubscriptionType = _subscriptionType,
-            Topics = _topics
+            Topics = _topics,
+            TopicsPattern = _topicsPattern
         };
 
         return _pulsarClient.CreateConsumer(options);
diff --git a/src/DotPulsar/Internal/Extensions/CommandExtensions.cs 
b/src/DotPulsar/Internal/Extensions/CommandExtensions.cs
index e1883f5..9da9e3d 100644
--- a/src/DotPulsar/Internal/Extensions/CommandExtensions.cs
+++ b/src/DotPulsar/Internal/Extensions/CommandExtensions.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -205,4 +205,11 @@ public static class CommandExtensions
             CommandType = BaseCommand.Type.PartitionedMetadata,
             PartitionMetadata = command
         };
+
+    public static BaseCommand AsBaseCommand(this CommandGetTopicsOfNamespace 
command)
+        => new()
+        {
+            CommandType = BaseCommand.Type.GetTopicsOfNamespace,
+            getTopicsOfNamespace = command
+        };
 }
diff --git a/src/DotPulsar/Internal/RequestResponseHandler.cs 
b/src/DotPulsar/Internal/RequestResponseHandler.cs
index 1e5a94c..a7903db 100644
--- a/src/DotPulsar/Internal/RequestResponseHandler.cs
+++ b/src/DotPulsar/Internal/RequestResponseHandler.cs
@@ -40,6 +40,7 @@ public sealed class RequestResponseHandler : IDisposable
         _getResponseIdentifier.Set(BaseCommand.Type.CloseProducer, cmd => 
StandardRequest.WithProducerId(cmd.CloseProducer.RequestId, 
cmd.CloseProducer.ProducerId));
         _getResponseIdentifier.Set(BaseCommand.Type.LookupResponse, cmd => 
StandardRequest.WithRequestId(cmd.LookupTopicResponse.RequestId));
         
_getResponseIdentifier.Set(BaseCommand.Type.PartitionedMetadataResponse, cmd => 
StandardRequest.WithRequestId(cmd.PartitionMetadataResponse.RequestId));
+        
_getResponseIdentifier.Set(BaseCommand.Type.GetTopicsOfNamespaceResponse, cmd 
=> StandardRequest.WithRequestId(cmd.getTopicsOfNamespaceResponse.RequestId));
         _getResponseIdentifier.Set(BaseCommand.Type.GetLastMessageIdResponse, 
cmd => StandardRequest.WithRequestId(cmd.GetLastMessageIdResponse.RequestId));
         _getResponseIdentifier.Set(BaseCommand.Type.GetOrCreateSchemaResponse, 
cmd => StandardRequest.WithRequestId(cmd.GetOrCreateSchemaResponse.RequestId));
         _getResponseIdentifier.Set(BaseCommand.Type.Success, cmd => 
StandardRequest.WithRequestId(cmd.Success.RequestId));
@@ -118,6 +119,12 @@ public sealed class RequestResponseHandler : IDisposable
         return 
_requests.CreateTask(StandardRequest.WithRequestId(command.RequestId));
     }
 
+    public Task<BaseCommand> Outgoing(CommandGetTopicsOfNamespace command)
+    {
+        command.RequestId = _requestId.FetchNext();
+        return 
_requests.CreateTask(StandardRequest.WithRequestId(command.RequestId));
+    }
+
     public Task<BaseCommand> Outgoing(CommandSeek command)
     {
         command.RequestId = _requestId.FetchNext();
diff --git a/src/DotPulsar/Internal/Abstractions/IConnectionPool.cs 
b/src/DotPulsar/RegexSubscriptionMode.cs
similarity index 51%
copy from src/DotPulsar/Internal/Abstractions/IConnectionPool.cs
copy to src/DotPulsar/RegexSubscriptionMode.cs
index dd338bc..6306c46 100644
--- a/src/DotPulsar/Internal/Abstractions/IConnectionPool.cs
+++ b/src/DotPulsar/RegexSubscriptionMode.cs
@@ -12,11 +12,25 @@
  * limitations under the License.
  */
 
-namespace DotPulsar.Internal.Abstractions;
+namespace DotPulsar;
 
-public interface IConnectionPool : IAsyncDisposable
+/// <summary>
+/// When subscribing to topics using a regular expression, one can specify to 
only pick a certain type of topics.
+/// </summary>
+public enum RegexSubscriptionMode : byte
 {
-    ValueTask<IConnection> FindConnectionForTopic(string topic, 
CancellationToken cancellationToken = default);
+    /// <summary>
+    /// Only subscribe to persistent topics.
+    /// </summary>
+    Persistent = 0,
 
-    ValueTask<uint> GetNumberOfPartitions(string topic, CancellationToken 
cancellationToken = default);
+    /// <summary>
+    /// Only subscribe to non-persistent topics.
+    /// </summary>
+    NonPersistent = 1,
+
+    /// <summary>
+    /// Subscribe to both persistent and non-persistent topics.
+    /// </summary>
+    All = 2,
 }
diff --git a/tests/DotPulsar.Tests/IntegrationFixture.cs 
b/tests/DotPulsar.Tests/IntegrationFixture.cs
index 18c5eec..1d1f126 100644
--- a/tests/DotPulsar.Tests/IntegrationFixture.cs
+++ b/tests/DotPulsar.Tests/IntegrationFixture.cs
@@ -136,6 +136,14 @@ public class IntegrationFixture : IAsyncLifetime
         return topic;
     }
 
+    public async Task CreateTopics(IEnumerable<string> topics, 
CancellationToken cancellationToken)
+    {
+        foreach (var topic in topics)
+        {
+            await CreateTopic(topic, cancellationToken);
+        }
+    }
+
     public async Task CreateTopic(string topic, CancellationToken 
cancellationToken)
     {
         var arguments = $"bin/pulsar-admin topics create {topic}";
diff --git a/tests/DotPulsar.Tests/Internal/ConsumerTests.cs 
b/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
index dfb76e0..70f8d61 100644
--- a/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
+++ b/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
@@ -17,6 +17,7 @@ namespace DotPulsar.Tests.Internal;
 using DotPulsar.Abstractions;
 using DotPulsar.Exceptions;
 using DotPulsar.Extensions;
+using System.Text.RegularExpressions;
 using Xunit.Abstractions;
 
 [Collection("Integration"), Trait("Category", "Integration")]
@@ -170,6 +171,36 @@ public sealed class ConsumerTests : IDisposable
         consumed.ShouldBe(produced, true);
     }
 
+    [Fact]
+    public async Task Receive_GivenTopicsPattern_ShouldReceiveAll()
+    {
+        //Arrange
+        var match1 = $"persistent://public/default/match-{Guid.NewGuid():N}";
+        var match2 = $"persistent://public/default/match-{Guid.NewGuid():N}";
+        var nomatch1 = 
$"non-persistent://public/default/match-{Guid.NewGuid():N}";
+        const string nomatch2 = "persistent://public/default/nomatch";
+
+        await _fixture.CreateTopics([match1, match2, nomatch1, nomatch2], 
_cts.Token);
+
+        await using var client = CreateClient();
+        await using var consumer = CreateConsumer(client, new 
Regex(@"persistent://public/default/match.*"));
+        await using var producer1 = CreateProducer(client, match1);
+        await using var producer2 = CreateProducer(client, match2);
+        await using var producer3 = CreateProducer(client, nomatch1);
+        await using var producer4 = CreateProducer(client, nomatch2);
+
+        //Act
+        var produced = new List<MessageId>();
+        produced.AddRange(await ProduceMessages(producer1, 10, "test message", 
_cts.Token));
+        produced.AddRange(await ProduceMessages(producer2, 10, "test message", 
_cts.Token));
+        _ = await ProduceMessages(producer3, 10, "test message", _cts.Token);
+        _ = await ProduceMessages(producer4, 10, "test message", _cts.Token);
+        var consumed = await ConsumeMessages(consumer, produced.Count, 
_cts.Token);
+
+        //Assert
+        consumed.ShouldBe(produced, true);
+    }
+
     [Fact]
     public async Task 
Receive_WhenFaultedAfterInvokingReceive_ShouldThrowConsumerFaultedException()
     {
@@ -371,6 +402,14 @@ public sealed class ConsumerTests : IDisposable
         .StateChangedHandler(_testOutputHelper.Log)
         .Create();
 
+    private IConsumer<string> CreateConsumer(IPulsarClient pulsarClient, Regex 
topicsPattern)
+        => pulsarClient.NewConsumer(Schema.String)
+        .InitialPosition(SubscriptionInitialPosition.Earliest)
+        .SubscriptionName(CreateSubscriptionName())
+        .TopicsPattern(topicsPattern)
+        .StateChangedHandler(_testOutputHelper.Log)
+        .Create();
+
     private IPulsarClient CreateClient()
         => PulsarClient
         .Builder()

Reply via email to