This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 0cb94d8  Refactor tests
0cb94d8 is described below

commit 0cb94d81d8f072f9a142399966e2b291b9c4e533
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Wed Sep 20 12:42:37 2023 +0200

    Refactor tests
---
 .../{ => Internal}/ConsumerTests.cs                | 106 +++++------
 .../{ => Internal}/ProducerTests.cs                | 197 ++++++++-------------
 .../DotPulsar.Tests/{ => Internal}/ReaderTests.cs  |  49 +++--
 .../{TokenTests.cs => PulsarClientTests.cs}        |  23 +--
 4 files changed, 143 insertions(+), 232 deletions(-)

diff --git a/tests/DotPulsar.Tests/ConsumerTests.cs 
b/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
similarity index 63%
rename from tests/DotPulsar.Tests/ConsumerTests.cs
rename to tests/DotPulsar.Tests/Internal/ConsumerTests.cs
index 4f7414d..3cd6d8b 100644
--- a/tests/DotPulsar.Tests/ConsumerTests.cs
+++ b/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
@@ -12,7 +12,7 @@
  * limitations under the License.
  */
 
-namespace DotPulsar.Tests;
+namespace DotPulsar.Tests.Internal;
 
 using DotPulsar.Abstractions;
 using DotPulsar.Exceptions;
@@ -41,11 +41,8 @@ public class ConsumerTests
     public async Task 
