This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 446ccd9 Adding connectivity tests for the producer using ToxiProxy
446ccd9 is described below
commit 446ccd98201af79a803fb005598908a206b5c5a3
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Thu Jan 4 13:22:45 2024 +0100
Adding connectivity tests for the producer using ToxiProxy
---
tests/DotPulsar.Tests/IntegrationFixture.cs | 36 ++++++-------
tests/DotPulsar.Tests/Internal/ProducerTests.cs | 69 +++++++++++++++++++++++++
2 files changed, 85 insertions(+), 20 deletions(-)
diff --git a/tests/DotPulsar.Tests/IntegrationFixture.cs
b/tests/DotPulsar.Tests/IntegrationFixture.cs
index 74827a9..9dec12f 100644
--- a/tests/DotPulsar.Tests/IntegrationFixture.cs
+++ b/tests/DotPulsar.Tests/IntegrationFixture.cs
@@ -19,7 +19,6 @@ using DotNet.Testcontainers.Containers;
using DotNet.Testcontainers.Networks;
using DotPulsar.Abstractions;
using Toxiproxy.Net;
-using Toxiproxy.Net.Toxics;
using Xunit.Abstractions;
using Xunit.Sdk;
@@ -32,17 +31,13 @@ public class IntegrationFixture : IAsyncLifetime
private const int ToxiProxyControlPort = 8474;
private const int ToxiProxyPort = 15124;
private readonly CancellationTokenSource _cts;
-
private readonly IMessageSink _messageSink;
-
private readonly INetwork _network;
private readonly IContainer _pulsarCluster;
private readonly IContainer _toxiProxy;
-
private Client _toxiProxyClient;
private Connection _toxiProxyConnection;
- private Proxy _toxiProxylocalToPulsarCluster;
-
+ private Proxy _toxiProxyPulsarProxy;
private string? _token;
public IntegrationFixture(IMessageSink messageSink)
@@ -94,7 +89,7 @@ public class IntegrationFixture : IAsyncLifetime
ServiceUrl = new
Uri($"pulsar://{_pulsarCluster.Hostname}:{PulsarPort}");
_toxiProxyConnection = new Connection();
_toxiProxyClient = _toxiProxyConnection.Client();
- _toxiProxylocalToPulsarCluster = new Proxy();
+ _toxiProxyPulsarProxy = new Proxy();
}
public Uri ServiceUrl { get; private set; }
@@ -119,17 +114,17 @@ public class IntegrationFixture : IAsyncLifetime
_messageSink.OnMessage(new DiagnosticMessage("Starting Pulsar
Cluster"));
await _pulsarCluster.StartAsync(_cts.Token);
_messageSink.OnMessage(new DiagnosticMessage("The containers has
initiated. Next, we'll configure Toxiproxy mappings."));
- _toxiProxyConnection = new Connection(_toxiProxy.Hostname,
_toxiProxy.GetMappedPublicPort(ToxiProxyControlPort));
+ _toxiProxyConnection = new Connection(_toxiProxy.Hostname,
_toxiProxy.GetMappedPublicPort(ToxiProxyControlPort));
_toxiProxyClient = _toxiProxyConnection.Client();
- _toxiProxylocalToPulsarCluster = new Proxy
+ _toxiProxyPulsarProxy = new Proxy
{
Name = "localToPulsarCluster",
Enabled = true,
Listen = $"{"0.0.0.0"}:{ToxiProxyPort}",
Upstream = $"{"pulsar"}:{PulsarPort}"
};
- await _toxiProxyClient.AddAsync(_toxiProxylocalToPulsarCluster);
+ await _toxiProxyClient.AddAsync(_toxiProxyPulsarProxy);
_messageSink.OnMessage(new DiagnosticMessage("Toxiproxy successfully
mapped connections between host and the Pulsar Cluster."));
ServiceUrl = new
Uri($"pulsar://{_toxiProxy.Hostname}:{_toxiProxy.GetMappedPublicPort(ToxiProxyPort)}");
@@ -202,22 +197,23 @@ public class IntegrationFixture : IAsyncLifetime
throw new Exception($"Could not create the partitioned topic:
{result.Stderr}");
}
- public async Task SimulateDisconnect()
+ public async Task<IAsyncDisposable> DisableThePulsarConnection()
{
- var timeoutProxy = new TimeoutToxic();
- timeoutProxy.Attributes.Timeout = 1000000000;
- timeoutProxy.Toxicity = 1.0;
- await _toxiProxylocalToPulsarCluster.AddAsync(timeoutProxy);
- await _toxiProxylocalToPulsarCluster.UpdateAsync();
+ _toxiProxyPulsarProxy.Enabled = false;
+ await _toxiProxyPulsarProxy.UpdateAsync();
+ return new Releaser(_toxiProxyPulsarProxy);
}
- public async Task RemoveAllToxins()
+ private class Releaser : IAsyncDisposable
{
- var allToxicsAsync = await
_toxiProxylocalToPulsarCluster.GetAllToxicsAsync();
+ private readonly Proxy _proxy;
+
+ public Releaser(Proxy proxy) => _proxy = proxy;
- foreach (var toxicBase in allToxicsAsync)
+ public async ValueTask DisposeAsync()
{
- await
_toxiProxylocalToPulsarCluster.RemoveToxicAsync(toxicBase.Name);
+ _proxy.Enabled = true;
+ await _proxy.UpdateAsync();
}
}
}
diff --git a/tests/DotPulsar.Tests/Internal/ProducerTests.cs
b/tests/DotPulsar.Tests/Internal/ProducerTests.cs
index 545849c..536f4a4 100644
--- a/tests/DotPulsar.Tests/Internal/ProducerTests.cs
+++ b/tests/DotPulsar.Tests/Internal/ProducerTests.cs
@@ -268,6 +268,75 @@ public sealed class ProducerTests : IDisposable
foundNonNegativeOne.Should().Be(true);
}
+ [Fact]
+ public async Task
Connectivity_WhenConnectionIsInitiallyDown_ShouldBeAbleToDispose()
+ {
+ //Arrange
+ await using var connectionDown = await
_fixture.DisableThePulsarConnection();
+ await using var client = CreateClient();
+ var producer = CreateProducer(client, await
_fixture.CreateTopic(_cts.Token));
+
+ //Act
+ var exception = await Record.ExceptionAsync(() =>
producer.DisposeAsync().AsTask());
+
+ //Assert
+ exception.Should().BeNull();
+ }
+
+ [Fact]
+ public async Task
Connectivity_WhenConnectionIsInitiallyDownAndComesUp_ShouldBeAbleToSend()
+ {
+ //Arrange
+ var connectionDown = await _fixture.DisableThePulsarConnection();
+ await using var client = CreateClient();
+ await using var producer = CreateProducer(client, await
_fixture.CreateTopic(_cts.Token));
+
+ //Act
+ await connectionDown.DisposeAsync();
+ await producer.StateChangedTo(ProducerState.Connected, _cts.Token);
+ var exception = await Record.ExceptionAsync(() =>
producer.Send("test", _cts.Token).AsTask());
+
+ //Assert
+ exception.Should().BeNull();
+ }
+
+ [Fact]
+ public async Task
Connectivity_WhenConnectionIsInitiallyUpAndGoesDown_ShouldBeAbleToDispose()
+ {
+ //Arrange
+ await using var client = CreateClient();
+ await using var producer = CreateProducer(client, await
_fixture.CreateTopic(_cts.Token));
+ await producer.OnStateChangeTo(ProducerState.Connected, _cts.Token);
+
+ //Act
+ await using var connectionDown = await
_fixture.DisableThePulsarConnection();
+ await producer.StateChangedTo(ProducerState.Disconnected, _cts.Token);
+ var exception = await Record.ExceptionAsync(() =>
producer.DisposeAsync().AsTask());
+
+ //Assert
+ exception.Should().BeNull();
+ }
+
+ [Fact]
+ public async Task
Connectivity_WhenConnectionIsInitiallyUpAndReconnects_ShouldBeAbleToSend()
+ {
+ //Arrange
+ await using var client = CreateClient();
+ await using var producer = CreateProducer(client, await
_fixture.CreateTopic(_cts.Token));
+ await producer.OnStateChangeTo(ProducerState.Connected, _cts.Token);
+
+ //Act
+ await using (await _fixture.DisableThePulsarConnection())
+ {
+ await producer.StateChangedTo(ProducerState.Disconnected,
_cts.Token);
+ }
+ await producer.OnStateChangeTo(ProducerState.Connected, _cts.Token);
+ var exception = await Record.ExceptionAsync(() =>
producer.Send("test", _cts.Token).AsTask());
+
+ //Assert
+ exception.Should().BeNull();
+ }
+
private static async Task<IEnumerable<MessageId>>
ProduceMessages(IProducer<string> producer, int numberOfMessages,
CancellationToken ct)
{
var messageIds = new MessageId[numberOfMessages];