This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 860edcd  Add token factory support, respond to server auth challenge 
on token refresh (#95)
860edcd is described below

commit 860edcd410a6c0c8c684e5a3234b10f1022d7840
Author: goldenccargill <[email protected]>
AuthorDate: Sat Jan 22 18:07:10 2022 +0100

    Add token factory support, respond to server auth challenge on token 
refresh (#95)
    
    * get pulsar manager running in docker compose
    
    * finally have a test env
    
    * extract separate test for token refresh
    
    * rollback change
    
    * add more test cases
    
    * fix docker issues
---
 src/DotPulsar/Abstractions/IPulsarClientBuilder.cs |   6 +
 src/DotPulsar/Internal/Connection.cs               |  26 +++-
 src/DotPulsar/Internal/ConnectionPool.cs           |  38 +++--
 src/DotPulsar/Internal/DefaultExceptionHandler.cs  |   1 +
 src/DotPulsar/Internal/DotPulsarEventSource.cs     |  18 +++
 .../Exceptions/TokenFactoryFailedException.cs      |  10 ++
 .../Internal/Extensions/CommandExtensions.cs       |   7 +
 src/DotPulsar/Internal/PulsarClientBuilder.cs      |  26 +++-
 src/DotPulsar/Internal/TokenFactoryExtensions.cs   |  26 ++++
 .../DotPulsar.IntegrationTests.csproj              |   6 +
 .../Fixtures/StandaloneClusterFixture.cs           |  10 +-
 .../Fixtures/StandaloneTokenClusterTest.cs         |   6 +
 .../Fixtures/TokenClusterFixture.cs                |  88 ++++++++++++
 .../ProcessAsyncHelper.cs                          | 127 +++++++++++++++++
 .../Services/PulsarServiceBase.cs                  |  32 +++--
 .../Services/ServiceFactory.cs                     |   7 +-
 .../Services/StandaloneContainerService.cs         |  41 ++----
 .../Services/StandaloneExternalService.cs          |   5 +
 .../TokenRefreshTests.cs                           | 156 +++++++++++++++++++++
 .../appdata/my-secret.key                          |   2 +
 .../docker-compose-standalone-tests.yml            |   7 +
 .../docker-compose-standalone-token-tests.yml      |  34 +++++
 .../DotPulsar.StressTests.csproj                   |   1 +
 .../Fixtures/StandaloneClusterFixture.cs           |  55 ++++----
 tests/docker-compose-standalone-tests.yml          |   9 ++
 25 files changed, 652 insertions(+), 92 deletions(-)

diff --git a/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs 
b/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs
index 7ef079e..ea2ab5b 100644
--- a/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs
+++ b/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs
@@ -16,6 +16,7 @@ namespace DotPulsar.Abstractions;
 
 using System;
 using System.Security.Cryptography.X509Certificates;
+using System.Threading.Tasks;
 
 /// <summary>
 /// A pulsar client building abstraction.
@@ -33,6 +34,11 @@ public interface IPulsarClientBuilder
     IPulsarClientBuilder AuthenticateUsingToken(string token);
 
     /// <summary>
+    /// Authenticate using a (JSON Web) token factory. This is optional.
+    /// </summary>
+    IPulsarClientBuilder AuthenticateUsingToken(Func<Task<string>> 
tokenFactory);
+
+    /// <summary>
     /// Set connection encryption policy. The default is 'EnforceUnencrypted' 
if the ServiceUrl scheme is 'pulsar' and 'EnforceEncrypted' if it's 
'pulsar+ssl'.
     /// </summary>
     IPulsarClientBuilder ConnectionSecurity(EncryptionPolicy encryptionPolicy);
diff --git a/src/DotPulsar/Internal/Connection.cs 
b/src/DotPulsar/Internal/Connection.cs
index 1ee72e3..60c6433 100644
--- a/src/DotPulsar/Internal/Connection.cs
+++ b/src/DotPulsar/Internal/Connection.cs
@@ -20,6 +20,8 @@ using Extensions;
 using PulsarApi;
 using System;
 using System.Buffers;
+using System.Diagnostics;
+using System.Text;
 using System.Threading;
 using System.Threading.Tasks;
 
@@ -29,14 +31,16 @@ public sealed class Connection : IConnection
     private readonly ChannelManager _channelManager;
     private readonly PingPongHandler _pingPongHandler;
     private readonly IPulsarStream _stream;
+    private readonly Func<Task<string>>? _accessTokenFactory;
     private int _isDisposed;
 
-    public Connection(IPulsarStream stream, TimeSpan keepAliveInterval)
+    public Connection(IPulsarStream stream, TimeSpan keepAliveInterval, 
Func<Task<string>>? accessTokenFactory)
     {
         _lock = new AsyncLock();
         _channelManager = new ChannelManager();
         _pingPongHandler = new PingPongHandler(this, keepAliveInterval);
         _stream = stream;
+        _accessTokenFactory = accessTokenFactory;
     }
 
     public async ValueTask<bool> HasChannels(CancellationToken 
cancellationToken)
@@ -81,6 +85,9 @@ public sealed class Connection : IConnection
         return await responseTask.ConfigureAwait(false);
     }
 
+    private Task Send(CommandAuthResponse authResponse, CancellationToken 
cancellationToken)
+        => Send(authResponse.AsBaseCommand(), cancellationToken);
+
     public Task Send(CommandPing command, CancellationToken cancellationToken)
         => Send(command.AsBaseCommand(), cancellationToken);
 
@@ -268,13 +275,13 @@ public sealed class Connection : IConnection
         }
     }
 
