This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c7497c3 Optimizing tests by reusing the token and also ensuring all
integration tests are run with a cancellation token (1 minute timeout). Found
and fixed some more race conditions and reran the tests 30 times without any
failures.
c7497c3 is described below
commit c7497c33dfde0732ef11c4de83322419dc55ad47
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Fri Oct 27 11:52:02 2023 +0200
Optimizing tests by reusing the token and also ensuring all integration
tests are run with a cancellation token (1 minute timeout).
Found and fixed some more race conditions and reran the tests 30 times
without any failures.
---
src/DotPulsar/Internal/AsyncQueue.cs | 17 ++--
src/DotPulsar/Internal/Connection.cs | 5 +-
src/DotPulsar/Internal/ConnectionPool.cs | 17 ++--
src/DotPulsar/Internal/PingPongHandler.cs | 45 ++++++-----
src/DotPulsar/Internal/PulsarStream.cs | 29 ++++---
src/DotPulsar/Internal/RequestResponseHandler.cs | 6 +-
src/DotPulsar/Internal/StateTaskCollection.cs | 47 +++++------
tests/DotPulsar.Tests/IntegrationFixture.cs | 8 +-
tests/DotPulsar.Tests/Internal/ConsumerTests.cs | 92 ++++++++++------------
tests/DotPulsar.Tests/Internal/ProducerTests.cs | 74 ++++++++---------
tests/DotPulsar.Tests/Internal/ReaderTests.cs | 91 ++++++++++-----------
tests/DotPulsar.Tests/PulsarClientTests.cs | 47 ++++++-----
.../DotPulsar.Tests/TestOutputHelperExtensions.cs | 36 +++++++++
13 files changed, 256 insertions(+), 258 deletions(-)
diff --git a/src/DotPulsar/Internal/AsyncQueue.cs
b/src/DotPulsar/Internal/AsyncQueue.cs
index 28d1cf1..043b753 100644
--- a/src/DotPulsar/Internal/AsyncQueue.cs
+++ b/src/DotPulsar/Internal/AsyncQueue.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -73,17 +73,14 @@ public sealed class AsyncQueue<T> : IEnqueue<T>,
IDequeue<T>, IDisposable
public void Dispose()
{
- lock (_lock)
- {
- if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
- return;
+ if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
+ return;
- foreach (var pendingDequeue in _pendingDequeues)
- pendingDequeue.Dispose();
+ foreach (var pendingDequeue in _pendingDequeues)
+ pendingDequeue.Dispose();
- _pendingDequeues.Clear();
- _queue.Clear();
- }
+ _pendingDequeues.Clear();
+ _queue.Clear();
}
private void Cancel(LinkedListNode<CancelableCompletionSource<T>> node)
diff --git a/src/DotPulsar/Internal/Connection.cs
b/src/DotPulsar/Internal/Connection.cs
index 5ffdb4e..9280f8a 100644
--- a/src/DotPulsar/Internal/Connection.cs
+++ b/src/DotPulsar/Internal/Connection.cs
@@ -53,7 +53,6 @@ public sealed class Connection : IConnection
public static Connection Connect(
IPulsarStream stream,
IAuthentication? authentication,
- CommandConnect commandConnect,
TimeSpan keepAliveInterval,
TimeSpan closeOnInactiveInterval)
{
@@ -331,9 +330,11 @@ public sealed class Connection : IConnection
private async Task ProcessIncomingFrames(CancellationToken
cancellationToken)
{
+ await Task.Yield();
+
try
{
- await foreach (var frame in _stream.Frames(cancellationToken))
+ await foreach (var frame in
_stream.Frames(cancellationToken).ConfigureAwait(false))
{
var commandSize = frame.ReadUInt32(0, true);
var command =
Serializer.Deserialize<BaseCommand>(frame.Slice(4, commandSize));
diff --git a/src/DotPulsar/Internal/ConnectionPool.cs
b/src/DotPulsar/Internal/ConnectionPool.cs
index 48dac09..1cd7e7c 100644
--- a/src/DotPulsar/Internal/ConnectionPool.cs
+++ b/src/DotPulsar/Internal/ConnectionPool.cs
@@ -27,7 +27,6 @@ using System.Threading.Tasks;
public sealed class ConnectionPool : IConnectionPool
{
- private readonly AsyncLock _lock;
private readonly CommandConnect _commandConnect;
private readonly Uri _serviceUrl;
private readonly Connector _connector;
@@ -49,7 +48,6 @@ public sealed class ConnectionPool : IConnectionPool
TimeSpan keepAliveInterval,
IAuthentication? authentication)
{
- _lock = new AsyncLock();
_commandConnect = commandConnect;
_serviceUrl = serviceUrl;
_connector = connector;
@@ -66,8 +64,6 @@ public sealed class ConnectionPool : IConnectionPool
{
_cancellationTokenSource.Cancel();
- await _lock.DisposeAsync().ConfigureAwait(false);
-
foreach (var serviceUrl in _connections.Keys.ToArray())
{
await DisposeConnection(serviceUrl).ConfigureAwait(false);
@@ -145,13 +141,10 @@ public sealed class ConnectionPool : IConnectionPool
private async ValueTask<Connection> GetConnection(PulsarUrl url,
CancellationToken cancellationToken)
{
- using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
- {
- if (_connections.TryGetValue(url, out var connection) &&
connection is not null)
- return connection;
+ if (_connections.TryGetValue(url, out var connection) && connection is
not null)
+ return connection;
- return await EstablishNewConnection(url,
cancellationToken).ConfigureAwait(false);
- }
+ return await EstablishNewConnection(url,
cancellationToken).ConfigureAwait(false);
}
private async Task<Connection> EstablishNewConnection(PulsarUrl url,
CancellationToken cancellationToken)
@@ -162,11 +155,11 @@ public sealed class ConnectionPool : IConnectionPool
if (url.ProxyThroughServiceUrl)
commandConnect = WithProxyToBroker(commandConnect, url.Logical);
- var connection = Connection.Connect(new PulsarStream(stream),
_authentication, commandConnect, _keepAliveInterval,
_closeInactiveConnectionsInterval);
- _connections[url] = connection;
+ var connection = Connection.Connect(new PulsarStream(stream),
_authentication, _keepAliveInterval, _closeInactiveConnectionsInterval);
_ =
connection.OnStateChangeFrom(ConnectionState.Connected).AsTask().ContinueWith(t
=> DisposeConnection(url));
var response = await connection.Send(commandConnect,
cancellationToken).ConfigureAwait(false);
response.Expect(BaseCommand.Type.Connected);
+ _connections[url] = connection;
connection.MaxMessageSize = response.Connected.MaxMessageSize;
return connection;
}
diff --git a/src/DotPulsar/Internal/PingPongHandler.cs
b/src/DotPulsar/Internal/PingPongHandler.cs
index c89a3fb..8feb502 100644
--- a/src/DotPulsar/Internal/PingPongHandler.cs
+++ b/src/DotPulsar/Internal/PingPongHandler.cs
@@ -23,19 +23,19 @@ using System.Threading.Tasks;
public sealed class PingPongHandler : IState<PingPongHandlerState>,
IAsyncDisposable
{
+ private readonly CancellationTokenSource _cts;
private readonly StateManager<PingPongHandlerState> _stateManager;
private readonly TimeSpan _keepAliveInterval;
- private readonly Timer _timer;
private long _lastCommand;
private bool _waitForPong;
public PingPongHandler(TimeSpan keepAliveInterval)
{
+ _cts = new CancellationTokenSource();
_stateManager = new
StateManager<PingPongHandlerState>(PingPongHandlerState.Active,
PingPongHandlerState.TimedOut, PingPongHandlerState.Closed);
_keepAliveInterval = keepAliveInterval;
- _timer = new Timer(Watch);
- _timer.Change(_keepAliveInterval, TimeSpan.Zero);
_lastCommand = Stopwatch.GetTimestamp();
+ _ = Task.Factory.StartNew(() => Watch());
}
public void Incoming(BaseCommand.Type _)
@@ -44,13 +44,24 @@ public sealed class PingPongHandler :
IState<PingPongHandlerState>, IAsyncDispos
_waitForPong = false;
}
- private void Watch(object? state)
+ private TimeSpan GetElapsedTimeSinceLastCommand()
{
- try
+ var lastCommand = Interlocked.Read(ref _lastCommand);
+ var now = Stopwatch.GetTimestamp();
+ var elapsedTicks = now - lastCommand;
+ if (elapsedTicks > 0)
+ return TimeSpan.FromSeconds(elapsedTicks / Stopwatch.Frequency);
+ return TimeSpan.Zero;
+ }
+
+ private async void Watch()
+ {
+ var waitFor = _keepAliveInterval;
+
+ while (!_cts.IsCancellationRequested)
{
- var lastCommand = Interlocked.Read(ref _lastCommand);
- var now = Stopwatch.GetTimestamp();
- var elapsed = TimeSpan.FromSeconds((now - lastCommand) /
Stopwatch.Frequency);
+ await Task.Delay(waitFor).ConfigureAwait(false);
+ var elapsed = GetElapsedTimeSinceLastCommand();
if (elapsed > _keepAliveInterval)
{
if (_waitForPong)
@@ -61,34 +72,22 @@ public sealed class PingPongHandler :
IState<PingPongHandlerState>, IAsyncDispos
_waitForPong = true;
_stateManager.SetState(PingPongHandlerState.ThresholdExceeded);
- _timer.Change(_keepAliveInterval, TimeSpan.Zero);
+ waitFor = _keepAliveInterval;
}
else
{
_stateManager.SetState(PingPongHandlerState.Active);
- _timer.Change(_keepAliveInterval.Subtract(elapsed),
TimeSpan.Zero);
+ waitFor = _keepAliveInterval.Subtract(elapsed);
}
}
- catch
- {
- // Ignore
- }
}
-#if NETSTANDARD2_0
public ValueTask DisposeAsync()
{
- _timer.Dispose();
+ _cts.Cancel();
_stateManager.SetState(PingPongHandlerState.Closed);
return new ValueTask();
}
-#else
- public async ValueTask DisposeAsync()
- {
- await _timer.DisposeAsync().ConfigureAwait(false);
- _stateManager.SetState(PingPongHandlerState.Closed);
- }
-#endif
public bool IsFinalState() => _stateManager.IsFinalState();
diff --git a/src/DotPulsar/Internal/PulsarStream.cs
b/src/DotPulsar/Internal/PulsarStream.cs
index 13488bc..a4f719b 100644
--- a/src/DotPulsar/Internal/PulsarStream.cs
+++ b/src/DotPulsar/Internal/PulsarStream.cs
@@ -36,8 +36,7 @@ public sealed class PulsarStream : IPulsarStream
private readonly Stream _stream;
private readonly ChunkingPipeline _pipeline;
- private readonly PipeReader _reader;
- private readonly PipeWriter _writer;
+ private readonly Pipe _pipe;
private int _isDisposed;
public PulsarStream(Stream stream)
@@ -45,9 +44,7 @@ public sealed class PulsarStream : IPulsarStream
_stream = stream;
_pipeline = new ChunkingPipeline(stream, ChunkSize);
var options = new PipeOptions(pauseWriterThreshold:
PauseAtMoreThan10Mb, resumeWriterThreshold: ResumeAt5MbOrLess);
- var pipe = new Pipe(options);
- _reader = pipe.Reader;
- _writer = pipe.Writer;
+ _pipe = new Pipe(options);
}
public async Task Send(ReadOnlySequence<byte> sequence)
@@ -74,7 +71,7 @@ public sealed class PulsarStream : IPulsarStream
private async Task FillPipe(CancellationToken cancellationToken)
{
- await Task.Yield();
+ var writer = _pipe.Writer;
try
{
@@ -83,7 +80,7 @@ public sealed class PulsarStream : IPulsarStream
#endif
while (true)
{
- var memory = _writer.GetMemory(84999);
+ var memory = writer.GetMemory(84999);
#if NETSTANDARD2_0
var bytesRead = await _stream.ReadAsync(buffer, 0,
buffer.Length, cancellationToken).ConfigureAwait(false);
new Memory<byte>(buffer, 0, bytesRead).CopyTo(memory);
@@ -93,9 +90,9 @@ public sealed class PulsarStream : IPulsarStream
if (bytesRead == 0)
break;
- _writer.Advance(bytesRead);
+ writer.Advance(bytesRead);
- var result = await
_writer.FlushAsync(cancellationToken).ConfigureAwait(false);
+ var result = await
writer.FlushAsync(cancellationToken).ConfigureAwait(false);
if (result.IsCompleted)
break;
@@ -107,7 +104,7 @@ public sealed class PulsarStream : IPulsarStream
}
finally
{
- await _writer.CompleteAsync().ConfigureAwait(false);
+ await writer.CompleteAsync().ConfigureAwait(false);
}
}
@@ -115,7 +112,9 @@ public sealed class PulsarStream : IPulsarStream
{
ThrowIfDisposed();
- _ = FillPipe(cancellationToken);
+ _ = Task.Factory.StartNew(async () => await
FillPipe(cancellationToken).ConfigureAwait(false));
+
+ var reader = _pipe.Reader;
try
{
@@ -125,7 +124,7 @@ public sealed class PulsarStream : IPulsarStream
while (true)
{
var minimumSize = FrameSizePrefix + frameSize;
- var readResult = await _reader.ReadAtLeastAsync(minimumSize,
cancellationToken).ConfigureAwait(false);
+ var readResult = await reader.ReadAtLeastAsync(minimumSize,
cancellationToken).ConfigureAwait(false);
var buffer = readResult.Buffer;
while (true)
@@ -148,15 +147,15 @@ public sealed class PulsarStream : IPulsarStream
frameSize = UnknownFrameSize;
}
- if (readResult.IsCompleted)
+ if (readResult.IsCompleted || readResult.IsCanceled)
break;
- _reader.AdvanceTo(buffer.Start);
+ reader.AdvanceTo(buffer.Start);
}
}
finally
{
- await _reader.CompleteAsync().ConfigureAwait(false);
+ await reader.CompleteAsync().ConfigureAwait(false);
}
}
diff --git a/src/DotPulsar/Internal/RequestResponseHandler.cs
b/src/DotPulsar/Internal/RequestResponseHandler.cs
index 3ce8795..5433897 100644
--- a/src/DotPulsar/Internal/RequestResponseHandler.cs
+++ b/src/DotPulsar/Internal/RequestResponseHandler.cs
@@ -22,17 +22,19 @@ using System.Threading.Tasks;
public sealed class RequestResponseHandler : IDisposable
{
+ private readonly ConnectRequest _connectRequest;
private readonly RequestId _requestId;
private readonly Awaiter<IRequest, BaseCommand> _requests;
private readonly EnumLookup<BaseCommand.Type, Func<BaseCommand, IRequest>>
_getResponseIdentifier;
public RequestResponseHandler()
{
+ _connectRequest = new ConnectRequest();
_requestId = new RequestId();
_requests = new Awaiter<IRequest, BaseCommand>();
_getResponseIdentifier = new EnumLookup<BaseCommand.Type,
Func<BaseCommand, IRequest>>(cmd => throw new Exception($"CommandType
'{cmd.CommandType}' not supported as request/response type"));
- _getResponseIdentifier.Set(BaseCommand.Type.Connected, cmd => new
ConnectRequest());
+ _getResponseIdentifier.Set(BaseCommand.Type.Connected, cmd =>
_connectRequest);
_getResponseIdentifier.Set(BaseCommand.Type.SendError, cmd => new
SendRequest(cmd.SendError.ProducerId, cmd.SendError.SequenceId));
_getResponseIdentifier.Set(BaseCommand.Type.SendReceipt, cmd => new
SendRequest(cmd.SendReceipt.ProducerId, cmd.SendReceipt.SequenceId));
_getResponseIdentifier.Set(BaseCommand.Type.ProducerSuccess, cmd =>
StandardRequest.WithRequestId(cmd.ProducerSuccess.RequestId));
@@ -43,7 +45,7 @@ public sealed class RequestResponseHandler : IDisposable
_getResponseIdentifier.Set(BaseCommand.Type.GetLastMessageIdResponse,
cmd => StandardRequest.WithRequestId(cmd.GetLastMessageIdResponse.RequestId));
_getResponseIdentifier.Set(BaseCommand.Type.GetOrCreateSchemaResponse,
cmd => StandardRequest.WithRequestId(cmd.GetOrCreateSchemaResponse.RequestId));
_getResponseIdentifier.Set(BaseCommand.Type.Success, cmd =>
StandardRequest.WithRequestId(cmd.Success.RequestId));
- _getResponseIdentifier.Set(BaseCommand.Type.Error, cmd =>
!_requestId.IsPastInitialId() ? new ConnectRequest() :
StandardRequest.WithRequestId(cmd.Error.RequestId));
+ _getResponseIdentifier.Set(BaseCommand.Type.Error, cmd =>
_requestId.IsPastInitialId() ?
StandardRequest.WithRequestId(cmd.Error.RequestId) : _connectRequest);
}
public void Dispose()
diff --git a/src/DotPulsar/Internal/StateTaskCollection.cs
b/src/DotPulsar/Internal/StateTaskCollection.cs
index ad17ae4..8ada892 100644
--- a/src/DotPulsar/Internal/StateTaskCollection.cs
+++ b/src/DotPulsar/Internal/StateTaskCollection.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -38,60 +38,55 @@ public sealed class StateTaskCollection<TState> where
TState : notnull
node = _awaiters.AddFirst(new StateTask<TState>(state, changed));
}
- node.Value.CancelableCompletionSource.SetupCancellation(() =>
TaskWasCanceled(node), cancellationToken);
+ node.Value.CancelableCompletionSource.SetupCancellation(() =>
RemoveAndDispose(node), cancellationToken);
return node.Value.CancelableCompletionSource.Task;
}
- public void CompleteTasksAwaiting(TState state)
+ public void CompleteTasksAwaiting(TState state) => SetState(state, true);
+
+ public void CompleteAllTasks(TState state) => SetState(state, false);
+
+ private LinkedListNode<StateTask<TState>>? GetFirst()
{
lock (_lock)
{
- var awaiter = _awaiters.First;
-
- while (awaiter is not null)
- {
- var next = awaiter.Next;
-
- if (awaiter.Value.IsAwaiting(state))
- {
- _awaiters.Remove(awaiter);
- awaiter.Value.CancelableCompletionSource.SetResult(state);
- awaiter.Value.CancelableCompletionSource.Dispose();
- }
-
- awaiter = next;
- }
+ return _awaiters.First;
}
}
- public void CompleteAllTasks(TState state)
+ private void SetState(TState state, bool onlyAwaiting)
{
- lock (_lock)
+ var awaiter = GetFirst();
+
+ while (awaiter is not null)
{
- foreach (var awaiter in _awaiters)
+ var next = awaiter.Next;
+
+ if (!onlyAwaiting || awaiter.Value.IsAwaiting(state))
{
- awaiter.CancelableCompletionSource.SetResult(state);
- awaiter.CancelableCompletionSource.Dispose();
+ awaiter.Value.CancelableCompletionSource.SetResult(state);
+ RemoveAndDispose(awaiter);
}
- _awaiters.Clear();
+ awaiter = next;
}
}
- private void TaskWasCanceled(LinkedListNode<StateTask<TState>> node)
+ private void RemoveAndDispose(LinkedListNode<StateTask<TState>> node)
{
lock (_lock)
{
try
{
_awaiters.Remove(node);
- node.Value.Dispose();
}
catch
{
// Ignore
}
}
+
+ node.Value.Dispose();
}
}
diff --git a/tests/DotPulsar.Tests/IntegrationFixture.cs
b/tests/DotPulsar.Tests/IntegrationFixture.cs
index b77a399..1e3ad94 100644
--- a/tests/DotPulsar.Tests/IntegrationFixture.cs
+++ b/tests/DotPulsar.Tests/IntegrationFixture.cs
@@ -14,6 +14,7 @@
namespace DotPulsar.Tests;
+using DotPulsar.Abstractions;
using Ductus.FluentDocker.Builders;
using Ductus.FluentDocker.Services;
using Ductus.FluentDocker.Services.Extensions;
@@ -34,6 +35,8 @@ public class IntegrationFixture : IAsyncLifetime
private readonly IMessageSink _messageSink;
private readonly IContainerService _cluster;
+ private string? _token;
+
public IntegrationFixture(IMessageSink messageSink)
{
_messageSink = messageSink;
@@ -62,7 +65,7 @@ public class IntegrationFixture : IAsyncLifetime
_cluster = new Builder()
.UseContainer()
- .UseImage("apachepulsar/pulsar:3.1.0")
+ .UseImage("apachepulsar/pulsar:3.1.1")
.WithEnvironment(environmentVariables)
.ExposePort(Port)
.Command("/bin/bash -c", arguments)
@@ -73,6 +76,8 @@ public class IntegrationFixture : IAsyncLifetime
public Uri ServiceUrl { get; private set; }
+ public IAuthentication Authentication => AuthenticationFactory.Token(ct =>
ValueTask.FromResult(_token!));
+
public Task DisposeAsync()
{
_cluster.Dispose();
@@ -90,6 +95,7 @@ public class IntegrationFixture : IAsyncLifetime
var endpoint = _cluster.ToHostExposedEndpoint($"{Port}/tcp");
_messageSink.OnMessage(new DiagnosticMessage($"Endpoint opened at
{endpoint}"));
ServiceUrl = new Uri($"pulsar://localhost:{endpoint.Port}");
+ _token = CreateToken(Timeout.InfiniteTimeSpan);
return Task.CompletedTask;
}
diff --git a/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
b/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
index 3cd6d8b..5984019 100644
--- a/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
+++ b/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
@@ -26,13 +26,15 @@ using Xunit;
using Xunit.Abstractions;
[Collection("Integration"), Trait("Category", "Integration")]
-public class ConsumerTests
+public class ConsumerTests : IDisposable
{
+ private readonly CancellationTokenSource _cts;
private readonly IntegrationFixture _fixture;
private readonly ITestOutputHelper _testOutputHelper;
public ConsumerTests(IntegrationFixture fixture, ITestOutputHelper
testOutputHelper)
{
+ _cts = new CancellationTokenSource(TimeSpan.FromMinutes(1));
_fixture = fixture;
_testOutputHelper = testOutputHelper;
}
@@ -42,10 +44,10 @@ public class ConsumerTests
{
//Arrange
await using var client = CreateClient();
- await using var consumer = CreateConsumer(client);
+ await using var consumer = CreateConsumer(client,
_fixture.CreateTopic());
//Act
- var actual = await consumer.GetLastMessageId();
+ var actual = await consumer.GetLastMessageId(_cts.Token);
//Assert
actual.Should().BeEquivalentTo(MessageId.Earliest);
@@ -55,7 +57,7 @@ public class ConsumerTests
public async Task
GetLastMessageId_GivenNonPartitionedTopic_ShouldGetMessageIdFromPartition()
{
//Arrange
- var topicName = CreateTopicName();
+ var topicName = _fixture.CreateTopic();
const int numberOfMessages = 6;
await using var client = CreateClient();
@@ -65,7 +67,7 @@ public class ConsumerTests
MessageId expected = null!;
for (var i = 0; i < numberOfMessages; i++)
{
- var messageId = await producer.Send("test-message");
+ var messageId = await producer.Send("test-message", _cts.Token);
if (i >= 5)
{
expected = messageId;
@@ -73,7 +75,7 @@ public class ConsumerTests
}
//Act
- var actual = await consumer.GetLastMessageId();
+ var actual = await consumer.GetLastMessageId(_cts.Token);
//Assert
actual.Should().BeEquivalentTo(expected);
@@ -83,15 +85,14 @@ public class ConsumerTests
public async Task
GetLastMessageId_GivenPartitionedTopic_ShouldThrowException()
{
//Arrange
- var topicName = CreateTopicName();
const int partitions = 3;
- _fixture.CreatePartitionedTopic(topicName, partitions);
+ var topicName = _fixture.CreatePartitionedTopic(partitions);
await using var client = CreateClient();
await using var consumer = CreateConsumer(client, topicName);
//Act
- var exception = await Record.ExceptionAsync(() =>
consumer.GetLastMessageId().AsTask());
+ var exception = await Record.ExceptionAsync(() =>
consumer.GetLastMessageId(_cts.Token).AsTask());
//Assert
exception.Should().BeOfType<NotSupportedException>();
@@ -101,7 +102,7 @@ public class ConsumerTests
public async Task
GetLastMessageIds_GivenNonPartitionedTopic_ShouldGetMessageIdFromPartition()
{
//Arrange
- var topicName = CreateTopicName();
+ var topicName = _fixture.CreateTopic();
const int numberOfMessages = 6;
await using var client = CreateClient();
@@ -111,7 +112,7 @@ public class ConsumerTests
var expected = new List<MessageId>();
for (var i = 0; i < numberOfMessages; i++)
{
- var messageId = await producer.Send("test-message");
+ var messageId = await producer.Send("test-message", _cts.Token);
if (i >= 5)
{
expected.Add(messageId);
@@ -119,7 +120,7 @@ public class ConsumerTests
}
//Act
- var actual = await consumer.GetLastMessageIds();
+ var actual = await consumer.GetLastMessageIds(_cts.Token);
//Assert
actual.Should().BeEquivalentTo(expected);
@@ -129,10 +130,9 @@ public class ConsumerTests
public async Task
GetLastMessageIds_GivenPartitionedTopic_ShouldGetMessageIdFromAllPartitions()
{
//Arrange
- var topicName = CreateTopicName();
const int numberOfMessages = 6;
const int partitions = 3;
- _fixture.CreatePartitionedTopic(topicName, partitions);
+ var topicName = _fixture.CreatePartitionedTopic(partitions);
await using var client = CreateClient();
await using var consumer = CreateConsumer(client, topicName);
@@ -141,7 +141,7 @@ public class ConsumerTests
var expected = new List<MessageId>();
for (var i = 0; i < numberOfMessages; i++)
{
- var messageId = await producer.Send("test-message");
+ var messageId = await producer.Send("test-message", _cts.Token);
if (i >= 3)
{
expected.Add(messageId);
@@ -149,7 +149,7 @@ public class ConsumerTests
}
//Act
- var actual = await consumer.GetLastMessageIds();
+ var actual = await consumer.GetLastMessageIds(_cts.Token);
//Assert
actual.Should().BeEquivalentTo(expected);
@@ -160,11 +160,11 @@ public class ConsumerTests
{
//Arrange
await using var client = CreateClient();
- await using var consumer = CreateConsumer(client);
+ await using var consumer = CreateConsumer(client,
_fixture.CreateTopic());
var expected = new List<MessageId>() { MessageId.Earliest };
//Act
- var actual = await consumer.GetLastMessageIds();
+ var actual = await consumer.GetLastMessageIds(_cts.Token);
//Assert
actual.Should().BeEquivalentTo(expected);
@@ -174,18 +174,16 @@ public class ConsumerTests
public async Task Receive_GivenNonPartitionedTopic_ShouldReceiveAll()
{
//Arrange
- var topicName = CreateTopicName();
- const int numberOfMessages = 10000;
+ var topicName = _fixture.CreateTopic();
+ const int numberOfMessages = 1000;
await using var client = CreateClient();
await using var consumer = CreateConsumer(client, topicName);
await using var producer = CreateProducer(client, topicName);
- var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60));
-
//Act
- var produced = await ProduceMessages(producer, numberOfMessages,
"test-message", cts.Token);
- var consumed = await ConsumeMessages(consumer, numberOfMessages,
cts.Token);
+ var produced = await ProduceMessages(producer, numberOfMessages,
"test-message", _cts.Token);
+ var consumed = await ConsumeMessages(consumer, numberOfMessages,
_cts.Token);
//Assert
consumed.Should().BeEquivalentTo(produced);
@@ -195,21 +193,18 @@ public class ConsumerTests
public async Task Receive_GivenPartitionedTopic_ShouldReceiveAll()
{
//Arrange
- var topicName = CreateTopicName();
const int numberOfMessages = 1000;
const int partitions = 3;
- _fixture.CreatePartitionedTopic(topicName, partitions);
+ var topicName = _fixture.CreatePartitionedTopic(partitions);
await using var client = CreateClient();
await using var consumer = CreateConsumer(client, topicName);
await using var producer = CreateProducer(client, topicName);
- var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60));
-
//Act
- var produced = await ProduceMessages(producer, numberOfMessages,
"test-message", cts.Token);
- var consumed = await ConsumeMessages(consumer, numberOfMessages,
cts.Token);
+ var produced = await ProduceMessages(producer, numberOfMessages,
"test-message", _cts.Token);
+ var consumed = await ConsumeMessages(consumer, numberOfMessages,
_cts.Token);
//Assert
consumed.Should().BeEquivalentTo(produced);
@@ -228,9 +223,9 @@ public class ConsumerTests
})
.ServiceUrl(new Uri("pulsar://nosuchhost")).Build();
- await using var consumer = CreateConsumer(client);
+ await using var consumer = CreateConsumer(client,
_fixture.CreateTopic());
- var receiveTask = consumer.Receive().AsTask();
+ var receiveTask = consumer.Receive(_cts.Token).AsTask();
semaphoreSlim.Release();
//Act
@@ -251,12 +246,12 @@ public class ConsumerTests
})
.ServiceUrl(new Uri("pulsar://nosuchhost")).Build();
- await using var consumer = CreateConsumer(client);
+ await using var consumer = CreateConsumer(client,
_fixture.CreateTopic());
- await consumer.OnStateChangeTo(ConsumerState.Faulted);
+ await consumer.OnStateChangeTo(ConsumerState.Faulted, _cts.Token);
//Act
- var exception = await Record.ExceptionAsync(() =>
consumer.Receive().AsTask());
+ var exception = await Record.ExceptionAsync(() =>
consumer.Receive(_cts.Token).AsTask());
//Assert
exception.Should().BeOfType<ConsumerFaultedException>();
@@ -293,36 +288,29 @@ public class ConsumerTests
return messageIds;
}
- private void LogState(ConsumerStateChanged stateChange)
- => _testOutputHelper.WriteLine($"The consumer for topic
'{stateChange.Consumer.Topic}' changed state to '{stateChange.ConsumerState}'");
-
- private void LogState(ProducerStateChanged stateChange)
- => _testOutputHelper.WriteLine($"The producer for topic
'{stateChange.Producer.Topic}' changed state to '{stateChange.ProducerState}'");
-
- private static string CreateTopicName() =>
$"persistent://public/default/{Guid.NewGuid():N}";
- private static string CreateConsumerName() =>
$"consumer-{Guid.NewGuid():N}";
private static string CreateSubscriptionName() =>
$"subscription-{Guid.NewGuid():N}";
- private IProducer<string> CreateProducer(IPulsarClient pulsarClient,
string? topicName = null)
+ private IProducer<string> CreateProducer(IPulsarClient pulsarClient,
string topicName)
=> pulsarClient.NewProducer(Schema.String)
- .Topic(topicName is null ? CreateTopicName() : topicName)
- .StateChangedHandler(LogState)
+ .Topic(topicName)
+ .StateChangedHandler(_testOutputHelper.Log)
.Create();
- private IConsumer<string> CreateConsumer(IPulsarClient pulsarClient,
string? topicName = null)
+ private IConsumer<string> CreateConsumer(IPulsarClient pulsarClient,
string topicName)
=> pulsarClient.NewConsumer(Schema.String)
- .ConsumerName(CreateConsumerName())
.InitialPosition(SubscriptionInitialPosition.Earliest)
.SubscriptionName(CreateSubscriptionName())
- .Topic(topicName is null ? CreateTopicName() : topicName)
- .StateChangedHandler(LogState)
+ .Topic(topicName)
+ .StateChangedHandler(_testOutputHelper.Log)
.Create();
private IPulsarClient CreateClient()
=> PulsarClient
.Builder()
- .Authentication(AuthenticationFactory.Token(ct =>
ValueTask.FromResult(_fixture.CreateToken(Timeout.InfiniteTimeSpan))))
- .ExceptionHandler(ec => _testOutputHelper.WriteLine($"Exception:
{ec.Exception}"))
+ .Authentication(_fixture.Authentication)
+ .ExceptionHandler(_testOutputHelper.Log)
.ServiceUrl(_fixture.ServiceUrl)
.Build();
+
+ public void Dispose() => _cts.Dispose();
}
diff --git a/tests/DotPulsar.Tests/Internal/ProducerTests.cs
b/tests/DotPulsar.Tests/Internal/ProducerTests.cs
index 1aa7010..686eb5d 100644
--- a/tests/DotPulsar.Tests/Internal/ProducerTests.cs
+++ b/tests/DotPulsar.Tests/Internal/ProducerTests.cs
@@ -25,14 +25,15 @@ using Xunit;
using Xunit.Abstractions;
[Collection("Integration"), Trait("Category", "Integration")]
-public class ProducerTests
+public class ProducerTests : IDisposable
{
- private static readonly TimeSpan TestTimeout = TimeSpan.FromSeconds(30);
+ private readonly CancellationTokenSource _cts;
private readonly IntegrationFixture _fixture;
private readonly ITestOutputHelper _testOutputHelper;
public ProducerTests(IntegrationFixture fixture, ITestOutputHelper
outputHelper)
{
+ _cts = new CancellationTokenSource(TimeSpan.FromMinutes(1));
_fixture = fixture;
_testOutputHelper = outputHelper;
}
@@ -48,8 +49,8 @@ public class ProducerTests
await using var consumer = CreateConsumer(client, topicName);
//Act
- var messageId = await producer.Send(content);
- var message = await consumer.Receive();
+ var messageId = await producer.Send(content, _cts.Token);
+ var message = await consumer.Receive(_cts.Token);
//Assert
message.MessageId.Should().Be(messageId);
@@ -74,10 +75,10 @@ public class ProducerTests
}
//Act
- await producer.SendChannel.Send(content, SetMessageId);
+ await producer.SendChannel.Send(content, SetMessageId, _cts.Token);
producer.SendChannel.Complete();
- await producer.SendChannel.Completion();
- var message = await consumer.Receive();
+ await producer.SendChannel.Completion(_cts.Token);
+ var message = await consumer.Receive(_cts.Token);
var expected = await idTask.Task;
//Assert
@@ -110,21 +111,20 @@ public class ProducerTests
{
//Arrange
var topicName = _fixture.CreateTopic();
- using var cts = new CancellationTokenSource(TestTimeout);
await using var client = CreateClient();
await using var producer1 = CreateProducer(client, topicName,
accessModeForProducer1);
- await producer1.OnStateChangeTo(ProducerState.Connected, cts.Token);
+ await producer1.OnStateChangeTo(ProducerState.Connected, _cts.Token);
//Act
await using var producer2 = CreateProducer(client, topicName,
accessModeForProducer2);
if (accessModeForProducer2 == ProducerAccessMode.ExclusiveWithFencing)
// We need to send a message to trigger the state change
{
- await producer2.OnStateChangeTo(ProducerState.Connected,
cts.Token);
+ await producer2.OnStateChangeTo(ProducerState.Connected,
_cts.Token);
try
{
- _ = producer1.Send("test", default);
+ _ = producer1.Send("test", _cts.Token);
}
catch
{
@@ -132,8 +132,8 @@ public class ProducerTests
}
}
- var actualStateForProducer1 = await
producer1.OnStateChangeTo(expectedStateForProducer1, cts.Token);
- var actualStateForProducer2 = await
producer2.OnStateChangeTo(expectedStateForProducer2, cts.Token);
+ var actualStateForProducer1 = await
producer1.OnStateChangeTo(expectedStateForProducer1, _cts.Token);
+ var actualStateForProducer2 = await
producer2.OnStateChangeTo(expectedStateForProducer2, _cts.Token);
//Assert
actualStateForProducer1.Should().Be(expectedStateForProducer1);
@@ -167,8 +167,8 @@ public class ProducerTests
for (var msgIndex = 0; msgIndex < msgCount; ++msgIndex)
{
var message = $"{content}-{i}-{msgIndex}";
- _ = await producer.Send(message);
- _testOutputHelper.WriteLine($"Sent a message: {message}");
+ _ = await producer.Send(message, _cts.Token);
+ _testOutputHelper.Log($"Sent a message: {message}");
}
}
@@ -179,7 +179,7 @@ public class ProducerTests
for (var msgIndex = 0; msgIndex < msgCount; ++msgIndex)
{
- var message = await consumer.Receive();
+ var message = await consumer.Receive(_cts.Token);
message.Value().Should().Be($"{content}-{i}-{msgIndex}");
}
}
@@ -201,14 +201,14 @@ public class ProducerTests
for (var i = 0; i < partitions; ++i)
{
consumers.Add(CreateConsumer(client,
$"{topicName}-partition-{i}"));
- await producer.Send($"{content}-{i}");
- _testOutputHelper.WriteLine($"Sent a message to consumer [{i}]");
+ await producer.Send($"{content}-{i}", _cts.Token);
+ _testOutputHelper.Log($"Sent a message to consumer [{i}]");
}
//Assert
for (var i = 0; i < partitions; ++i)
{
- (await
consumers[i].Receive()).Value().Should().Be($"{content}-{i}");
+ (await
consumers[i].Receive(_cts.Token)).Value().Should().Be($"{content}-{i}");
}
}
@@ -223,11 +223,9 @@ public class ProducerTests
await using var consumer = CreateConsumer(client, topicName);
await using var producer = CreateProducer(client, topicName);
- var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60));
-
//Act
- var produced = await ProduceMessages(producer, numberOfMessages,
cts.Token);
- var consumed = await ConsumeMessages(consumer, numberOfMessages,
cts.Token);
+ var produced = await ProduceMessages(producer, numberOfMessages,
_cts.Token);
+ var consumed = await ConsumeMessages(consumer, numberOfMessages,
_cts.Token);
//Assert
var foundNonNegativeOne = false;
@@ -256,11 +254,9 @@ public class ProducerTests
await using var consumer = CreateConsumer(client, topicName);
await using var producer = CreateProducer(client, topicName);
- var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60));
-
//Act
- var produced = await ProduceMessages(producer, numberOfMessages,
cts.Token);
- var consumed = await ConsumeMessages(consumer, numberOfMessages,
cts.Token);
+ var produced = await ProduceMessages(producer, numberOfMessages,
_cts.Token);
+ var consumed = await ConsumeMessages(consumer, numberOfMessages,
_cts.Token);
//Assert
var foundNonNegativeOne = false;
@@ -309,13 +305,6 @@ public class ProducerTests
return messageIds;
}
- private void LogState(ConsumerStateChanged stateChange)
- => _testOutputHelper.WriteLine($"The consumer for topic
'{stateChange.Consumer.Topic}' changed state to '{stateChange.ConsumerState}'");
-
- private void LogState(ProducerStateChanged stateChange)
- => _testOutputHelper.WriteLine($"The producer for topic
'{stateChange.Producer.Topic}' changed state to '{stateChange.ProducerState}'");
-
- private static string CreateConsumerName() =>
$"consumer-{Guid.NewGuid():N}";
private static string CreateSubscriptionName() =>
$"subscription-{Guid.NewGuid():N}";
private IProducer<string> CreateProducer(
@@ -325,23 +314,24 @@ public class ProducerTests
=> pulsarClient.NewProducer(Schema.String)
.Topic(topicName)
.ProducerAccessMode(producerAccessMode)
- .StateChangedHandler(LogState)
+ .StateChangedHandler(_testOutputHelper.Log)
.Create();
private IConsumer<string> CreateConsumer(IPulsarClient pulsarClient,
string topicName)
=> pulsarClient.NewConsumer(Schema.String)
- .ConsumerName(CreateConsumerName())
.InitialPosition(SubscriptionInitialPosition.Earliest)
.SubscriptionName(CreateSubscriptionName())
.Topic(topicName)
- .StateChangedHandler(LogState)
+ .StateChangedHandler(_testOutputHelper.Log)
.Create();
private IPulsarClient CreateClient()
=> PulsarClient
- .Builder()
- .Authentication(AuthenticationFactory.Token(ct =>
ValueTask.FromResult(_fixture.CreateToken(Timeout.InfiniteTimeSpan))))
- .ExceptionHandler(ec => _testOutputHelper.WriteLine($"Exception:
{ec.Exception}"))
- .ServiceUrl(_fixture.ServiceUrl)
- .Build();
+ .Builder()
+ .Authentication(_fixture.Authentication)
+ .ExceptionHandler(_testOutputHelper.Log)
+ .ServiceUrl(_fixture.ServiceUrl)
+ .Build();
+
+ public void Dispose() => _cts.Dispose();
}
diff --git a/tests/DotPulsar.Tests/Internal/ReaderTests.cs
b/tests/DotPulsar.Tests/Internal/ReaderTests.cs
index 968f26d..ed8a620 100644
--- a/tests/DotPulsar.Tests/Internal/ReaderTests.cs
+++ b/tests/DotPulsar.Tests/Internal/ReaderTests.cs
@@ -26,13 +26,15 @@ using Xunit;
using Xunit.Abstractions;
[Collection("Integration"), Trait("Category", "Integration")]
-public class ReaderTests
+public class ReaderTests : IDisposable
{
+ private readonly CancellationTokenSource _cts;
private readonly IntegrationFixture _fixture;
private readonly ITestOutputHelper _testOutputHelper;
public ReaderTests(IntegrationFixture fixture, ITestOutputHelper
testOutputHelper)
{
+ _cts = new CancellationTokenSource(TimeSpan.FromMinutes(1));
_fixture = fixture;
_testOutputHelper = testOutputHelper;
}
@@ -42,10 +44,10 @@ public class ReaderTests
{
//Arrange
await using var client = CreateClient();
- await using var reader = CreateReader(client, MessageId.Earliest);
+ await using var reader = CreateReader(client, MessageId.Earliest,
_fixture.CreateTopic());
//Act
- var actual = await reader.GetLastMessageId();
+ var actual = await reader.GetLastMessageId(_cts.Token);
//Assert
actual.Should().BeEquivalentTo(MessageId.Earliest);
@@ -55,7 +57,7 @@ public class ReaderTests
public async Task
GetLastMessageId_GivenNonPartitionedTopic_ShouldGetMessageId()
{
//Arrange
- var topicName = CreateTopicName();
+ var topicName = _fixture.CreateTopic();
const int numberOfMessages = 6;
await using var client = CreateClient();
@@ -65,7 +67,7 @@ public class ReaderTests
MessageId expected = null!;
for (var i = 0; i < numberOfMessages; i++)
{
- var messageId = await producer.Send("test-message");
+ var messageId = await producer.Send("test-message", _cts.Token);
if (i >= 5)
{
expected = messageId;
@@ -73,7 +75,7 @@ public class ReaderTests
}
//Act
- var actual = await reader.GetLastMessageId();
+ var actual = await reader.GetLastMessageId(_cts.Token);
//Assert
actual.Should().BeEquivalentTo(expected);
@@ -83,15 +85,14 @@ public class ReaderTests
public async Task
GetLastMessageId_GivenPartitionedTopic_ShouldThrowException()
{
//Arrange
- var topicName = CreateTopicName();
const int partitions = 3;
- _fixture.CreatePartitionedTopic(topicName, partitions);
+ var topicName = _fixture.CreatePartitionedTopic(partitions);
await using var client = CreateClient();
await using var reader = CreateReader(client, MessageId.Earliest,
topicName);
//Act
- var exception = await Record.ExceptionAsync(() =>
reader.GetLastMessageId().AsTask());
+ var exception = await Record.ExceptionAsync(() =>
reader.GetLastMessageId(_cts.Token).AsTask());
//Assert
exception.Should().BeOfType<NotSupportedException>();
@@ -102,11 +103,11 @@ public class ReaderTests
{
//Arrange
await using var client = CreateClient();
- await using var reader = CreateReader(client, MessageId.Earliest);
+ await using var reader = CreateReader(client, MessageId.Earliest,
_fixture.CreateTopic());
var expected = new List<MessageId>() { MessageId.Earliest };
//Act
- var actual = await reader.GetLastMessageIds();
+ var actual = await reader.GetLastMessageIds(_cts.Token);
//Assert
actual.Should().BeEquivalentTo(expected);
@@ -116,7 +117,7 @@ public class ReaderTests
public async Task
GetLastMessageIds_GivenNonPartitionedTopic_ShouldGetMessageIdFromPartition()
{
//Arrange
- var topicName = CreateTopicName();
+ var topicName = _fixture.CreateTopic();
const int numberOfMessages = 6;
await using var client = CreateClient();
@@ -126,14 +127,14 @@ public class ReaderTests
var expected = new List<MessageId>();
for (var i = 0; i < numberOfMessages; i++)
{
- var messageId = await producer.Send("test-message");
+ var messageId = await producer.Send("test-message", _cts.Token);
if (i >= 5)
{
expected.Add(messageId);
}
}
//Act
- var actual = await reader.GetLastMessageIds();
+ var actual = await reader.GetLastMessageIds(_cts.Token);
//Assert
actual.Should().BeEquivalentTo(expected);
@@ -143,10 +144,9 @@ public class ReaderTests
public async Task
GetLastMessageIds_GivenPartitionedTopic_ShouldGetMessageIdsFromPartitions()
{
//Arrange
- var topicName = CreateTopicName();
const int numberOfMessages = 6;
const int partitions = 3;
- _fixture.CreatePartitionedTopic(topicName, partitions);
+ var topicName = _fixture.CreatePartitionedTopic(partitions);
await using var client = CreateClient();
await using var reader = CreateReader(client, MessageId.Earliest,
topicName);
@@ -155,7 +155,7 @@ public class ReaderTests
var expected = new List<MessageId>();
for (var i = 0; i < numberOfMessages; i++)
{
- var messageId = await producer.Send("test-message");
+ var messageId = await producer.Send("test-message", _cts.Token);
if (i >= 3)
{
expected.Add(messageId);
@@ -163,7 +163,7 @@ public class ReaderTests
}
//Act
- var actual = await reader.GetLastMessageIds();
+ var actual = await reader.GetLastMessageIds(_cts.Token);
//Assert
actual.Should().BeEquivalentTo(expected);
@@ -173,7 +173,7 @@ public class ReaderTests
public async Task Receive_GivenNonPartitionedTopic_ShouldReceiveAll()
{
//Arrange
- var topicName = CreateTopicName();
+ var topicName = _fixture.CreateTopic();
const int numberOfMessages = 10;
await using var client = CreateClient();
@@ -183,7 +183,7 @@ public class ReaderTests
var expected = new List<MessageId>();
for (var i = 0; i < numberOfMessages; i++)
{
- var messageId = await producer.Send("test-message");
+ var messageId = await producer.Send("test-message", _cts.Token);
expected.Add(messageId);
}
@@ -191,7 +191,7 @@ public class ReaderTests
var actual = new List<MessageId>();
for (var i = 0; i < numberOfMessages; i++)
{
- var messageId = await reader.Receive();
+ var messageId = await reader.Receive(_cts.Token);
actual.Add(messageId.MessageId);
}
@@ -203,10 +203,9 @@ public class ReaderTests
public async Task Receive_GivenPartitionedTopic_ShouldReceiveAll()
{
//Arrange
- var topicName = CreateTopicName();
const int numberOfMessages = 50;
const int partitions = 3;
- _fixture.CreatePartitionedTopic(topicName, partitions);
+ var topicName = _fixture.CreatePartitionedTopic(partitions);
await using var client = CreateClient();
await using var producer = CreateProducer(client, topicName);
@@ -215,7 +214,7 @@ public class ReaderTests
var expected = new List<MessageId>();
for (var i = 0; i < numberOfMessages; i++)
{
- var messageId = await producer.Send("test-message");
+ var messageId = await producer.Send("test-message", _cts.Token);
expected.Add(messageId);
}
@@ -223,7 +222,7 @@ public class ReaderTests
var actual = new List<MessageId>();
for (var i = 0; i < numberOfMessages; i++)
{
- var messageId = await reader.Receive();
+ var messageId = await reader.Receive(_cts.Token);
actual.Add(messageId.MessageId);
}
@@ -244,9 +243,9 @@ public class ReaderTests
})
.ServiceUrl(new Uri("pulsar://nosuchhost")).Build();
- await using var reader = CreateReader(client, MessageId.Earliest,
CreateTopicName());
+ await using var reader = CreateReader(client, MessageId.Earliest,
_fixture.CreateTopic());
- var receiveTask = reader.Receive().AsTask();
+ var receiveTask = reader.Receive(_cts.Token).AsTask();
semaphoreSlim.Release();
//Act
@@ -267,43 +266,37 @@ public class ReaderTests
})
.ServiceUrl(new Uri("pulsar://nosuchhost")).Build();
- await using var reader = CreateReader(client, MessageId.Earliest,
CreateTopicName());
+ await using var reader = CreateReader(client, MessageId.Earliest,
_fixture.CreateTopic());
- await reader.OnStateChangeTo(ReaderState.Faulted);
+ await reader.OnStateChangeTo(ReaderState.Faulted, _cts.Token);
//Act
- var exception = await Record.ExceptionAsync(() =>
reader.Receive().AsTask());
+ var exception = await Record.ExceptionAsync(() =>
reader.Receive(_cts.Token).AsTask());
//Assert
exception.Should().BeOfType<ReaderFaultedException>();
}
- private void LogState(ReaderStateChanged stateChange)
- => _testOutputHelper.WriteLine($"The reader for topic
'{stateChange.Reader.Topic}' changed state to '{stateChange.ReaderState}'");
-
- private void LogState(ProducerStateChanged stateChange)
- => _testOutputHelper.WriteLine($"The producer for topic
'{stateChange.Producer.Topic}' changed state to '{stateChange.ProducerState}'");
-
- private static string CreateTopicName() =>
$"persistent://public/default/{Guid.NewGuid():N}";
-
- private IProducer<string> CreateProducer(IPulsarClient pulsarClient,
string? topicName = null)
+ private IProducer<string> CreateProducer(IPulsarClient pulsarClient,
string topicName)
=> pulsarClient.NewProducer(Schema.String)
- .Topic(topicName is null ? CreateTopicName() : topicName)
- .StateChangedHandler(LogState)
+ .Topic(topicName)
+ .StateChangedHandler(_testOutputHelper.Log)
.Create();
- private IReader<string> CreateReader(IPulsarClient pulsarClient, MessageId
messageId, string? topicName = null)
+ private IReader<string> CreateReader(IPulsarClient pulsarClient, MessageId
messageId, string topicName)
=> pulsarClient.NewReader(Schema.String)
.StartMessageId(messageId)
- .Topic(topicName is null ? CreateTopicName() : topicName)
- .StateChangedHandler(LogState)
+ .Topic(topicName)
+ .StateChangedHandler(_testOutputHelper.Log)
.Create();
private IPulsarClient CreateClient()
=> PulsarClient
- .Builder()
- .Authentication(AuthenticationFactory.Token(ct =>
ValueTask.FromResult(_fixture.CreateToken(Timeout.InfiniteTimeSpan))))
- .ExceptionHandler(ec => _testOutputHelper.WriteLine($"Exception:
{ec.Exception}"))
- .ServiceUrl(_fixture.ServiceUrl)
- .Build();
+ .Builder()
+ .Authentication(_fixture.Authentication)
+ .ExceptionHandler(_testOutputHelper.Log)
+ .ServiceUrl(_fixture.ServiceUrl)
+ .Build();
+
+ public void Dispose() => _cts.Dispose();
}
diff --git a/tests/DotPulsar.Tests/PulsarClientTests.cs
b/tests/DotPulsar.Tests/PulsarClientTests.cs
index c5956bc..1277455 100644
--- a/tests/DotPulsar.Tests/PulsarClientTests.cs
+++ b/tests/DotPulsar.Tests/PulsarClientTests.cs
@@ -25,15 +25,15 @@ using Xunit;
using Xunit.Abstractions;
[Collection("Integration"), Trait("Category", "Integration")]
-public class PulsarClientTests
+public class PulsarClientTests : IDisposable
{
- private const string MyTopic = "persistent://public/default/mytopic";
-
+ private readonly CancellationTokenSource _cts;
private readonly IntegrationFixture _fixture;
private readonly ITestOutputHelper _testOutputHelper;
public PulsarClientTests(IntegrationFixture fixture, ITestOutputHelper
outputHelper)
{
+ _cts = new CancellationTokenSource(TimeSpan.FromMinutes(1));
_fixture = fixture;
_testOutputHelper = outputHelper;
}
@@ -46,8 +46,8 @@ public class PulsarClientTests
await using var producer = CreateProducer(client);
// Act
- var exception = await Record.ExceptionAsync(() =>
producer.Send("Test").AsTask());
- var state = await producer.OnStateChangeTo(ProducerState.Faulted);
+ var exception = await Record.ExceptionAsync(() =>
producer.Send("Test", _cts.Token).AsTask());
+ var state = await producer.OnStateChangeTo(ProducerState.Faulted,
_cts.Token);
// Assert
exception.Should().BeOfType<ProducerFaultedException>();
@@ -64,16 +64,16 @@ public class PulsarClientTests
if (throwException)
throw new Exception();
var token = _fixture.CreateToken(TimeSpan.FromSeconds(10));
- _testOutputHelper.WriteLine($"Received token: {token}");
+ _testOutputHelper.Log($"Received token: {token}");
return ValueTask.FromResult(token);
});
await using var producer = CreateProducer(client);
// Act
- _ = await producer.Send("Test"); // Make sure we have a working
connection
+ _ = await producer.Send("Test", _cts.Token); // Make sure we have a
working connection
throwException = true;
- var state = await producer.OnStateChangeTo(ProducerState.Faulted);
+ var state = await producer.OnStateChangeTo(ProducerState.Faulted,
_cts.Token);
// Assert
state.Should().Be(ProducerState.Faulted);
@@ -92,8 +92,8 @@ public class PulsarClientTests
await using var producer = CreateProducer(client);
// Act
- var exception = await Record.ExceptionAsync(() =>
producer.Send("Test").AsTask());
- var state = await producer.OnStateChangeTo(ProducerState.Faulted);
+ var exception = await Record.ExceptionAsync(() =>
producer.Send("Test", _cts.Token).AsTask());
+ var state = await producer.OnStateChangeTo(ProducerState.Faulted,
_cts.Token);
// Assert
exception.Should().BeOfType<ProducerFaultedException>();
@@ -115,16 +115,16 @@ public class PulsarClientTests
tcs.SetResult();
var token = _fixture.CreateToken(TimeSpan.FromSeconds(10));
- _testOutputHelper.WriteLine($"Received token: {token}");
+ _testOutputHelper.Log($"Received token: {token}");
return ValueTask.FromResult(token);
});
await using var producer = CreateProducer(client);
// Act
- _ = await producer.Send("Test"); // Make sure we have a working
connection
+ _ = await producer.Send("Test", _cts.Token); // Make sure we have a
working connection
await tcs.Task;
- var state = await producer.OnStateChangeTo(ProducerState.Connected);
+ var state = await producer.OnStateChangeTo(ProducerState.Connected,
_cts.Token);
// Assert
state.Should().Be(ProducerState.Connected);
@@ -132,19 +132,18 @@ public class PulsarClientTests
private IPulsarClient CreateClient(Func<CancellationToken,
ValueTask<string>> tokenSupplier)
=> PulsarClient
- .Builder()
- .Authentication(AuthenticationFactory.Token(tokenSupplier))
- .ExceptionHandler(ec => _testOutputHelper.WriteLine($"Exception:
{ec.Exception}"))
- .ServiceUrl(_fixture.ServiceUrl)
- .Build();
+ .Builder()
+ .Authentication(AuthenticationFactory.Token(tokenSupplier))
+ .ExceptionHandler(_testOutputHelper.Log)
+ .ServiceUrl(_fixture.ServiceUrl)
+ .Build();
private IProducer<string> CreateProducer(IPulsarClient client)
=> client
- .NewProducer(Schema.String)
- .Topic(MyTopic)
- .StateChangedHandler(LogState)
- .Create();
+ .NewProducer(Schema.String)
+ .Topic(_fixture.CreateTopic())
+ .StateChangedHandler(_testOutputHelper.Log)
+ .Create();
- private void LogState(ProducerStateChanged stateChange)
- => _testOutputHelper.WriteLine($"The producer for topic
'{stateChange.Producer.Topic}' changed state to '{stateChange.ProducerState}'");
+ public void Dispose() => _cts.Dispose();
}
diff --git a/tests/DotPulsar.Tests/TestOutputHelperExtensions.cs
b/tests/DotPulsar.Tests/TestOutputHelperExtensions.cs
new file mode 100644
index 0000000..ece54a0
--- /dev/null
+++ b/tests/DotPulsar.Tests/TestOutputHelperExtensions.cs
@@ -0,0 +1,36 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Tests;
+
+using System;
+using Xunit.Abstractions;
+
+public static class TestOutputHelperExtensions
+{
+ public static void Log(this ITestOutputHelper helper, string message)
+ => helper.WriteLine($"{DateTime.UtcNow:HH:mm:ss}: {message}");
+
+ public static void Log(this ITestOutputHelper helper, ExceptionContext
context)
+ => helper.Log($"PulsarClient got an exception: {context.Exception}");
+
+ public static void Log(this ITestOutputHelper helper, ConsumerStateChanged
stateChange)
+ => helper.Log($"The consumer for topic '{stateChange.Consumer.Topic}'
changed state to '{stateChange.ConsumerState}'");
+
+ public static void Log(this ITestOutputHelper helper, ProducerStateChanged
stateChange)
+ => helper.Log($"The producer for topic '{stateChange.Producer.Topic}'
changed state to '{stateChange.ProducerState}'");
+
+ public static void Log(this ITestOutputHelper helper, ReaderStateChanged
stateChange)
+ => helper.Log($"The reader for topic '{stateChange.Reader.Topic}'
changed state to '{stateChange.ReaderState}'");
+}