This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 446ccd9  Adding connectivity tests for the producer using ToxiProxy
446ccd9 is described below

commit 446ccd98201af79a803fb005598908a206b5c5a3
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Thu Jan 4 13:22:45 2024 +0100

    Adding connectivity tests for the producer using ToxiProxy
---
 tests/DotPulsar.Tests/IntegrationFixture.cs     | 36 ++++++-------
 tests/DotPulsar.Tests/Internal/ProducerTests.cs | 69 +++++++++++++++++++++++++
 2 files changed, 85 insertions(+), 20 deletions(-)

diff --git a/tests/DotPulsar.Tests/IntegrationFixture.cs 
b/tests/DotPulsar.Tests/IntegrationFixture.cs
index 74827a9..9dec12f 100644
--- a/tests/DotPulsar.Tests/IntegrationFixture.cs
+++ b/tests/DotPulsar.Tests/IntegrationFixture.cs
@@ -19,7 +19,6 @@ using DotNet.Testcontainers.Containers;
 using DotNet.Testcontainers.Networks;
 using DotPulsar.Abstractions;
 using Toxiproxy.Net;
-using Toxiproxy.Net.Toxics;
 using Xunit.Abstractions;
 using Xunit.Sdk;
 
@@ -32,17 +31,13 @@ public class IntegrationFixture : IAsyncLifetime
     private const int ToxiProxyControlPort = 8474;
     private const int ToxiProxyPort = 15124;
     private readonly CancellationTokenSource _cts;
-
     private readonly IMessageSink _messageSink;
-
     private readonly INetwork _network;
     private readonly IContainer _pulsarCluster;
     private readonly IContainer _toxiProxy;
-
     private Client _toxiProxyClient;
     private Connection _toxiProxyConnection;
-    private Proxy _toxiProxylocalToPulsarCluster;
-
+    private Proxy _toxiProxyPulsarProxy;
     private string? _token;
 
     public IntegrationFixture(IMessageSink messageSink)
@@ -94,7 +89,7 @@ public class IntegrationFixture : IAsyncLifetime
         ServiceUrl = new 
Uri($"pulsar://{_pulsarCluster.Hostname}:{PulsarPort}");
         _toxiProxyConnection = new Connection();
         _toxiProxyClient = _toxiProxyConnection.Client();
-        _toxiProxylocalToPulsarCluster = new Proxy();
+        _toxiProxyPulsarProxy = new Proxy();
     }
 
     public Uri ServiceUrl { get; private set; }
