This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new def5a36  Clean up and added some more tests
def5a36 is described below

commit def5a36de7e2a451a9c9648d14feac07df1fa3c0
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Fri Jan 31 14:31:43 2025 +0100

    Clean up and added some more tests
---
 src/DotPulsar/ConsumerOptions.cs                   |  2 +-
 .../Internal/Abstractions/IConnectionPool.cs       |  3 ++-
 src/DotPulsar/Internal/ConnectionPool.cs           | 10 ++++----
 src/DotPulsar/Internal/Consumer.cs                 | 17 +++++++------
 tests/DotPulsar.Tests/Internal/ConsumerTests.cs    | 28 ++++++++++++++++++++++
 5 files changed, 46 insertions(+), 14 deletions(-)

diff --git a/src/DotPulsar/ConsumerOptions.cs b/src/DotPulsar/ConsumerOptions.cs
index 46c655c..f523ce8 100644
--- a/src/DotPulsar/ConsumerOptions.cs
+++ b/src/DotPulsar/ConsumerOptions.cs
@@ -84,7 +84,7 @@ public sealed class ConsumerOptions<TMessage>
     /// Initializes a new instance using the specified subscription name, 
topic and schema.
     /// </summary>
     public ConsumerOptions(string subscriptionName, string topic, 
ISchema<TMessage> schema)
-        : this (subscriptionName, schema, topic, null, Array.Empty<string>()) 
{ }
+        : this(subscriptionName, schema, topic, null, Array.Empty<string>()) { 
}
 
     /// <summary>
     /// Initializes a new instance using the specified subscription name, 
topics and schema.
diff --git a/src/DotPulsar/Internal/Abstractions/IConnectionPool.cs 
b/src/DotPulsar/Internal/Abstractions/IConnectionPool.cs
index b8d40cf..e38ceba 100644
--- a/src/DotPulsar/Internal/Abstractions/IConnectionPool.cs
+++ b/src/DotPulsar/Internal/Abstractions/IConnectionPool.cs
@@ -14,6 +14,7 @@
 
 namespace DotPulsar.Internal.Abstractions;
 
+using DotPulsar.Internal.PulsarApi;
 using System.Text.RegularExpressions;
 
 public interface IConnectionPool : IAsyncDisposable
@@ -22,5 +23,5 @@ public interface IConnectionPool : IAsyncDisposable
 
     ValueTask<uint> GetNumberOfPartitions(string topic, CancellationToken 
cancellationToken = default);
 
-    ValueTask<IEnumerable<string>> GetTopicsOfNamespace(RegexSubscriptionMode 
mode, Regex topicsPattern, CancellationToken cancellationToken = default);
+    ValueTask<IEnumerable<string>> 
GetTopicsOfNamespace(CommandGetTopicsOfNamespace.Mode mode, Regex 
topicsPattern, CancellationToken cancellationToken = default);
 }
diff --git a/src/DotPulsar/Internal/ConnectionPool.cs 
b/src/DotPulsar/Internal/ConnectionPool.cs
index 6d434bc..81ea171 100644
--- a/src/DotPulsar/Internal/ConnectionPool.cs
+++ b/src/DotPulsar/Internal/ConnectionPool.cs
@@ -234,7 +234,7 @@ public sealed class ConnectionPool : IConnectionPool
         return response.PartitionMetadataResponse.Partitions;
     }
 