-    public async Task ProcessIncommingFrames()
+    public async Task ProcessIncommingFrames(CancellationToken 
cancellationToken)
     {
         await Task.Yield();
 
         try
         {
-            await foreach (var frame in _stream.Frames())
+            await foreach (var frame in _stream.Frames(cancellationToken))
             {
                 var commandSize = frame.ReadUInt32(0, true);
                 var command = 
Serializer.Deserialize<BaseCommand>(frame.Slice(4, commandSize));
@@ -285,7 +292,18 @@ public sealed class Connection : IConnection
                 if (command.CommandType == BaseCommand.Type.Message)
                     _channelManager.Incoming(command.Message, new 
ReadOnlySequence<byte>(frame.Slice(commandSize + 4).ToArray()));
                 else
-                    _channelManager.Incoming(command);
+                {
+                    if (_accessTokenFactory != null && command.CommandType == 
BaseCommand.Type.AuthChallenge)
+                    {
+                        var token = await _accessTokenFactory.GetToken();
+                        await Send(new CommandAuthResponse { Response = new 
AuthData { Data = Encoding.UTF8.GetBytes(token), AuthMethodName = "token" } }, 
cancellationToken);
+                        DotPulsarEventSource.Log.TokenRefreshed();
+                    }
+                    else
+                    {
+                        _channelManager.Incoming(command);
+                    }
+                }
             }
         }
         catch
diff --git a/src/DotPulsar/Internal/ConnectionPool.cs 
b/src/DotPulsar/Internal/ConnectionPool.cs
index 14ab85f..9e5dc2f 100644
--- a/src/DotPulsar/Internal/ConnectionPool.cs
+++ b/src/DotPulsar/Internal/ConnectionPool.cs
@@ -21,6 +21,7 @@ using PulsarApi;
 using System;
 using System.Collections.Concurrent;
 using System.Linq;
+using System.Text;
 using System.Threading;
 using System.Threading.Tasks;
 
@@ -36,6 +37,8 @@ public sealed class ConnectionPool : IConnectionPool
     private readonly Task _closeInactiveConnections;
     private readonly string? _listenerName;
     private readonly TimeSpan _keepAliveInterval;
+    private readonly IExecute _executor;
+    private readonly Func<Task<string>>? _accessTokenFactory;
 
     public ConnectionPool(
         CommandConnect commandConnect,
@@ -44,7 +47,9 @@ public sealed class ConnectionPool : IConnectionPool
         EncryptionPolicy encryptionPolicy,
         TimeSpan closeInactiveConnectionsInterval,
         string? listenerName,
-        TimeSpan keepAliveInterval)
+        TimeSpan keepAliveInterval,
+        IExecute executor,
+        Func<Task<string>>? accessTokenFactory)
     {
         _lock = new AsyncLock();
         _commandConnect = commandConnect;
@@ -56,6 +61,8 @@ public sealed class ConnectionPool : IConnectionPool
         _cancellationTokenSource = new CancellationTokenSource();
         _closeInactiveConnections = 
CloseInactiveConnections(closeInactiveConnectionsInterval, 
_cancellationTokenSource.Token);
         _keepAliveInterval = keepAliveInterval;
+        _executor = executor;
+        _accessTokenFactory = accessTokenFactory;
     }
 
     public async ValueTask DisposeAsync()
@@ -105,6 +112,7 @@ public sealed class ConnectionPool : IConnectionPool
 
             if (response.LookupTopicResponse.ProxyThroughServiceUrl)
             {
+                await connection.DisposeAsync();
                 var url = new PulsarUrl(physicalUrl, lookupResponseServiceUrl);
                 return await GetConnection(url, 
cancellationToken).ConfigureAwait(false);
             }
@@ -157,14 +165,21 @@ public sealed class ConnectionPool : IConnectionPool
     private async Task<Connection> EstablishNewConnection(PulsarUrl url, 
CancellationToken cancellationToken)
     {
         var stream = await 
_connector.Connect(url.Physical).ConfigureAwait(false);
-        var connection = new Connection(new PulsarStream(stream), 
_keepAliveInterval);
+        var connection = new Connection(new PulsarStream(stream), 
_keepAliveInterval, _accessTokenFactory);
         DotPulsarEventSource.Log.ConnectionCreated();
         _connections[url] = connection;
-        _ = connection.ProcessIncommingFrames().ContinueWith(t => 
DisposeConnection(url));
+        _ = 
connection.ProcessIncommingFrames(cancellationToken).ContinueWith(t => 
DisposeConnection(url));
         var commandConnect = _commandConnect;
 
+        if (_accessTokenFactory != null)
+        {
+            var token = await _accessTokenFactory.GetToken(_executor);
+            commandConnect.AuthMethodName = "token";
+            commandConnect.AuthData = Encoding.UTF8.GetBytes(token);
+        }
+
         if (url.ProxyThroughServiceUrl)
-            commandConnect = WithProxyToBroker(_commandConnect, url.Logical);
+            commandConnect = WithProxyToBroker(commandConnect, url.Logical);
 
         var response = await connection.Send(commandConnect, 
cancellationToken).ConfigureAwait(false);
         response.Expect(BaseCommand.Type.Connected);
@@ -184,15 +199,16 @@ public sealed class ConnectionPool : IConnectionPool
     {
         return new CommandConnect
         {
-            AuthData = commandConnect.AuthData,
-            AuthMethod = commandConnect.AuthMethod,
-            AuthMethodName = commandConnect.AuthMethodName,
+            AuthData = commandConnect.ShouldSerializeAuthData() ? 
commandConnect.AuthData : null,
+            AuthMethod = commandConnect.ShouldSerializeAuthMethod() ? 
commandConnect.AuthMethod : AuthMethod.AuthMethodNone,
+            AuthMethodName = commandConnect.ShouldSerializeAuthMethodName() ? 
commandConnect.AuthMethodName : null,
             ClientVersion = commandConnect.ClientVersion,
-            OriginalPrincipal = commandConnect.OriginalPrincipal,
+            OriginalPrincipal = 
commandConnect.ShouldSerializeOriginalPrincipal() ? 
commandConnect.OriginalPrincipal : null,
             ProtocolVersion = commandConnect.ProtocolVersion,
-            OriginalAuthData = commandConnect.OriginalAuthData,
-            OriginalAuthMethod = commandConnect.OriginalAuthMethod,
-            ProxyToBrokerUrl = $"{logicalUrl.Host}:{logicalUrl.Port}"
+            OriginalAuthData = 
commandConnect.ShouldSerializeOriginalAuthData() ? 
commandConnect.OriginalAuthData : null,
+            OriginalAuthMethod = 
commandConnect.ShouldSerializeOriginalAuthMethod() ? 
commandConnect.OriginalAuthMethod : null,
+            ProxyToBrokerUrl = $"{logicalUrl.Host}:{logicalUrl.Port}",
+            FeatureFlags = commandConnect.FeatureFlags
         };
     }
 
diff --git a/src/DotPulsar/Internal/DefaultExceptionHandler.cs 
b/src/DotPulsar/Internal/DefaultExceptionHandler.cs
index a42eeef..019340b 100644
--- a/src/DotPulsar/Internal/DefaultExceptionHandler.cs
+++ b/src/DotPulsar/Internal/DefaultExceptionHandler.cs
@@ -62,6 +62,7 @@ public sealed class DefaultExceptionHandler : IHandleException
                 SocketError.NetworkUnreachable => FaultAction.Rethrow,
                 _ => FaultAction.Retry
             },
+            TokenFactoryFailedException => FaultAction.Retry,
             _ => FaultAction.Rethrow
         };
 }
