This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 80725e3 Add Toxiproxy to simulate errors in distributed systems (#198)
80725e3 is described below
commit 80725e3bc579b58818f9970aeb6a5fa5b8061746
Author: entvex <[email protected]>
AuthorDate: Wed Jan 3 14:06:18 2024 +0100
Add Toxiproxy to simulate errors in distributed systems (#198)
This commit adds Toxiproxy to our system to help us simulate common issues
in distributed systems such as fault tolerance and error recovery. This will
help us identify and fix issues before they become critical.
---
tests/DotPulsar.Tests/DotPulsar.Tests.csproj | 1 +
tests/DotPulsar.Tests/IntegrationFixture.cs | 123 +++++++++++++++++++++------
2 files changed, 98 insertions(+), 26 deletions(-)
diff --git a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
index 8946c9d..038df91 100644
--- a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
+++ b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
@@ -25,6 +25,7 @@
<PackageReference Include="NSubstitute" Version="5.1.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.8.0" />
<PackageReference Include="Testcontainers" Version="3.6.0" />
+ <PackageReference Include="ToxiproxyNetCore" Version="1.0.35" />
<PackageReference Include="xunit" Version="2.6.4" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.5.6">
<PrivateAssets>all</PrivateAssets>
diff --git a/tests/DotPulsar.Tests/IntegrationFixture.cs
b/tests/DotPulsar.Tests/IntegrationFixture.cs
index 001cd8a..74827a9 100644
--- a/tests/DotPulsar.Tests/IntegrationFixture.cs
+++ b/tests/DotPulsar.Tests/IntegrationFixture.cs
@@ -16,7 +16,10 @@ namespace DotPulsar.Tests;
using DotNet.Testcontainers.Builders;
using DotNet.Testcontainers.Containers;
+using DotNet.Testcontainers.Networks;
using DotPulsar.Abstractions;
+using Toxiproxy.Net;
+using Toxiproxy.Net.Toxics;
using Xunit.Abstractions;
using Xunit.Sdk;
@@ -25,11 +28,20 @@ public class IntegrationFixture : IAsyncLifetime
private const string AuthenticationPlugin =
"org.apache.pulsar.client.impl.auth.AuthenticationToken";
private const string SecretKeyPath = "/pulsar/secret.key";
private const string UserName = "test-user";
- private const int Port = 6650;
+ private const int PulsarPort = 6650;
+ private const int ToxiProxyControlPort = 8474;
+ private const int ToxiProxyPort = 15124;
private readonly CancellationTokenSource _cts;
private readonly IMessageSink _messageSink;
- private readonly IContainer _cluster;
+
+ private readonly INetwork _network;
+ private readonly IContainer _pulsarCluster;
+ private readonly IContainer _toxiProxy;
+
+ private Client _toxiProxyClient;
+ private Connection _toxiProxyConnection;
+ private Proxy _toxiProxylocalToPulsarCluster;
private string? _token;
@@ -38,7 +50,7 @@ public class IntegrationFixture : IAsyncLifetime
_messageSink = messageSink;
_cts = new CancellationTokenSource(TimeSpan.FromMinutes(10));
- var environmentVariables = new Dictionary<string, string>()
+ var environmentVariables = new Dictionary<string, string>
{
{ "PULSAR_PREFIX_tokenSecretKey", $"file://{SecretKeyPath}" },
{ "PULSAR_PREFIX_authenticationRefreshCheckSeconds", "5" },
@@ -57,15 +69,32 @@ public class IntegrationFixture : IAsyncLifetime
$"export
CLIENT_PREFIX_authParams=$brokerClientAuthenticationParameters &&
bin/apply-config-from-env.py conf/standalone.conf && " +
$"bin/apply-config-from-env-with-prefix.py CLIENT_PREFIX_
conf/client.conf && bin/pulsar standalone --no-functions-worker";
- _cluster = new ContainerBuilder()
+ _network = new NetworkBuilder()
+ .WithName(Guid.NewGuid().ToString("D"))
+ .Build();
+
+ _toxiProxy = new ContainerBuilder()
+ .WithImage("ghcr.io/shopify/toxiproxy:2.7.0")
+ .WithPortBinding(ToxiProxyControlPort, true)
+ .WithPortBinding(ToxiProxyPort, true)
+ .WithHostname("toxiproxy")
+ .WithNetwork(_network)
+
.WithWaitStrategy(Wait.ForUnixContainer().UntilHttpRequestIsSucceeded(strategy
=> strategy.ForPath("/version").ForPort(ToxiProxyControlPort)))
+ .Build();
+
+ _pulsarCluster = new ContainerBuilder()
.WithImage("apachepulsar/pulsar:3.1.1")
.WithEnvironment(environmentVariables)
- .WithPortBinding(Port, true)
+ .WithHostname("pulsar")
+ .WithNetwork(_network)
.WithWaitStrategy(Wait.ForUnixContainer().UntilCommandIsCompleted(["/bin/bash",
"-c", "bin/pulsar-admin clusters list"]))
.WithCommand("/bin/bash", "-c", arguments)
.Build();
- ServiceUrl = new Uri($"pulsar://{_cluster.Hostname}:{Port}");
+ ServiceUrl = new
Uri($"pulsar://{_pulsarCluster.Hostname}:{PulsarPort}");
+ _toxiProxyConnection = new Connection();
+ _toxiProxyClient = _toxiProxyConnection.Client();
+ _toxiProxylocalToPulsarCluster = new Proxy();
}
public Uri ServiceUrl { get; private set; }
@@ -74,31 +103,54 @@ public class IntegrationFixture : IAsyncLifetime
public async Task DisposeAsync()
{
- await _cluster.DisposeAsync();
+ await _pulsarCluster.DisposeAsync();
+ await _toxiProxy.DisposeAsync();
+ _toxiProxyConnection.Dispose();
_cts.Dispose();
}
- private void HandleClusterStateChange(string state) =>
- _messageSink.OnMessage(new DiagnosticMessage($"The Pulsar cluster
changed state to: {state}"));
-
public async Task InitializeAsync()
{
- _cluster.Created += (_, _) => HandleClusterStateChange("Created");
- _cluster.Creating += (_, _) => HandleClusterStateChange("Creating");
- _cluster.Started += (_, _) => HandleClusterStateChange("Started");
- _cluster.Starting += (_, _) => HandleClusterStateChange("Starting");
- _cluster.Stopped += (_, _) => HandleClusterStateChange("Stopped");
- _cluster.Stopping += (_, _) => HandleClusterStateChange("Stopping");
-
- _messageSink.OnMessage(new DiagnosticMessage("Starting container
service"));
- await _cluster.StartAsync(_cts.Token);
- _messageSink.OnMessage(new DiagnosticMessage("The container service
has initiated. Next, we'll retrieve the endpoint."));
- var endpoint = _cluster.GetMappedPublicPort(Port);
- _messageSink.OnMessage(new DiagnosticMessage($"Endpoint opened at
{endpoint}"));
- ServiceUrl = new Uri($"pulsar://{_cluster.Hostname}:{endpoint}");
+ SubscribeToContainerEvents(_toxiProxy, "Toxiproxy");
+ SubscribeToContainerEvents(_pulsarCluster, "Pulsar cluster");
+ await _network.CreateAsync(_cts.Token);
+ _messageSink.OnMessage(new DiagnosticMessage("Starting Toxiproxy"));
+ await _toxiProxy.StartAsync(_cts.Token);
+ _messageSink.OnMessage(new DiagnosticMessage("Starting Pulsar
Cluster"));
+ await _pulsarCluster.StartAsync(_cts.Token);
+ _messageSink.OnMessage(new DiagnosticMessage("The containers has
initiated. Next, we'll configure Toxiproxy mappings."));
+ _toxiProxyConnection = new Connection(_toxiProxy.Hostname,
_toxiProxy.GetMappedPublicPort(ToxiProxyControlPort));
+
+ _toxiProxyClient = _toxiProxyConnection.Client();
+ _toxiProxylocalToPulsarCluster = new Proxy
+ {
+ Name = "localToPulsarCluster",
+ Enabled = true,
+ Listen = $"{"0.0.0.0"}:{ToxiProxyPort}",
+ Upstream = $"{"pulsar"}:{PulsarPort}"
+ };
+ await _toxiProxyClient.AddAsync(_toxiProxylocalToPulsarCluster);
+
+ _messageSink.OnMessage(new DiagnosticMessage("Toxiproxy successfully
mapped connections between host and the Pulsar Cluster."));
+ ServiceUrl = new
Uri($"pulsar://{_toxiProxy.Hostname}:{_toxiProxy.GetMappedPublicPort(ToxiProxyPort)}");
+ _messageSink.OnMessage(new DiagnosticMessage("You can connect with: "
+ ServiceUrl));
+
_token = await CreateToken(Timeout.InfiniteTimeSpan, _cts.Token);
}
+ private void HandleClusterStateChange(string containerName, string state)
=>
+ _messageSink.OnMessage(new DiagnosticMessage($"The {containerName}
changed state to: {state}"));
+
+ private void SubscribeToContainerEvents(IContainer container, string
containerName)
+ {
+ container.Created += (_, _) => HandleClusterStateChange(containerName,
"Created");
+ container.Creating += (_, _) =>
HandleClusterStateChange(containerName, "Creating");
+ container.Started += (_, _) => HandleClusterStateChange(containerName,
"Started");
+ container.Starting += (_, _) =>
HandleClusterStateChange(containerName, "Starting");
+ container.Stopped += (_, _) => HandleClusterStateChange(containerName,
"Stopped");
+ container.Stopping += (_, _) =>
HandleClusterStateChange(containerName, "Stopping");
+ }
+
public async Task<string> CreateToken(TimeSpan expiryTime,
CancellationToken cancellationToken)
{
var arguments = $"bin/pulsar tokens create --secret-key
{SecretKeyPath} --subject {UserName}";
@@ -106,7 +158,7 @@ public class IntegrationFixture : IAsyncLifetime
if (expiryTime != Timeout.InfiniteTimeSpan)
arguments += $" --expiry-time {expiryTime.TotalSeconds}s";
- var result = await _cluster.ExecAsync(new[] { "/bin/bash", "-c",
arguments }, cancellationToken);
+ var result = await _pulsarCluster.ExecAsync(new[] { "/bin/bash", "-c",
arguments }, cancellationToken);
if (result.ExitCode != 0)
throw new InvalidOperationException($"Could not create the token:
{result.Stderr}");
@@ -127,7 +179,7 @@ public class IntegrationFixture : IAsyncLifetime
{
var arguments = $"bin/pulsar-admin topics create {topic}";
- var result = await _cluster.ExecAsync(new[] { "/bin/bash", "-c",
arguments }, cancellationToken);
+ var result = await _pulsarCluster.ExecAsync(new[] { "/bin/bash", "-c",
arguments }, cancellationToken);
if (result.ExitCode != 0)
throw new Exception($"Could not create the topic:
{result.Stderr}");
@@ -144,9 +196,28 @@ public class IntegrationFixture : IAsyncLifetime
{
var arguments = $"bin/pulsar-admin topics create-partitioned-topic
{topic} -p {numberOfPartitions}";
- var result = await _cluster.ExecAsync(new[] { "/bin/bash", "-c",
arguments }, cancellationToken);
+ var result = await _pulsarCluster.ExecAsync(new[] { "/bin/bash", "-c",
arguments }, cancellationToken);
if (result.ExitCode != 0)
throw new Exception($"Could not create the partitioned topic:
{result.Stderr}");
}
+
+ public async Task SimulateDisconnect()
+ {
+ var timeoutProxy = new TimeoutToxic();
+ timeoutProxy.Attributes.Timeout = 1000000000;
+ timeoutProxy.Toxicity = 1.0;
+ await _toxiProxylocalToPulsarCluster.AddAsync(timeoutProxy);
+ await _toxiProxylocalToPulsarCluster.UpdateAsync();
+ }
+
+ public async Task RemoveAllToxins()
+ {
+ var allToxicsAsync = await
_toxiProxylocalToPulsarCluster.GetAllToxicsAsync();
+
+ foreach (var toxicBase in allToxicsAsync)
+ {
+ await
_toxiProxylocalToPulsarCluster.RemoveToxicAsync(toxicBase.Name);
+ }
+ }
}