GetLastMessageId_GivenEmptyTopic_ShouldBeEqualToMessageIdEarliest()
     {
         //Arrange
-        const string topicName = 
$"consumer-{nameof(GetLastMessageId_GivenEmptyTopic_ShouldBeEqualToMessageIdEarliest)}";
-        const string subscriptionName = "subscription-given-given-empty-topic";
-        const string consumerName = $"consumer-given-empty-topic";
         await using var client = CreateClient();
-        await using var consumer = CreateConsumer(client, 
SubscriptionInitialPosition.Earliest, topicName, consumerName, 
subscriptionName);
+        await using var consumer = CreateConsumer(client);
 
         //Act
         var actual = await consumer.GetLastMessageId();
@@ -58,20 +55,17 @@ public class ConsumerTests
     public async Task 
GetLastMessageId_GivenNonPartitionedTopic_ShouldGetMessageIdFromPartition()
     {
         //Arrange
-        const string topicName = 
"persistent://public/default/consumer-getlastmessageid-given-non-partitioned-topic";
-        const string subscriptionName = 
"subscription-given-given-partitioned-topic";
-        const string consumerName = $"consumer-given-partitioned-topic";
-        const string content = "test-message";
+        var topicName = CreateTopicName();
         const int numberOfMessages = 6;
 
         await using var client = CreateClient();
-        await using var consumer = CreateConsumer(client, 
SubscriptionInitialPosition.Earliest, topicName, consumerName, 
subscriptionName);
+        await using var consumer = CreateConsumer(client, topicName);
         await using var producer = CreateProducer(client, topicName);
 
         MessageId expected = null!;
         for (var i = 0; i < numberOfMessages; i++)
         {
-            var messageId = await producer.Send(content);
+            var messageId = await producer.Send("test-message");
             if (i >= 5)
             {
                 expected = messageId;
@@ -89,14 +83,12 @@ public class ConsumerTests
     public async Task 
GetLastMessageId_GivenPartitionedTopic_ShouldThrowException()
     {
         //Arrange
-        const string topicName = 
"persistent://public/default/consumer-getlastmessageid-given-partitioned-topic";
-        const string subscriptionName = 
"subscription-given-given-partitioned-topic";
-        const string consumerName = "consumer-given-partitioned-topic";
+        var topicName = CreateTopicName();
         const int partitions = 3;
         _fixture.CreatePartitionedTopic(topicName, partitions);
 
         await using var client = CreateClient();
-        await using var consumer = CreateConsumer(client, 
SubscriptionInitialPosition.Earliest, topicName, consumerName, 
subscriptionName);
+        await using var consumer = CreateConsumer(client, topicName);
 
         //Act
         var exception = await Record.ExceptionAsync(() => 
consumer.GetLastMessageId().AsTask());
@@ -109,20 +101,17 @@ public class ConsumerTests
     public async Task 
GetLastMessageIds_GivenNonPartitionedTopic_ShouldGetMessageIdFromPartition()
     {
         //Arrange
-        const string topicName = 
"consumer-getlastmessageids-given-non-partitioned-topic";
-        const string subscriptionName = "subscription-should_have-3-topics";
-        const string consumerName = "consumer-should_have-3-topics";
-        const string content = "test-message";
+        var topicName = CreateTopicName();
         const int numberOfMessages = 6;
 
         await using var client = CreateClient();
-        await using var consumer = CreateConsumer(client, 
SubscriptionInitialPosition.Earliest, topicName, consumerName, 
subscriptionName);
+        await using var consumer = CreateConsumer(client, topicName);
         await using var producer = CreateProducer(client, topicName);
 
         var expected = new List<MessageId>();
         for (var i = 0; i < numberOfMessages; i++)
         {
-            var messageId = await producer.Send(content);
+            var messageId = await producer.Send("test-message");
             if (i >= 5)
             {
                 expected.Add(messageId);
@@ -140,22 +129,19 @@ public class ConsumerTests
     public async Task 
GetLastMessageIds_GivenPartitionedTopic_ShouldGetMessageIdFromAllPartitions()
     {
         //Arrange
-        const string topicName = 
"consumer-getlastmessageids-given-partitioned-topic";
-        const string subscriptionName = "subscription-should_have-3-topics";
-        const string consumerName = "consumer-should_have-3-topics";
-        const string content = "test-message";
+        var topicName = CreateTopicName();
         const int numberOfMessages = 6;
         const int partitions = 3;
-        
_fixture.CreatePartitionedTopic($"persistent://public/default/{topicName}", 
partitions);
+        _fixture.CreatePartitionedTopic(topicName, partitions);
 
         await using var client = CreateClient();
-        await using var consumer = CreateConsumer(client, 
SubscriptionInitialPosition.Earliest, topicName, consumerName, 
subscriptionName);
+        await using var consumer = CreateConsumer(client, topicName);
         await using var producer = CreateProducer(client, topicName);
 
         var expected = new List<MessageId>();
         for (var i = 0; i < numberOfMessages; i++)
         {
-            var messageId = await producer.Send(content);
+            var messageId = await producer.Send("test-message");
             if (i >= 3)
             {
                 expected.Add(messageId);
@@ -173,11 +159,8 @@ public class ConsumerTests
     public async Task 
GetLastMessageIds_GivenEmptyTopic_ShouldBeEqualToMessageIdEarliest()
     {
         //Arrange
-        const string topicName = 
$"consumer-{nameof(GetLastMessageIds_GivenEmptyTopic_ShouldBeEqualToMessageIdEarliest)}";
-        const string subscriptionName = "subscription-given-given-empty-topic";
-        const string consumerName = $"consumer-given-empty-topic";
         await using var client = CreateClient();
-        await using var consumer = CreateConsumer(client, 
SubscriptionInitialPosition.Earliest, topicName, consumerName, 
subscriptionName);
+        await using var consumer = CreateConsumer(client);
         var expected = new List<MessageId>() { MessageId.Earliest };
 
         //Act
@@ -191,20 +174,17 @@ public class ConsumerTests
     public async Task Receive_GivenNonPartitionedTopic_ShouldReceiveAll()
     {
         //Arrange
-        const string topicName = 
"persistent://public/default/consumer-given-topic-with-messages";
-        const string subscriptionName = 
"subscription-given-topic-with-messages";
-        const string consumerName = "consumer-given-topic-with-messages";
-        const string content = "test-message";
+        var topicName = CreateTopicName();
         const int numberOfMessages = 10000;
 
         await using var client = CreateClient();
-        await using var consumer = CreateConsumer(client, 
SubscriptionInitialPosition.Earliest, topicName, consumerName, 
subscriptionName);
+        await using var consumer = CreateConsumer(client, topicName);
         await using var producer = CreateProducer(client, topicName);
 
         var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60));
 
         //Act
-        var produced = await ProduceMessages(producer, numberOfMessages, 
content, cts.Token);
+        var produced = await ProduceMessages(producer, numberOfMessages, 
"test-message", cts.Token);
         var consumed = await ConsumeMessages(consumer, numberOfMessages, 
cts.Token);
 
         //Assert
@@ -215,23 +195,20 @@ public class ConsumerTests
     public async Task Receive_GivenPartitionedTopic_ShouldReceiveAll()
     {
         //Arrange
-        const string subscriptionName = 
"subscription-given-non-partitioned-topic-with-messages";
-        const string consumerName = 
"consumer-given-non-partitioned-topic-with-messages";
-        const string topicName = "consumer-with-3-partitions-test";
-        const string content = "test-message";
-        const int numberOfMessages = 10000;
+        var topicName = CreateTopicName();
+        const int numberOfMessages = 1000;
         const int partitions = 3;
 
-        
_fixture.CreatePartitionedTopic($"persistent://public/default/{topicName}", 
partitions);
+        _fixture.CreatePartitionedTopic(topicName, partitions);
 
         await using var client = CreateClient();
-        await using var consumer = CreateConsumer(client, 
SubscriptionInitialPosition.Earliest, topicName, consumerName, 
subscriptionName);
+        await using var consumer = CreateConsumer(client, topicName);
         await using var producer = CreateProducer(client, topicName);
 
         var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60));
 
         //Act
-        var produced = await ProduceMessages(producer, numberOfMessages, 
content, cts.Token);
+        var produced = await ProduceMessages(producer, numberOfMessages, 
"test-message", cts.Token);
         var consumed = await ConsumeMessages(consumer, numberOfMessages, 
cts.Token);
 
         //Assert
@@ -251,7 +228,7 @@ public class ConsumerTests
         })
         .ServiceUrl(new Uri("pulsar://nosuchhost")).Build();
 
-        await using var consumer = CreateConsumer(client, 
SubscriptionInitialPosition.Earliest, "persistent://a/b/c", "cn", "sn");
+        await using var consumer = CreateConsumer(client);
 
         var receiveTask = consumer.Receive().AsTask();
         semaphoreSlim.Release();
@@ -274,7 +251,7 @@ public class ConsumerTests
         })
         .ServiceUrl(new Uri("pulsar://nosuchhost")).Build();
 
-        await using var consumer = CreateConsumer(client, 
SubscriptionInitialPosition.Earliest, "persistent://a/b/c", "cn", "sn");
+        await using var consumer = CreateConsumer(client);
 
         await consumer.OnStateChangeTo(ConsumerState.Faulted);
 
@@ -322,31 +299,30 @@ public class ConsumerTests
     private void LogState(ProducerStateChanged stateChange)
         => _testOutputHelper.WriteLine($"The producer for topic 
'{stateChange.Producer.Topic}' changed state to '{stateChange.ProducerState}'");
 
-    private IProducer<string> CreateProducer(IPulsarClient pulsarClient, 
string topicName)
+    private static string CreateTopicName() => 
$"persistent://public/default/{Guid.NewGuid():N}";
+    private static string CreateConsumerName() => 
$"consumer-{Guid.NewGuid():N}";
+    private static string CreateSubscriptionName() => 
$"subscription-{Guid.NewGuid():N}";
+
+    private IProducer<string> CreateProducer(IPulsarClient pulsarClient, 
string? topicName = null)
         => pulsarClient.NewProducer(Schema.String)
-        .Topic(topicName)
+        .Topic(topicName is null ? CreateTopicName() : topicName)
         .StateChangedHandler(LogState)
         .Create();
 
-    private IConsumer<string> CreateConsumer(
-        IPulsarClient pulsarClient,
-        SubscriptionInitialPosition subscriptionInitialPosition,
-        string topicName,
-        string consumerName,
-        string subscriptionName)
+    private IConsumer<string> CreateConsumer(IPulsarClient pulsarClient, 
string? topicName = null)
         => pulsarClient.NewConsumer(Schema.String)
-        .ConsumerName(consumerName)
-        .InitialPosition(subscriptionInitialPosition)
-        .SubscriptionName(subscriptionName)
-        .Topic(topicName)
+        .ConsumerName(CreateConsumerName())
+        .InitialPosition(SubscriptionInitialPosition.Earliest)
+        .SubscriptionName(CreateSubscriptionName())
+        .Topic(topicName is null ? CreateTopicName() : topicName)
         .StateChangedHandler(LogState)
         .Create();
 
     private IPulsarClient CreateClient()
         => PulsarClient
-            .Builder()
-            .Authentication(AuthenticationFactory.Token(ct => 
ValueTask.FromResult(_fixture.CreateToken(Timeout.InfiniteTimeSpan))))
-            .ExceptionHandler(ec => _testOutputHelper.WriteLine($"Exception: 
{ec.Exception}"))
-            .ServiceUrl(_fixture.ServiceUrl)
-            .Build();
+        .Builder()
+        .Authentication(AuthenticationFactory.Token(ct => 
ValueTask.FromResult(_fixture.CreateToken(Timeout.InfiniteTimeSpan))))
+        .ExceptionHandler(ec => _testOutputHelper.WriteLine($"Exception: 
{ec.Exception}"))
+        .ServiceUrl(_fixture.ServiceUrl)
+        .Build();
 }
diff --git a/tests/DotPulsar.Tests/ProducerTests.cs 
b/tests/DotPulsar.Tests/Internal/ProducerTests.cs
similarity index 61%
rename from tests/DotPulsar.Tests/ProducerTests.cs
rename to tests/DotPulsar.Tests/Internal/ProducerTests.cs
index a7b8dca..b238409 100644
--- a/tests/DotPulsar.Tests/ProducerTests.cs
+++ b/tests/DotPulsar.Tests/Internal/ProducerTests.cs
@@ -12,14 +12,13 @@
  * limitations under the License.
  */
 
-namespace DotPulsar.Tests;
+namespace DotPulsar.Tests.Internal;
 
 using DotPulsar.Abstractions;
 using DotPulsar.Extensions;
 using FluentAssertions;
 using System;
 using System.Collections.Generic;
-using System.Text;
 using System.Threading;
 using System.Threading.Tasks;
 using Xunit;
@@ -42,48 +41,32 @@ public class ProducerTests
     public async Task 
SimpleProduceConsume_WhenSendingMessagesToProducer_ThenReceiveMessagesFromConsumer()
     {
         //Arrange
-        await using var client = CreateClient();
-        var topicName = $"simple-produce-consume{Guid.NewGuid():N}";
         const string content = "test-message";
+        var topicName = CreateTopicName();
+        await using var client = CreateClient();
+        await using var producer = CreateProducer(client, topicName);
+        await using var consumer = CreateConsumer(client, topicName);
 
         //Act
-        await using var producer = client.NewProducer(Schema.String)
-            .Topic(topicName)
-            .Create();
-
-        await using var consumer = client.NewConsumer(Schema.String)
-            .Topic(topicName)
-            .SubscriptionName("test-sub")
-            .InitialPosition(SubscriptionInitialPosition.Earliest)
-            .Create();
-
         await producer.Send(content);
-        _testOutputHelper.WriteLine($"Sent a message: {content}");
+        var message = await consumer.Receive();
 
         //Assert
-        (await consumer.Receive()).Value().Should().Be(content);
+        message.Value().Should().Be(content);
     }
 
     [Fact]
     public async Task 
SimpleProduceConsume_WhenSendingWithChannel_ThenReceiveMessagesFromConsumer()
     {
         //Arrange
-        await using var client = CreateClient();
-        var topicName = $"simple-produce-consume{Guid.NewGuid():N}";
         const string content = "test-message";
+        var topicName = CreateTopicName();
+        await using var client = CreateClient();
+        await using var producer = CreateProducer(client, topicName);
+        await using var consumer = CreateConsumer(client, topicName);
         const int msgCount = 3;
 
         //Act
-        await using var producer = client.NewProducer(Schema.String)
-            .Topic(topicName)
-            .Create();
-
-        await using var consumer = client.NewConsumer(Schema.String)
-            .Topic(topicName)
-            .SubscriptionName("test-sub")
-            .InitialPosition(SubscriptionInitialPosition.Earliest)
-            .Create();
-
         for (var i = 0; i < msgCount; i++)
         {
             await producer.SendChannel.Send(content);
@@ -109,28 +92,19 @@ public class ProducerTests
     public async Task 
TwoProducers_WhenConnectingSecond_ThenGoToExpectedState(ProducerAccessMode 
accessMode, ProducerState expectedState)
     {
         //Arrange
-        await using var client = CreateClient();
         var topicName = $"producer-access-mode{Guid.NewGuid():N}";
         var cts = new CancellationTokenSource(TestTimeout);
-
-        await using var producer1 = client.NewProducer(Schema.String)
-            .StateChangedHandler(x => _testOutputHelper.WriteLine($"Producer 1 
changed to state: {x.ProducerState}"))
-            .ProducerAccessMode(accessMode)
-            .Topic(topicName)
-            .Create();
-        await producer1.OnStateChangeTo(ProducerState.Connected, cts.Token);
+        await using var client = CreateClient();
 
         //Act
-        await using var producer2 = client.NewProducer(Schema.String)
-            .StateChangedHandler(x => _testOutputHelper.WriteLine($"Producer 2 
changed to state: {x.ProducerState}"))
-            .ProducerAccessMode(accessMode)
-            .Topic(topicName)
-            .Create();
+        await using var producer1 = CreateProducer(client, topicName, 
accessMode);
+        _ = await producer1.OnStateChangeTo(ProducerState.Connected, 
cts.Token);
 
-        var result = await producer2.OnStateChangeTo(expectedState, cts.Token);
+        await using var producer2 = CreateProducer(client, topicName, 
accessMode);
+        var actualState = await producer2.OnStateChangeTo(expectedState, 
cts.Token);
 
         //Assert
-        result.Should().Be(expectedState);
+        actualState.Should().Be(expectedState);
     }
 
     [Fact]
@@ -138,23 +112,16 @@ public class ProducerTests
     {
         //Arrange
         await using var client = CreateClient();
-        var topicName = $"producer-access-mode{Guid.NewGuid():N}";
+        var topicName = CreateTopicName();
         var cts = new CancellationTokenSource(TestTimeout);
 
-        await using var producer1 = client.NewProducer(Schema.String)
-            .StateChangedHandler(x => _testOutputHelper.WriteLine($"Producer 1 
changed to state: {x.ProducerState}"))
-            .ProducerAccessMode(ProducerAccessMode.ExclusiveWithFencing)
-            .Topic(topicName)
-            .Create();
+        await using var producer1 = CreateProducer(client, topicName, 
ProducerAccessMode.ExclusiveWithFencing);
         await producer1.OnStateChangeTo(ProducerState.Connected, cts.Token);
 
         //Act
-        await using var producer2 = client.NewProducer(Schema.String)
-            .StateChangedHandler(x => _testOutputHelper.WriteLine($"Producer 2 
changed to state: {x.ProducerState}"))
-            .ProducerAccessMode(ProducerAccessMode.ExclusiveWithFencing)
-            .Topic(topicName)
-            .Create();
+        await using var producer2 = CreateProducer(client, topicName, 
ProducerAccessMode.ExclusiveWithFencing);
         await producer2.OnStateChangeTo(ProducerState.Connected, cts.Token);
+
         try
         {
             // We need to send a message to trigger the disconnect
@@ -179,23 +146,15 @@ public class ProducerTests
     public async Task 
TwoProducers_WhenUsingDifferentAccessModes_ThenGoToExpectedStates(ProducerAccessMode
 accessMode1, ProducerAccessMode accessMode2, ProducerState producerState1, 
ProducerState producerState2)
     {
         //Arrange
-        await using var client = CreateClient();
-        var topicName = $"producer-access-mode{Guid.NewGuid():N}";
+        var topicName = CreateTopicName();
         var cts = new CancellationTokenSource(TestTimeout);
+        await using var client = CreateClient();
 
-        await using var producer1 = client.NewProducer(Schema.String)
-            .StateChangedHandler(x => _testOutputHelper.WriteLine($"Producer 1 
changed to state: {x.ProducerState}"))
-            .ProducerAccessMode(accessMode1)
-            .Topic(topicName)
-            .Create();
+        await using var producer1 = CreateProducer(client, topicName, 
accessMode1);
         await producer1.OnStateChangeTo(ProducerState.Connected, cts.Token);
 
         //Act
-        await using var producer2 = client.NewProducer(Schema.String)
-            .StateChangedHandler(x => _testOutputHelper.WriteLine($"Producer 2 
changed to state: {x.ProducerState}"))
-            .ProducerAccessMode(accessMode2)
-            .Topic(topicName)
-            .Create();
+        await using var producer2 = CreateProducer(client, topicName, 
accessMode2);
 
         var result1 = await producer1.OnStateChangeTo(producerState1, 
cts.Token);
         var result2 = await producer2.OnStateChangeTo(producerState2, 
cts.Token);
@@ -212,19 +171,15 @@ public class ProducerTests
         const string content = "test-message";
         const int partitions = 3;
         const int msgCount = 3;
-        var topicName = $"single-partitioned-{Guid.NewGuid():N}";
-        
_fixture.CreatePartitionedTopic($"persistent://public/default/{topicName}", 
partitions);
+        var topicName = CreateTopicName();
+        _fixture.CreatePartitionedTopic(topicName, partitions);
         await using var client = CreateClient();
 
         //Act
         var consumers = new List<IConsumer<string>>();
         for (var i = 0; i < partitions; ++i)
         {
-            consumers.Add(client.NewConsumer(Schema.String)
-                .Topic($"{topicName}-partition-{i}")
-                .SubscriptionName("test-sub")
-                .InitialPosition(SubscriptionInitialPosition.Earliest)
-                .Create());
+            consumers.Add(CreateConsumer(client, 
$"{topicName}-partition-{i}"));
         }
 
         for (var i = 0; i < partitions; ++i)
@@ -261,25 +216,19 @@ public class ProducerTests
         //Arrange
         await using var client = CreateClient();
 
-        var topicName = $"round-robin-partitioned-{Guid.NewGuid():N}";
+        var topicName = CreateTopicName();
         const string content = "test-message";
         const int partitions = 3;
         var consumers = new List<IConsumer<string>>();
 
-        
_fixture.CreatePartitionedTopic($"persistent://public/default/{topicName}", 
partitions);
+        _fixture.CreatePartitionedTopic(topicName, partitions);
 
         //Act
-        await using var producer = client.NewProducer(Schema.String)
-            .Topic(topicName)
-            .Create();
+        await using var producer = CreateProducer(client, topicName);
 
         for (var i = 0; i < partitions; ++i)
         {
-            consumers.Add(client.NewConsumer(Schema.String)
-                .Topic($"{topicName}-partition-{i}")
-                .SubscriptionName("test-sub")
-                .InitialPosition(SubscriptionInitialPosition.Earliest)
-                .Create());
+            consumers.Add(CreateConsumer(client, 
$"{topicName}-partition-{i}"));
             await producer.Send($"{content}-{i}");
             _testOutputHelper.WriteLine($"Sent a message to consumer [{i}]");
         }
@@ -295,26 +244,12 @@ public class ProducerTests
     public async Task 
Send_WhenProducingMessagesForOnePartition_ShouldPartitionOnlyBeNegativeOne()
     {
         //Arrange
-        var testRunId = Guid.NewGuid().ToString("N");
         const int numberOfMessages = 10;
-        var topic = $"persistent://public/default/producer-test-{testRunId}";
-
-        await using var client = PulsarClient.Builder()
-            .ServiceUrl(_fixture.ServiceUrl)
-            .Authentication(AuthenticationFactory.Token(ct => 
ValueTask.FromResult(_fixture.CreateToken(Timeout.InfiniteTimeSpan))))
-            .Build();
+        var topicName = CreateTopicName();
 
-        await using var consumer = client.NewConsumer(Schema.ByteArray)
-            .ConsumerName($"consumer-{testRunId}")
-            .InitialPosition(SubscriptionInitialPosition.Earliest)
-            .SubscriptionName($"subscription-{testRunId}")
-            .Topic(topic)
-            .Create();
-
-        await using var producer = client.NewProducer(Schema.ByteArray)
-            .ProducerName($"producer-{testRunId}")
-            .Topic(topic)
-            .Create();
+        await using var client = CreateClient();
+        await using var consumer = CreateConsumer(client, topicName);
+        await using var producer = CreateProducer(client, topicName);
 
         var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60));
 
@@ -342,29 +277,15 @@ public class ProducerTests
     public async Task 
Send_WhenProducingMessagesForFourPartitions_ShouldPartitionBeDifferentThanNegativeOne()
     {
         //Arrange
-        var testRunId = Guid.NewGuid().ToString("N");
         const int numberOfMessages = 10;
         const int partitions = 4;
-        var topicName = $"producer-with-4-partitions-test";
+        var topicName = CreateTopicName();
 
-        
_fixture.CreatePartitionedTopic($"persistent://public/default/{topicName}", 
partitions);
+        _fixture.CreatePartitionedTopic(topicName, partitions);
 
-        await using var client = PulsarClient.Builder()
-            .ServiceUrl(_fixture.ServiceUrl)
-            .Authentication(AuthenticationFactory.Token(ct => 
ValueTask.FromResult(_fixture.CreateToken(Timeout.InfiniteTimeSpan))))
-            .Build();
-
-        await using var consumer = client.NewConsumer(Schema.ByteArray)
-            .ConsumerName($"consumer-{testRunId}")
-            .InitialPosition(SubscriptionInitialPosition.Earliest)
-            .SubscriptionName($"subscription-{testRunId}")
-            .Topic(topicName)
-            .Create();
-
-        await using var producer = client.NewProducer(Schema.ByteArray)
-            .ProducerName($"producer-{testRunId}")
-            .Topic(topicName)
-            .Create();
+        await using var client = CreateClient();
+        await using var consumer = CreateConsumer(client, topicName);
+        await using var producer = CreateProducer(client, topicName);
 
         var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60));
 
@@ -388,20 +309,19 @@ public class ProducerTests
         foundNonNegativeOne.Should().Be(true);
     }
 
-    private static async Task<IEnumerable<MessageId>> 
ProduceMessages(IProducer<byte[]> producer, int numberOfMessages, 
CancellationToken ct)
+    private static async Task<IEnumerable<MessageId>> 
ProduceMessages(IProducer<string> producer, int numberOfMessages, 
CancellationToken ct)
     {
         var messageIds = new MessageId[numberOfMessages];
 
         for (var i = 0; i < numberOfMessages; ++i)
         {
-            var data = Encoding.UTF8.GetBytes($"Sent #{i} at 
{DateTimeOffset.UtcNow:s}");
-            messageIds[i] = await producer.Send(data, ct);
+            messageIds[i] = await producer.Send($"Sent #{i} at 
{DateTimeOffset.UtcNow:s}", ct);
         }
 
         return messageIds;
     }
 
-    private static async Task<IEnumerable<MessageId>> 
ConsumeMessages(IConsumer<byte[]> consumer, int numberOfMessages, 
CancellationToken ct)
+    private static async Task<IEnumerable<MessageId>> 
ConsumeMessages(IConsumer<string> consumer, int numberOfMessages, 
CancellationToken ct)
     {
         var messageIds = new List<MessageId>(numberOfMessages);
 
@@ -420,6 +340,35 @@ public class ProducerTests
         return messageIds;
     }
 
+    private void LogState(ConsumerStateChanged stateChange)
+        => _testOutputHelper.WriteLine($"The consumer for topic 
'{stateChange.Consumer.Topic}' changed state to '{stateChange.ConsumerState}'");
+
+    private void LogState(ProducerStateChanged stateChange)
+        => _testOutputHelper.WriteLine($"The producer for topic 
'{stateChange.Producer.Topic}' changed state to '{stateChange.ProducerState}'");
+
+    private static string CreateTopicName() => 
$"persistent://public/default/{Guid.NewGuid():N}";
+    private static string CreateConsumerName() => 
$"consumer-{Guid.NewGuid():N}";
+    private static string CreateSubscriptionName() => 
$"subscription-{Guid.NewGuid():N}";
+
+    private IProducer<string> CreateProducer(
+        IPulsarClient pulsarClient,
+        string? topicName = null,
+        ProducerAccessMode producerAccessMode = ProducerAccessMode.Shared)
+        =>  pulsarClient.NewProducer(Schema.String)
+        .Topic(topicName is null ? CreateTopicName() : topicName)
+        .ProducerAccessMode(producerAccessMode)
+        .StateChangedHandler(LogState)
+        .Create();
+
+    private IConsumer<string> CreateConsumer(IPulsarClient pulsarClient, 
string? topicName = null)
+        => pulsarClient.NewConsumer(Schema.String)
+        .ConsumerName(CreateConsumerName())
+        .InitialPosition(SubscriptionInitialPosition.Earliest)
+        .SubscriptionName(CreateSubscriptionName())
+        .Topic(topicName is null ? CreateTopicName() : topicName)
+        .StateChangedHandler(LogState)
+        .Create();
+
     private IPulsarClient CreateClient()
         => PulsarClient
             .Builder()
diff --git a/tests/DotPulsar.Tests/ReaderTests.cs 
b/tests/DotPulsar.Tests/Internal/ReaderTests.cs
similarity index 83%
rename from tests/DotPulsar.Tests/ReaderTests.cs
rename to tests/DotPulsar.Tests/Internal/ReaderTests.cs
index 7d8fb43..968f26d 100644
--- a/tests/DotPulsar.Tests/ReaderTests.cs
+++ b/tests/DotPulsar.Tests/Internal/ReaderTests.cs
@@ -12,7 +12,7 @@
  * limitations under the License.
  */
 
-namespace DotPulsar.Tests;
+namespace DotPulsar.Tests.Internal;
 
 using DotPulsar.Abstractions;
 using DotPulsar.Exceptions;
@@ -41,9 +41,8 @@ public class ReaderTests
     public async Task 
GetLastMessageId_GivenEmptyTopic_ShouldBeEqualToMessageIdEarliest()
     {
         //Arrange
-        const string topicName = 
$"reader-{nameof(GetLastMessageId_GivenEmptyTopic_ShouldBeEqualToMessageIdEarliest)}";
         await using var client = CreateClient();
-        await using var reader = CreateReader(client, MessageId.Earliest, 
topicName);
+        await using var reader = CreateReader(client, MessageId.Earliest);
 
         //Act
         var actual = await reader.GetLastMessageId();
@@ -56,8 +55,7 @@ public class ReaderTests
     public async Task 
GetLastMessageId_GivenNonPartitionedTopic_ShouldGetMessageId()
     {
         //Arrange
-        const string topicName = 
"persistent://public/default/reader-non-partitioned-topic-get-last-MessageId";
-        const string content = "test-message";
+        var topicName = CreateTopicName();
         const int numberOfMessages = 6;
 
         await using var client = CreateClient();
@@ -67,7 +65,7 @@ public class ReaderTests
         MessageId expected = null!;
         for (var i = 0; i < numberOfMessages; i++)
         {
-            var messageId = await producer.Send(content);
+            var messageId = await producer.Send("test-message");
             if (i >= 5)
             {
                 expected = messageId;
@@ -85,7 +83,7 @@ public class ReaderTests
     public async Task 
GetLastMessageId_GivenPartitionedTopic_ShouldThrowException()
     {
         //Arrange
-        const string topicName = 
"persistent://public/default/reader-partitioned-topic-get-last-MessageId";
+        var topicName = CreateTopicName();
         const int partitions = 3;
         _fixture.CreatePartitionedTopic(topicName, partitions);
 
@@ -103,9 +101,8 @@ public class ReaderTests
     public async Task 
GetLastMessageIds_GivenEmptyTopic_ShouldBeEqualToMessageIdEarliest()
     {
         //Arrange
-        const string topicName = 
$"reader-{nameof(GetLastMessageIds_GivenEmptyTopic_ShouldBeEqualToMessageIdEarliest)}";
         await using var client = CreateClient();
-        await using var reader = CreateReader(client, MessageId.Earliest, 
topicName);
+        await using var reader = CreateReader(client, MessageId.Earliest);
         var expected = new List<MessageId>() { MessageId.Earliest };
 
         //Act
@@ -119,8 +116,7 @@ public class ReaderTests
     public async Task 
GetLastMessageIds_GivenNonPartitionedTopic_ShouldGetMessageIdFromPartition()
     {
         //Arrange
-        const string topicName = 
"reader-non-partitioned-topic-get-last-MessageIds";
-        const string content = "test-message";
+        var topicName = CreateTopicName();
         const int numberOfMessages = 6;
 
         await using var client = CreateClient();
@@ -130,7 +126,7 @@ public class ReaderTests
         var expected = new List<MessageId>();
         for (var i = 0; i < numberOfMessages; i++)
         {
-            var messageId = await producer.Send(content);
+            var messageId = await producer.Send("test-message");
             if (i >= 5)
             {
                 expected.Add(messageId);
@@ -147,11 +143,10 @@ public class ReaderTests
     public async Task 
GetLastMessageIds_GivenPartitionedTopic_ShouldGetMessageIdsFromPartitions()
     {
         //Arrange
-        const string topicName = 
"reader-partitioned-topic-get-last-MessageIds";
-        const string content = "test-message";
+        var topicName = CreateTopicName();
         const int numberOfMessages = 6;
         const int partitions = 3;
-        
_fixture.CreatePartitionedTopic($"persistent://public/default/{topicName}", 
partitions);
+        _fixture.CreatePartitionedTopic(topicName, partitions);
 
         await using var client = CreateClient();
         await using var reader = CreateReader(client, MessageId.Earliest, 
topicName);
@@ -160,7 +155,7 @@ public class ReaderTests
         var expected = new List<MessageId>();
         for (var i = 0; i < numberOfMessages; i++)
         {
-            var messageId = await producer.Send(content);
+            var messageId = await producer.Send("test-message");
             if (i >= 3)
             {
                 expected.Add(messageId);
@@ -178,7 +173,7 @@ public class ReaderTests
     public async Task Receive_GivenNonPartitionedTopic_ShouldReceiveAll()
     {
         //Arrange
-        const string topicName = 
"Receive_GivenNonPartitionedTopicWithMessages_ShouldReceiveAll";
+        var topicName = CreateTopicName();
         const int numberOfMessages = 10;
 
         await using var client = CreateClient();
@@ -208,10 +203,10 @@ public class ReaderTests
     public async Task Receive_GivenPartitionedTopic_ShouldReceiveAll()
     {
         //Arrange
-        const string topicName = "reader-with-3-partitions-test";
+        var topicName = CreateTopicName();
         const int numberOfMessages = 50;
         const int partitions = 3;
-        
_fixture.CreatePartitionedTopic($"persistent://public/default/{topicName}", 
partitions);
+        _fixture.CreatePartitionedTopic(topicName, partitions);
 
         await using var client = CreateClient();
         await using var producer = CreateProducer(client, topicName);
@@ -249,7 +244,7 @@ public class ReaderTests
         })
         .ServiceUrl(new Uri("pulsar://nosuchhost")).Build();
 
-        await using var reader = CreateReader(client, MessageId.Earliest, 
"persistent://a/b/c");
+        await using var reader = CreateReader(client, MessageId.Earliest, 
CreateTopicName());
 
         var receiveTask = reader.Receive().AsTask();
         semaphoreSlim.Release();
@@ -272,7 +267,7 @@ public class ReaderTests
         })
         .ServiceUrl(new Uri("pulsar://nosuchhost")).Build();
 
-        await using var reader = CreateReader(client, MessageId.Earliest, 
"persistent://a/b/c");
+        await using var reader = CreateReader(client, MessageId.Earliest, 
CreateTopicName());
 
         await reader.OnStateChangeTo(ReaderState.Faulted);
 
@@ -289,14 +284,18 @@ public class ReaderTests
     private void LogState(ProducerStateChanged stateChange)
         => _testOutputHelper.WriteLine($"The producer for topic 
'{stateChange.Producer.Topic}' changed state to '{stateChange.ProducerState}'");
 
-    private IProducer<String> CreateProducer(IPulsarClient pulsarClient, 
string topicName) => pulsarClient.NewProducer(Schema.String)
-        .Topic(topicName)
+    private static string CreateTopicName() => 
$"persistent://public/default/{Guid.NewGuid():N}";
+
+    private IProducer<string> CreateProducer(IPulsarClient pulsarClient, 
string? topicName = null)
+        => pulsarClient.NewProducer(Schema.String)
+        .Topic(topicName is null ? CreateTopicName() : topicName)
         .StateChangedHandler(LogState)
         .Create();
 
-    private IReader<String> CreateReader(IPulsarClient pulsarClient, MessageId 
messageId, string topicName) => pulsarClient.NewReader(Schema.String)
+    private IReader<string> CreateReader(IPulsarClient pulsarClient, MessageId 
messageId, string? topicName = null)
+        => pulsarClient.NewReader(Schema.String)
         .StartMessageId(messageId)
-        .Topic(topicName)
+        .Topic(topicName is null ? CreateTopicName() : topicName)
         .StateChangedHandler(LogState)
         .Create();
 
diff --git a/tests/DotPulsar.Tests/TokenTests.cs 
b/tests/DotPulsar.Tests/PulsarClientTests.cs
similarity index 85%
rename from tests/DotPulsar.Tests/TokenTests.cs
rename to tests/DotPulsar.Tests/PulsarClientTests.cs
index 3759125..c5956bc 100644
--- a/tests/DotPulsar.Tests/TokenTests.cs
+++ b/tests/DotPulsar.Tests/PulsarClientTests.cs
@@ -25,14 +25,14 @@ using Xunit;
 using Xunit.Abstractions;
 
 [Collection("Integration"), Trait("Category", "Integration")]
-public class TokenTests
+public class PulsarClientTests
 {
     private const string MyTopic = "persistent://public/default/mytopic";
 
     private readonly IntegrationFixture _fixture;
     private readonly ITestOutputHelper _testOutputHelper;
 
-    public TokenTests(IntegrationFixture fixture, ITestOutputHelper 
outputHelper)
+    public PulsarClientTests(IntegrationFixture fixture, ITestOutputHelper 
outputHelper)
     {
         _fixture = fixture;
         _testOutputHelper = outputHelper;
@@ -142,22 +142,9 @@ public class TokenTests
         => client
             .NewProducer(Schema.String)
             .Topic(MyTopic)
-            .StateChangedHandler(Monitor)
+            .StateChangedHandler(LogState)
             .Create();
 
-    private void Monitor(ProducerStateChanged stateChanged, CancellationToken 
_)
-    {
-        var stateMessage = stateChanged.ProducerState switch
-        {
-            ProducerState.Connected => "is connected",
-            ProducerState.Disconnected => "is disconnected",
-            ProducerState.PartiallyConnected => "is partially connected",
-            ProducerState.Closed => "has closed",
-            ProducerState.Faulted => "has faulted",
-            _ => $"has an unknown state '{stateChanged.ProducerState}'"
-        };
-
-        var topic = stateChanged.Producer.Topic;
-        _testOutputHelper.WriteLine($"The producer for topic '{topic}' 
{stateMessage}");
-    }
+    private void LogState(ProducerStateChanged stateChange)
+        => _testOutputHelper.WriteLine($"The producer for topic 
'{stateChange.Producer.Topic}' changed state to '{stateChange.ProducerState}'");
 }


Reply via email to