This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a8ef06a  Added 3 more Connectivity. But added them as skipped for now. 
(#200)
a8ef06a is described below

commit a8ef06af0c9fd0d5147333abb0f760c8e67abd06
Author: entvex <[email protected]>
AuthorDate: Fri Jan 12 13:34:58 2024 +0100

    Added 3 more Connectivity. But added them as skipped for now. (#200)
    
    Co-authored-by: David Jensen <[email protected]>
---
 tests/DotPulsar.Tests/Internal/ConsumerTests.cs | 24 ++++++++++++++++++++++++
 tests/DotPulsar.Tests/Internal/ProducerTests.cs | 24 ++++++++++++++++++++++++
 tests/DotPulsar.Tests/Internal/ReaderTests.cs   | 24 ++++++++++++++++++++++++
 3 files changed, 72 insertions(+)

diff --git a/tests/DotPulsar.Tests/Internal/ConsumerTests.cs 
b/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
index e60a419..e5d51d1 100644
--- a/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
+++ b/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
@@ -192,6 +192,30 @@ public sealed class ConsumerTests : IDisposable
         exception.Should().BeOfType<ConsumerFaultedException>();
     }
 
+    [Fact(Skip = "Skip for now")]
+    public async Task 
Connectivity_WhenInitiallyConnectedWithNoMessagesThenGoDown_ShouldBeAbleToReceiveWhenUpAgain()
+    {
+        //Arrange
+        var topicName = await _fixture.CreateTopic(_cts.Token);
+        await using var client = CreateClient();
+        await using var consumer = CreateConsumer(client, topicName);
+        await using var producer = CreateProducer(client, topicName);
+        await consumer.StateChangedTo(ConsumerState.Active, _cts.Token);
+        var receiveTask = consumer.Receive(_cts.Token);
+        await using (await _fixture.DisableThePulsarConnection())
+        {
+            await consumer.StateChangedTo(ConsumerState.Disconnected, 
_cts.Token);
+        }
+        await consumer.StateChangedTo(ConsumerState.Active, _cts.Token);
+        await ProduceMessages(producer, 1, "test-message", _cts.Token);
+
+        //Act
+        var exception = await Record.ExceptionAsync(async () => await 
receiveTask.AsTask());
+
+        //Assert
+        exception.Should().BeNull();
+    }
+
     [Fact]
     public async Task 
Connectivity_WhenConnectionIsInitiallyDownAndComesUp_ShouldBeAbleToReceive()
     {
diff --git a/tests/DotPulsar.Tests/Internal/ProducerTests.cs 
b/tests/DotPulsar.Tests/Internal/ProducerTests.cs
index 536f4a4..7e0f8f4 100644
--- a/tests/DotPulsar.Tests/Internal/ProducerTests.cs
+++ b/tests/DotPulsar.Tests/Internal/ProducerTests.cs
@@ -268,6 +268,30 @@ public sealed class ProducerTests : IDisposable
         foundNonNegativeOne.Should().Be(true);
     }
 
+    [Fact(Skip = "Skip for now")]
+    public async Task 
Connectivity_WhenConnectionIsInitiallyUpAndComesDown_ShouldBeAbleToSendWhileDown()
+    {
+        //Arrange
+        var topicName = await _fixture.CreateTopic(_cts.Token);
+        await using var client = CreateClient();
+        await using var producer = CreateProducer(client, topicName);
+        await producer.OnStateChangeTo(ProducerState.Connected, _cts.Token);
+
+        ValueTask<MessageId> sendTask;
+        await using (await _fixture.DisableThePulsarConnection())
+        {
+            await producer.StateChangedTo(ProducerState.Disconnected, 
_cts.Token);
+            sendTask = producer.Send("test", _cts.Token);
+        }
+        await producer.OnStateChangeTo(ProducerState.Connected, _cts.Token);
+
+        //Act
+        var exception = await Record.ExceptionAsync(async () => await 
sendTask.AsTask());
+
+        //Assert
+        exception.Should().BeNull();
+    }
+
     [Fact]
     public async Task 
Connectivity_WhenConnectionIsInitiallyDown_ShouldBeAbleToDispose()
     {
diff --git a/tests/DotPulsar.Tests/Internal/ReaderTests.cs 
b/tests/DotPulsar.Tests/Internal/ReaderTests.cs
index 44c81f4..504b679 100644
--- a/tests/DotPulsar.Tests/Internal/ReaderTests.cs
+++ b/tests/DotPulsar.Tests/Internal/ReaderTests.cs
@@ -212,6 +212,30 @@ public sealed class ReaderTests : IDisposable
         exception.Should().BeOfType<ReaderFaultedException>();
     }
 
+    [Fact(Skip = "Skip for now")]
+    public async Task 
Connectivity_WhenInitiallyConnectedWithNoMessagesThenGoDown_ShouldBeAbleToReceiveWhenUpAgain()
+    {
+        //Arrange
+        var topicName = await _fixture.CreateTopic(_cts.Token);
+        await using var client = CreateClient();
+        await using var reader = CreateReader(client, MessageId.Earliest, 
topicName);
+        await using var producer = CreateProducer(client, topicName);
+        await reader.StateChangedTo(ReaderState.Connected, _cts.Token);
+        var receiveTask = reader.Receive(_cts.Token);
+        await using (await _fixture.DisableThePulsarConnection())
+        {
+            await reader.StateChangedTo(ReaderState.Disconnected, _cts.Token);
+        }
+        await reader.StateChangedTo(ReaderState.Connected, _cts.Token);
+        await producer.Send("test-message", _cts.Token);
+
+        //Act
+        var exception = await Record.ExceptionAsync(async () => await 
receiveTask.AsTask());
+
+        //Assert
+        exception.Should().BeNull();
+    }
+
     [Fact]
     public async Task 
Connectivity_WhenConnectionIsInitiallyDownAndComesUp_ShouldBeAbleToReceive()
     {

Reply via email to