diff --git a/src/DotPulsar/Internal/DotPulsarEventSource.cs 
b/src/DotPulsar/Internal/DotPulsarEventSource.cs
index 5547b07..aaccbc4 100644
--- a/src/DotPulsar/Internal/DotPulsarEventSource.cs
+++ b/src/DotPulsar/Internal/DotPulsarEventSource.cs
@@ -38,6 +38,10 @@ public sealed class DotPulsarEventSource
     public void ReaderCreated() { }
 
     public void ReaderDisposed() { }
+
+    public void TokenRefreshed() { }
+
+    public long TokenRefreshCount => 0;
 }
 
 #else
@@ -76,11 +80,15 @@ public sealed class DotPulsarEventSource : EventSource
 
     private PollingCounter? _currentReadersCounter;
     private long _currentReaders;
+
+    private PollingCounter? _tokenRefreshCounter;
+    private long _totalTokenRefreshes;
 #pragma warning restore IDE0052 // Remove unread private members
 
     public static readonly DotPulsarEventSource Log = new();
 
     public DotPulsarEventSource() : base("DotPulsar") { }
+    public long TokenRefreshCount => _totalTokenRefreshes;
 
     public void ClientCreated()
     {
@@ -137,6 +145,11 @@ public sealed class DotPulsarEventSource : EventSource
         Interlocked.Decrement(ref _currentReaders);
     }
 
+    public void TokenRefreshed()
+    {
+        Interlocked.Increment(ref _totalTokenRefreshes);
+    }
+
     protected override void OnEventCommand(EventCommandEventArgs command)
     {
         if (command.Command != EventCommand.Enable)
@@ -191,6 +204,11 @@ public sealed class DotPulsarEventSource : EventSource
         {
             DisplayName = "Current number of readers"
         };
+
+        _tokenRefreshCounter ??= new PollingCounter("total-token-refreshes", 
this, () => Volatile.Read(ref _totalTokenRefreshes))
+        {
+            DisplayName = "Number of times token was refreshed"
+        };
     }
 }
 #endif
diff --git a/src/DotPulsar/Internal/Exceptions/TokenFactoryFailedException.cs 
b/src/DotPulsar/Internal/Exceptions/TokenFactoryFailedException.cs
new file mode 100644
index 0000000..7dee344
--- /dev/null
+++ b/src/DotPulsar/Internal/Exceptions/TokenFactoryFailedException.cs
@@ -0,0 +1,10 @@
+namespace DotPulsar.Internal.Exceptions;
+
+using System;
+
+public class TokenFactoryFailedException : Exception
+{
+    public TokenFactoryFailedException(Exception innerException) : 
base("Exception when trying to fetch token from token factory", innerException)
+    {
+    }
+}
diff --git a/src/DotPulsar/Internal/Extensions/CommandExtensions.cs 
b/src/DotPulsar/Internal/Extensions/CommandExtensions.cs
index a17e313..1853398 100644
--- a/src/DotPulsar/Internal/Extensions/CommandExtensions.cs
+++ b/src/DotPulsar/Internal/Extensions/CommandExtensions.cs
@@ -100,6 +100,13 @@ public static class CommandExtensions
             Ping = command
         };
 