-    public async ValueTask<IEnumerable<string>> 
GetTopicsOfNamespace(RegexSubscriptionMode mode, Regex topicsPattern, 
CancellationToken cancellationToken = default)
+    public async ValueTask<IEnumerable<string>> 
GetTopicsOfNamespace(CommandGetTopicsOfNamespace.Mode mode, Regex 
topicsPattern, CancellationToken cancellationToken = default)
     {
         var topicUriPattern = new 
Regex(@"^(persistent|non-persistent)://([^/]+)/([^/]+)/(.+)$", 
RegexOptions.Compiled);
 
@@ -251,15 +251,15 @@ public sealed class ConnectionPool : IConnectionPool
         if (!string.IsNullOrEmpty(persistence))
         {
             if (persistence.Equals("persistent"))
-                mode = RegexSubscriptionMode.Persistent;
+                mode = CommandGetTopicsOfNamespace.Mode.Persistent;
             else
-                mode = RegexSubscriptionMode.NonPersistent;
+                mode = CommandGetTopicsOfNamespace.Mode.NonPersistent;
         }
 
         var getTopicsOfNamespace = new CommandGetTopicsOfNamespace
         {
-            mode = (CommandGetTopicsOfNamespace.Mode) mode,
-            Namespace =$"{tenant}/{ns}",
+            mode = mode,
+            Namespace = $"{tenant}/{ns}",
             TopicsPattern = patternString
         };
 
diff --git a/src/DotPulsar/Internal/Consumer.cs 
b/src/DotPulsar/Internal/Consumer.cs
index a472b1f..5b71281 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -116,21 +116,23 @@ public sealed class Consumer<TMessage> : 
IConsumer<TMessage>
                 topics.Add(topic);
                 continue;
             }
-            
+
             for (var i = 0; i < numberOfPartitions; ++i)
             {
                 topics.Add(GetPartitionedTopicName(topic, i));
             }
         }
 
-        if (_consumerOptions.TopicsPattern is not null)
+        var pattern = _consumerOptions.TopicsPattern;
+        if (pattern is not null)
         {
-            topics.AddRange(await 
_connectionPool.GetTopicsOfNamespace(_consumerOptions.RegexSubscriptionMode, 
_consumerOptions.TopicsPattern, _cts.Token).ConfigureAwait(false));
+            var mode = (CommandGetTopicsOfNamespace.Mode) 
_consumerOptions.RegexSubscriptionMode;
+            var foundTopics = await _connectionPool.GetTopicsOfNamespace(mode, 
pattern, _cts.Token).ConfigureAwait(false);
+            topics.AddRange(foundTopics);
+            if (topics.Count == 0)
+                throw new TopicNotFoundException($"No topics were found using 
the pattern '{pattern}'");
         }
 
-        if (topics.Count == 0)
-            throw new TopicNotFoundException("No topics were found");
-
         _numberOfSubConsumers = topics.Count;
         var monitoringTasks = new 
Task<ConsumerStateChanged>[_numberOfSubConsumers];
         var states = new ConsumerState[_numberOfSubConsumers];
@@ -147,7 +149,8 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
             _singleSubConsumer = _subConsumers.First().Value;
 
         _receiveEnumerator = _subConsumers.GetEnumerator();
-        _receiveEnumerator.MoveNext();_allSubConsumersAreReady = true;
+        _receiveEnumerator.MoveNext();
+        _allSubConsumersAreReady = true;
         _semaphoreSlim.Release();
 
         while (true)
diff --git a/tests/DotPulsar.Tests/Internal/ConsumerTests.cs 
b/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
index 70f8d61..ea1f080 100644
--- a/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
+++ b/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
@@ -201,6 +201,34 @@ public sealed class ConsumerTests : IDisposable
         consumed.ShouldBe(produced, true);
     }
 
+    [Fact]
+    public async Task 
Receive_GivenTopicsPatternWithNoMatches_ShouldFaultConsumer()
+    {
+        //Arrange
+        await using var client = CreateClient();
+        await using var consumer = CreateConsumer(client, new 
Regex(@"persistent://public/default/nosuchtopics.*"));
+
+        //Act
+        var exception = await 
Record.ExceptionAsync(consumer.Receive(_cts.Token).AsTask);
+
+        //Assert
+        exception.ShouldBeOfType<ConsumerFaultedException>();
+    }
+
+    [Fact]
+    public async Task Receive_GivenInvalidTopicsPattern_ShouldFaultConsumer()
+    {
+        //Arrange
+        await using var client = CreateClient();
+        await using var consumer = CreateConsumer(client, new 
Regex(@"invalid://public/default/match.*"));
+
+        //Act
+        var exception = await 
Record.ExceptionAsync(consumer.Receive(_cts.Token).AsTask);
+
+        //Assert
+        exception.ShouldBeOfType<ConsumerFaultedException>();
+    }
+
     [Fact]
     public async Task 
Receive_WhenFaultedAfterInvokingReceive_ShouldThrowConsumerFaultedException()
     {

Reply via email to