This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c7497c3  Optimizing tests by reusing the token and also ensuring all 
integration tests are run with a cancellation token (1 minute timeout). Found 
and fixed some more race conditions and reran the tests 30 times without any 
failures.
c7497c3 is described below

commit c7497c33dfde0732ef11c4de83322419dc55ad47
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Fri Oct 27 11:52:02 2023 +0200

    Optimizing tests by reusing the token and also ensuring all integration 
tests are run with a cancellation token (1 minute timeout).
    Found and fixed some more race conditions and reran the tests 30 times 
without any failures.
---
 src/DotPulsar/Internal/AsyncQueue.cs               | 17 ++--
 src/DotPulsar/Internal/Connection.cs               |  5 +-
 src/DotPulsar/Internal/ConnectionPool.cs           | 17 ++--
 src/DotPulsar/Internal/PingPongHandler.cs          | 45 ++++++-----
 src/DotPulsar/Internal/PulsarStream.cs             | 29 ++++---
 src/DotPulsar/Internal/RequestResponseHandler.cs   |  6 +-
 src/DotPulsar/Internal/StateTaskCollection.cs      | 47 +++++------
 tests/DotPulsar.Tests/IntegrationFixture.cs        |  8 +-
 tests/DotPulsar.Tests/Internal/ConsumerTests.cs    | 92 ++++++++++------------
 tests/DotPulsar.Tests/Internal/ProducerTests.cs    | 74 ++++++++---------
 tests/DotPulsar.Tests/Internal/ReaderTests.cs      | 91 ++++++++++-----------
 tests/DotPulsar.Tests/PulsarClientTests.cs         | 47 ++++++-----
 .../DotPulsar.Tests/TestOutputHelperExtensions.cs  | 36 +++++++++
 13 files changed, 256 insertions(+), 258 deletions(-)

diff --git a/src/DotPulsar/Internal/AsyncQueue.cs 
b/src/DotPulsar/Internal/AsyncQueue.cs
index 28d1cf1..043b753 100644
--- a/src/DotPulsar/Internal/AsyncQueue.cs
+++ b/src/DotPulsar/Internal/AsyncQueue.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -73,17 +73,14 @@ public sealed class AsyncQueue<T> : IEnqueue<T>, 
IDequeue<T>, IDisposable
 
     public void Dispose()
     {
-        lock (_lock)
-        {
-            if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
-                return;
+        if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
+            return;
 
-            foreach (var pendingDequeue in _pendingDequeues)
-                pendingDequeue.Dispose();
+        foreach (var pendingDequeue in _pendingDequeues)
+            pendingDequeue.Dispose();
 
-            _pendingDequeues.Clear();
-            _queue.Clear();
-        }
+        _pendingDequeues.Clear();
+        _queue.Clear();
     }
 
     private void Cancel(LinkedListNode<CancelableCompletionSource<T>> node)