+    public static BaseCommand AsBaseCommand(this CommandAuthResponse command)
+        => new()
+        {
+            CommandType = BaseCommand.Type.AuthResponse,
+            AuthResponse = command
+        };
+
     public static BaseCommand AsBaseCommand(this CommandPong command)
         => new()
         {
diff --git a/src/DotPulsar/Internal/PulsarClientBuilder.cs 
b/src/DotPulsar/Internal/PulsarClientBuilder.cs
index fa06e04..ddd1a52 100644
--- a/src/DotPulsar/Internal/PulsarClientBuilder.cs
+++ b/src/DotPulsar/Internal/PulsarClientBuilder.cs
@@ -14,6 +14,7 @@
 
 namespace DotPulsar.Internal;
 
+using Abstractions;
 using DotPulsar.Abstractions;
 using DotPulsar.Exceptions;
 using PulsarApi;
@@ -21,6 +22,7 @@ using System;
 using System.Collections.Generic;
 using System.Security.Cryptography.X509Certificates;
 using System.Text;
+using System.Threading.Tasks;
 
 public sealed class PulsarClientBuilder : IPulsarClientBuilder
 {
@@ -36,6 +38,7 @@ public sealed class PulsarClientBuilder : IPulsarClientBuilder
     private bool _verifyCertificateAuthority;
     private bool _verifyCertificateName;
     private TimeSpan _closeInactiveConnectionsInterval;
+    private Func<Task<string>>? _tokenFactory;
 
     public PulsarClientBuilder()
     {
@@ -69,6 +72,15 @@ public sealed class PulsarClientBuilder : 
IPulsarClientBuilder
         return this;
     }
 
+    public IPulsarClientBuilder AuthenticateUsingToken(Func<Task<string>> 
tokenFactory)
+    {
+        _tokenFactory = tokenFactory;
+        var featureFlags = _commandConnect.FeatureFlags ?? new FeatureFlags();
+        featureFlags.SupportsAuthRefresh = true;
+        _commandConnect.FeatureFlags = featureFlags;
+        return this;
+    }
+
     public IPulsarClientBuilder ConnectionSecurity(EncryptionPolicy 
encryptionPolicy)
     {
         _encryptionPolicy = encryptionPolicy;
@@ -152,13 +164,21 @@ public sealed class PulsarClientBuilder : 
IPulsarClientBuilder
         else
             throw new InvalidSchemeException($"Invalid scheme '{scheme}'. 
Expected '{Constants.PulsarScheme}' or '{Constants.PulsarSslScheme}'");
 
-
         var connector = new Connector(_clientCertificates, 
_trustedCertificateAuthority, _verifyCertificateAuthority, 
_verifyCertificateName);
-        var connectionPool = new ConnectionPool(_commandConnect, _serviceUrl, 
connector, _encryptionPolicy.Value, _closeInactiveConnectionsInterval, 
_listenerName, _keepAliveInterval);
-        var processManager = new ProcessManager(connectionPool);
+
         var exceptionHandlers = new List<IHandleException>(_exceptionHandlers) 
{ new DefaultExceptionHandler(_retryInterval) };
         var exceptionHandlerPipeline = new 
ExceptionHandlerPipeline(exceptionHandlers);
+        var connectionPool = new ConnectionPool(_commandConnect, _serviceUrl, 
connector, _encryptionPolicy.Value, _closeInactiveConnectionsInterval, 
_listenerName,
+            _keepAliveInterval,
+            new Executor(Guid.Empty, new EmptyRegisterEvent(), 
exceptionHandlerPipeline),
+            _tokenFactory);
+        var processManager = new ProcessManager(connectionPool);
 
         return new PulsarClient(connectionPool, processManager, 
exceptionHandlerPipeline, _serviceUrl);
     }
 }
+
+internal class EmptyRegisterEvent : IRegisterEvent
+{
+    public void Register(IEvent @event) { }
+}
diff --git a/src/DotPulsar/Internal/TokenFactoryExtensions.cs 
b/src/DotPulsar/Internal/TokenFactoryExtensions.cs
new file mode 100644
index 0000000..3aa26bf
--- /dev/null
+++ b/src/DotPulsar/Internal/TokenFactoryExtensions.cs
@@ -0,0 +1,26 @@
+namespace DotPulsar.Internal;
+
+using Abstractions;
+using Exceptions;
+using System;
+using System.Threading.Tasks;
+
+internal static class TokenFactoryExtensions
+{
+    public static async Task<string> GetToken(this Func<Task<string>> 
tokenFactory, IExecute executor)
+    {
+        return await executor.Execute(tokenFactory.GetToken);
+    }
+
+    public static async Task<string> GetToken(this Func<Task<string>> 
tokenFactory)
+    {
+        try
+        {
+            return await tokenFactory();
+        }
+        catch (Exception e)
+        {
+            throw new TokenFactoryFailedException(e);
+        }
+    }
+}
diff --git a/tests/DotPulsar.IntegrationTests/DotPulsar.IntegrationTests.csproj 
b/tests/DotPulsar.IntegrationTests/DotPulsar.IntegrationTests.csproj
index c65fb53..c1997b6 100644
--- a/tests/DotPulsar.IntegrationTests/DotPulsar.IntegrationTests.csproj
+++ b/tests/DotPulsar.IntegrationTests/DotPulsar.IntegrationTests.csproj
@@ -28,6 +28,12 @@
       <None Update="docker-compose-standalone-tests.yml">
         <CopyToOutputDirectory>Always</CopyToOutputDirectory>
       </None>
+      <None Update="docker-compose-standalone-token-tests.yml">
+        <CopyToOutputDirectory>Always</CopyToOutputDirectory>
+      </None>
+      <None Update="appdata\my-secret.key">
+        <CopyToOutputDirectory>Always</CopyToOutputDirectory>
+      </None>
     </ItemGroup>
 
 </Project>
diff --git 
a/tests/DotPulsar.IntegrationTests/Fixtures/StandaloneClusterFixture.cs 
b/tests/DotPulsar.IntegrationTests/Fixtures/StandaloneClusterFixture.cs
index c3e8d9d..eeb3090 100644
--- a/tests/DotPulsar.IntegrationTests/Fixtures/StandaloneClusterFixture.cs
+++ b/tests/DotPulsar.IntegrationTests/Fixtures/StandaloneClusterFixture.cs
@@ -18,14 +18,22 @@ using Abstraction;
 using Services;
 using System.Threading.Tasks;
 using Xunit;
+using Xunit.Abstractions;
 
 public class StandaloneClusterFixture : IAsyncLifetime
 {
+    private readonly IMessageSink _messageSink;
+
+    public StandaloneClusterFixture(IMessageSink messageSink)
+    {
+        _messageSink = messageSink;
+    }
+
     public IPulsarService? PulsarService { private set; get; }
 
     public async Task InitializeAsync()
     {
-        PulsarService = ServiceFactory.CreatePulsarService();
+        PulsarService = ServiceFactory.CreatePulsarService(_messageSink);
         await PulsarService.InitializeAsync();
     }
 
diff --git 
a/tests/DotPulsar.IntegrationTests/Fixtures/StandaloneTokenClusterTest.cs 
b/tests/DotPulsar.IntegrationTests/Fixtures/StandaloneTokenClusterTest.cs
new file mode 100644
index 0000000..cb27b93
--- /dev/null
+++ b/tests/DotPulsar.IntegrationTests/Fixtures/StandaloneTokenClusterTest.cs
@@ -0,0 +1,6 @@
+namespace DotPulsar.IntegrationTests.Fixtures;
+
+using Xunit;
+
+[CollectionDefinition(nameof(StandaloneTokenClusterTest))]
+public class StandaloneTokenClusterTest : 
ICollectionFixture<TokenClusterFixture> { }
diff --git a/tests/DotPulsar.IntegrationTests/Fixtures/TokenClusterFixture.cs 
b/tests/DotPulsar.IntegrationTests/Fixtures/TokenClusterFixture.cs
new file mode 100644
index 0000000..3c57911
--- /dev/null
+++ b/tests/DotPulsar.IntegrationTests/Fixtures/TokenClusterFixture.cs
@@ -0,0 +1,88 @@
+namespace DotPulsar.IntegrationTests.Fixtures;
+
+using Abstraction;
+using Services;
+using System;
+using System.Net.Http;
+using System.Net.Http.Headers;
+using System.Threading.Tasks;
+using Xunit.Abstractions;
+using Xunit.Sdk;
+
+public class TokenClusterFixture : PulsarServiceBase
+{
+    private readonly IMessageSink _messageSink;
+
+    public TokenClusterFixture(IMessageSink messageSink) : base(messageSink)
+    {
+        _messageSink = messageSink;
+    }
+
+    public IPulsarService PulsarService => this;
+
+    public override async Task InitializeAsync()
+    {
+        await TakeDownPulsar(); // clean-up if anything was left running from 
previous run
+
+        await ProcessAsyncHelper.ExecuteShellCommand("docker-compose", "-f 
docker-compose-standalone-token-tests.yml up -d")
+            .ThrowOnFailure();
+
+        var waitTries = 10;
+
+        using var handler = new HttpClientHandler { AllowAutoRedirect = true };
+
+        using var client = new HttpClient(handler);
+
+        var token = await GetAuthToken(false);
+
+        client.DefaultRequestHeaders.Authorization = new 
AuthenticationHeaderValue("Bearer", token);
+
+        while (waitTries > 0)
+        {
+            try
+            {
+                await 
client.GetAsync($"{PulsarService.GetWebServiceUri()}/metrics/").ConfigureAwait(false);
+                return;
+            }
+            catch(Exception e)
+            {
+                _messageSink.OnMessage(new DiagnosticMessage("Error trying to 
fetch metrics: {0}", e));
+                waitTries--;
+                await Task.Delay(5000).ConfigureAwait(false);
+            }
+        }
+
+        throw new Exception("Unable to confirm Pulsar has initialized");
+    }
+
+    protected override async Task OnDispose()
+        => await TakeDownPulsar();
+
+    public override Uri GetBrokerUri() => new("pulsar://localhost:54547");
+
+    public override Uri GetWebServiceUri() => new("http://localhost:54548";);
+
+    private Task TakeDownPulsar()
+        => ProcessAsyncHelper.ExecuteShellCommand("docker-compose", "-f 
docker-compose-standalone-token-tests.yml down")
+            .LogFailure(s => MessageSink.OnMessage(new 
DiagnosticMessage("Error bringing down container: {0}", s)));
+
+    public static async Task<string> GetAuthToken(bool includeExpiry)
+    {
+        var arguments = "exec pulsar-tokens bin/pulsar tokens create 
--secret-key file:///appdata/my-secret.key --subject test-user";
+
+        if (includeExpiry)
+        {
+            arguments += " --expiry-time 10s";
+        }
+
+        var result = await ProcessAsyncHelper.ExecuteShellCommand("docker",
+            arguments);
+
+        if (!result.Completed)
+        {
+            throw new InvalidOperationException($"Getting token from container 
failed{Environment.NewLine}{result.Output}");
+        }
+
+        return result.Output;
+    }
+}
diff --git a/tests/DotPulsar.IntegrationTests/ProcessAsyncHelper.cs 
b/tests/DotPulsar.IntegrationTests/ProcessAsyncHelper.cs
new file mode 100644
index 0000000..56c00b8
--- /dev/null
+++ b/tests/DotPulsar.IntegrationTests/ProcessAsyncHelper.cs
@@ -0,0 +1,127 @@
+using System;
+using System.Diagnostics;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace DotPulsar.IntegrationTests;
+
+public static class ProcessAsyncHelper
+{
+    public static async Task ThrowOnFailure(this Task<ProcessResult> 
resultTask)
+    {
+        var result = await resultTask;
+
+        if (!result.Completed)
+        {
+            throw new InvalidOperationException($"Process did not complete 
correctly, {Environment.NewLine}{result.Output}");
+        }
+    }
+
+    public static async Task LogFailure(this Task<ProcessResult> resultTask, 
Action<string> logAction)
+    {
+        var result = await resultTask;
+
+        if (!result.Completed)
+        {
+            logAction(result.Output);
+        }
+    }
+
+    public static async Task<ProcessResult> ExecuteShellCommand(string 
command, string arguments)
+    {
+        var result = new ProcessResult();
+
+        using var process = new Process();
+
+        process.StartInfo.FileName = command;
+        process.StartInfo.Arguments = arguments;
+        process.StartInfo.UseShellExecute = false;
+        process.StartInfo.RedirectStandardOutput = true;
+        process.StartInfo.RedirectStandardError = true;
+        process.StartInfo.CreateNoWindow = true;
+
+        var outputBuilder = new StringBuilder();
+        var outputCloseEvent = new TaskCompletionSource<bool>();
+
+        process.OutputDataReceived += (s, e) =>
+        {
+            // The output stream has been closed i.e. the process has 
terminated
+            if (e.Data == null)
+            {
+                outputCloseEvent.SetResult(true);
+            }
+            else
+            {
+                outputBuilder.Append(e.Data);
+            }
+        };
+
+        var errorBuilder = new StringBuilder();
+        var errorCloseEvent = new TaskCompletionSource<bool>();
+
+        process.ErrorDataReceived += (s, e) =>
+        {
+            // The error stream has been closed i.e. the process has terminated
+            if (e.Data == null)
+            {
+                errorCloseEvent.SetResult(true);
+            }
+            else
+            {
+                errorBuilder.Append(e.Data);
+            }
+        };
+
+        bool isStarted;
+
+        try
+        {
+            isStarted = process.Start();
+        }
+        catch (Exception error)
+        {
+            // Usually it occurs when an executable file is not found or is 
not executable
+
+            result.Completed = true;
+            result.ExitCode = -1;
+            result.Output = error.Message;
+
+            isStarted = false;
+        }
+
+        if (isStarted)
+        {
+            // Reads the output stream first and then waits because deadlocks 
are possible
+            process.BeginOutputReadLine();
+            process.BeginErrorReadLine();
+
+            // Creates task to wait for process exit using timeout
+            var waitForExit = WaitForExitAsync(process);
+
+            // Create task to wait for process exit and closing all output 
streams
+            await Task.WhenAll(waitForExit, outputCloseEvent.Task, 
errorCloseEvent.Task);
+
+            result.Completed = waitForExit.Result;
+            result.ExitCode = process.ExitCode;
+            // Adds process output if it was completed with error
+            result.Output = process.ExitCode != 0 ? 
$"{outputBuilder}{errorBuilder}" : outputBuilder.ToString();
+        }
+
+        return result;
+    }
+
+
+    private static async Task<bool> WaitForExitAsync(Process process)
+    {
+        await process.WaitForExitAsync();
+        return process.ExitCode == 0;
+    }
+
+
+    public struct ProcessResult
+    {
+        public bool Completed;
+        public int? ExitCode;
+        public string Output;
+    }
+}
diff --git a/tests/DotPulsar.IntegrationTests/Services/PulsarServiceBase.cs 
b/tests/DotPulsar.IntegrationTests/Services/PulsarServiceBase.cs
index 7f642d8..c614a84 100644
--- a/tests/DotPulsar.IntegrationTests/Services/PulsarServiceBase.cs
+++ b/tests/DotPulsar.IntegrationTests/Services/PulsarServiceBase.cs
@@ -20,33 +20,45 @@ using System.Net.Http;
 using System.Text;
 using System.Threading;
 using System.Threading.Tasks;
+using Xunit.Abstractions;
+using Xunit.Sdk;
 
-public class PulsarServiceBase : IPulsarService
+public abstract class PulsarServiceBase : IPulsarService
 {
+    protected readonly IMessageSink MessageSink;
     private readonly CancellationTokenSource _cts;
     private readonly HttpClient _adminClient;
 
-    protected PulsarServiceBase()
+    protected PulsarServiceBase(IMessageSink messageSink)
     {
+        MessageSink = messageSink;
         _cts = new CancellationTokenSource();
         _adminClient = new HttpClient();
     }
 
-    public virtual Task InitializeAsync()
-        => Task.CompletedTask;
+    public abstract Task InitializeAsync();
 
-    public virtual Task DisposeAsync()
+    public async Task DisposeAsync()
     {
         _adminClient.Dispose();
         _cts.Dispose();
-        return Task.CompletedTask;
+
+        try
+        {
+            await OnDispose();
+        }
+        catch (Exception e)
+        {
+            MessageSink.OnMessage(new DiagnosticMessage("Error disposing: 
{0}", e));
+        }
     }
 
-    public virtual Uri GetBrokerUri()
-        => throw new NotImplementedException();
+    protected virtual Task OnDispose()
+        => Task.CompletedTask;
+
+    public abstract Uri GetBrokerUri();
 
-    public virtual Uri GetWebServiceUri()
-        => throw new NotImplementedException();
+    public abstract Uri GetWebServiceUri();
 
     public async Task<HttpResponseMessage?> CreatePartitionedTopic(string 
restTopic, int numPartitions)
     {
diff --git a/tests/DotPulsar.IntegrationTests/Services/ServiceFactory.cs 
b/tests/DotPulsar.IntegrationTests/Services/ServiceFactory.cs
index 3b9e0a1..90e10ec 100644
--- a/tests/DotPulsar.IntegrationTests/Services/ServiceFactory.cs
+++ b/tests/DotPulsar.IntegrationTests/Services/ServiceFactory.cs
@@ -15,21 +15,22 @@
 namespace DotPulsar.IntegrationTests.Services;
 
 using Abstraction;
+using Xunit.Abstractions;
 
 public static class ServiceFactory
 {
     private const string _pulsarDeploymentType = "PULSAR_DEPLOYMENT_TYPE";
     private const string _containerDeployment = "container";
 
-    public static IPulsarService CreatePulsarService()
+    public static IPulsarService CreatePulsarService(IMessageSink messageSink)
     {
         var deploymentType = 
System.Environment.GetEnvironmentVariable(_pulsarDeploymentType);
 
         if (deploymentType == _containerDeployment)
         {
-            return new StandaloneContainerService();
+            return new StandaloneContainerService(messageSink);
         }
 
-        return new StandaloneExternalService();
+        return new StandaloneExternalService(messageSink);
     }
 }
diff --git 
a/tests/DotPulsar.IntegrationTests/Services/StandaloneContainerService.cs 
b/tests/DotPulsar.IntegrationTests/Services/StandaloneContainerService.cs
index e54b0fb..4d7d42e 100644
--- a/tests/DotPulsar.IntegrationTests/Services/StandaloneContainerService.cs
+++ b/tests/DotPulsar.IntegrationTests/Services/StandaloneContainerService.cs
@@ -15,18 +15,21 @@
 namespace DotPulsar.IntegrationTests.Services;
 
 using System;
-using System.Diagnostics;
 using System.Net.Http;
 using System.Threading.Tasks;
+using Xunit.Abstractions;
+using Xunit.Sdk;
 
 public sealed class StandaloneContainerService : PulsarServiceBase
 {
+    public StandaloneContainerService(IMessageSink messageSink) : 
base(messageSink) { }
+
     public override async Task InitializeAsync()
     {
-        await base.InitializeAsync().ConfigureAwait(false);
-        TakeDownPulsar(); // clean-up if anything was left running from 
previous run
+        await TakeDownPulsar(); // clean-up if anything was left running from 
previous run
 
-        RunProcess("docker-compose", "-f docker-compose-standalone-tests.yml 
up -d");
+        await ProcessAsyncHelper.ExecuteShellCommand("docker-compose", "-f 
docker-compose-standalone-tests.yml up -d")
+            .ThrowOnFailure();
 
         var waitTries = 10;
 
@@ -51,33 +54,11 @@ public sealed class StandaloneContainerService : 
PulsarServiceBase
         throw new Exception("Unable to confirm Pulsar has initialized");
     }
 
-    public override async Task DisposeAsync()
-    {
-        await base.DisposeAsync().ConfigureAwait(false);
-        TakeDownPulsar();
-    }
-
-    private static void TakeDownPulsar()
-        => RunProcess("docker-compose", "-f 
docker-compose-standalone-tests.yml down");
-
-    private static void RunProcess(string name, string arguments)
-    {
-        var processStartInfo = new ProcessStartInfo { FileName = name, 
Arguments = arguments };
+    protected override Task OnDispose() => TakeDownPulsar();
 
-        processStartInfo.Environment["TAG"] = "test";
-        processStartInfo.Environment["CONFIGURATION"] = "Debug";
-        processStartInfo.Environment["COMPUTERNAME"] = Environment.MachineName;
-
-        var process = Process.Start(processStartInfo);
-
-        if (process is null)
-            throw new Exception("Process.Start returned null");
-
-        process.WaitForExit();
-
-        if (process.ExitCode != 0)
-            throw new Exception($"Exit code {process.ExitCode} when running 
process {name} with arguments {arguments}");
-    }
+    private Task TakeDownPulsar()
+        => ProcessAsyncHelper.ExecuteShellCommand("docker-compose", "-f 
docker-compose-standalone-tests.yml down")
+            .LogFailure(s => MessageSink.OnMessage(new 
DiagnosticMessage("Error bringing down container: {0}", s)));
 
     public override Uri GetBrokerUri()
         => new("pulsar://localhost:54545");
diff --git 
a/tests/DotPulsar.IntegrationTests/Services/StandaloneExternalService.cs 
b/tests/DotPulsar.IntegrationTests/Services/StandaloneExternalService.cs
index ed7222b..c1ce2f0 100644
--- a/tests/DotPulsar.IntegrationTests/Services/StandaloneExternalService.cs
+++ b/tests/DotPulsar.IntegrationTests/Services/StandaloneExternalService.cs
@@ -15,9 +15,14 @@
 namespace DotPulsar.IntegrationTests.Services;
 
 using System;
+using System.Threading.Tasks;
+using Xunit.Abstractions;
 
 public sealed class StandaloneExternalService : PulsarServiceBase
 {
+    public StandaloneExternalService(IMessageSink messageSink) : 
base(messageSink) { }
+    public override Task InitializeAsync() => Task.CompletedTask;
+
     public override Uri GetBrokerUri()
         => new("pulsar://localhost:6650");
 
diff --git a/tests/DotPulsar.IntegrationTests/TokenRefreshTests.cs 
b/tests/DotPulsar.IntegrationTests/TokenRefreshTests.cs
new file mode 100644
index 0000000..5a0891a
--- /dev/null
+++ b/tests/DotPulsar.IntegrationTests/TokenRefreshTests.cs
@@ -0,0 +1,156 @@
+namespace DotPulsar.IntegrationTests;
+
+using Abstraction;
+using Abstractions;
+using Extensions;
+using Fixtures;
+using Internal;
+using System;
+using System.Buffers;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Xunit;
+using Xunit.Abstractions;
+
+[Collection(nameof(StandaloneTokenClusterTest))]
+public class TokenRefreshTests
+{
+    public enum TokenTestRefreshType
+    {
+        Standard,
+        FailAtStartup,
+        FailOnRefresh,
+        TimeoutOnRefresh
+    }
+
+    private const string MyTopic = "persistent://public/default/mytopic";
+    private readonly ITestOutputHelper _testOutputHelper;
+    private readonly IPulsarService _pulsarService;
+
+    public TokenRefreshTests(ITestOutputHelper outputHelper, 
TokenClusterFixture fixture)
+    {
+        _testOutputHelper = outputHelper;
+        Debug.Assert(fixture.PulsarService != null, "fixture.PulsarService != 
null");
+        _pulsarService = fixture.PulsarService;
+    }
+
+    [InlineData(TokenTestRefreshType.Standard, 0)] // Standard happy path with 
no token refresh failures
+    [InlineData(TokenTestRefreshType.FailAtStartup, 1)] // 1 Failure at 
startup, not on refresh
+    [InlineData(TokenTestRefreshType.FailOnRefresh, 2)] // Fails on refresh 
which will force a reconnection and fail once more on new connection
+    [InlineData(TokenTestRefreshType.TimeoutOnRefresh, 0)] // Connection will 
be disconnected by server due to slow response to auth challenge
+    [Theory]
+    public async Task TestExpiryRefresh(TokenTestRefreshType refreshType, int 
timesToFail)
+    {
+        var initialRefreshCount = DotPulsarEventSource.Log.TokenRefreshCount;
+
+        var publishingStarted = false;
+        var delayedNames = new HashSet<string>();
+        Task<string> GetToken(string name, ref int count)
+        {
+            if (refreshType is TokenTestRefreshType.Standard)
+            {
+                return GetAuthToken(name);
+            }
+
+            if (refreshType is TokenTestRefreshType.FailAtStartup && 
!publishingStarted && ++count <= timesToFail)
+            {
+                return Task.FromException<string>(new Exception("Initial Token 
Failed"));
+            }
+
+            if (refreshType is TokenTestRefreshType.FailOnRefresh && 
publishingStarted && ++count <= timesToFail)
+            {
+                return Task.FromException<string>(count == 1 ? new 
Exception("Refresh Failed") : new Exception("Initial Token Failed"));
+            }
+
+            if (refreshType is TokenTestRefreshType.TimeoutOnRefresh && 
publishingStarted && !delayedNames.Contains(name))
+            {
+                delayedNames.Add(name);
+                return Task.Delay(6000).ContinueWith(_ => 
GetAuthToken(name)).Unwrap();
+            }
+
+            return GetAuthToken(name);
+        }
+
+        var producerTokenCount = 0;
+        await using var producerClient = GetPulsarClient("Producer", ()
+            => GetToken("Producer", ref producerTokenCount));
+
+        var consumerTokenCount = 0;
+        await using var consumerClient = GetPulsarClient("Consumer", ()
+            => GetToken("Consumer", ref consumerTokenCount));
+
+        var producer = CreateProducer(producerClient);
+
+        var consumer = consumerClient.NewConsumer(Schema.String)
+            .Topic(MyTopic)
+            .SubscriptionName("test-sub")
+            .InitialPosition(SubscriptionInitialPosition.Earliest)
+            .Create();
+
+        var received = new List<string>();
+        const int messageCount = 20;
+
+        var publisherTask = Task.Run(async () =>
+        {
+            for (var i = 0; i < messageCount; i++)
+            {
+                _testOutputHelper.WriteLine("Trying to publish message for 
index {0}", i);
+                var messageId = await 
producer.Send(Encoding.UTF8.GetBytes(i.ToString()));
+                publishingStarted = true;
+                _testOutputHelper.WriteLine("Published message {0} for index 
{1}", messageId, i);
+
+                await Task.Delay(1000);
+            }
+        });
+
+        var consumerTask = Task.Run(async () =>
+        {
+            await consumer.OnStateChangeTo(ConsumerState.Active);
+            for (int j = 0; j < messageCount; j++)
+            {
+                var message = await consumer.Receive();
+                received.Add(Encoding.UTF8.GetString(message.Data));
+            }
+        });
+
+        var all = Task.WhenAll(consumerTask, publisherTask);
+        var timeoutTask = Task.Delay(60_000);
+        var result = await Task.WhenAny(all, timeoutTask);
+        Assert.True(result != timeoutTask);
+
+        if (refreshType is TokenTestRefreshType.Standard)
+        {
+            Assert.True(DotPulsarEventSource.Log.TokenRefreshCount > 
initialRefreshCount);
+        }
+
+        var expected = Enumerable.Range(0, messageCount).Select(i => 
i.ToString()).ToList();
+        var missing = expected.Except(received).ToList();
+
+        if (missing.Count > 0)
+        {
+            Assert.True(false, $"Missing values: {string.Join(",", missing)}");
+        }
+    }
+
+    private static IProducer<ReadOnlySequence<byte>> 
CreateProducer(IPulsarClient producerClient)
+        => producerClient.NewProducer()
+            .Topic(MyTopic)
+            .Create();
+
+    private IPulsarClient GetPulsarClient(string name, Func<Task<string>> 
tokenFactory)
+        => PulsarClient.Builder()
+            .AuthenticateUsingToken(tokenFactory)
+            .RetryInterval(TimeSpan.FromSeconds(1))
+            .ExceptionHandler(ec => _testOutputHelper.WriteLine("Error 
(handled={0}) occurred in {1} client: {2}", ec.ExceptionHandled, name, 
ec.Exception))
+            .ServiceUrl(_pulsarService.GetBrokerUri()).Build();
+
+    private async Task<string> GetAuthToken(string name)
+    {
+        var result = await TokenClusterFixture.GetAuthToken(true);
+        _testOutputHelper.WriteLine("{0} received token {1}", name, result);
+        return result;
+    }
+}
diff --git a/tests/DotPulsar.IntegrationTests/appdata/my-secret.key 
b/tests/DotPulsar.IntegrationTests/appdata/my-secret.key
new file mode 100644
index 0000000..457a354
--- /dev/null
+++ b/tests/DotPulsar.IntegrationTests/appdata/my-secret.key
@@ -0,0 +1,2 @@
+��O����
+���m   ���m4�x#�#�vn!���
\ No newline at end of file
diff --git 
a/tests/DotPulsar.IntegrationTests/docker-compose-standalone-tests.yml 
b/tests/DotPulsar.IntegrationTests/docker-compose-standalone-tests.yml
index a3642ec..0e517e5 100644
--- a/tests/DotPulsar.IntegrationTests/docker-compose-standalone-tests.yml
+++ b/tests/DotPulsar.IntegrationTests/docker-compose-standalone-tests.yml
@@ -12,3 +12,10 @@ services:
       PULSAR_MEM: " -Xms1g -Xmx1g -XX:MaxDirectMemorySize=2g"
     command: |
       /bin/bash -c "bin/apply-config-from-env.py conf/standalone.conf && 
bin/pulsar standalone --no-functions-worker"
+    networks:
+      - pulsar-standalone
+        
+networks:
+  pulsar-standalone:
+    name: pulsar-standalone
+    driver: bridge
diff --git 
a/tests/DotPulsar.IntegrationTests/docker-compose-standalone-token-tests.yml 
b/tests/DotPulsar.IntegrationTests/docker-compose-standalone-token-tests.yml
new file mode 100644
index 0000000..af76c5b
--- /dev/null
+++ b/tests/DotPulsar.IntegrationTests/docker-compose-standalone-token-tests.yml
@@ -0,0 +1,34 @@
+version: '3.5'
+  
+services:
+        
+  pulsar-tokens:
+    container_name: pulsar-tokens
+    image: 'apachepulsar/pulsar:2.7.0'
+    ports:
+      - '54548:8081'
+      - '54547:6651'
+    volumes:
+      - ./appdata/:/appdata
+    environment:
+      - PULSAR_PREFIX_tokenSecretKey=file:///appdata/my-secret.key
+      - authenticationEnabled=true
+      - authorizationEnabled=true
+      - 
authenticationProviders=org.apache.pulsar.broker.authentication.AuthenticationProviderToken
+      - authenticateOriginalAuthData=false
+      - 
brokerClientAuthenticationPlugin=org.apache.pulsar.client.impl.auth.AuthenticationToken
+      - 
brokerClientAuthenticationParameters=token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0LXVzZXIifQ.CoBrja1EHr0e2kZKGFS8M-xS2SOC2E08yZmjktvcYOs
+      - superUserRoles=test-user
+      - PULSAR_PREFIX_authenticationRefreshCheckSeconds=5
+      - webServicePort=8081
+      - brokerServicePort=6651
+#      - PULSAR_LOG_LEVEL=debug
+    command: |
+      /bin/bash -c "bin/apply-config-from-env.py conf/standalone.conf && 
bin/pulsar standalone --no-functions-worker"
+    networks:
+      - pulsar-tokens
+        
+networks:
+  pulsar-tokens:
+    name: pulsar-tokens
+    driver: bridge
diff --git a/tests/DotPulsar.StressTests/DotPulsar.StressTests.csproj 
b/tests/DotPulsar.StressTests/DotPulsar.StressTests.csproj
index abf1d14..5524900 100644
--- a/tests/DotPulsar.StressTests/DotPulsar.StressTests.csproj
+++ b/tests/DotPulsar.StressTests/DotPulsar.StressTests.csproj
@@ -26,6 +26,7 @@
 
   <ItemGroup>
     <ProjectReference Include="..\..\src\DotPulsar\DotPulsar.csproj" />
+    <ProjectReference 
Include="..\DotPulsar.IntegrationTests\DotPulsar.IntegrationTests.csproj" />
   </ItemGroup>
 
 </Project>
diff --git a/tests/DotPulsar.StressTests/Fixtures/StandaloneClusterFixture.cs 
b/tests/DotPulsar.StressTests/Fixtures/StandaloneClusterFixture.cs
index ef7321c..500cd84 100644
--- a/tests/DotPulsar.StressTests/Fixtures/StandaloneClusterFixture.cs
+++ b/tests/DotPulsar.StressTests/Fixtures/StandaloneClusterFixture.cs
@@ -14,19 +14,29 @@
 
 namespace DotPulsar.StressTests.Fixtures;
 
+using IntegrationTests;
 using System;
-using System.Diagnostics;
 using System.Net.Http;
 using System.Threading.Tasks;
 using Xunit;
+using Xunit.Abstractions;
+using Xunit.Sdk;
 
 public class StandaloneClusterFixture : IAsyncLifetime
 {
+    private readonly IMessageSink _messageSink;
+
+    public StandaloneClusterFixture(IMessageSink messageSink)
+    {
+        _messageSink = messageSink;
+    }
+
     public async Task InitializeAsync()
     {
-        TakeDownPulsar(); // clean-up if anything was left running from 
previous run
+        await TakeDownPulsar(); // clean-up if anything was left running from 
previous run
 
-        RunProcess("docker-compose", "-f docker-compose-standalone-tests.yml 
up -d");
+        await ProcessAsyncHelper.ExecuteShellCommand("docker-compose", "-f 
docker-compose-standalone-tests.yml up -d")
+            .ThrowOnFailure();
 
         var waitTries = 10;
 
@@ -54,34 +64,19 @@ public class StandaloneClusterFixture : IAsyncLifetime
         throw new Exception("Unable to confirm Pulsar has initialized");
     }
 
-    public Task DisposeAsync()
+    public async Task DisposeAsync()
     {
-        TakeDownPulsar();
-        return Task.CompletedTask;
-    }
-
-    private static void TakeDownPulsar()
-        => RunProcess("docker-compose", "-f 
docker-compose-standalone-tests.yml down");
-
-    private static void RunProcess(string name, string arguments)
-    {
-        var processStartInfo = new ProcessStartInfo
+        try
         {
-            FileName = name,
-            Arguments = arguments
-        };
-
-        processStartInfo.Environment["TAG"] = "test";
-        processStartInfo.Environment["CONFIGURATION"] = "Debug";
-        processStartInfo.Environment["COMPUTERNAME"] = Environment.MachineName;
-
-        var process = Process.Start(processStartInfo);
-        if (process is null)
-            throw new Exception("Process.Start returned null");
-
-        process.WaitForExit();
-
-        if (process.ExitCode != 0)
-            throw new Exception($"Exit code {process.ExitCode} when running 
process {name} with arguments {arguments}");
+            await TakeDownPulsar();
+        }
+        catch (Exception e)
+        {
+            _messageSink.OnMessage(new DiagnosticMessage("Error taking down 
pulsar: {0}", e));
+        }
     }
+
+    private Task TakeDownPulsar()
+        => ProcessAsyncHelper.ExecuteShellCommand("docker-compose", "-f 
docker-compose-standalone-tests.yml down")
+            .LogFailure(s => _messageSink.OnMessage(new 
DiagnosticMessage("Error bringing down container: {0}", s)));
 }
diff --git a/tests/docker-compose-standalone-tests.yml 
b/tests/docker-compose-standalone-tests.yml
index b9b3cd9..bca3999 100644
--- a/tests/docker-compose-standalone-tests.yml
+++ b/tests/docker-compose-standalone-tests.yml
@@ -12,6 +12,8 @@ services:
       PULSAR_MEM: " -Xms1g -Xmx1g -XX:MaxDirectMemorySize=2g"
     command: |
       /bin/bash -c "bin/apply-config-from-env.py conf/standalone.conf && 
bin/pulsar standalone --no-functions-worker"
+    networks:
+      - pulsar-stresstests
 
   loadbalancer:
     container_name: loadbalancer
@@ -25,3 +27,10 @@ services:
       - ./haproxy.cfg:/usr/local/etc/haproxy/haproxy.cfg:ro
     depends_on:
       - pulsar
+    networks:
+      - pulsar-stresstests
+
+networks:
+  pulsar-stresstests:
+    name: pulsar-stresstests
+    driver: bridge
\ No newline at end of file

Reply via email to