This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 0cb94d8 Refactor tests
0cb94d8 is described below
commit 0cb94d81d8f072f9a142399966e2b291b9c4e533
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Wed Sep 20 12:42:37 2023 +0200
Refactor tests
---
.../{ => Internal}/ConsumerTests.cs | 106 +++++------
.../{ => Internal}/ProducerTests.cs | 197 ++++++++-------------
.../DotPulsar.Tests/{ => Internal}/ReaderTests.cs | 49 +++--
.../{TokenTests.cs => PulsarClientTests.cs} | 23 +--
4 files changed, 143 insertions(+), 232 deletions(-)
diff --git a/tests/DotPulsar.Tests/ConsumerTests.cs
b/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
similarity index 63%
rename from tests/DotPulsar.Tests/ConsumerTests.cs
rename to tests/DotPulsar.Tests/Internal/ConsumerTests.cs
index 4f7414d..3cd6d8b 100644
--- a/tests/DotPulsar.Tests/ConsumerTests.cs
+++ b/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
@@ -12,7 +12,7 @@
* limitations under the License.
*/
-namespace DotPulsar.Tests;
+namespace DotPulsar.Tests.Internal;
using DotPulsar.Abstractions;
using DotPulsar.Exceptions;
@@ -41,11 +41,8 @@ public class ConsumerTests
public async Task
GetLastMessageId_GivenEmptyTopic_ShouldBeEqualToMessageIdEarliest()
{
//Arrange
- const string topicName =
$"consumer-{nameof(GetLastMessageId_GivenEmptyTopic_ShouldBeEqualToMessageIdEarliest)}";
- const string subscriptionName = "subscription-given-given-empty-topic";
- const string consumerName = $"consumer-given-empty-topic";
await using var client = CreateClient();
- await using var consumer = CreateConsumer(client,
SubscriptionInitialPosition.Earliest, topicName, consumerName,
subscriptionName);
+ await using var consumer = CreateConsumer(client);
//Act
var actual = await consumer.GetLastMessageId();
@@ -58,20 +55,17 @@ public class ConsumerTests
public async Task
GetLastMessageId_GivenNonPartitionedTopic_ShouldGetMessageIdFromPartition()
{
//Arrange
- const string topicName =
"persistent://public/default/consumer-getlastmessageid-given-non-partitioned-topic";
- const string subscriptionName =
"subscription-given-given-partitioned-topic";
- const string consumerName = $"consumer-given-partitioned-topic";
- const string content = "test-message";
+ var topicName = CreateTopicName();
const int numberOfMessages = 6;
await using var client = CreateClient();
- await using var consumer = CreateConsumer(client,
SubscriptionInitialPosition.Earliest, topicName, consumerName,
subscriptionName);
+ await using var consumer = CreateConsumer(client, topicName);
await using var producer = CreateProducer(client, topicName);
MessageId expected = null!;
for (var i = 0; i < numberOfMessages; i++)
{
- var messageId = await producer.Send(content);
+ var messageId = await producer.Send("test-message");
if (i >= 5)
{
expected = messageId;
@@ -89,14 +83,12 @@ public class ConsumerTests
public async Task
GetLastMessageId_GivenPartitionedTopic_ShouldThrowException()
{
//Arrange
- const string topicName =
"persistent://public/default/consumer-getlastmessageid-given-partitioned-topic";
- const string subscriptionName =
"subscription-given-given-partitioned-topic";
- const string consumerName = "consumer-given-partitioned-topic";
+ var topicName = CreateTopicName();
const int partitions = 3;
_fixture.CreatePartitionedTopic(topicName, partitions);
await using var client = CreateClient();
- await using var consumer = CreateConsumer(client,
SubscriptionInitialPosition.Earliest, topicName, consumerName,
subscriptionName);
+ await using var consumer = CreateConsumer(client, topicName);
//Act
var exception = await Record.ExceptionAsync(() =>
consumer.GetLastMessageId().AsTask());
@@ -109,20 +101,17 @@ public class ConsumerTests
public async Task
GetLastMessageIds_GivenNonPartitionedTopic_ShouldGetMessageIdFromPartition()
{
//Arrange
- const string topicName =
"consumer-getlastmessageids-given-non-partitioned-topic";
- const string subscriptionName = "subscription-should_have-3-topics";
- const string consumerName = "consumer-should_have-3-topics";
- const string content = "test-message";
+ var topicName = CreateTopicName();
const int numberOfMessages = 6;
await using var client = CreateClient();
- await using var consumer = CreateConsumer(client,
SubscriptionInitialPosition.Earliest, topicName, consumerName,
subscriptionName);
+ await using var consumer = CreateConsumer(client, topicName);
await using var producer = CreateProducer(client, topicName);
var expected = new List<MessageId>();
for (var i = 0; i < numberOfMessages; i++)
{
- var messageId = await producer.Send(content);
+ var messageId = await producer.Send("test-message");
if (i >= 5)
{
expected.Add(messageId);
@@ -140,22 +129,19 @@ public class ConsumerTests
public async Task
GetLastMessageIds_GivenPartitionedTopic_ShouldGetMessageIdFromAllPartitions()
{
//Arrange
- const string topicName =
"consumer-getlastmessageids-given-partitioned-topic";
- const string subscriptionName = "subscription-should_have-3-topics";
- const string consumerName = "consumer-should_have-3-topics";
- const string content = "test-message";
+ var topicName = CreateTopicName();
const int numberOfMessages = 6;
const int partitions = 3;
-
_fixture.CreatePartitionedTopic($"persistent://public/default/{topicName}",
partitions);
+ _fixture.CreatePartitionedTopic(topicName, partitions);
await using var client = CreateClient();
- await using var consumer = CreateConsumer(client,
SubscriptionInitialPosition.Earliest, topicName, consumerName,
subscriptionName);
+ await using var consumer = CreateConsumer(client, topicName);
await using var producer = CreateProducer(client, topicName);
var expected = new List<MessageId>();
for (var i = 0; i < numberOfMessages; i++)
{
- var messageId = await producer.Send(content);
+ var messageId = await producer.Send("test-message");
if (i >= 3)
{
expected.Add(messageId);
@@ -173,11 +159,8 @@ public class ConsumerTests
public async Task
GetLastMessageIds_GivenEmptyTopic_ShouldBeEqualToMessageIdEarliest()
{
//Arrange
- const string topicName =
$"consumer-{nameof(GetLastMessageIds_GivenEmptyTopic_ShouldBeEqualToMessageIdEarliest)}";
- const string subscriptionName = "subscription-given-given-empty-topic";
- const string consumerName = $"consumer-given-empty-topic";
await using var client = CreateClient();
- await using var consumer = CreateConsumer(client,
SubscriptionInitialPosition.Earliest, topicName, consumerName,
subscriptionName);
+ await using var consumer = CreateConsumer(client);
var expected = new List<MessageId>() { MessageId.Earliest };
//Act
@@ -191,20 +174,17 @@ public class ConsumerTests
public async Task Receive_GivenNonPartitionedTopic_ShouldReceiveAll()
{
//Arrange
- const string topicName =
"persistent://public/default/consumer-given-topic-with-messages";
- const string subscriptionName =
"subscription-given-topic-with-messages";
- const string consumerName = "consumer-given-topic-with-messages";
- const string content = "test-message";
+ var topicName = CreateTopicName();
const int numberOfMessages = 10000;
await using var client = CreateClient();
- await using var consumer = CreateConsumer(client,
SubscriptionInitialPosition.Earliest, topicName, consumerName,
subscriptionName);
+ await using var consumer = CreateConsumer(client, topicName);
await using var producer = CreateProducer(client, topicName);
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60));
//Act
- var produced = await ProduceMessages(producer, numberOfMessages,
content, cts.Token);
+ var produced = await ProduceMessages(producer, numberOfMessages,
"test-message", cts.Token);
var consumed = await ConsumeMessages(consumer, numberOfMessages,
cts.Token);
//Assert
@@ -215,23 +195,20 @@ public class ConsumerTests
public async Task Receive_GivenPartitionedTopic_ShouldReceiveAll()
{
//Arrange
- const string subscriptionName =
"subscription-given-non-partitioned-topic-with-messages";
- const string consumerName =
"consumer-given-non-partitioned-topic-with-messages";
- const string topicName = "consumer-with-3-partitions-test";
- const string content = "test-message";
- const int numberOfMessages = 10000;
+ var topicName = CreateTopicName();
+ const int numberOfMessages = 1000;
const int partitions = 3;
-
_fixture.CreatePartitionedTopic($"persistent://public/default/{topicName}",
partitions);
+ _fixture.CreatePartitionedTopic(topicName, partitions);
await using var client = CreateClient();
- await using var consumer = CreateConsumer(client,
SubscriptionInitialPosition.Earliest, topicName, consumerName,
subscriptionName);
+ await using var consumer = CreateConsumer(client, topicName);
await using var producer = CreateProducer(client, topicName);
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60));
//Act
- var produced = await ProduceMessages(producer, numberOfMessages,
content, cts.Token);
+ var produced = await ProduceMessages(producer, numberOfMessages,
"test-message", cts.Token);
var consumed = await ConsumeMessages(consumer, numberOfMessages,
cts.Token);
//Assert
@@ -251,7 +228,7 @@ public class ConsumerTests
})
.ServiceUrl(new Uri("pulsar://nosuchhost")).Build();
- await using var consumer = CreateConsumer(client,
SubscriptionInitialPosition.Earliest, "persistent://a/b/c", "cn", "sn");
+ await using var consumer = CreateConsumer(client);
var receiveTask = consumer.Receive().AsTask();
semaphoreSlim.Release();
@@ -274,7 +251,7 @@ public class ConsumerTests
})
.ServiceUrl(new Uri("pulsar://nosuchhost")).Build();
- await using var consumer = CreateConsumer(client,
SubscriptionInitialPosition.Earliest, "persistent://a/b/c", "cn", "sn");
+ await using var consumer = CreateConsumer(client);
await consumer.OnStateChangeTo(ConsumerState.Faulted);
@@ -322,31 +299,30 @@ public class ConsumerTests
private void LogState(ProducerStateChanged stateChange)
=> _testOutputHelper.WriteLine($"The producer for topic
'{stateChange.Producer.Topic}' changed state to '{stateChange.ProducerState}'");
- private IProducer<string> CreateProducer(IPulsarClient pulsarClient,
string topicName)
+ private static string CreateTopicName() =>
$"persistent://public/default/{Guid.NewGuid():N}";
+ private static string CreateConsumerName() =>
$"consumer-{Guid.NewGuid():N}";
+ private static string CreateSubscriptionName() =>
$"subscription-{Guid.NewGuid():N}";
+
+ private IProducer<string> CreateProducer(IPulsarClient pulsarClient,
string? topicName = null)
=> pulsarClient.NewProducer(Schema.String)
- .Topic(topicName)
+ .Topic(topicName is null ? CreateTopicName() : topicName)
.StateChangedHandler(LogState)
.Create();
- private IConsumer<string> CreateConsumer(
- IPulsarClient pulsarClient,
- SubscriptionInitialPosition subscriptionInitialPosition,
- string topicName,
- string consumerName,
- string subscriptionName)
+ private IConsumer<string> CreateConsumer(IPulsarClient pulsarClient,
string? topicName = null)
=> pulsarClient.NewConsumer(Schema.String)
- .ConsumerName(consumerName)
- .InitialPosition(subscriptionInitialPosition)
- .SubscriptionName(subscriptionName)
- .Topic(topicName)
+ .ConsumerName(CreateConsumerName())
+ .InitialPosition(SubscriptionInitialPosition.Earliest)
+ .SubscriptionName(CreateSubscriptionName())
+ .Topic(topicName is null ? CreateTopicName() : topicName)
.StateChangedHandler(LogState)
.Create();
private IPulsarClient CreateClient()
=> PulsarClient
- .Builder()
- .Authentication(AuthenticationFactory.Token(ct =>
ValueTask.FromResult(_fixture.CreateToken(Timeout.InfiniteTimeSpan))))
- .ExceptionHandler(ec => _testOutputHelper.WriteLine($"Exception:
{ec.Exception}"))
- .ServiceUrl(_fixture.ServiceUrl)
- .Build();
+ .Builder()
+ .Authentication(AuthenticationFactory.Token(ct =>
ValueTask.FromResult(_fixture.CreateToken(Timeout.InfiniteTimeSpan))))
+ .ExceptionHandler(ec => _testOutputHelper.WriteLine($"Exception:
{ec.Exception}"))
+ .ServiceUrl(_fixture.ServiceUrl)
+ .Build();
}
diff --git a/tests/DotPulsar.Tests/ProducerTests.cs
b/tests/DotPulsar.Tests/Internal/ProducerTests.cs
similarity index 61%
rename from tests/DotPulsar.Tests/ProducerTests.cs
rename to tests/DotPulsar.Tests/Internal/ProducerTests.cs
index a7b8dca..b238409 100644
--- a/tests/DotPulsar.Tests/ProducerTests.cs
+++ b/tests/DotPulsar.Tests/Internal/ProducerTests.cs
@@ -12,14 +12,13 @@
* limitations under the License.
*/
-namespace DotPulsar.Tests;
+namespace DotPulsar.Tests.Internal;
using DotPulsar.Abstractions;
using DotPulsar.Extensions;
using FluentAssertions;
using System;
using System.Collections.Generic;
-using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Xunit;
@@ -42,48 +41,32 @@ public class ProducerTests
public async Task
SimpleProduceConsume_WhenSendingMessagesToProducer_ThenReceiveMessagesFromConsumer()
{
//Arrange
- await using var client = CreateClient();
- var topicName = $"simple-produce-consume{Guid.NewGuid():N}";
const string content = "test-message";
+ var topicName = CreateTopicName();
+ await using var client = CreateClient();
+ await using var producer = CreateProducer(client, topicName);
+ await using var consumer = CreateConsumer(client, topicName);
//Act
- await using var producer = client.NewProducer(Schema.String)
- .Topic(topicName)
- .Create();
-
- await using var consumer = client.NewConsumer(Schema.String)
- .Topic(topicName)
- .SubscriptionName("test-sub")
- .InitialPosition(SubscriptionInitialPosition.Earliest)
- .Create();
-
await producer.Send(content);
- _testOutputHelper.WriteLine($"Sent a message: {content}");
+ var message = await consumer.Receive();
//Assert
- (await consumer.Receive()).Value().Should().Be(content);
+ message.Value().Should().Be(content);
}
[Fact]
public async Task
SimpleProduceConsume_WhenSendingWithChannel_ThenReceiveMessagesFromConsumer()
{
//Arrange
- await using var client = CreateClient();
- var topicName = $"simple-produce-consume{Guid.NewGuid():N}";
const string content = "test-message";
+ var topicName = CreateTopicName();
+ await using var client = CreateClient();
+ await using var producer = CreateProducer(client, topicName);
+ await using var consumer = CreateConsumer(client, topicName);
const int msgCount = 3;
//Act
- await using var producer = client.NewProducer(Schema.String)
- .Topic(topicName)
- .Create();
-
- await using var consumer = client.NewConsumer(Schema.String)
- .Topic(topicName)
- .SubscriptionName("test-sub")
- .InitialPosition(SubscriptionInitialPosition.Earliest)
- .Create();
-
for (var i = 0; i < msgCount; i++)
{
await producer.SendChannel.Send(content);
@@ -109,28 +92,19 @@ public class ProducerTests
public async Task
TwoProducers_WhenConnectingSecond_ThenGoToExpectedState(ProducerAccessMode
accessMode, ProducerState expectedState)
{
//Arrange
- await using var client = CreateClient();
var topicName = $"producer-access-mode{Guid.NewGuid():N}";
var cts = new CancellationTokenSource(TestTimeout);
-
- await using var producer1 = client.NewProducer(Schema.String)
- .StateChangedHandler(x => _testOutputHelper.WriteLine($"Producer 1
changed to state: {x.ProducerState}"))
- .ProducerAccessMode(accessMode)
- .Topic(topicName)
- .Create();
- await producer1.OnStateChangeTo(ProducerState.Connected, cts.Token);
+ await using var client = CreateClient();
//Act
- await using var producer2 = client.NewProducer(Schema.String)
- .StateChangedHandler(x => _testOutputHelper.WriteLine($"Producer 2
changed to state: {x.ProducerState}"))
- .ProducerAccessMode(accessMode)
- .Topic(topicName)
- .Create();
+ await using var producer1 = CreateProducer(client, topicName,
accessMode);
+ _ = await producer1.OnStateChangeTo(ProducerState.Connected,
cts.Token);
- var result = await producer2.OnStateChangeTo(expectedState, cts.Token);
+ await using var producer2 = CreateProducer(client, topicName,
accessMode);
+ var actualState = await producer2.OnStateChangeTo(expectedState,
cts.Token);
//Assert
- result.Should().Be(expectedState);
+ actualState.Should().Be(expectedState);
}
[Fact]
@@ -138,23 +112,16 @@ public class ProducerTests
{
//Arrange
await using var client = CreateClient();
- var topicName = $"producer-access-mode{Guid.NewGuid():N}";
+ var topicName = CreateTopicName();
var cts = new CancellationTokenSource(TestTimeout);
- await using var producer1 = client.NewProducer(Schema.String)
- .StateChangedHandler(x => _testOutputHelper.WriteLine($"Producer 1
changed to state: {x.ProducerState}"))
- .ProducerAccessMode(ProducerAccessMode.ExclusiveWithFencing)
- .Topic(topicName)
- .Create();
+ await using var producer1 = CreateProducer(client, topicName,
ProducerAccessMode.ExclusiveWithFencing);
await producer1.OnStateChangeTo(ProducerState.Connected, cts.Token);
//Act
- await using var producer2 = client.NewProducer(Schema.String)
- .StateChangedHandler(x => _testOutputHelper.WriteLine($"Producer 2
changed to state: {x.ProducerState}"))
- .ProducerAccessMode(ProducerAccessMode.ExclusiveWithFencing)
- .Topic(topicName)
- .Create();
+ await using var producer2 = CreateProducer(client, topicName,
ProducerAccessMode.ExclusiveWithFencing);
await producer2.OnStateChangeTo(ProducerState.Connected, cts.Token);
+
try
{
// We need to send a message to trigger the disconnect
@@ -179,23 +146,15 @@ public class ProducerTests
public async Task
TwoProducers_WhenUsingDifferentAccessModes_ThenGoToExpectedStates(ProducerAccessMode
accessMode1, ProducerAccessMode accessMode2, ProducerState producerState1,
ProducerState producerState2)
{
//Arrange
- await using var client = CreateClient();
- var topicName = $"producer-access-mode{Guid.NewGuid():N}";
+ var topicName = CreateTopicName();
var cts = new CancellationTokenSource(TestTimeout);
+ await using var client = CreateClient();
- await using var producer1 = client.NewProducer(Schema.String)
- .StateChangedHandler(x => _testOutputHelper.WriteLine($"Producer 1
changed to state: {x.ProducerState}"))
- .ProducerAccessMode(accessMode1)
- .Topic(topicName)
- .Create();
+ await using var producer1 = CreateProducer(client, topicName,
accessMode1);
await producer1.OnStateChangeTo(ProducerState.Connected, cts.Token);
//Act
- await using var producer2 = client.NewProducer(Schema.String)
- .StateChangedHandler(x => _testOutputHelper.WriteLine($"Producer 2
changed to state: {x.ProducerState}"))
- .ProducerAccessMode(accessMode2)
- .Topic(topicName)
- .Create();
+ await using var producer2 = CreateProducer(client, topicName,
accessMode2);
var result1 = await producer1.OnStateChangeTo(producerState1,
cts.Token);
var result2 = await producer2.OnStateChangeTo(producerState2,
cts.Token);
@@ -212,19 +171,15 @@ public class ProducerTests
const string content = "test-message";
const int partitions = 3;
const int msgCount = 3;
- var topicName = $"single-partitioned-{Guid.NewGuid():N}";
-
_fixture.CreatePartitionedTopic($"persistent://public/default/{topicName}",
partitions);
+ var topicName = CreateTopicName();
+ _fixture.CreatePartitionedTopic(topicName, partitions);
await using var client = CreateClient();
//Act
var consumers = new List<IConsumer<string>>();
for (var i = 0; i < partitions; ++i)
{
- consumers.Add(client.NewConsumer(Schema.String)
- .Topic($"{topicName}-partition-{i}")
- .SubscriptionName("test-sub")
- .InitialPosition(SubscriptionInitialPosition.Earliest)
- .Create());
+ consumers.Add(CreateConsumer(client,
$"{topicName}-partition-{i}"));
}
for (var i = 0; i < partitions; ++i)
@@ -261,25 +216,19 @@ public class ProducerTests
//Arrange
await using var client = CreateClient();
- var topicName = $"round-robin-partitioned-{Guid.NewGuid():N}";
+ var topicName = CreateTopicName();
const string content = "test-message";
const int partitions = 3;
var consumers = new List<IConsumer<string>>();
-
_fixture.CreatePartitionedTopic($"persistent://public/default/{topicName}",
partitions);
+ _fixture.CreatePartitionedTopic(topicName, partitions);
//Act
- await using var producer = client.NewProducer(Schema.String)
- .Topic(topicName)
- .Create();
+ await using var producer = CreateProducer(client, topicName);
for (var i = 0; i < partitions; ++i)
{
- consumers.Add(client.NewConsumer(Schema.String)
- .Topic($"{topicName}-partition-{i}")
- .SubscriptionName("test-sub")
- .InitialPosition(SubscriptionInitialPosition.Earliest)
- .Create());
+ consumers.Add(CreateConsumer(client,
$"{topicName}-partition-{i}"));
await producer.Send($"{content}-{i}");
_testOutputHelper.WriteLine($"Sent a message to consumer [{i}]");
}
@@ -295,26 +244,12 @@ public class ProducerTests
public async Task
Send_WhenProducingMessagesForOnePartition_ShouldPartitionOnlyBeNegativeOne()
{
//Arrange
- var testRunId = Guid.NewGuid().ToString("N");
const int numberOfMessages = 10;
- var topic = $"persistent://public/default/producer-test-{testRunId}";
-
- await using var client = PulsarClient.Builder()
- .ServiceUrl(_fixture.ServiceUrl)
- .Authentication(AuthenticationFactory.Token(ct =>
ValueTask.FromResult(_fixture.CreateToken(Timeout.InfiniteTimeSpan))))
- .Build();
+ var topicName = CreateTopicName();
- await using var consumer = client.NewConsumer(Schema.ByteArray)
- .ConsumerName($"consumer-{testRunId}")
- .InitialPosition(SubscriptionInitialPosition.Earliest)
- .SubscriptionName($"subscription-{testRunId}")
- .Topic(topic)
- .Create();
-
- await using var producer = client.NewProducer(Schema.ByteArray)
- .ProducerName($"producer-{testRunId}")
- .Topic(topic)
- .Create();
+ await using var client = CreateClient();
+ await using var consumer = CreateConsumer(client, topicName);
+ await using var producer = CreateProducer(client, topicName);
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60));
@@ -342,29 +277,15 @@ public class ProducerTests
public async Task
Send_WhenProducingMessagesForFourPartitions_ShouldPartitionBeDifferentThanNegativeOne()
{
//Arrange
- var testRunId = Guid.NewGuid().ToString("N");
const int numberOfMessages = 10;
const int partitions = 4;
- var topicName = $"producer-with-4-partitions-test";
+ var topicName = CreateTopicName();
-
_fixture.CreatePartitionedTopic($"persistent://public/default/{topicName}",
partitions);
+ _fixture.CreatePartitionedTopic(topicName, partitions);
- await using var client = PulsarClient.Builder()
- .ServiceUrl(_fixture.ServiceUrl)
- .Authentication(AuthenticationFactory.Token(ct =>
ValueTask.FromResult(_fixture.CreateToken(Timeout.InfiniteTimeSpan))))
- .Build();
-
- await using var consumer = client.NewConsumer(Schema.ByteArray)
- .ConsumerName($"consumer-{testRunId}")
- .InitialPosition(SubscriptionInitialPosition.Earliest)
- .SubscriptionName($"subscription-{testRunId}")
- .Topic(topicName)
- .Create();
-
- await using var producer = client.NewProducer(Schema.ByteArray)
- .ProducerName($"producer-{testRunId}")
- .Topic(topicName)
- .Create();
+ await using var client = CreateClient();
+ await using var consumer = CreateConsumer(client, topicName);
+ await using var producer = CreateProducer(client, topicName);
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60));
@@ -388,20 +309,19 @@ public class ProducerTests
foundNonNegativeOne.Should().Be(true);
}
- private static async Task<IEnumerable<MessageId>>
ProduceMessages(IProducer<byte[]> producer, int numberOfMessages,
CancellationToken ct)
+ private static async Task<IEnumerable<MessageId>>
ProduceMessages(IProducer<string> producer, int numberOfMessages,
CancellationToken ct)
{
var messageIds = new MessageId[numberOfMessages];
for (var i = 0; i < numberOfMessages; ++i)
{
- var data = Encoding.UTF8.GetBytes($"Sent #{i} at
{DateTimeOffset.UtcNow:s}");
- messageIds[i] = await producer.Send(data, ct);
+ messageIds[i] = await producer.Send($"Sent #{i} at
{DateTimeOffset.UtcNow:s}", ct);
}
return messageIds;
}
- private static async Task<IEnumerable<MessageId>>
ConsumeMessages(IConsumer<byte[]> consumer, int numberOfMessages,
CancellationToken ct)
+ private static async Task<IEnumerable<MessageId>>
ConsumeMessages(IConsumer<string> consumer, int numberOfMessages,
CancellationToken ct)
{
var messageIds = new List<MessageId>(numberOfMessages);
@@ -420,6 +340,35 @@ public class ProducerTests
return messageIds;
}
+ private void LogState(ConsumerStateChanged stateChange)
+ => _testOutputHelper.WriteLine($"The consumer for topic
'{stateChange.Consumer.Topic}' changed state to '{stateChange.ConsumerState}'");
+
+ private void LogState(ProducerStateChanged stateChange)
+ => _testOutputHelper.WriteLine($"The producer for topic
'{stateChange.Producer.Topic}' changed state to '{stateChange.ProducerState}'");
+
+ private static string CreateTopicName() =>
$"persistent://public/default/{Guid.NewGuid():N}";
+ private static string CreateConsumerName() =>
$"consumer-{Guid.NewGuid():N}";
+ private static string CreateSubscriptionName() =>
$"subscription-{Guid.NewGuid():N}";
+
+ private IProducer<string> CreateProducer(
+ IPulsarClient pulsarClient,
+ string? topicName = null,
+ ProducerAccessMode producerAccessMode = ProducerAccessMode.Shared)
+ => pulsarClient.NewProducer(Schema.String)
+ .Topic(topicName is null ? CreateTopicName() : topicName)
+ .ProducerAccessMode(producerAccessMode)
+ .StateChangedHandler(LogState)
+ .Create();
+
+ private IConsumer<string> CreateConsumer(IPulsarClient pulsarClient,
string? topicName = null)
+ => pulsarClient.NewConsumer(Schema.String)
+ .ConsumerName(CreateConsumerName())
+ .InitialPosition(SubscriptionInitialPosition.Earliest)
+ .SubscriptionName(CreateSubscriptionName())
+ .Topic(topicName is null ? CreateTopicName() : topicName)
+ .StateChangedHandler(LogState)
+ .Create();
+
private IPulsarClient CreateClient()
=> PulsarClient
.Builder()
diff --git a/tests/DotPulsar.Tests/ReaderTests.cs
b/tests/DotPulsar.Tests/Internal/ReaderTests.cs
similarity index 83%
rename from tests/DotPulsar.Tests/ReaderTests.cs
rename to tests/DotPulsar.Tests/Internal/ReaderTests.cs
index 7d8fb43..968f26d 100644
--- a/tests/DotPulsar.Tests/ReaderTests.cs
+++ b/tests/DotPulsar.Tests/Internal/ReaderTests.cs
@@ -12,7 +12,7 @@
* limitations under the License.
*/
-namespace DotPulsar.Tests;
+namespace DotPulsar.Tests.Internal;
using DotPulsar.Abstractions;
using DotPulsar.Exceptions;
@@ -41,9 +41,8 @@ public class ReaderTests
public async Task
GetLastMessageId_GivenEmptyTopic_ShouldBeEqualToMessageIdEarliest()
{
//Arrange
- const string topicName =
$"reader-{nameof(GetLastMessageId_GivenEmptyTopic_ShouldBeEqualToMessageIdEarliest)}";
await using var client = CreateClient();
- await using var reader = CreateReader(client, MessageId.Earliest,
topicName);
+ await using var reader = CreateReader(client, MessageId.Earliest);
//Act
var actual = await reader.GetLastMessageId();
@@ -56,8 +55,7 @@ public class ReaderTests
public async Task
GetLastMessageId_GivenNonPartitionedTopic_ShouldGetMessageId()
{
//Arrange
- const string topicName =
"persistent://public/default/reader-non-partitioned-topic-get-last-MessageId";
- const string content = "test-message";
+ var topicName = CreateTopicName();
const int numberOfMessages = 6;
await using var client = CreateClient();
@@ -67,7 +65,7 @@ public class ReaderTests
MessageId expected = null!;
for (var i = 0; i < numberOfMessages; i++)
{
- var messageId = await producer.Send(content);
+ var messageId = await producer.Send("test-message");
if (i >= 5)
{
expected = messageId;
@@ -85,7 +83,7 @@ public class ReaderTests
public async Task
GetLastMessageId_GivenPartitionedTopic_ShouldThrowException()
{
//Arrange
- const string topicName =
"persistent://public/default/reader-partitioned-topic-get-last-MessageId";
+ var topicName = CreateTopicName();
const int partitions = 3;
_fixture.CreatePartitionedTopic(topicName, partitions);
@@ -103,9 +101,8 @@ public class ReaderTests
public async Task
GetLastMessageIds_GivenEmptyTopic_ShouldBeEqualToMessageIdEarliest()
{
//Arrange
- const string topicName =
$"reader-{nameof(GetLastMessageIds_GivenEmptyTopic_ShouldBeEqualToMessageIdEarliest)}";
await using var client = CreateClient();
- await using var reader = CreateReader(client, MessageId.Earliest,
topicName);
+ await using var reader = CreateReader(client, MessageId.Earliest);
var expected = new List<MessageId>() { MessageId.Earliest };
//Act
@@ -119,8 +116,7 @@ public class ReaderTests
public async Task
GetLastMessageIds_GivenNonPartitionedTopic_ShouldGetMessageIdFromPartition()
{
//Arrange
- const string topicName =
"reader-non-partitioned-topic-get-last-MessageIds";
- const string content = "test-message";
+ var topicName = CreateTopicName();
const int numberOfMessages = 6;
await using var client = CreateClient();
@@ -130,7 +126,7 @@ public class ReaderTests
var expected = new List<MessageId>();
for (var i = 0; i < numberOfMessages; i++)
{
- var messageId = await producer.Send(content);
+ var messageId = await producer.Send("test-message");
if (i >= 5)
{
expected.Add(messageId);
@@ -147,11 +143,10 @@ public class ReaderTests
public async Task
GetLastMessageIds_GivenPartitionedTopic_ShouldGetMessageIdsFromPartitions()
{
//Arrange
- const string topicName =
"reader-partitioned-topic-get-last-MessageIds";
- const string content = "test-message";
+ var topicName = CreateTopicName();
const int numberOfMessages = 6;
const int partitions = 3;
-
_fixture.CreatePartitionedTopic($"persistent://public/default/{topicName}",
partitions);
+ _fixture.CreatePartitionedTopic(topicName, partitions);
await using var client = CreateClient();
await using var reader = CreateReader(client, MessageId.Earliest,
topicName);
@@ -160,7 +155,7 @@ public class ReaderTests
var expected = new List<MessageId>();
for (var i = 0; i < numberOfMessages; i++)
{
- var messageId = await producer.Send(content);
+ var messageId = await producer.Send("test-message");
if (i >= 3)
{
expected.Add(messageId);
@@ -178,7 +173,7 @@ public class ReaderTests
public async Task Receive_GivenNonPartitionedTopic_ShouldReceiveAll()
{
//Arrange
- const string topicName =
"Receive_GivenNonPartitionedTopicWithMessages_ShouldReceiveAll";
+ var topicName = CreateTopicName();
const int numberOfMessages = 10;
await using var client = CreateClient();
@@ -208,10 +203,10 @@ public class ReaderTests
public async Task Receive_GivenPartitionedTopic_ShouldReceiveAll()
{
//Arrange
- const string topicName = "reader-with-3-partitions-test";
+ var topicName = CreateTopicName();
const int numberOfMessages = 50;
const int partitions = 3;
-
_fixture.CreatePartitionedTopic($"persistent://public/default/{topicName}",
partitions);
+ _fixture.CreatePartitionedTopic(topicName, partitions);
await using var client = CreateClient();
await using var producer = CreateProducer(client, topicName);
@@ -249,7 +244,7 @@ public class ReaderTests
})
.ServiceUrl(new Uri("pulsar://nosuchhost")).Build();
- await using var reader = CreateReader(client, MessageId.Earliest,
"persistent://a/b/c");
+ await using var reader = CreateReader(client, MessageId.Earliest,
CreateTopicName());
var receiveTask = reader.Receive().AsTask();
semaphoreSlim.Release();
@@ -272,7 +267,7 @@ public class ReaderTests
})
.ServiceUrl(new Uri("pulsar://nosuchhost")).Build();
- await using var reader = CreateReader(client, MessageId.Earliest,
"persistent://a/b/c");
+ await using var reader = CreateReader(client, MessageId.Earliest,
CreateTopicName());
await reader.OnStateChangeTo(ReaderState.Faulted);
@@ -289,14 +284,18 @@ public class ReaderTests
private void LogState(ProducerStateChanged stateChange)
=> _testOutputHelper.WriteLine($"The producer for topic
'{stateChange.Producer.Topic}' changed state to '{stateChange.ProducerState}'");
- private IProducer<String> CreateProducer(IPulsarClient pulsarClient,
string topicName) => pulsarClient.NewProducer(Schema.String)
- .Topic(topicName)
+ private static string CreateTopicName() =>
$"persistent://public/default/{Guid.NewGuid():N}";
+
+ private IProducer<string> CreateProducer(IPulsarClient pulsarClient,
string? topicName = null)
+ => pulsarClient.NewProducer(Schema.String)
+ .Topic(topicName is null ? CreateTopicName() : topicName)
.StateChangedHandler(LogState)
.Create();
- private IReader<String> CreateReader(IPulsarClient pulsarClient, MessageId
messageId, string topicName) => pulsarClient.NewReader(Schema.String)
+ private IReader<string> CreateReader(IPulsarClient pulsarClient, MessageId
messageId, string? topicName = null)
+ => pulsarClient.NewReader(Schema.String)
.StartMessageId(messageId)
- .Topic(topicName)
+ .Topic(topicName is null ? CreateTopicName() : topicName)
.StateChangedHandler(LogState)
.Create();
diff --git a/tests/DotPulsar.Tests/TokenTests.cs
b/tests/DotPulsar.Tests/PulsarClientTests.cs
similarity index 85%
rename from tests/DotPulsar.Tests/TokenTests.cs
rename to tests/DotPulsar.Tests/PulsarClientTests.cs
index 3759125..c5956bc 100644
--- a/tests/DotPulsar.Tests/TokenTests.cs
+++ b/tests/DotPulsar.Tests/PulsarClientTests.cs
@@ -25,14 +25,14 @@ using Xunit;
using Xunit.Abstractions;
[Collection("Integration"), Trait("Category", "Integration")]
-public class TokenTests
+public class PulsarClientTests
{
private const string MyTopic = "persistent://public/default/mytopic";
private readonly IntegrationFixture _fixture;
private readonly ITestOutputHelper _testOutputHelper;
- public TokenTests(IntegrationFixture fixture, ITestOutputHelper
outputHelper)
+ public PulsarClientTests(IntegrationFixture fixture, ITestOutputHelper
outputHelper)
{
_fixture = fixture;
_testOutputHelper = outputHelper;
@@ -142,22 +142,9 @@ public class TokenTests
=> client
.NewProducer(Schema.String)
.Topic(MyTopic)
- .StateChangedHandler(Monitor)
+ .StateChangedHandler(LogState)
.Create();
- private void Monitor(ProducerStateChanged stateChanged, CancellationToken
_)
- {
- var stateMessage = stateChanged.ProducerState switch
- {
- ProducerState.Connected => "is connected",
- ProducerState.Disconnected => "is disconnected",
- ProducerState.PartiallyConnected => "is partially connected",
- ProducerState.Closed => "has closed",
- ProducerState.Faulted => "has faulted",
- _ => $"has an unknown state '{stateChanged.ProducerState}'"
- };
-
- var topic = stateChanged.Producer.Topic;
- _testOutputHelper.WriteLine($"The producer for topic '{topic}'
{stateMessage}");
- }
+ private void LogState(ProducerStateChanged stateChange)
+ => _testOutputHelper.WriteLine($"The producer for topic
'{stateChange.Producer.Topic}' changed state to '{stateChange.ProducerState}'");
}