This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new def5a36 Clean up and added some more tests
def5a36 is described below
commit def5a36de7e2a451a9c9648d14feac07df1fa3c0
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Fri Jan 31 14:31:43 2025 +0100
Clean up and added some more tests
---
src/DotPulsar/ConsumerOptions.cs | 2 +-
.../Internal/Abstractions/IConnectionPool.cs | 3 ++-
src/DotPulsar/Internal/ConnectionPool.cs | 10 ++++----
src/DotPulsar/Internal/Consumer.cs | 17 +++++++------
tests/DotPulsar.Tests/Internal/ConsumerTests.cs | 28 ++++++++++++++++++++++
5 files changed, 46 insertions(+), 14 deletions(-)
diff --git a/src/DotPulsar/ConsumerOptions.cs b/src/DotPulsar/ConsumerOptions.cs
index 46c655c..f523ce8 100644
--- a/src/DotPulsar/ConsumerOptions.cs
+++ b/src/DotPulsar/ConsumerOptions.cs
@@ -84,7 +84,7 @@ public sealed class ConsumerOptions<TMessage>
/// Initializes a new instance using the specified subscription name,
topic and schema.
/// </summary>
public ConsumerOptions(string subscriptionName, string topic,
ISchema<TMessage> schema)
- : this (subscriptionName, schema, topic, null, Array.Empty<string>())
{ }
+ : this(subscriptionName, schema, topic, null, Array.Empty<string>()) {
}
/// <summary>
/// Initializes a new instance using the specified subscription name,
topics and schema.
diff --git a/src/DotPulsar/Internal/Abstractions/IConnectionPool.cs
b/src/DotPulsar/Internal/Abstractions/IConnectionPool.cs
index b8d40cf..e38ceba 100644
--- a/src/DotPulsar/Internal/Abstractions/IConnectionPool.cs
+++ b/src/DotPulsar/Internal/Abstractions/IConnectionPool.cs
@@ -14,6 +14,7 @@
namespace DotPulsar.Internal.Abstractions;
+using DotPulsar.Internal.PulsarApi;
using System.Text.RegularExpressions;
public interface IConnectionPool : IAsyncDisposable
@@ -22,5 +23,5 @@ public interface IConnectionPool : IAsyncDisposable
ValueTask<uint> GetNumberOfPartitions(string topic, CancellationToken
cancellationToken = default);
- ValueTask<IEnumerable<string>> GetTopicsOfNamespace(RegexSubscriptionMode
mode, Regex topicsPattern, CancellationToken cancellationToken = default);
+ ValueTask<IEnumerable<string>>
GetTopicsOfNamespace(CommandGetTopicsOfNamespace.Mode mode, Regex
topicsPattern, CancellationToken cancellationToken = default);
}
diff --git a/src/DotPulsar/Internal/ConnectionPool.cs
b/src/DotPulsar/Internal/ConnectionPool.cs
index 6d434bc..81ea171 100644
--- a/src/DotPulsar/Internal/ConnectionPool.cs
+++ b/src/DotPulsar/Internal/ConnectionPool.cs
@@ -234,7 +234,7 @@ public sealed class ConnectionPool : IConnectionPool
return response.PartitionMetadataResponse.Partitions;
}
- public async ValueTask<IEnumerable<string>>
GetTopicsOfNamespace(RegexSubscriptionMode mode, Regex topicsPattern,
CancellationToken cancellationToken = default)
+ public async ValueTask<IEnumerable<string>>
GetTopicsOfNamespace(CommandGetTopicsOfNamespace.Mode mode, Regex
topicsPattern, CancellationToken cancellationToken = default)
{
var topicUriPattern = new
Regex(@"^(persistent|non-persistent)://([^/]+)/([^/]+)/(.+)$",
RegexOptions.Compiled);
@@ -251,15 +251,15 @@ public sealed class ConnectionPool : IConnectionPool
if (!string.IsNullOrEmpty(persistence))
{
if (persistence.Equals("persistent"))
- mode = RegexSubscriptionMode.Persistent;
+ mode = CommandGetTopicsOfNamespace.Mode.Persistent;
else
- mode = RegexSubscriptionMode.NonPersistent;
+ mode = CommandGetTopicsOfNamespace.Mode.NonPersistent;
}
var getTopicsOfNamespace = new CommandGetTopicsOfNamespace
{
- mode = (CommandGetTopicsOfNamespace.Mode) mode,
- Namespace =$"{tenant}/{ns}",
+ mode = mode,
+ Namespace = $"{tenant}/{ns}",
TopicsPattern = patternString
};
diff --git a/src/DotPulsar/Internal/Consumer.cs
b/src/DotPulsar/Internal/Consumer.cs
index a472b1f..5b71281 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -116,21 +116,23 @@ public sealed class Consumer<TMessage> :
IConsumer<TMessage>
topics.Add(topic);
continue;
}
-
+
for (var i = 0; i < numberOfPartitions; ++i)
{
topics.Add(GetPartitionedTopicName(topic, i));
}
}
- if (_consumerOptions.TopicsPattern is not null)
+ var pattern = _consumerOptions.TopicsPattern;
+ if (pattern is not null)
{
- topics.AddRange(await
_connectionPool.GetTopicsOfNamespace(_consumerOptions.RegexSubscriptionMode,
_consumerOptions.TopicsPattern, _cts.Token).ConfigureAwait(false));
+ var mode = (CommandGetTopicsOfNamespace.Mode)
_consumerOptions.RegexSubscriptionMode;
+ var foundTopics = await _connectionPool.GetTopicsOfNamespace(mode,
pattern, _cts.Token).ConfigureAwait(false);
+ topics.AddRange(foundTopics);
+ if (topics.Count == 0)
+ throw new TopicNotFoundException($"No topics were found using
the pattern '{pattern}'");
}
- if (topics.Count == 0)
- throw new TopicNotFoundException("No topics were found");
-
_numberOfSubConsumers = topics.Count;
var monitoringTasks = new
Task<ConsumerStateChanged>[_numberOfSubConsumers];
var states = new ConsumerState[_numberOfSubConsumers];
@@ -147,7 +149,8 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
_singleSubConsumer = _subConsumers.First().Value;
_receiveEnumerator = _subConsumers.GetEnumerator();
- _receiveEnumerator.MoveNext();_allSubConsumersAreReady = true;
+ _receiveEnumerator.MoveNext();
+ _allSubConsumersAreReady = true;
_semaphoreSlim.Release();
while (true)
diff --git a/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
b/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
index 70f8d61..ea1f080 100644
--- a/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
+++ b/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
@@ -201,6 +201,34 @@ public sealed class ConsumerTests : IDisposable
consumed.ShouldBe(produced, true);
}
+ [Fact]
+ public async Task
Receive_GivenTopicsPatternWithNoMatches_ShouldFaultConsumer()
+ {
+ //Arrange
+ await using var client = CreateClient();
+ await using var consumer = CreateConsumer(client, new
Regex(@"persistent://public/default/nosuchtopics.*"));
+
+ //Act
+ var exception = await
Record.ExceptionAsync(consumer.Receive(_cts.Token).AsTask);
+
+ //Assert
+ exception.ShouldBeOfType<ConsumerFaultedException>();
+ }
+
+ [Fact]
+ public async Task Receive_GivenInvalidTopicsPattern_ShouldFaultConsumer()
+ {
+ //Arrange
+ await using var client = CreateClient();
+ await using var consumer = CreateConsumer(client, new
Regex(@"invalid://public/default/match.*"));
+
+ //Act
+ var exception = await
Record.ExceptionAsync(consumer.Receive(_cts.Token).AsTask);
+
+ //Assert
+ exception.ShouldBeOfType<ConsumerFaultedException>();
+ }
+
[Fact]
public async Task
Receive_WhenFaultedAfterInvokingReceive_ShouldThrowConsumerFaultedException()
{