@@ -119,17 +114,17 @@ public class IntegrationFixture : IAsyncLifetime
         _messageSink.OnMessage(new DiagnosticMessage("Starting Pulsar 
Cluster"));
         await _pulsarCluster.StartAsync(_cts.Token);
         _messageSink.OnMessage(new DiagnosticMessage("The containers has 
initiated. Next, we'll configure Toxiproxy mappings."));
-        _toxiProxyConnection = new Connection(_toxiProxy.Hostname, 
_toxiProxy.GetMappedPublicPort(ToxiProxyControlPort));
 
+        _toxiProxyConnection = new Connection(_toxiProxy.Hostname, 
_toxiProxy.GetMappedPublicPort(ToxiProxyControlPort));
         _toxiProxyClient = _toxiProxyConnection.Client();
-        _toxiProxylocalToPulsarCluster = new Proxy
+        _toxiProxyPulsarProxy = new Proxy
         {
             Name = "localToPulsarCluster",
             Enabled = true,
             Listen = $"{"0.0.0.0"}:{ToxiProxyPort}",
             Upstream = $"{"pulsar"}:{PulsarPort}"
         };
-        await _toxiProxyClient.AddAsync(_toxiProxylocalToPulsarCluster);
+        await _toxiProxyClient.AddAsync(_toxiProxyPulsarProxy);
 
         _messageSink.OnMessage(new DiagnosticMessage("Toxiproxy successfully 
mapped connections between host and the Pulsar Cluster."));
         ServiceUrl = new 
Uri($"pulsar://{_toxiProxy.Hostname}:{_toxiProxy.GetMappedPublicPort(ToxiProxyPort)}");
@@ -202,22 +197,23 @@ public class IntegrationFixture : IAsyncLifetime
             throw new Exception($"Could not create the partitioned topic: 
{result.Stderr}");
     }
 
-    public async Task SimulateDisconnect()
+    public async Task<IAsyncDisposable> DisableThePulsarConnection()
     {
-        var timeoutProxy = new TimeoutToxic();
-        timeoutProxy.Attributes.Timeout = 1000000000;
-        timeoutProxy.Toxicity = 1.0;
-        await _toxiProxylocalToPulsarCluster.AddAsync(timeoutProxy);
-        await _toxiProxylocalToPulsarCluster.UpdateAsync();
+        _toxiProxyPulsarProxy.Enabled = false;
+        await _toxiProxyPulsarProxy.UpdateAsync();
+        return new Releaser(_toxiProxyPulsarProxy);
     }
 
-    public async Task RemoveAllToxins()
+    private class Releaser : IAsyncDisposable
     {
-        var allToxicsAsync = await 
_toxiProxylocalToPulsarCluster.GetAllToxicsAsync();
+        private readonly Proxy _proxy;
+
+        public Releaser(Proxy proxy) => _proxy = proxy;
 
-        foreach (var toxicBase in allToxicsAsync)
+        public async ValueTask DisposeAsync()
         {
-            await 
_toxiProxylocalToPulsarCluster.RemoveToxicAsync(toxicBase.Name);
+            _proxy.Enabled = true;
+            await _proxy.UpdateAsync();
         }
     }
 }
diff --git a/tests/DotPulsar.Tests/Internal/ProducerTests.cs 
b/tests/DotPulsar.Tests/Internal/ProducerTests.cs
index 545849c..536f4a4 100644
--- a/tests/DotPulsar.Tests/Internal/ProducerTests.cs
+++ b/tests/DotPulsar.Tests/Internal/ProducerTests.cs
@@ -268,6 +268,75 @@ public sealed class ProducerTests : IDisposable
         foundNonNegativeOne.Should().Be(true);
     }
 
+    [Fact]
+    public async Task 
Connectivity_WhenConnectionIsInitiallyDown_ShouldBeAbleToDispose()
+    {
+        //Arrange
+        await using var connectionDown = await 
_fixture.DisableThePulsarConnection();
+        await using var client = CreateClient();
+        var producer = CreateProducer(client, await 
_fixture.CreateTopic(_cts.Token));
+
+        //Act
+        var exception = await Record.ExceptionAsync(() => 
producer.DisposeAsync().AsTask());
+
+        //Assert
+        exception.Should().BeNull();
+    }
+
+    [Fact]
+    public async Task 
Connectivity_WhenConnectionIsInitiallyDownAndComesUp_ShouldBeAbleToSend()
+    {
+        //Arrange
+        var connectionDown = await _fixture.DisableThePulsarConnection();
+        await using var client = CreateClient();
+        await using var producer = CreateProducer(client, await 
_fixture.CreateTopic(_cts.Token));
+
+        //Act
+        await connectionDown.DisposeAsync();
+        await producer.StateChangedTo(ProducerState.Connected, _cts.Token);
+        var exception = await Record.ExceptionAsync(() => 
producer.Send("test", _cts.Token).AsTask());
+
+        //Assert
+        exception.Should().BeNull();
+    }
+
+    [Fact]
+    public async Task 
Connectivity_WhenConnectionIsInitiallyUpAndGoesDown_ShouldBeAbleToDispose()
+    {
+        //Arrange
+        await using var client = CreateClient();
+        await using var producer = CreateProducer(client, await 
_fixture.CreateTopic(_cts.Token));
+        await producer.OnStateChangeTo(ProducerState.Connected, _cts.Token);
+
+        //Act
+        await using var connectionDown = await 
_fixture.DisableThePulsarConnection();
+        await producer.StateChangedTo(ProducerState.Disconnected, _cts.Token);
+        var exception = await Record.ExceptionAsync(() => 
producer.DisposeAsync().AsTask());
+
+        //Assert
+        exception.Should().BeNull();
+    }
+
+    [Fact]
+    public async Task 
Connectivity_WhenConnectionIsInitiallyUpAndReconnects_ShouldBeAbleToSend()
+    {
+        //Arrange
+        await using var client = CreateClient();
+        await using var producer = CreateProducer(client, await 
_fixture.CreateTopic(_cts.Token));
+        await producer.OnStateChangeTo(ProducerState.Connected, _cts.Token);
+
+        //Act
+        await using (await _fixture.DisableThePulsarConnection())
+        {
+            await producer.StateChangedTo(ProducerState.Disconnected, 
_cts.Token);
+        }
+        await producer.OnStateChangeTo(ProducerState.Connected, _cts.Token);
+        var exception = await Record.ExceptionAsync(() => 
producer.Send("test", _cts.Token).AsTask());
+
+        //Assert
+        exception.Should().BeNull();
+    }
+
     private static async Task<IEnumerable<MessageId>> 
ProduceMessages(IProducer<string> producer, int numberOfMessages, 
CancellationToken ct)
     {
         var messageIds = new MessageId[numberOfMessages];

Reply via email to