This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b49abe6  Added connectivity tests for Consumer and Reader (#199)
b49abe6 is described below

commit b49abe6479170d8afd77024eff7cc0cc84c72867
Author: entvex <[email protected]>
AuthorDate: Fri Jan 5 15:31:28 2024 +0100

    Added connectivity tests for Consumer and Reader (#199)
    
    Co-authored-by: David Jensen <[email protected]>
---
 tests/DotPulsar.Tests/Internal/ConsumerTests.cs | 75 +++++++++++++++++++++++++
 tests/DotPulsar.Tests/Internal/ReaderTests.cs   | 75 +++++++++++++++++++++++++
 2 files changed, 150 insertions(+)

diff --git a/tests/DotPulsar.Tests/Internal/ConsumerTests.cs 
b/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
index e74fb9b..e60a419 100644
--- a/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
+++ b/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
@@ -192,6 +192,81 @@ public sealed class ConsumerTests : IDisposable
         exception.Should().BeOfType<ConsumerFaultedException>();
     }
 
+    [Fact]
+    public async Task 
Connectivity_WhenConnectionIsInitiallyDownAndComesUp_ShouldBeAbleToReceive()
+    {
+        //Arrange
+        var topicName = await _fixture.CreateTopic(_cts.Token);
+        await using var client = CreateClient();
+        await using var producer = CreateProducer(client, topicName);
+        await ProduceMessages(producer, 1, "test-message", _cts.Token);
+        var connectionDown = await _fixture.DisableThePulsarConnection();
+        await using var consumer = CreateConsumer(client, topicName);
+
+        //Act
+        await connectionDown.DisposeAsync();
+        await consumer.StateChangedTo(ConsumerState.Active, _cts.Token);
+        var exception = await Record.ExceptionAsync(() => 
consumer.Receive(_cts.Token).AsTask());
+
+        //Assert
+        exception.Should().BeNull();
+    }
+
+    [Fact]
+    public async Task 
Connectivity_WhenConnectionIsInitiallyDown_ShouldBeAbleToDispose()
+    {
+        //Arrange
+        await using var connectionDown = await 
_fixture.DisableThePulsarConnection();
+        await using var client = CreateClient();
+        await using var consumer = CreateConsumer(client, await 
_fixture.CreateTopic(_cts.Token));
+
+        //Act
+        var exception = await Record.ExceptionAsync(() => 
consumer.DisposeAsync().AsTask());
+
+        //Assert
+        exception.Should().BeNull();
+    }
+
+    [Fact]
+    public async Task 
Connectivity_WhenConnectionIsInitiallyUpAndGoesDown_ShouldBeAbleToDispose()
+    {
+        //Arrange
+        await using var client = CreateClient();
+        var consumer = CreateConsumer(client, await 
_fixture.CreateTopic(_cts.Token));
+        await consumer.OnStateChangeTo(ConsumerState.Active, _cts.Token);
+
+        //Act
+        await using var connectionDown = await 
_fixture.DisableThePulsarConnection();
+        await consumer.StateChangedTo(ConsumerState.Disconnected, _cts.Token);
+        var exception = await Record.ExceptionAsync(() => 
consumer.DisposeAsync().AsTask());
+
+        //Assert
+        exception.Should().BeNull();
+    }
+
+    [Fact]
+    public async Task 
Connectivity_WhenConnectionIsInitiallyUpAndReconnects_ShouldBeAbleToReceive()
+    {
+        //Arrange
+        var topicName = await _fixture.CreateTopic(_cts.Token);
+        await using var client = CreateClient();
+        await using var producer = CreateProducer(client, topicName);
+        await using var consumer = CreateConsumer(client, topicName);
+        await ProduceMessages(producer, 1, "test-message", _cts.Token);
+        await consumer.OnStateChangeTo(ConsumerState.Active, _cts.Token);
+
+        //Act
+        await using (await _fixture.DisableThePulsarConnection())
+        {
+            await consumer.StateChangedTo(ConsumerState.Disconnected, 
_cts.Token);
+        }
+        await consumer.OnStateChangeTo(ConsumerState.Active, _cts.Token);
+        var exception = await Record.ExceptionAsync(() => 
consumer.Receive(_cts.Token).AsTask());
+
+        //Assert
+        exception.Should().BeNull();
+    }
+
     private static async Task<IEnumerable<MessageId>> 
ProduceMessages(IProducer<string> producer, int numberOfMessages, string 
content, CancellationToken ct)
     {
         var messageIds = new MessageId[numberOfMessages];
diff --git a/tests/DotPulsar.Tests/Internal/ReaderTests.cs 
b/tests/DotPulsar.Tests/Internal/ReaderTests.cs
index 7969844..44c81f4 100644
--- a/tests/DotPulsar.Tests/Internal/ReaderTests.cs
+++ b/tests/DotPulsar.Tests/Internal/ReaderTests.cs
@@ -212,6 +212,81 @@ public sealed class ReaderTests : IDisposable
         exception.Should().BeOfType<ReaderFaultedException>();
     }
 
+    [Fact]
+    public async Task 
Connectivity_WhenConnectionIsInitiallyDownAndComesUp_ShouldBeAbleToReceive()
+    {
+        //Arrange
+        var topicName = await _fixture.CreateTopic(_cts.Token);
+        await using var client = CreateClient();
+        await using var producer = CreateProducer(client, topicName);
+        await producer.Send("test-message", _cts.Token);
+        var connectionDown = await _fixture.DisableThePulsarConnection();
+        await using var reader = CreateReader(client, MessageId.Earliest, 
topicName);
+
+        //Act
+        await connectionDown.DisposeAsync();
+        await reader.StateChangedTo(ReaderState.Connected, _cts.Token);
+        var exception = await Record.ExceptionAsync(() => 
reader.Receive(_cts.Token).AsTask());
+
+        //Assert
+        exception.Should().BeNull();
+    }
+
+    [Fact]
+    public async Task 
Connectivity_WhenConnectionIsInitiallyDown_ShouldBeAbleToDispose()
+    {
+        //Arrange
+        await using var connectionDown = await 
_fixture.DisableThePulsarConnection();
+        await using var client = CreateClient();
+        var reader = CreateReader(client, MessageId.Earliest, await 
_fixture.CreateTopic(_cts.Token));
+
+        //Act
+        var exception = await Record.ExceptionAsync(() => 
reader.DisposeAsync().AsTask());
+
+        //Assert
+        exception.Should().BeNull();
+    }
+
+    [Fact]
+    public async Task 
Connectivity_WhenConnectionIsInitiallyUpAndGoesDown_ShouldBeAbleToDispose()
+    {
+        //Arrange
+        await using var client = CreateClient();
+        var reader = CreateReader(client, MessageId.Earliest, await 
_fixture.CreateTopic(_cts.Token));
+        await reader.OnStateChangeTo(ReaderState.Connected, _cts.Token);
+
+        //Act
+        await using var connectionDown = await 
_fixture.DisableThePulsarConnection();
+        await reader.StateChangedTo(ReaderState.Disconnected, _cts.Token);
+        var exception = await Record.ExceptionAsync(() => 
reader.DisposeAsync().AsTask());
+
+        //Assert
+        exception.Should().BeNull();
+    }
+
+    [Fact]
+    public async Task 
Connectivity_WhenConnectionIsInitiallyUpAndReconnects_ShouldBeAbleToReceive()
+    {
+        //Arrange
+        var topicName = await _fixture.CreateTopic(_cts.Token);
+        await using var client = CreateClient();
+        await using var producer = CreateProducer(client, topicName);
+        await using var reader = CreateReader(client, MessageId.Earliest, 
topicName);
+        await producer.Send("test-message", _cts.Token);
+        await reader.OnStateChangeTo(ReaderState.Connected, _cts.Token);
+
+        //Act
+        await using (await _fixture.DisableThePulsarConnection())
+        {
+            await reader.StateChangedTo(ReaderState.Disconnected, _cts.Token);
+        }
+        await reader.OnStateChangeTo(ReaderState.Connected, _cts.Token);
+        var exception = await Record.ExceptionAsync(() => 
reader.Receive(_cts.Token).AsTask());
+
+        //Assert
+        exception.Should().BeNull();
+    }
+
     private IProducer<string> CreateProducer(IPulsarClient pulsarClient, 
string topicName)
         => pulsarClient.NewProducer(Schema.String)
         .Topic(topicName)

Reply via email to