This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b49abe6 Added connectivity tests for Consumer and Reader (#199)
b49abe6 is described below
commit b49abe6479170d8afd77024eff7cc0cc84c72867
Author: entvex <[email protected]>
AuthorDate: Fri Jan 5 15:31:28 2024 +0100
Added connectivity tests for Consumer and Reader (#199)
Co-authored-by: David Jensen <[email protected]>
---
tests/DotPulsar.Tests/Internal/ConsumerTests.cs | 75 +++++++++++++++++++++++++
tests/DotPulsar.Tests/Internal/ReaderTests.cs | 75 +++++++++++++++++++++++++
2 files changed, 150 insertions(+)
diff --git a/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
b/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
index e74fb9b..e60a419 100644
--- a/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
+++ b/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
@@ -192,6 +192,81 @@ public sealed class ConsumerTests : IDisposable
exception.Should().BeOfType<ConsumerFaultedException>();
}
+ [Fact]
+ public async Task
Connectivity_WhenConnectionIsInitiallyDownAndComesUp_ShouldBeAbleToReceive()
+ {
+ //Arrange
+ var topicName = await _fixture.CreateTopic(_cts.Token);
+ await using var client = CreateClient();
+ await using var producer = CreateProducer(client, topicName);
+ await ProduceMessages(producer, 1, "test-message", _cts.Token);
+ var connectionDown = await _fixture.DisableThePulsarConnection();
+ await using var consumer = CreateConsumer(client, topicName);
+
+ //Act
+ await connectionDown.DisposeAsync();
+ await consumer.StateChangedTo(ConsumerState.Active, _cts.Token);
+ var exception = await Record.ExceptionAsync(() =>
consumer.Receive(_cts.Token).AsTask());
+
+ //Assert
+ exception.Should().BeNull();
+ }
+
+ [Fact]
+ public async Task
Connectivity_WhenConnectionIsInitiallyDown_ShouldBeAbleToDispose()
+ {
+ //Arrange
+ await using var connectionDown = await
_fixture.DisableThePulsarConnection();
+ await using var client = CreateClient();
+ await using var consumer = CreateConsumer(client, await
_fixture.CreateTopic(_cts.Token));
+
+ //Act
+ var exception = await Record.ExceptionAsync(() =>
consumer.DisposeAsync().AsTask());
+
+ //Assert
+ exception.Should().BeNull();
+ }
+
+ [Fact]
+ public async Task
Connectivity_WhenConnectionIsInitiallyUpAndGoesDown_ShouldBeAbleToDispose()
+ {
+ //Arrange
+ await using var client = CreateClient();
+ var consumer = CreateConsumer(client, await
_fixture.CreateTopic(_cts.Token));
+ await consumer.OnStateChangeTo(ConsumerState.Active, _cts.Token);
+
+ //Act
+ await using var connectionDown = await
_fixture.DisableThePulsarConnection();
+ await consumer.StateChangedTo(ConsumerState.Disconnected, _cts.Token);
+ var exception = await Record.ExceptionAsync(() =>
consumer.DisposeAsync().AsTask());
+
+ //Assert
+ exception.Should().BeNull();
+ }
+
+ [Fact]
+ public async Task
Connectivity_WhenConnectionIsInitiallyUpAndReconnects_ShouldBeAbleToReceive()
+ {
+ //Arrange
+ var topicName = await _fixture.CreateTopic(_cts.Token);
+ await using var client = CreateClient();
+ await using var producer = CreateProducer(client, topicName);
+ await using var consumer = CreateConsumer(client, topicName);
+ await ProduceMessages(producer, 1, "test-message", _cts.Token);
+ await consumer.OnStateChangeTo(ConsumerState.Active, _cts.Token);
+
+ //Act
+ await using (await _fixture.DisableThePulsarConnection())
+ {
+ await consumer.StateChangedTo(ConsumerState.Disconnected,
_cts.Token);
+ }
+ await consumer.OnStateChangeTo(ConsumerState.Active, _cts.Token);
+ var exception = await Record.ExceptionAsync(() =>
consumer.Receive(_cts.Token).AsTask());
+
+ //Assert
+ exception.Should().BeNull();
+ }
+
private static async Task<IEnumerable<MessageId>>
ProduceMessages(IProducer<string> producer, int numberOfMessages, string
content, CancellationToken ct)
{
var messageIds = new MessageId[numberOfMessages];
diff --git a/tests/DotPulsar.Tests/Internal/ReaderTests.cs
b/tests/DotPulsar.Tests/Internal/ReaderTests.cs
index 7969844..44c81f4 100644
--- a/tests/DotPulsar.Tests/Internal/ReaderTests.cs
+++ b/tests/DotPulsar.Tests/Internal/ReaderTests.cs
@@ -212,6 +212,81 @@ public sealed class ReaderTests : IDisposable
exception.Should().BeOfType<ReaderFaultedException>();
}
+ [Fact]
+ public async Task
Connectivity_WhenConnectionIsInitiallyDownAndComesUp_ShouldBeAbleToReceive()
+ {
+ //Arrange
+ var topicName = await _fixture.CreateTopic(_cts.Token);
+ await using var client = CreateClient();
+ await using var producer = CreateProducer(client, topicName);
+ await producer.Send("test-message", _cts.Token);
+ var connectionDown = await _fixture.DisableThePulsarConnection();
+ await using var reader = CreateReader(client, MessageId.Earliest,
topicName);
+
+ //Act
+ await connectionDown.DisposeAsync();
+ await reader.StateChangedTo(ReaderState.Connected, _cts.Token);
+ var exception = await Record.ExceptionAsync(() =>
reader.Receive(_cts.Token).AsTask());
+
+ //Assert
+ exception.Should().BeNull();
+ }
+
+ [Fact]
+ public async Task
Connectivity_WhenConnectionIsInitiallyDown_ShouldBeAbleToDispose()
+ {
+ //Arrange
+ await using var connectionDown = await
_fixture.DisableThePulsarConnection();
+ await using var client = CreateClient();
+ var reader = CreateReader(client, MessageId.Earliest, await
_fixture.CreateTopic(_cts.Token));
+
+ //Act
+ var exception = await Record.ExceptionAsync(() =>
reader.DisposeAsync().AsTask());
+
+ //Assert
+ exception.Should().BeNull();
+ }
+
+ [Fact]
+ public async Task
Connectivity_WhenConnectionIsInitiallyUpAndGoesDown_ShouldBeAbleToDispose()
+ {
+ //Arrange
+ await using var client = CreateClient();
+ var reader = CreateReader(client, MessageId.Earliest, await
_fixture.CreateTopic(_cts.Token));
+ await reader.OnStateChangeTo(ReaderState.Connected, _cts.Token);
+
+ //Act
+ await using var connectionDown = await
_fixture.DisableThePulsarConnection();
+ await reader.StateChangedTo(ReaderState.Disconnected, _cts.Token);
+ var exception = await Record.ExceptionAsync(() =>
reader.DisposeAsync().AsTask());
+
+ //Assert
+ exception.Should().BeNull();
+ }
+
+ [Fact]
+ public async Task
Connectivity_WhenConnectionIsInitiallyUpAndReconnects_ShouldBeAbleToReceive()
+ {
+ //Arrange
+ var topicName = await _fixture.CreateTopic(_cts.Token);
+ await using var client = CreateClient();
+ await using var producer = CreateProducer(client, topicName);
+ await using var reader = CreateReader(client, MessageId.Earliest,
topicName);
+ await producer.Send("test-message", _cts.Token);
+ await reader.OnStateChangeTo(ReaderState.Connected, _cts.Token);
+
+ //Act
+ await using (await _fixture.DisableThePulsarConnection())
+ {
+ await reader.StateChangedTo(ReaderState.Disconnected, _cts.Token);
+ }
+ await reader.OnStateChangeTo(ReaderState.Connected, _cts.Token);
+ var exception = await Record.ExceptionAsync(() =>
reader.Receive(_cts.Token).AsTask());
+
+ //Assert
+ exception.Should().BeNull();
+ }
+
private IProducer<string> CreateProducer(IPulsarClient pulsarClient,
string topicName)
=> pulsarClient.NewProducer(Schema.String)
.Topic(topicName)