This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a8ef06a Added 3 more Connectivity. But added them as skipped for now.
(#200)
a8ef06a is described below
commit a8ef06af0c9fd0d5147333abb0f760c8e67abd06
Author: entvex <[email protected]>
AuthorDate: Fri Jan 12 13:34:58 2024 +0100
Added 3 more Connectivity. But added them as skipped for now. (#200)
Co-authored-by: David Jensen <[email protected]>
---
tests/DotPulsar.Tests/Internal/ConsumerTests.cs | 24 ++++++++++++++++++++++++
tests/DotPulsar.Tests/Internal/ProducerTests.cs | 24 ++++++++++++++++++++++++
tests/DotPulsar.Tests/Internal/ReaderTests.cs | 24 ++++++++++++++++++++++++
3 files changed, 72 insertions(+)
diff --git a/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
b/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
index e60a419..e5d51d1 100644
--- a/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
+++ b/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
@@ -192,6 +192,30 @@ public sealed class ConsumerTests : IDisposable
exception.Should().BeOfType<ConsumerFaultedException>();
}
+ [Fact(Skip = "Skip for now")]
+ public async Task
Connectivity_WhenInitiallyConnectedWithNoMessagesThenGoDown_ShouldBeAbleToReceiveWhenUpAgain()
+ {
+ //Arrange
+ var topicName = await _fixture.CreateTopic(_cts.Token);
+ await using var client = CreateClient();
+ await using var consumer = CreateConsumer(client, topicName);
+ await using var producer = CreateProducer(client, topicName);
+ await consumer.StateChangedTo(ConsumerState.Active, _cts.Token);
+ var receiveTask = consumer.Receive(_cts.Token);
+ await using (await _fixture.DisableThePulsarConnection())
+ {
+ await consumer.StateChangedTo(ConsumerState.Disconnected,
_cts.Token);
+ }
+ await consumer.StateChangedTo(ConsumerState.Active, _cts.Token);
+ await ProduceMessages(producer, 1, "test-message", _cts.Token);
+
+ //Act
+ var exception = await Record.ExceptionAsync(async () => await
receiveTask.AsTask());
+
+ //Assert
+ exception.Should().BeNull();
+ }
+
[Fact]
public async Task
Connectivity_WhenConnectionIsInitiallyDownAndComesUp_ShouldBeAbleToReceive()
{
diff --git a/tests/DotPulsar.Tests/Internal/ProducerTests.cs
b/tests/DotPulsar.Tests/Internal/ProducerTests.cs
index 536f4a4..7e0f8f4 100644
--- a/tests/DotPulsar.Tests/Internal/ProducerTests.cs
+++ b/tests/DotPulsar.Tests/Internal/ProducerTests.cs
@@ -268,6 +268,30 @@ public sealed class ProducerTests : IDisposable
foundNonNegativeOne.Should().Be(true);
}
+ [Fact(Skip = "Skip for now")]
+ public async Task
Connectivity_WhenConnectionIsInitiallyUpAndComesDown_ShouldBeAbleToSendWhileDown()
+ {
+ //Arrange
+ var topicName = await _fixture.CreateTopic(_cts.Token);
+ await using var client = CreateClient();
+ await using var producer = CreateProducer(client, topicName);
+ await producer.OnStateChangeTo(ProducerState.Connected, _cts.Token);
+
+ ValueTask<MessageId> sendTask;
+ await using (await _fixture.DisableThePulsarConnection())
+ {
+ await producer.StateChangedTo(ProducerState.Disconnected,
_cts.Token);
+ sendTask = producer.Send("test", _cts.Token);
+ }
+ await producer.OnStateChangeTo(ProducerState.Connected, _cts.Token);
+
+ //Act
+ var exception = await Record.ExceptionAsync(async () => await
sendTask.AsTask());
+
+ //Assert
+ exception.Should().BeNull();
+ }
+
[Fact]
public async Task
Connectivity_WhenConnectionIsInitiallyDown_ShouldBeAbleToDispose()
{
diff --git a/tests/DotPulsar.Tests/Internal/ReaderTests.cs
b/tests/DotPulsar.Tests/Internal/ReaderTests.cs
index 44c81f4..504b679 100644
--- a/tests/DotPulsar.Tests/Internal/ReaderTests.cs
+++ b/tests/DotPulsar.Tests/Internal/ReaderTests.cs
@@ -212,6 +212,30 @@ public sealed class ReaderTests : IDisposable
exception.Should().BeOfType<ReaderFaultedException>();
}
+ [Fact(Skip = "Skip for now")]
+ public async Task
Connectivity_WhenInitiallyConnectedWithNoMessagesThenGoDown_ShouldBeAbleToReceiveWhenUpAgain()
+ {
+ //Arrange
+ var topicName = await _fixture.CreateTopic(_cts.Token);
+ await using var client = CreateClient();
+ await using var reader = CreateReader(client, MessageId.Earliest,
topicName);
+ await using var producer = CreateProducer(client, topicName);
+ await reader.StateChangedTo(ReaderState.Connected, _cts.Token);
+ var receiveTask = reader.Receive(_cts.Token);
+ await using (await _fixture.DisableThePulsarConnection())
+ {
+ await reader.StateChangedTo(ReaderState.Disconnected, _cts.Token);
+ }
+ await reader.StateChangedTo(ReaderState.Connected, _cts.Token);
+ await producer.Send("test-message", _cts.Token);
+
+ //Act
+ var exception = await Record.ExceptionAsync(async () => await
receiveTask.AsTask());
+
+ //Assert
+ exception.Should().BeNull();
+ }
+
[Fact]
public async Task
Connectivity_WhenConnectionIsInitiallyDownAndComesUp_ShouldBeAbleToReceive()
{