diff --git a/src/DotPulsar/Internal/Connection.cs 
b/src/DotPulsar/Internal/Connection.cs
index 5ffdb4e..9280f8a 100644
--- a/src/DotPulsar/Internal/Connection.cs
+++ b/src/DotPulsar/Internal/Connection.cs
@@ -53,7 +53,6 @@ public sealed class Connection : IConnection
     public static Connection Connect(
         IPulsarStream stream,
         IAuthentication? authentication,
-        CommandConnect commandConnect,
         TimeSpan keepAliveInterval,
         TimeSpan closeOnInactiveInterval)
     {
@@ -331,9 +330,11 @@ public sealed class Connection : IConnection
 
     private async Task ProcessIncomingFrames(CancellationToken 
cancellationToken)
     {
+        await Task.Yield();
+
         try
         {
-            await foreach (var frame in _stream.Frames(cancellationToken))
+            await foreach (var frame in 
_stream.Frames(cancellationToken).ConfigureAwait(false))
             {
                 var commandSize = frame.ReadUInt32(0, true);
                 var command = 
Serializer.Deserialize<BaseCommand>(frame.Slice(4, commandSize));
diff --git a/src/DotPulsar/Internal/ConnectionPool.cs 
b/src/DotPulsar/Internal/ConnectionPool.cs
index 48dac09..1cd7e7c 100644
--- a/src/DotPulsar/Internal/ConnectionPool.cs
+++ b/src/DotPulsar/Internal/ConnectionPool.cs
@@ -27,7 +27,6 @@ using System.Threading.Tasks;
 
 public sealed class ConnectionPool : IConnectionPool
 {
-    private readonly AsyncLock _lock;
     private readonly CommandConnect _commandConnect;
     private readonly Uri _serviceUrl;
     private readonly Connector _connector;
@@ -49,7 +48,6 @@ public sealed class ConnectionPool : IConnectionPool
         TimeSpan keepAliveInterval,
         IAuthentication? authentication)
     {
-        _lock = new AsyncLock();
         _commandConnect = commandConnect;
         _serviceUrl = serviceUrl;
         _connector = connector;
@@ -66,8 +64,6 @@ public sealed class ConnectionPool : IConnectionPool
     {
         _cancellationTokenSource.Cancel();
 
-        await _lock.DisposeAsync().ConfigureAwait(false);
-
         foreach (var serviceUrl in _connections.Keys.ToArray())
         {
             await DisposeConnection(serviceUrl).ConfigureAwait(false);
@@ -145,13 +141,10 @@ public sealed class ConnectionPool : IConnectionPool
 
     private async ValueTask<Connection> GetConnection(PulsarUrl url, 
CancellationToken cancellationToken)
     {
-        using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
-        {
-            if (_connections.TryGetValue(url, out var connection) && 
connection is not null)
-                return connection;
+        if (_connections.TryGetValue(url, out var connection) && connection is 
not null)
+            return connection;
 
-            return await EstablishNewConnection(url, 
cancellationToken).ConfigureAwait(false);
-        }
+        return await EstablishNewConnection(url, 
cancellationToken).ConfigureAwait(false);
     }
 
     private async Task<Connection> EstablishNewConnection(PulsarUrl url, 
CancellationToken cancellationToken)
@@ -162,11 +155,11 @@ public sealed class ConnectionPool : IConnectionPool
         if (url.ProxyThroughServiceUrl)
             commandConnect = WithProxyToBroker(commandConnect, url.Logical);
 
-        var connection = Connection.Connect(new PulsarStream(stream), 
_authentication, commandConnect, _keepAliveInterval, 
_closeInactiveConnectionsInterval);
-        _connections[url] = connection;
+        var connection = Connection.Connect(new PulsarStream(stream), 
_authentication, _keepAliveInterval, _closeInactiveConnectionsInterval);
         _ = 
connection.OnStateChangeFrom(ConnectionState.Connected).AsTask().ContinueWith(t 
=> DisposeConnection(url));
         var response = await connection.Send(commandConnect, 
cancellationToken).ConfigureAwait(false);
         response.Expect(BaseCommand.Type.Connected);
+        _connections[url] = connection;
         connection.MaxMessageSize = response.Connected.MaxMessageSize;
         return connection;
     }
diff --git a/src/DotPulsar/Internal/PingPongHandler.cs 
b/src/DotPulsar/Internal/PingPongHandler.cs
index c89a3fb..8feb502 100644
--- a/src/DotPulsar/Internal/PingPongHandler.cs
+++ b/src/DotPulsar/Internal/PingPongHandler.cs
@@ -23,19 +23,19 @@ using System.Threading.Tasks;
 
 public sealed class PingPongHandler : IState<PingPongHandlerState>, 
IAsyncDisposable
 {
+    private readonly CancellationTokenSource _cts;
     private readonly StateManager<PingPongHandlerState> _stateManager;
     private readonly TimeSpan _keepAliveInterval;
-    private readonly Timer _timer;
     private long _lastCommand;
     private bool _waitForPong;
 
     public PingPongHandler(TimeSpan keepAliveInterval)
     {
+        _cts = new CancellationTokenSource();
         _stateManager = new 
StateManager<PingPongHandlerState>(PingPongHandlerState.Active, 
PingPongHandlerState.TimedOut, PingPongHandlerState.Closed);
         _keepAliveInterval = keepAliveInterval;
-        _timer = new Timer(Watch);
-        _timer.Change(_keepAliveInterval, TimeSpan.Zero);
         _lastCommand = Stopwatch.GetTimestamp();
+        _ = Task.Factory.StartNew(() => Watch());
     }
 
     public void Incoming(BaseCommand.Type _)
@@ -44,13 +44,24 @@ public sealed class PingPongHandler : 
IState<PingPongHandlerState>, IAsyncDispos
         _waitForPong = false;
     }
 
-    private void Watch(object? state)
+    private TimeSpan GetElapsedTimeSinceLastCommand()
     {
-        try
+        var lastCommand = Interlocked.Read(ref _lastCommand);
+        var now = Stopwatch.GetTimestamp();
+        var elapsedTicks = now - lastCommand;
+        if (elapsedTicks > 0)
+            return TimeSpan.FromSeconds(elapsedTicks / Stopwatch.Frequency);
+        return TimeSpan.Zero;
+    }
+
+    private async void Watch()
+    {
+        var waitFor = _keepAliveInterval;
+
+        while (!_cts.IsCancellationRequested)
         {
-            var lastCommand = Interlocked.Read(ref _lastCommand);
-            var now = Stopwatch.GetTimestamp();
-            var elapsed = TimeSpan.FromSeconds((now - lastCommand) / 
Stopwatch.Frequency);
+            await Task.Delay(waitFor).ConfigureAwait(false);
+            var elapsed = GetElapsedTimeSinceLastCommand();
             if (elapsed > _keepAliveInterval)
             {
                 if (_waitForPong)
@@ -61,34 +72,22 @@ public sealed class PingPongHandler : 
IState<PingPongHandlerState>, IAsyncDispos
 
                 _waitForPong = true;
                 _stateManager.SetState(PingPongHandlerState.ThresholdExceeded);
-                _timer.Change(_keepAliveInterval, TimeSpan.Zero);
+                waitFor = _keepAliveInterval;
             }
             else
             {
                 _stateManager.SetState(PingPongHandlerState.Active);
-                _timer.Change(_keepAliveInterval.Subtract(elapsed), 
TimeSpan.Zero);
+                waitFor = _keepAliveInterval.Subtract(elapsed);
             }
         }
-        catch
-        {
-            // Ignore
-        }
     }
 
-#if NETSTANDARD2_0
     public ValueTask DisposeAsync()
     {
-        _timer.Dispose();
+        _cts.Cancel();
         _stateManager.SetState(PingPongHandlerState.Closed);
         return new ValueTask();
     }
-#else
-    public async ValueTask DisposeAsync()
-    {
-        await _timer.DisposeAsync().ConfigureAwait(false);
-        _stateManager.SetState(PingPongHandlerState.Closed);
-    }
-#endif
 
     public bool IsFinalState() => _stateManager.IsFinalState();
 
diff --git a/src/DotPulsar/Internal/PulsarStream.cs 
b/src/DotPulsar/Internal/PulsarStream.cs
index 13488bc..a4f719b 100644
--- a/src/DotPulsar/Internal/PulsarStream.cs
+++ b/src/DotPulsar/Internal/PulsarStream.cs
@@ -36,8 +36,7 @@ public sealed class PulsarStream : IPulsarStream
 
     private readonly Stream _stream;
     private readonly ChunkingPipeline _pipeline;
-    private readonly PipeReader _reader;
-    private readonly PipeWriter _writer;
+    private readonly Pipe _pipe;
     private int _isDisposed;
 
     public PulsarStream(Stream stream)
@@ -45,9 +44,7 @@ public sealed class PulsarStream : IPulsarStream
         _stream = stream;
         _pipeline = new ChunkingPipeline(stream, ChunkSize);
         var options = new PipeOptions(pauseWriterThreshold: 
PauseAtMoreThan10Mb, resumeWriterThreshold: ResumeAt5MbOrLess);
-        var pipe = new Pipe(options);
-        _reader = pipe.Reader;
-        _writer = pipe.Writer;
+        _pipe = new Pipe(options);
     }
 
     public async Task Send(ReadOnlySequence<byte> sequence)
@@ -74,7 +71,7 @@ public sealed class PulsarStream : IPulsarStream
 
     private async Task FillPipe(CancellationToken cancellationToken)
     {
-        await Task.Yield();
+        var writer = _pipe.Writer;
 
         try
         {
@@ -83,7 +80,7 @@ public sealed class PulsarStream : IPulsarStream
 #endif
             while (true)
             {
-                var memory = _writer.GetMemory(84999);
+                var memory = writer.GetMemory(84999);
 #if NETSTANDARD2_0
                 var bytesRead = await _stream.ReadAsync(buffer, 0, 
buffer.Length, cancellationToken).ConfigureAwait(false);
                 new Memory<byte>(buffer, 0, bytesRead).CopyTo(memory);
@@ -93,9 +90,9 @@ public sealed class PulsarStream : IPulsarStream
                 if (bytesRead == 0)
                     break;
 
-                _writer.Advance(bytesRead);
+                writer.Advance(bytesRead);
 
-                var result = await 
_writer.FlushAsync(cancellationToken).ConfigureAwait(false);
+                var result = await 
writer.FlushAsync(cancellationToken).ConfigureAwait(false);
 
                 if (result.IsCompleted)
                     break;
@@ -107,7 +104,7 @@ public sealed class PulsarStream : IPulsarStream
         }
         finally
         {
-            await _writer.CompleteAsync().ConfigureAwait(false);
+            await writer.CompleteAsync().ConfigureAwait(false);
         }
     }
 
@@ -115,7 +112,9 @@ public sealed class PulsarStream : IPulsarStream
     {
         ThrowIfDisposed();
 
-        _ = FillPipe(cancellationToken);
+        _ = Task.Factory.StartNew(async () => await 
FillPipe(cancellationToken).ConfigureAwait(false));
+
+        var reader = _pipe.Reader;
 
         try
         {
@@ -125,7 +124,7 @@ public sealed class PulsarStream : IPulsarStream
             while (true)
             {
                 var minimumSize = FrameSizePrefix + frameSize;
-                var readResult = await _reader.ReadAtLeastAsync(minimumSize, 
cancellationToken).ConfigureAwait(false);
+                var readResult = await reader.ReadAtLeastAsync(minimumSize, 
cancellationToken).ConfigureAwait(false);
                 var buffer = readResult.Buffer;
 
                 while (true)
@@ -148,15 +147,15 @@ public sealed class PulsarStream : IPulsarStream
                     frameSize = UnknownFrameSize;
                 }
 
-                if (readResult.IsCompleted)
+                if (readResult.IsCompleted || readResult.IsCanceled)
                     break;
 
-                _reader.AdvanceTo(buffer.Start);
+                reader.AdvanceTo(buffer.Start);
             }
         }
         finally
         {
-            await _reader.CompleteAsync().ConfigureAwait(false);
+            await reader.CompleteAsync().ConfigureAwait(false);
         }
     }
 
diff --git a/src/DotPulsar/Internal/RequestResponseHandler.cs 
b/src/DotPulsar/Internal/RequestResponseHandler.cs
index 3ce8795..5433897 100644
--- a/src/DotPulsar/Internal/RequestResponseHandler.cs
+++ b/src/DotPulsar/Internal/RequestResponseHandler.cs
@@ -22,17 +22,19 @@ using System.Threading.Tasks;
 
 public sealed class RequestResponseHandler : IDisposable
 {
+    private readonly ConnectRequest _connectRequest;
     private readonly RequestId _requestId;
     private readonly Awaiter<IRequest, BaseCommand> _requests;
     private readonly EnumLookup<BaseCommand.Type, Func<BaseCommand, IRequest>> 
_getResponseIdentifier;
 
     public RequestResponseHandler()
     {
+        _connectRequest = new ConnectRequest();
         _requestId = new RequestId();
         _requests = new Awaiter<IRequest, BaseCommand>();
 
         _getResponseIdentifier = new EnumLookup<BaseCommand.Type, 
Func<BaseCommand, IRequest>>(cmd => throw new Exception($"CommandType 
'{cmd.CommandType}' not supported as request/response type"));
-        _getResponseIdentifier.Set(BaseCommand.Type.Connected, cmd => new 
ConnectRequest());
+        _getResponseIdentifier.Set(BaseCommand.Type.Connected, cmd => 
_connectRequest);
         _getResponseIdentifier.Set(BaseCommand.Type.SendError, cmd => new 
SendRequest(cmd.SendError.ProducerId, cmd.SendError.SequenceId));
         _getResponseIdentifier.Set(BaseCommand.Type.SendReceipt, cmd => new 
SendRequest(cmd.SendReceipt.ProducerId, cmd.SendReceipt.SequenceId));
         _getResponseIdentifier.Set(BaseCommand.Type.ProducerSuccess, cmd => 
StandardRequest.WithRequestId(cmd.ProducerSuccess.RequestId));
@@ -43,7 +45,7 @@ public sealed class RequestResponseHandler : IDisposable
         _getResponseIdentifier.Set(BaseCommand.Type.GetLastMessageIdResponse, 
cmd => StandardRequest.WithRequestId(cmd.GetLastMessageIdResponse.RequestId));
         _getResponseIdentifier.Set(BaseCommand.Type.GetOrCreateSchemaResponse, 
cmd => StandardRequest.WithRequestId(cmd.GetOrCreateSchemaResponse.RequestId));
         _getResponseIdentifier.Set(BaseCommand.Type.Success, cmd => 
StandardRequest.WithRequestId(cmd.Success.RequestId));
-        _getResponseIdentifier.Set(BaseCommand.Type.Error, cmd => 
!_requestId.IsPastInitialId() ? new ConnectRequest() : 
StandardRequest.WithRequestId(cmd.Error.RequestId));
+        _getResponseIdentifier.Set(BaseCommand.Type.Error, cmd => 
_requestId.IsPastInitialId() ? 
StandardRequest.WithRequestId(cmd.Error.RequestId) : _connectRequest);
     }
 
     public void Dispose()
diff --git a/src/DotPulsar/Internal/StateTaskCollection.cs 
b/src/DotPulsar/Internal/StateTaskCollection.cs
index ad17ae4..8ada892 100644
--- a/src/DotPulsar/Internal/StateTaskCollection.cs
+++ b/src/DotPulsar/Internal/StateTaskCollection.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -38,60 +38,55 @@ public sealed class StateTaskCollection<TState> where 
TState : notnull
             node = _awaiters.AddFirst(new StateTask<TState>(state, changed));
         }
 
-        node.Value.CancelableCompletionSource.SetupCancellation(() => 
TaskWasCanceled(node), cancellationToken);
+        node.Value.CancelableCompletionSource.SetupCancellation(() => 
RemoveAndDispose(node), cancellationToken);
 
         return node.Value.CancelableCompletionSource.Task;
     }
 
-    public void CompleteTasksAwaiting(TState state)
+    public void CompleteTasksAwaiting(TState state) => SetState(state, true);
+
+    public void CompleteAllTasks(TState state) => SetState(state, false);
+
+    private LinkedListNode<StateTask<TState>>? GetFirst()
     {
         lock (_lock)
         {
-            var awaiter = _awaiters.First;
-
-            while (awaiter is not null)
-            {
-                var next = awaiter.Next;
-
-                if (awaiter.Value.IsAwaiting(state))
-                {
-                    _awaiters.Remove(awaiter);
-                    awaiter.Value.CancelableCompletionSource.SetResult(state);
-                    awaiter.Value.CancelableCompletionSource.Dispose();
-                }
-
-                awaiter = next;
-            }
+            return _awaiters.First;
         }
     }
 
-    public void CompleteAllTasks(TState state)
+    private void SetState(TState state, bool onlyAwaiting)
     {
-        lock (_lock)
+        var awaiter = GetFirst();
+
+        while (awaiter is not null)
         {
-            foreach (var awaiter in _awaiters)
+            var next = awaiter.Next;
+
+            if (!onlyAwaiting || awaiter.Value.IsAwaiting(state))
             {
-                awaiter.CancelableCompletionSource.SetResult(state);
-                awaiter.CancelableCompletionSource.Dispose();
+                awaiter.Value.CancelableCompletionSource.SetResult(state);
+                RemoveAndDispose(awaiter);
             }
 
-            _awaiters.Clear();
+            awaiter = next;
         }
     }
 
-    private void TaskWasCanceled(LinkedListNode<StateTask<TState>> node)
+    private void RemoveAndDispose(LinkedListNode<StateTask<TState>> node)
     {
         lock (_lock)
         {
             try
             {
                 _awaiters.Remove(node);
-                node.Value.Dispose();
             }
             catch
             {
                 // Ignore
             }
         }
+
+        node.Value.Dispose();
     }
 }
diff --git a/tests/DotPulsar.Tests/IntegrationFixture.cs 
b/tests/DotPulsar.Tests/IntegrationFixture.cs
index b77a399..1e3ad94 100644
--- a/tests/DotPulsar.Tests/IntegrationFixture.cs
+++ b/tests/DotPulsar.Tests/IntegrationFixture.cs
@@ -14,6 +14,7 @@
 
 namespace DotPulsar.Tests;
 
+using DotPulsar.Abstractions;
 using Ductus.FluentDocker.Builders;
 using Ductus.FluentDocker.Services;
 using Ductus.FluentDocker.Services.Extensions;
@@ -34,6 +35,8 @@ public class IntegrationFixture : IAsyncLifetime
     private readonly IMessageSink _messageSink;
     private readonly IContainerService _cluster;
 
+    private string? _token;
+
     public IntegrationFixture(IMessageSink messageSink)
     {
         _messageSink = messageSink;
@@ -62,7 +65,7 @@ public class IntegrationFixture : IAsyncLifetime
 
         _cluster = new Builder()
             .UseContainer()
-            .UseImage("apachepulsar/pulsar:3.1.0")
+            .UseImage("apachepulsar/pulsar:3.1.1")
             .WithEnvironment(environmentVariables)
             .ExposePort(Port)
             .Command("/bin/bash -c", arguments)
@@ -73,6 +76,8 @@ public class IntegrationFixture : IAsyncLifetime
 
     public Uri ServiceUrl { get; private set; }
 
+    public IAuthentication Authentication => AuthenticationFactory.Token(ct => 
ValueTask.FromResult(_token!));
+
     public Task DisposeAsync()
     {
         _cluster.Dispose();
@@ -90,6 +95,7 @@ public class IntegrationFixture : IAsyncLifetime
         var endpoint = _cluster.ToHostExposedEndpoint($"{Port}/tcp");
         _messageSink.OnMessage(new DiagnosticMessage($"Endpoint opened at 
{endpoint}"));
         ServiceUrl = new Uri($"pulsar://localhost:{endpoint.Port}");
+        _token = CreateToken(Timeout.InfiniteTimeSpan);
         return Task.CompletedTask;
     }
 
diff --git a/tests/DotPulsar.Tests/Internal/ConsumerTests.cs 
b/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
index 3cd6d8b..5984019 100644
--- a/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
+++ b/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
@@ -26,13 +26,15 @@ using Xunit;
 using Xunit.Abstractions;
 
 [Collection("Integration"), Trait("Category", "Integration")]
-public class ConsumerTests
+public class ConsumerTests : IDisposable
 {
+    private readonly CancellationTokenSource _cts;
     private readonly IntegrationFixture _fixture;
     private readonly ITestOutputHelper _testOutputHelper;
 
     public ConsumerTests(IntegrationFixture fixture, ITestOutputHelper 
testOutputHelper)
     {
+        _cts = new CancellationTokenSource(TimeSpan.FromMinutes(1));
         _fixture = fixture;
         _testOutputHelper = testOutputHelper;
     }
@@ -42,10 +44,10 @@ public class ConsumerTests
     {
         //Arrange
         await using var client = CreateClient();
-        await using var consumer = CreateConsumer(client);
+        await using var consumer = CreateConsumer(client, 
_fixture.CreateTopic());
 
         //Act
-        var actual = await consumer.GetLastMessageId();
+        var actual = await consumer.GetLastMessageId(_cts.Token);
 
         //Assert
         actual.Should().BeEquivalentTo(MessageId.Earliest);
@@ -55,7 +57,7 @@ public class ConsumerTests
     public async Task 
GetLastMessageId_GivenNonPartitionedTopic_ShouldGetMessageIdFromPartition()
     {
         //Arrange
-        var topicName = CreateTopicName();
+        var topicName = _fixture.CreateTopic();
         const int numberOfMessages = 6;
 
         await using var client = CreateClient();
@@ -65,7 +67,7 @@ public class ConsumerTests
         MessageId expected = null!;
         for (var i = 0; i < numberOfMessages; i++)
         {
-            var messageId = await producer.Send("test-message");
+            var messageId = await producer.Send("test-message", _cts.Token);
             if (i >= 5)
             {
                 expected = messageId;
@@ -73,7 +75,7 @@ public class ConsumerTests
         }
 
         //Act
-        var actual = await consumer.GetLastMessageId();
+        var actual = await consumer.GetLastMessageId(_cts.Token);
 
         //Assert
         actual.Should().BeEquivalentTo(expected);
@@ -83,15 +85,14 @@ public class ConsumerTests
     public async Task 
GetLastMessageId_GivenPartitionedTopic_ShouldThrowException()
     {
         //Arrange
-        var topicName = CreateTopicName();
         const int partitions = 3;
-        _fixture.CreatePartitionedTopic(topicName, partitions);
+        var topicName = _fixture.CreatePartitionedTopic(partitions);
 
         await using var client = CreateClient();
         await using var consumer = CreateConsumer(client, topicName);
 
         //Act
-        var exception = await Record.ExceptionAsync(() => 
consumer.GetLastMessageId().AsTask());
+        var exception = await Record.ExceptionAsync(() => 
consumer.GetLastMessageId(_cts.Token).AsTask());
 
         //Assert
         exception.Should().BeOfType<NotSupportedException>();
@@ -101,7 +102,7 @@ public class ConsumerTests
     public async Task 
GetLastMessageIds_GivenNonPartitionedTopic_ShouldGetMessageIdFromPartition()
     {
         //Arrange
-        var topicName = CreateTopicName();
+        var topicName = _fixture.CreateTopic();
         const int numberOfMessages = 6;
 
         await using var client = CreateClient();
@@ -111,7 +112,7 @@ public class ConsumerTests
         var expected = new List<MessageId>();
         for (var i = 0; i < numberOfMessages; i++)
         {
-            var messageId = await producer.Send("test-message");
+            var messageId = await producer.Send("test-message", _cts.Token);
             if (i >= 5)
             {
                 expected.Add(messageId);
@@ -119,7 +120,7 @@ public class ConsumerTests
         }
 
         //Act
-        var actual = await consumer.GetLastMessageIds();
+        var actual = await consumer.GetLastMessageIds(_cts.Token);
 
         //Assert
         actual.Should().BeEquivalentTo(expected);
@@ -129,10 +130,9 @@ public class ConsumerTests
     public async Task 
GetLastMessageIds_GivenPartitionedTopic_ShouldGetMessageIdFromAllPartitions()
     {
         //Arrange
-        var topicName = CreateTopicName();
         const int numberOfMessages = 6;
         const int partitions = 3;
-        _fixture.CreatePartitionedTopic(topicName, partitions);
+        var topicName = _fixture.CreatePartitionedTopic(partitions);
 
         await using var client = CreateClient();
         await using var consumer = CreateConsumer(client, topicName);
@@ -141,7 +141,7 @@ public class ConsumerTests
         var expected = new List<MessageId>();
         for (var i = 0; i < numberOfMessages; i++)
         {
-            var messageId = await producer.Send("test-message");
+            var messageId = await producer.Send("test-message", _cts.Token);
             if (i >= 3)
             {
                 expected.Add(messageId);
@@ -149,7 +149,7 @@ public class ConsumerTests
         }
 
         //Act
-        var actual = await consumer.GetLastMessageIds();
+        var actual = await consumer.GetLastMessageIds(_cts.Token);
 
         //Assert
         actual.Should().BeEquivalentTo(expected);
@@ -160,11 +160,11 @@ public class ConsumerTests
     {
         //Arrange
         await using var client = CreateClient();
-        await using var consumer = CreateConsumer(client);
+        await using var consumer = CreateConsumer(client, 
_fixture.CreateTopic());
         var expected = new List<MessageId>() { MessageId.Earliest };
 
         //Act
-        var actual = await consumer.GetLastMessageIds();
+        var actual = await consumer.GetLastMessageIds(_cts.Token);
 
         //Assert
         actual.Should().BeEquivalentTo(expected);
@@ -174,18 +174,16 @@ public class ConsumerTests
     public async Task Receive_GivenNonPartitionedTopic_ShouldReceiveAll()
     {
         //Arrange
-        var topicName = CreateTopicName();
-        const int numberOfMessages = 10000;
+        var topicName = _fixture.CreateTopic();
+        const int numberOfMessages = 1000;
 
         await using var client = CreateClient();
         await using var consumer = CreateConsumer(client, topicName);
         await using var producer = CreateProducer(client, topicName);
 
-        var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60));
-
         //Act
-        var produced = await ProduceMessages(producer, numberOfMessages, 
"test-message", cts.Token);
-        var consumed = await ConsumeMessages(consumer, numberOfMessages, 
cts.Token);
+        var produced = await ProduceMessages(producer, numberOfMessages, 
"test-message", _cts.Token);
+        var consumed = await ConsumeMessages(consumer, numberOfMessages, 
_cts.Token);
 
         //Assert
         consumed.Should().BeEquivalentTo(produced);
@@ -195,21 +193,18 @@ public class ConsumerTests
     public async Task Receive_GivenPartitionedTopic_ShouldReceiveAll()
     {
         //Arrange
-        var topicName = CreateTopicName();
         const int numberOfMessages = 1000;
         const int partitions = 3;
 
-        _fixture.CreatePartitionedTopic(topicName, partitions);
+        var topicName = _fixture.CreatePartitionedTopic(partitions);
 
         await using var client = CreateClient();
         await using var consumer = CreateConsumer(client, topicName);
         await using var producer = CreateProducer(client, topicName);
 
-        var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60));
-
         //Act
-        var produced = await ProduceMessages(producer, numberOfMessages, 
"test-message", cts.Token);
-        var consumed = await ConsumeMessages(consumer, numberOfMessages, 
cts.Token);
+        var produced = await ProduceMessages(producer, numberOfMessages, 
"test-message", _cts.Token);
+        var consumed = await ConsumeMessages(consumer, numberOfMessages, 
_cts.Token);
 
         //Assert
         consumed.Should().BeEquivalentTo(produced);
@@ -228,9 +223,9 @@ public class ConsumerTests
         })
         .ServiceUrl(new Uri("pulsar://nosuchhost")).Build();
 
-        await using var consumer = CreateConsumer(client);
+        await using var consumer = CreateConsumer(client, 
_fixture.CreateTopic());
 
-        var receiveTask = consumer.Receive().AsTask();
+        var receiveTask = consumer.Receive(_cts.Token).AsTask();
         semaphoreSlim.Release();
 
         //Act
@@ -251,12 +246,12 @@ public class ConsumerTests
         })
         .ServiceUrl(new Uri("pulsar://nosuchhost")).Build();
 
-        await using var consumer = CreateConsumer(client);
+        await using var consumer = CreateConsumer(client, 
_fixture.CreateTopic());
 
-        await consumer.OnStateChangeTo(ConsumerState.Faulted);
+        await consumer.OnStateChangeTo(ConsumerState.Faulted, _cts.Token);
 
         //Act
-        var exception = await Record.ExceptionAsync(() => 
consumer.Receive().AsTask());
+        var exception = await Record.ExceptionAsync(() => 
consumer.Receive(_cts.Token).AsTask());
 
         //Assert
         exception.Should().BeOfType<ConsumerFaultedException>();
@@ -293,36 +288,29 @@ public class ConsumerTests
         return messageIds;
     }
 
-    private void LogState(ConsumerStateChanged stateChange)
-        => _testOutputHelper.WriteLine($"The consumer for topic 
'{stateChange.Consumer.Topic}' changed state to '{stateChange.ConsumerState}'");
-
-    private void LogState(ProducerStateChanged stateChange)
-        => _testOutputHelper.WriteLine($"The producer for topic 
'{stateChange.Producer.Topic}' changed state to '{stateChange.ProducerState}'");
-
-    private static string CreateTopicName() => 
$"persistent://public/default/{Guid.NewGuid():N}";
-    private static string CreateConsumerName() => 
$"consumer-{Guid.NewGuid():N}";
     private static string CreateSubscriptionName() => 
$"subscription-{Guid.NewGuid():N}";
 
-    private IProducer<string> CreateProducer(IPulsarClient pulsarClient, 
string? topicName = null)
+    private IProducer<string> CreateProducer(IPulsarClient pulsarClient, 
string topicName)
         => pulsarClient.NewProducer(Schema.String)
-        .Topic(topicName is null ? CreateTopicName() : topicName)
-        .StateChangedHandler(LogState)
+        .Topic(topicName)
+        .StateChangedHandler(_testOutputHelper.Log)
         .Create();
 
-    private IConsumer<string> CreateConsumer(IPulsarClient pulsarClient, 
string? topicName = null)
+    private IConsumer<string> CreateConsumer(IPulsarClient pulsarClient, 
string topicName)
         => pulsarClient.NewConsumer(Schema.String)
-        .ConsumerName(CreateConsumerName())
         .InitialPosition(SubscriptionInitialPosition.Earliest)
         .SubscriptionName(CreateSubscriptionName())
-        .Topic(topicName is null ? CreateTopicName() : topicName)
-        .StateChangedHandler(LogState)
+        .Topic(topicName)
+        .StateChangedHandler(_testOutputHelper.Log)
         .Create();
 
     private IPulsarClient CreateClient()
         => PulsarClient
         .Builder()
-        .Authentication(AuthenticationFactory.Token(ct => 
ValueTask.FromResult(_fixture.CreateToken(Timeout.InfiniteTimeSpan))))
-        .ExceptionHandler(ec => _testOutputHelper.WriteLine($"Exception: 
{ec.Exception}"))
+        .Authentication(_fixture.Authentication)
+        .ExceptionHandler(_testOutputHelper.Log)
         .ServiceUrl(_fixture.ServiceUrl)
         .Build();
+
+    public void Dispose() => _cts.Dispose();
 }
diff --git a/tests/DotPulsar.Tests/Internal/ProducerTests.cs 
b/tests/DotPulsar.Tests/Internal/ProducerTests.cs
index 1aa7010..686eb5d 100644
--- a/tests/DotPulsar.Tests/Internal/ProducerTests.cs
+++ b/tests/DotPulsar.Tests/Internal/ProducerTests.cs
@@ -25,14 +25,15 @@ using Xunit;
 using Xunit.Abstractions;
 
 [Collection("Integration"), Trait("Category", "Integration")]
-public class ProducerTests
+public class ProducerTests : IDisposable
 {
-    private static readonly TimeSpan TestTimeout = TimeSpan.FromSeconds(30);
+    private readonly CancellationTokenSource _cts;
     private readonly IntegrationFixture _fixture;
     private readonly ITestOutputHelper _testOutputHelper;
 
     public ProducerTests(IntegrationFixture fixture, ITestOutputHelper 
outputHelper)
     {
+        _cts = new CancellationTokenSource(TimeSpan.FromMinutes(1));
         _fixture = fixture;
         _testOutputHelper = outputHelper;
     }
@@ -48,8 +49,8 @@ public class ProducerTests
         await using var consumer = CreateConsumer(client, topicName);
 
         //Act
-        var messageId = await producer.Send(content);
-        var message = await consumer.Receive();
+        var messageId = await producer.Send(content, _cts.Token);
+        var message = await consumer.Receive(_cts.Token);
 
         //Assert
         message.MessageId.Should().Be(messageId);
@@ -74,10 +75,10 @@ public class ProducerTests
         }
 
         //Act
-        await producer.SendChannel.Send(content, SetMessageId);
+        await producer.SendChannel.Send(content, SetMessageId, _cts.Token);
         producer.SendChannel.Complete();
-        await producer.SendChannel.Completion();
-        var message = await consumer.Receive();
+        await producer.SendChannel.Completion(_cts.Token);
+        var message = await consumer.Receive(_cts.Token);
         var expected = await idTask.Task;
 
         //Assert
@@ -110,21 +111,20 @@ public class ProducerTests
     {
         //Arrange
         var topicName = _fixture.CreateTopic();
-        using var cts = new CancellationTokenSource(TestTimeout);
         await using var client = CreateClient();
         await using var producer1 = CreateProducer(client, topicName, 
accessModeForProducer1);
-        await producer1.OnStateChangeTo(ProducerState.Connected, cts.Token);
+        await producer1.OnStateChangeTo(ProducerState.Connected, _cts.Token);
 
         //Act
         await using var producer2 = CreateProducer(client, topicName, 
accessModeForProducer2);
 
         if (accessModeForProducer2 == ProducerAccessMode.ExclusiveWithFencing) 
// We need to send a message to trigger the state change
         {
-            await producer2.OnStateChangeTo(ProducerState.Connected, 
cts.Token);
+            await producer2.OnStateChangeTo(ProducerState.Connected, 
_cts.Token);
 
             try
             {
-                _ = producer1.Send("test", default);
+                _ = producer1.Send("test", _cts.Token);
             }
             catch
             {
@@ -132,8 +132,8 @@ public class ProducerTests
             }
         }
 
-        var actualStateForProducer1 = await 
producer1.OnStateChangeTo(expectedStateForProducer1, cts.Token);
-        var actualStateForProducer2 = await 
producer2.OnStateChangeTo(expectedStateForProducer2, cts.Token);
+        var actualStateForProducer1 = await 
producer1.OnStateChangeTo(expectedStateForProducer1, _cts.Token);
+        var actualStateForProducer2 = await 
producer2.OnStateChangeTo(expectedStateForProducer2, _cts.Token);
 
         //Assert
         actualStateForProducer1.Should().Be(expectedStateForProducer1);
@@ -167,8 +167,8 @@ public class ProducerTests
             for (var msgIndex = 0; msgIndex < msgCount; ++msgIndex)
             {
                 var message = $"{content}-{i}-{msgIndex}";
-                _ = await producer.Send(message);
-                _testOutputHelper.WriteLine($"Sent a message: {message}");
+                _ = await producer.Send(message, _cts.Token);
+                _testOutputHelper.Log($"Sent a message: {message}");
             }
         }
 
@@ -179,7 +179,7 @@ public class ProducerTests
 
             for (var msgIndex = 0; msgIndex < msgCount; ++msgIndex)
             {
-                var message = await consumer.Receive();
+                var message = await consumer.Receive(_cts.Token);
                 message.Value().Should().Be($"{content}-{i}-{msgIndex}");
             }
         }
@@ -201,14 +201,14 @@ public class ProducerTests
         for (var i = 0; i < partitions; ++i)
         {
             consumers.Add(CreateConsumer(client, 
$"{topicName}-partition-{i}"));
-            await producer.Send($"{content}-{i}");
-            _testOutputHelper.WriteLine($"Sent a message to consumer [{i}]");
+            await producer.Send($"{content}-{i}", _cts.Token);
+            _testOutputHelper.Log($"Sent a message to consumer [{i}]");
         }
 
         //Assert
         for (var i = 0; i < partitions; ++i)
         {
-            (await 
consumers[i].Receive()).Value().Should().Be($"{content}-{i}");
+            (await 
consumers[i].Receive(_cts.Token)).Value().Should().Be($"{content}-{i}");
         }
     }
 
@@ -223,11 +223,9 @@ public class ProducerTests
         await using var consumer = CreateConsumer(client, topicName);
         await using var producer = CreateProducer(client, topicName);
 
-        var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60));
-
         //Act
-        var produced = await ProduceMessages(producer, numberOfMessages, 
cts.Token);
-        var consumed = await ConsumeMessages(consumer, numberOfMessages, 
cts.Token);
+        var produced = await ProduceMessages(producer, numberOfMessages, 
_cts.Token);
+        var consumed = await ConsumeMessages(consumer, numberOfMessages, 
_cts.Token);
 
         //Assert
         var foundNonNegativeOne = false;
@@ -256,11 +254,9 @@ public class ProducerTests
         await using var consumer = CreateConsumer(client, topicName);
         await using var producer = CreateProducer(client, topicName);
 
-        var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60));
-
         //Act
-        var produced = await ProduceMessages(producer, numberOfMessages, 
cts.Token);
-        var consumed = await ConsumeMessages(consumer, numberOfMessages, 
cts.Token);
+        var produced = await ProduceMessages(producer, numberOfMessages, 
_cts.Token);
+        var consumed = await ConsumeMessages(consumer, numberOfMessages, 
_cts.Token);
 
         //Assert
         var foundNonNegativeOne = false;
@@ -309,13 +305,6 @@ public class ProducerTests
         return messageIds;
     }
 
-    private void LogState(ConsumerStateChanged stateChange)
-        => _testOutputHelper.WriteLine($"The consumer for topic 
'{stateChange.Consumer.Topic}' changed state to '{stateChange.ConsumerState}'");
-
-    private void LogState(ProducerStateChanged stateChange)
-        => _testOutputHelper.WriteLine($"The producer for topic 
'{stateChange.Producer.Topic}' changed state to '{stateChange.ProducerState}'");
-
-    private static string CreateConsumerName() => 
$"consumer-{Guid.NewGuid():N}";
     private static string CreateSubscriptionName() => 
$"subscription-{Guid.NewGuid():N}";
 
     private IProducer<string> CreateProducer(
@@ -325,23 +314,24 @@ public class ProducerTests
         => pulsarClient.NewProducer(Schema.String)
         .Topic(topicName)
         .ProducerAccessMode(producerAccessMode)
-        .StateChangedHandler(LogState)
+        .StateChangedHandler(_testOutputHelper.Log)
         .Create();
 
     private IConsumer<string> CreateConsumer(IPulsarClient pulsarClient, 
string topicName)
         => pulsarClient.NewConsumer(Schema.String)
-        .ConsumerName(CreateConsumerName())
         .InitialPosition(SubscriptionInitialPosition.Earliest)
         .SubscriptionName(CreateSubscriptionName())
         .Topic(topicName)
-        .StateChangedHandler(LogState)
+        .StateChangedHandler(_testOutputHelper.Log)
         .Create();
 
     private IPulsarClient CreateClient()
         => PulsarClient
-            .Builder()
-            .Authentication(AuthenticationFactory.Token(ct => 
ValueTask.FromResult(_fixture.CreateToken(Timeout.InfiniteTimeSpan))))
-            .ExceptionHandler(ec => _testOutputHelper.WriteLine($"Exception: 
{ec.Exception}"))
-            .ServiceUrl(_fixture.ServiceUrl)
-            .Build();
+        .Builder()
+        .Authentication(_fixture.Authentication)
+        .ExceptionHandler(_testOutputHelper.Log)
+        .ServiceUrl(_fixture.ServiceUrl)
+        .Build();
+
+    public void Dispose() => _cts.Dispose();
 }
diff --git a/tests/DotPulsar.Tests/Internal/ReaderTests.cs 
b/tests/DotPulsar.Tests/Internal/ReaderTests.cs
index 968f26d..ed8a620 100644
--- a/tests/DotPulsar.Tests/Internal/ReaderTests.cs
+++ b/tests/DotPulsar.Tests/Internal/ReaderTests.cs
@@ -26,13 +26,15 @@ using Xunit;
 using Xunit.Abstractions;
 
 [Collection("Integration"), Trait("Category", "Integration")]
-public class ReaderTests
+public class ReaderTests : IDisposable
 {
+    private readonly CancellationTokenSource _cts;
     private readonly IntegrationFixture _fixture;
     private readonly ITestOutputHelper _testOutputHelper;
 
     public ReaderTests(IntegrationFixture fixture, ITestOutputHelper 
testOutputHelper)
     {
+        _cts = new CancellationTokenSource(TimeSpan.FromMinutes(1));
         _fixture = fixture;
         _testOutputHelper = testOutputHelper;
     }
@@ -42,10 +44,10 @@ public class ReaderTests
     {
         //Arrange
         await using var client = CreateClient();
-        await using var reader = CreateReader(client, MessageId.Earliest);
+        await using var reader = CreateReader(client, MessageId.Earliest, 
_fixture.CreateTopic());
 
         //Act
-        var actual = await reader.GetLastMessageId();
+        var actual = await reader.GetLastMessageId(_cts.Token);
 
         //Assert
         actual.Should().BeEquivalentTo(MessageId.Earliest);
@@ -55,7 +57,7 @@ public class ReaderTests
     public async Task 
GetLastMessageId_GivenNonPartitionedTopic_ShouldGetMessageId()
     {
         //Arrange
-        var topicName = CreateTopicName();
+        var topicName = _fixture.CreateTopic();
         const int numberOfMessages = 6;
 
         await using var client = CreateClient();
@@ -65,7 +67,7 @@ public class ReaderTests
         MessageId expected = null!;
         for (var i = 0; i < numberOfMessages; i++)
         {
-            var messageId = await producer.Send("test-message");
+            var messageId = await producer.Send("test-message", _cts.Token);
             if (i >= 5)
             {
                 expected = messageId;
@@ -73,7 +75,7 @@ public class ReaderTests
         }
 
         //Act
-        var actual = await reader.GetLastMessageId();
+        var actual = await reader.GetLastMessageId(_cts.Token);
 
         //Assert
         actual.Should().BeEquivalentTo(expected);
@@ -83,15 +85,14 @@ public class ReaderTests
     public async Task 
GetLastMessageId_GivenPartitionedTopic_ShouldThrowException()
     {
         //Arrange
-        var topicName = CreateTopicName();
         const int partitions = 3;
-        _fixture.CreatePartitionedTopic(topicName, partitions);
+        var topicName = _fixture.CreatePartitionedTopic(partitions);
 
         await using var client = CreateClient();
         await using var reader = CreateReader(client, MessageId.Earliest, 
topicName);
 
         //Act
-        var exception = await Record.ExceptionAsync(() => 
reader.GetLastMessageId().AsTask());
+        var exception = await Record.ExceptionAsync(() => 
reader.GetLastMessageId(_cts.Token).AsTask());
 
         //Assert
         exception.Should().BeOfType<NotSupportedException>();
@@ -102,11 +103,11 @@ public class ReaderTests
     {
         //Arrange
         await using var client = CreateClient();
-        await using var reader = CreateReader(client, MessageId.Earliest);
+        await using var reader = CreateReader(client, MessageId.Earliest, 
_fixture.CreateTopic());
         var expected = new List<MessageId>() { MessageId.Earliest };
 
         //Act
-        var actual = await reader.GetLastMessageIds();
+        var actual = await reader.GetLastMessageIds(_cts.Token);
 
         //Assert
         actual.Should().BeEquivalentTo(expected);
@@ -116,7 +117,7 @@ public class ReaderTests
     public async Task 
GetLastMessageIds_GivenNonPartitionedTopic_ShouldGetMessageIdFromPartition()
     {
         //Arrange
-        var topicName = CreateTopicName();
+        var topicName = _fixture.CreateTopic();
         const int numberOfMessages = 6;
 
         await using var client = CreateClient();
@@ -126,14 +127,14 @@ public class ReaderTests
         var expected = new List<MessageId>();
         for (var i = 0; i < numberOfMessages; i++)
         {
-            var messageId = await producer.Send("test-message");
+            var messageId = await producer.Send("test-message", _cts.Token);
             if (i >= 5)
             {
                 expected.Add(messageId);
             }
         }
         //Act
-        var actual = await reader.GetLastMessageIds();
+        var actual = await reader.GetLastMessageIds(_cts.Token);
 
         //Assert
         actual.Should().BeEquivalentTo(expected);
@@ -143,10 +144,9 @@ public class ReaderTests
     public async Task 
GetLastMessageIds_GivenPartitionedTopic_ShouldGetMessageIdsFromPartitions()
     {
         //Arrange
-        var topicName = CreateTopicName();
         const int numberOfMessages = 6;
         const int partitions = 3;
-        _fixture.CreatePartitionedTopic(topicName, partitions);
+        var topicName = _fixture.CreatePartitionedTopic(partitions);
 
         await using var client = CreateClient();
         await using var reader = CreateReader(client, MessageId.Earliest, 
topicName);
@@ -155,7 +155,7 @@ public class ReaderTests
         var expected = new List<MessageId>();
         for (var i = 0; i < numberOfMessages; i++)
         {
-            var messageId = await producer.Send("test-message");
+            var messageId = await producer.Send("test-message", _cts.Token);
             if (i >= 3)
             {
                 expected.Add(messageId);
@@ -163,7 +163,7 @@ public class ReaderTests
         }
 
         //Act
-        var actual = await reader.GetLastMessageIds();
+        var actual = await reader.GetLastMessageIds(_cts.Token);
 
         //Assert
         actual.Should().BeEquivalentTo(expected);
@@ -173,7 +173,7 @@ public class ReaderTests
     public async Task Receive_GivenNonPartitionedTopic_ShouldReceiveAll()
     {
         //Arrange
-        var topicName = CreateTopicName();
+        var topicName = _fixture.CreateTopic();
         const int numberOfMessages = 10;
 
         await using var client = CreateClient();
@@ -183,7 +183,7 @@ public class ReaderTests
         var expected = new List<MessageId>();
         for (var i = 0; i < numberOfMessages; i++)
         {
-            var messageId = await producer.Send("test-message");
+            var messageId = await producer.Send("test-message", _cts.Token);
             expected.Add(messageId);
         }
 
@@ -191,7 +191,7 @@ public class ReaderTests
         var actual = new List<MessageId>();
         for (var i = 0; i < numberOfMessages; i++)
         {
-            var messageId = await reader.Receive();
+            var messageId = await reader.Receive(_cts.Token);
             actual.Add(messageId.MessageId);
         }
 
@@ -203,10 +203,9 @@ public class ReaderTests
     public async Task Receive_GivenPartitionedTopic_ShouldReceiveAll()
     {
         //Arrange
-        var topicName = CreateTopicName();
         const int numberOfMessages = 50;
         const int partitions = 3;
-        _fixture.CreatePartitionedTopic(topicName, partitions);
+        var topicName = _fixture.CreatePartitionedTopic(partitions);
 
         await using var client = CreateClient();
         await using var producer = CreateProducer(client, topicName);
@@ -215,7 +214,7 @@ public class ReaderTests
         var expected = new List<MessageId>();
         for (var i = 0; i < numberOfMessages; i++)
         {
-            var messageId = await producer.Send("test-message");
+            var messageId = await producer.Send("test-message", _cts.Token);
             expected.Add(messageId);
         }
 
@@ -223,7 +222,7 @@ public class ReaderTests
         var actual = new List<MessageId>();
         for (var i = 0; i < numberOfMessages; i++)
         {
-            var messageId = await reader.Receive();
+            var messageId = await reader.Receive(_cts.Token);
             actual.Add(messageId.MessageId);
         }
 
@@ -244,9 +243,9 @@ public class ReaderTests
         })
         .ServiceUrl(new Uri("pulsar://nosuchhost")).Build();
 
-        await using var reader = CreateReader(client, MessageId.Earliest, 
CreateTopicName());
+        await using var reader = CreateReader(client, MessageId.Earliest, 
_fixture.CreateTopic());
 
-        var receiveTask = reader.Receive().AsTask();
+        var receiveTask = reader.Receive(_cts.Token).AsTask();
         semaphoreSlim.Release();
 
         //Act
@@ -267,43 +266,37 @@ public class ReaderTests
         })
         .ServiceUrl(new Uri("pulsar://nosuchhost")).Build();
 
-        await using var reader = CreateReader(client, MessageId.Earliest, 
CreateTopicName());
+        await using var reader = CreateReader(client, MessageId.Earliest, 
_fixture.CreateTopic());
 
-        await reader.OnStateChangeTo(ReaderState.Faulted);
+        await reader.OnStateChangeTo(ReaderState.Faulted, _cts.Token);
 
         //Act
-        var exception = await Record.ExceptionAsync(() => 
reader.Receive().AsTask());
+        var exception = await Record.ExceptionAsync(() => 
reader.Receive(_cts.Token).AsTask());
 
         //Assert
         exception.Should().BeOfType<ReaderFaultedException>();
     }
 
-    private void LogState(ReaderStateChanged stateChange)
-        => _testOutputHelper.WriteLine($"The reader for topic 
'{stateChange.Reader.Topic}' changed state to '{stateChange.ReaderState}'");
-
-    private void LogState(ProducerStateChanged stateChange)
-        => _testOutputHelper.WriteLine($"The producer for topic 
'{stateChange.Producer.Topic}' changed state to '{stateChange.ProducerState}'");
-
-    private static string CreateTopicName() => 
$"persistent://public/default/{Guid.NewGuid():N}";
-
-    private IProducer<string> CreateProducer(IPulsarClient pulsarClient, 
string? topicName = null)
+    private IProducer<string> CreateProducer(IPulsarClient pulsarClient, 
string topicName)
         => pulsarClient.NewProducer(Schema.String)
-        .Topic(topicName is null ? CreateTopicName() : topicName)
-        .StateChangedHandler(LogState)
+        .Topic(topicName)
+        .StateChangedHandler(_testOutputHelper.Log)
         .Create();
 
-    private IReader<string> CreateReader(IPulsarClient pulsarClient, MessageId 
messageId, string? topicName = null)
+    private IReader<string> CreateReader(IPulsarClient pulsarClient, MessageId 
messageId, string topicName)
         => pulsarClient.NewReader(Schema.String)
         .StartMessageId(messageId)
-        .Topic(topicName is null ? CreateTopicName() : topicName)
-        .StateChangedHandler(LogState)
+        .Topic(topicName)
+        .StateChangedHandler(_testOutputHelper.Log)
         .Create();
 
     private IPulsarClient CreateClient()
         => PulsarClient
-            .Builder()
-            .Authentication(AuthenticationFactory.Token(ct => 
ValueTask.FromResult(_fixture.CreateToken(Timeout.InfiniteTimeSpan))))
-            .ExceptionHandler(ec => _testOutputHelper.WriteLine($"Exception: 
{ec.Exception}"))
-            .ServiceUrl(_fixture.ServiceUrl)
-            .Build();
+        .Builder()
+        .Authentication(_fixture.Authentication)
+        .ExceptionHandler(_testOutputHelper.Log)
+        .ServiceUrl(_fixture.ServiceUrl)
+        .Build();
+
+    public void Dispose() => _cts.Dispose();
 }
diff --git a/tests/DotPulsar.Tests/PulsarClientTests.cs 
b/tests/DotPulsar.Tests/PulsarClientTests.cs
index c5956bc..1277455 100644
--- a/tests/DotPulsar.Tests/PulsarClientTests.cs
+++ b/tests/DotPulsar.Tests/PulsarClientTests.cs
@@ -25,15 +25,15 @@ using Xunit;
 using Xunit.Abstractions;
 
 [Collection("Integration"), Trait("Category", "Integration")]
-public class PulsarClientTests
+public class PulsarClientTests : IDisposable
 {
-    private const string MyTopic = "persistent://public/default/mytopic";
-
+    private readonly CancellationTokenSource _cts;
     private readonly IntegrationFixture _fixture;
     private readonly ITestOutputHelper _testOutputHelper;
 
     public PulsarClientTests(IntegrationFixture fixture, ITestOutputHelper 
outputHelper)
     {
+        _cts = new CancellationTokenSource(TimeSpan.FromMinutes(1));
         _fixture = fixture;
         _testOutputHelper = outputHelper;
     }
@@ -46,8 +46,8 @@ public class PulsarClientTests
         await using var producer = CreateProducer(client);
 
         // Act
-        var exception = await Record.ExceptionAsync(() => 
producer.Send("Test").AsTask());
-        var state = await producer.OnStateChangeTo(ProducerState.Faulted);
+        var exception = await Record.ExceptionAsync(() => 
producer.Send("Test", _cts.Token).AsTask());
+        var state = await producer.OnStateChangeTo(ProducerState.Faulted, 
_cts.Token);
 
         // Assert
         exception.Should().BeOfType<ProducerFaultedException>();
@@ -64,16 +64,16 @@ public class PulsarClientTests
             if (throwException)
                 throw new Exception();
             var token = _fixture.CreateToken(TimeSpan.FromSeconds(10));
-            _testOutputHelper.WriteLine($"Received token: {token}");
+            _testOutputHelper.Log($"Received token: {token}");
             return ValueTask.FromResult(token);
         });
 
         await using var producer = CreateProducer(client);
 
         // Act
-        _ = await producer.Send("Test"); // Make sure we have a working 
connection
+        _ = await producer.Send("Test", _cts.Token); // Make sure we have a 
working connection
         throwException = true;
-        var state = await producer.OnStateChangeTo(ProducerState.Faulted);
+        var state = await producer.OnStateChangeTo(ProducerState.Faulted, 
_cts.Token);
 
         // Assert
         state.Should().Be(ProducerState.Faulted);
@@ -92,8 +92,8 @@ public class PulsarClientTests
         await using var producer = CreateProducer(client);
 
         // Act
-        var exception = await Record.ExceptionAsync(() => 
producer.Send("Test").AsTask());
-        var state = await producer.OnStateChangeTo(ProducerState.Faulted);
+        var exception = await Record.ExceptionAsync(() => 
producer.Send("Test", _cts.Token).AsTask());
+        var state = await producer.OnStateChangeTo(ProducerState.Faulted, 
_cts.Token);
 
         // Assert
         exception.Should().BeOfType<ProducerFaultedException>();
@@ -115,16 +115,16 @@ public class PulsarClientTests
                 tcs.SetResult();
 
             var token = _fixture.CreateToken(TimeSpan.FromSeconds(10));
-            _testOutputHelper.WriteLine($"Received token: {token}");
+            _testOutputHelper.Log($"Received token: {token}");
             return ValueTask.FromResult(token);
         });
 
         await using var producer = CreateProducer(client);
 
         // Act
-        _ = await producer.Send("Test"); // Make sure we have a working 
connection
+        _ = await producer.Send("Test", _cts.Token); // Make sure we have a 
working connection
         await tcs.Task;
-        var state = await producer.OnStateChangeTo(ProducerState.Connected);
+        var state = await producer.OnStateChangeTo(ProducerState.Connected, 
_cts.Token);
 
         // Assert
         state.Should().Be(ProducerState.Connected);
@@ -132,19 +132,18 @@ public class PulsarClientTests
 
     private IPulsarClient CreateClient(Func<CancellationToken, 
ValueTask<string>> tokenSupplier)
         => PulsarClient
-            .Builder()
-            .Authentication(AuthenticationFactory.Token(tokenSupplier))
-            .ExceptionHandler(ec => _testOutputHelper.WriteLine($"Exception: 
{ec.Exception}"))
-            .ServiceUrl(_fixture.ServiceUrl)
-            .Build();
+        .Builder()
+        .Authentication(AuthenticationFactory.Token(tokenSupplier))
+        .ExceptionHandler(_testOutputHelper.Log)
+        .ServiceUrl(_fixture.ServiceUrl)
+        .Build();
 
     private IProducer<string> CreateProducer(IPulsarClient client)
         => client
-            .NewProducer(Schema.String)
-            .Topic(MyTopic)
-            .StateChangedHandler(LogState)
-            .Create();
+        .NewProducer(Schema.String)
+        .Topic(_fixture.CreateTopic())
+        .StateChangedHandler(_testOutputHelper.Log)
+        .Create();
 
-    private void LogState(ProducerStateChanged stateChange)
-        => _testOutputHelper.WriteLine($"The producer for topic 
'{stateChange.Producer.Topic}' changed state to '{stateChange.ProducerState}'");
+    public void Dispose() => _cts.Dispose();
 }
diff --git a/tests/DotPulsar.Tests/TestOutputHelperExtensions.cs 
b/tests/DotPulsar.Tests/TestOutputHelperExtensions.cs
new file mode 100644
index 0000000..ece54a0
--- /dev/null
+++ b/tests/DotPulsar.Tests/TestOutputHelperExtensions.cs
@@ -0,0 +1,36 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Tests;
+
+using System;
+using Xunit.Abstractions;
+
+public static class TestOutputHelperExtensions
+{
+    public static void Log(this ITestOutputHelper helper, string message)
+        => helper.WriteLine($"{DateTime.UtcNow:HH:mm:ss}: {message}");
+
+    public static void Log(this ITestOutputHelper helper, ExceptionContext 
context)
+        => helper.Log($"PulsarClient got an exception: {context.Exception}");
+
+    public static void Log(this ITestOutputHelper helper, ConsumerStateChanged 
stateChange)
+        => helper.Log($"The consumer for topic '{stateChange.Consumer.Topic}' 
changed state to '{stateChange.ConsumerState}'");
+
+    public static void Log(this ITestOutputHelper helper, ProducerStateChanged 
stateChange)
+        => helper.Log($"The producer for topic '{stateChange.Producer.Topic}' 
changed state to '{stateChange.ProducerState}'");
+
+    public static void Log(this ITestOutputHelper helper, ReaderStateChanged 
stateChange)
+        => helper.Log($"The reader for topic '{stateChange.Reader.Topic}' 
changed state to '{stateChange.ReaderState}'");
+}

Reply via email to