This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c40f55d Migrated from fluent docker to testcontainers (#197)
c40f55d is described below
commit c40f55dda722825e2c34853084ec6cb21b9be9ff
Author: entvex <[email protected]>
AuthorDate: Fri Dec 22 14:16:07 2023 +0100
Migrated from fluent docker to testcontainers (#197)
* Migrated from fluent docker to testcontainers
* Made improvements to IntegrationFixture.cs according to feedback.
* IntegrationFixture get better CancellationToken support.
Implemented CancellationToken support for methods and enabled
Testcontainers to dynamically map to a random port on the host.
Tests dependent on the functionality within IntegrationFixture now include
cancellationToken where necessary.
* Fixed warning and indents
* Made a DiagnosticMessage more clear.
---
tests/DotPulsar.Tests/DotPulsar.Tests.csproj | 2 +-
tests/DotPulsar.Tests/IntegrationFixture.cs | 123 ++++++++++++------------
tests/DotPulsar.Tests/Internal/ConsumerTests.cs | 16 ++-
tests/DotPulsar.Tests/Internal/ProducerTests.cs | 16 ++-
tests/DotPulsar.Tests/Internal/ReaderTests.cs | 16 ++-
tests/DotPulsar.Tests/PulsarClientTests.cs | 25 +++--
6 files changed, 97 insertions(+), 101 deletions(-)
diff --git a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
index 450dc44..9ee0ddf 100644
--- a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
+++ b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
@@ -19,12 +19,12 @@
<PackageReference Include="AutoFixture.AutoNSubstitute" Version="4.18.1" />
<PackageReference Include="AutoFixture.Xunit2" Version="4.18.1" />
<PackageReference Include="DotNetZip" Version="1.16.0" />
- <PackageReference Include="Ductus.FluentDocker" Version="2.10.59" />
<PackageReference Include="FluentAssertions" Version="6.12.0" />
<PackageReference Include="IronSnappy" Version="1.3.1" />
<PackageReference Include="K4os.Compression.LZ4" Version="1.3.6" />
<PackageReference Include="NSubstitute" Version="5.1.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.8.0" />
+ <PackageReference Include="Testcontainers" Version="3.6.0" />
<PackageReference Include="xunit" Version="2.6.3" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.5.5">
<PrivateAssets>all</PrivateAssets>
diff --git a/tests/DotPulsar.Tests/IntegrationFixture.cs
b/tests/DotPulsar.Tests/IntegrationFixture.cs
index b6a019b..001cd8a 100644
--- a/tests/DotPulsar.Tests/IntegrationFixture.cs
+++ b/tests/DotPulsar.Tests/IntegrationFixture.cs
@@ -14,11 +14,9 @@
namespace DotPulsar.Tests;
+using DotNet.Testcontainers.Builders;
+using DotNet.Testcontainers.Containers;
using DotPulsar.Abstractions;
-using Ductus.FluentDocker.Builders;
-using Ductus.FluentDocker.Services;
-using Ductus.FluentDocker.Services.Extensions;
-using System;
using Xunit.Abstractions;
using Xunit.Sdk;
@@ -28,122 +26,127 @@ public class IntegrationFixture : IAsyncLifetime
private const string SecretKeyPath = "/pulsar/secret.key";
private const string UserName = "test-user";
private const int Port = 6650;
+ private readonly CancellationTokenSource _cts;
private readonly IMessageSink _messageSink;
- private readonly IContainerService _cluster;
+ private readonly IContainer _cluster;
private string? _token;
public IntegrationFixture(IMessageSink messageSink)
{
_messageSink = messageSink;
+ _cts = new CancellationTokenSource(TimeSpan.FromMinutes(10));
- var environmentVariables = new[]
+ var environmentVariables = new Dictionary<string, string>()
{
- $"PULSAR_PREFIX_tokenSecretKey=file://{SecretKeyPath}",
- "PULSAR_PREFIX_authenticationRefreshCheckSeconds=5",
- $"superUserRoles={UserName}",
- "authenticationEnabled=true",
- "authorizationEnabled=true",
-
"authenticationProviders=org.apache.pulsar.broker.authentication.AuthenticationProviderToken",
- "authenticateOriginalAuthData=false",
- $"brokerClientAuthenticationPlugin={AuthenticationPlugin}",
- $"CLIENT_PREFIX_authPlugin={AuthenticationPlugin}",
+ { "PULSAR_PREFIX_tokenSecretKey", $"file://{SecretKeyPath}" },
+ { "PULSAR_PREFIX_authenticationRefreshCheckSeconds", "5" },
+ { "superUserRoles", UserName },
+ { "authenticationEnabled", "true" },
+ { "authorizationEnabled", "true" },
+ { "authenticationProviders",
"org.apache.pulsar.broker.authentication.AuthenticationProviderToken" },
+ { "authenticateOriginalAuthData", "false" },
+ { "brokerClientAuthenticationPlugin", AuthenticationPlugin },
+ { "CLIENT_PREFIX_authPlugin", AuthenticationPlugin }
};
- var arguments = "\"" +
- $"bin/pulsar tokens create-secret-key --output
{SecretKeyPath} && " +
- $"export
brokerClientAuthenticationParameters=token:$(bin/pulsar tokens create
--secret-key {SecretKeyPath} --subject {UserName}) && " +
- "export
CLIENT_PREFIX_authParams=$brokerClientAuthenticationParameters && " +
- "bin/apply-config-from-env.py conf/standalone.conf &&
" +
- "bin/apply-config-from-env-with-prefix.py
CLIENT_PREFIX_ conf/client.conf && " +
- "bin/pulsar standalone --no-functions-worker"
- + "\"";
-
- _cluster = new Builder()
- .UseContainer()
- .UseImage("apachepulsar/pulsar:3.1.1")
+ var arguments =
+ $"bin/pulsar tokens create-secret-key --output {SecretKeyPath} &&
" +
+ $"export brokerClientAuthenticationParameters=token:$(bin/pulsar
tokens create --secret-key {SecretKeyPath} --subject {UserName}) && " +
+ $"export
CLIENT_PREFIX_authParams=$brokerClientAuthenticationParameters &&
bin/apply-config-from-env.py conf/standalone.conf && " +
+ $"bin/apply-config-from-env-with-prefix.py CLIENT_PREFIX_
conf/client.conf && bin/pulsar standalone --no-functions-worker";
+
+ _cluster = new ContainerBuilder()
+ .WithImage("apachepulsar/pulsar:3.1.1")
.WithEnvironment(environmentVariables)
- .ExposePort(Port)
- .Command("/bin/bash -c", arguments)
+ .WithPortBinding(Port, true)
+
.WithWaitStrategy(Wait.ForUnixContainer().UntilCommandIsCompleted(["/bin/bash",
"-c", "bin/pulsar-admin clusters list"]))
+ .WithCommand("/bin/bash", "-c", arguments)
.Build();
- ServiceUrl = new Uri("pulsar://localhost:6650");
+ ServiceUrl = new Uri($"pulsar://{_cluster.Hostname}:{Port}");
}
public Uri ServiceUrl { get; private set; }
public IAuthentication Authentication => AuthenticationFactory.Token(ct =>
ValueTask.FromResult(_token!));
- public Task DisposeAsync()
+ public async Task DisposeAsync()
{
- _cluster.Dispose();
- return Task.CompletedTask;
+ await _cluster.DisposeAsync();
+ _cts.Dispose();
}
- public Task InitializeAsync()
+ private void HandleClusterStateChange(string state) =>
+ _messageSink.OnMessage(new DiagnosticMessage($"The Pulsar cluster
changed state to: {state}"));
+
+ public async Task InitializeAsync()
{
- _cluster.StateChange += (sender, args) => _messageSink.OnMessage(new
DiagnosticMessage($"The Pulsar cluster changed state to: {args.State}"));
+ _cluster.Created += (_, _) => HandleClusterStateChange("Created");
+ _cluster.Creating += (_, _) => HandleClusterStateChange("Creating");
+ _cluster.Started += (_, _) => HandleClusterStateChange("Started");
+ _cluster.Starting += (_, _) => HandleClusterStateChange("Starting");
+ _cluster.Stopped += (_, _) => HandleClusterStateChange("Stopped");
+ _cluster.Stopping += (_, _) => HandleClusterStateChange("Stopping");
+
_messageSink.OnMessage(new DiagnosticMessage("Starting container
service"));
- _cluster.Start();
- _messageSink.OnMessage(new DiagnosticMessage("Container service
started. Waiting for message in logs"));
- _cluster.WaitForMessageInLogs("[test-user] Created namespace
public/default", int.MaxValue);
- _messageSink.OnMessage(new DiagnosticMessage("Got message, will now
get endpoint"));
- var endpoint = _cluster.ToHostExposedEndpoint($"{Port}/tcp");
+ await _cluster.StartAsync(_cts.Token);
+ _messageSink.OnMessage(new DiagnosticMessage("The container service
has initiated. Next, we'll retrieve the endpoint."));
+ var endpoint = _cluster.GetMappedPublicPort(Port);
_messageSink.OnMessage(new DiagnosticMessage($"Endpoint opened at
{endpoint}"));
- ServiceUrl = new Uri($"pulsar://localhost:{endpoint.Port}");
- _token = CreateToken(Timeout.InfiniteTimeSpan);
- return Task.CompletedTask;
+ ServiceUrl = new Uri($"pulsar://{_cluster.Hostname}:{endpoint}");
+ _token = await CreateToken(Timeout.InfiniteTimeSpan, _cts.Token);
}
- public string CreateToken(TimeSpan expiryTime)
+ public async Task<string> CreateToken(TimeSpan expiryTime,
CancellationToken cancellationToken)
{
var arguments = $"bin/pulsar tokens create --secret-key
{SecretKeyPath} --subject {UserName}";
if (expiryTime != Timeout.InfiniteTimeSpan)
arguments += $" --expiry-time {expiryTime.TotalSeconds}s";
- var result = _cluster.Execute(arguments);
+ var result = await _cluster.ExecAsync(new[] { "/bin/bash", "-c",
arguments }, cancellationToken);
- if (!result.Success)
- throw new InvalidOperationException($"Could not create the token:
{result.Error}");
+ if (result.ExitCode != 0)
+ throw new InvalidOperationException($"Could not create the token:
{result.Stderr}");
- return result.Data[0];
+ return result.Stdout;
}
private static string CreateTopicName() =>
$"persistent://public/default/{Guid.NewGuid():N}";
- public string CreateTopic()
+ public async Task<string> CreateTopic(CancellationToken cancellationToken)
{
var topic = CreateTopicName();
- CreateTopic(topic);
+ await CreateTopic(topic, cancellationToken);
return topic;
}
- public void CreateTopic(string topic)
+ public async Task CreateTopic(string topic, CancellationToken
cancellationToken)
{
var arguments = $"bin/pulsar-admin topics create {topic}";
- var result = _cluster.Execute(arguments);
+ var result = await _cluster.ExecAsync(new[] { "/bin/bash", "-c",
arguments }, cancellationToken);
- if (!result.Success)
- throw new Exception($"Could not create the topic: {result.Error}");
+ if (result.ExitCode != 0)
+ throw new Exception($"Could not create the topic:
{result.Stderr}");
}
- public string CreatePartitionedTopic(int numberOfPartitions)
+ public async Task<string> CreatePartitionedTopic(int numberOfPartitions,
CancellationToken cancellationToken)
{
var topic = CreateTopicName();
- CreatePartitionedTopic(topic, numberOfPartitions);
+ await CreatePartitionedTopic(topic, numberOfPartitions,
cancellationToken);
return topic;
}
- public void CreatePartitionedTopic(string topic, int numberOfPartitions)
+ public async Task CreatePartitionedTopic(string topic, int
numberOfPartitions, CancellationToken cancellationToken)
{
var arguments = $"bin/pulsar-admin topics create-partitioned-topic
{topic} -p {numberOfPartitions}";
- var result = _cluster.Execute(arguments);
+ var result = await _cluster.ExecAsync(new[] { "/bin/bash", "-c",
arguments }, cancellationToken);
- if (!result.Success)
- throw new Exception($"Could not create the partitioned topic:
{result.Error}");
+ if (result.ExitCode != 0)
+ throw new Exception($"Could not create the partitioned topic:
{result.Stderr}");
}
}
diff --git a/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
b/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
index 73d2938..e74fb9b 100644
--- a/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
+++ b/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
@@ -17,8 +17,6 @@ namespace DotPulsar.Tests.Internal;
using DotPulsar.Abstractions;
using DotPulsar.Exceptions;
using DotPulsar.Extensions;
-using System;
-using System.Collections.Generic;
using Xunit.Abstractions;
[Collection("Integration"), Trait("Category", "Integration")]
@@ -39,7 +37,7 @@ public sealed class ConsumerTests : IDisposable
public async Task
GetLastMessageIds_GivenNonPartitionedTopic_ShouldGetMessageIdFromPartition()
{
//Arrange
- var topicName = _fixture.CreateTopic();
+ var topicName = await _fixture.CreateTopic(_cts.Token);
const int numberOfMessages = 6;
await using var client = CreateClient();
@@ -69,7 +67,7 @@ public sealed class ConsumerTests : IDisposable
//Arrange
const int numberOfMessages = 6;
const int partitions = 3;
- var topicName = _fixture.CreatePartitionedTopic(partitions);
+ var topicName = await _fixture.CreatePartitionedTopic(partitions,
_cts.Token);
await using var client = CreateClient();
await using var consumer = CreateConsumer(client, topicName);
@@ -97,7 +95,7 @@ public sealed class ConsumerTests : IDisposable
{
//Arrange
await using var client = CreateClient();
- await using var consumer = CreateConsumer(client,
_fixture.CreateTopic());
+ await using var consumer = CreateConsumer(client, await
_fixture.CreateTopic(_cts.Token));
var expected = new List<MessageId>() { MessageId.Earliest };
//Act
@@ -111,7 +109,7 @@ public sealed class ConsumerTests : IDisposable
public async Task Receive_GivenNonPartitionedTopic_ShouldReceiveAll()
{
//Arrange
- var topicName = _fixture.CreateTopic();
+ var topicName = await _fixture.CreateTopic(_cts.Token);
const int numberOfMessages = 1000;
await using var client = CreateClient();
@@ -133,7 +131,7 @@ public sealed class ConsumerTests : IDisposable
const int numberOfMessages = 1000;
const int partitions = 3;
- var topicName = _fixture.CreatePartitionedTopic(partitions);
+ var topicName = await _fixture.CreatePartitionedTopic(partitions,
_cts.Token);
await using var client = CreateClient();
await using var consumer = CreateConsumer(client, topicName);
@@ -160,7 +158,7 @@ public sealed class ConsumerTests : IDisposable
})
.ServiceUrl(new Uri("pulsar://nosuchhost")).Build();
- await using var consumer = CreateConsumer(client,
_fixture.CreateTopic());
+ await using var consumer = CreateConsumer(client, await
_fixture.CreateTopic(_cts.Token));
var receiveTask = consumer.Receive(_cts.Token).AsTask();
semaphoreSlim.Release();
@@ -183,7 +181,7 @@ public sealed class ConsumerTests : IDisposable
})
.ServiceUrl(new Uri("pulsar://nosuchhost")).Build();
- await using var consumer = CreateConsumer(client,
_fixture.CreateTopic());
+ await using var consumer = CreateConsumer(client, await
_fixture.CreateTopic(_cts.Token));
await consumer.OnStateChangeTo(ConsumerState.Faulted, _cts.Token);
diff --git a/tests/DotPulsar.Tests/Internal/ProducerTests.cs
b/tests/DotPulsar.Tests/Internal/ProducerTests.cs
index eb416ad..545849c 100644
--- a/tests/DotPulsar.Tests/Internal/ProducerTests.cs
+++ b/tests/DotPulsar.Tests/Internal/ProducerTests.cs
@@ -16,8 +16,6 @@ namespace DotPulsar.Tests.Internal;
using DotPulsar.Abstractions;
using DotPulsar.Extensions;
-using System;
-using System.Collections.Generic;
using Xunit.Abstractions;
[Collection("Integration"), Trait("Category", "Integration")]
@@ -39,7 +37,7 @@ public sealed class ProducerTests : IDisposable
{
//Arrange
const string content = "test-message";
- var topicName = _fixture.CreateTopic();
+ var topicName = await _fixture.CreateTopic(_cts.Token);
await using var client = CreateClient();
await using var producer = CreateProducer(client, topicName);
await using var consumer = CreateConsumer(client, topicName);
@@ -58,7 +56,7 @@ public sealed class ProducerTests : IDisposable
{
//Arrange
const string content = "test-message";
- var topicName = _fixture.CreateTopic();
+ var topicName = await _fixture.CreateTopic(_cts.Token);
await using var client = CreateClient();
await using var producer = CreateProducer(client, topicName);
await using var consumer = CreateConsumer(client, topicName);
@@ -106,7 +104,7 @@ public sealed class ProducerTests : IDisposable
ProducerState expectedStateForProducer2)
{
//Arrange
- var topicName = _fixture.CreateTopic();
+ var topicName = await _fixture.CreateTopic(_cts.Token);
await using var client = CreateClient();
await using var producer1 = CreateProducer(client, topicName,
accessModeForProducer1);
await producer1.OnStateChangeTo(ProducerState.Connected, _cts.Token);
@@ -143,7 +141,7 @@ public sealed class ProducerTests : IDisposable
const string content = "test-message";
const int partitions = 3;
const int msgCount = 3;
- var topicName = _fixture.CreatePartitionedTopic(partitions);
+ var topicName = await _fixture.CreatePartitionedTopic(partitions,
_cts.Token);
await using var client = CreateClient();
//Act
@@ -189,7 +187,7 @@ public sealed class ProducerTests : IDisposable
const int partitions = 3;
var consumers = new List<IConsumer<string>>();
await using var client = CreateClient();
- var topicName = _fixture.CreatePartitionedTopic(partitions);
+ var topicName = await _fixture.CreatePartitionedTopic(partitions,
_cts.Token);
//Act
await using var producer = CreateProducer(client, topicName);
@@ -213,7 +211,7 @@ public sealed class ProducerTests : IDisposable
{
//Arrange
const int numberOfMessages = 10;
- var topicName = _fixture.CreateTopic();
+ var topicName = await _fixture.CreateTopic(_cts.Token);
await using var client = CreateClient();
await using var consumer = CreateConsumer(client, topicName);
@@ -245,7 +243,7 @@ public sealed class ProducerTests : IDisposable
//Arrange
const int numberOfMessages = 10;
const int partitions = 4;
- var topicName = _fixture.CreatePartitionedTopic(partitions);
+ var topicName = await _fixture.CreatePartitionedTopic(partitions,
_cts.Token);
await using var client = CreateClient();
await using var consumer = CreateConsumer(client, topicName);
await using var producer = CreateProducer(client, topicName);
diff --git a/tests/DotPulsar.Tests/Internal/ReaderTests.cs
b/tests/DotPulsar.Tests/Internal/ReaderTests.cs
index 70df162..7969844 100644
--- a/tests/DotPulsar.Tests/Internal/ReaderTests.cs
+++ b/tests/DotPulsar.Tests/Internal/ReaderTests.cs
@@ -17,8 +17,6 @@ namespace DotPulsar.Tests.Internal;
using DotPulsar.Abstractions;
using DotPulsar.Exceptions;
using DotPulsar.Extensions;
-using System;
-using System.Collections.Generic;
using Xunit.Abstractions;
[Collection("Integration"), Trait("Category", "Integration")]
@@ -40,7 +38,7 @@ public sealed class ReaderTests : IDisposable
{
//Arrange
await using var client = CreateClient();
- await using var reader = CreateReader(client, MessageId.Earliest,
_fixture.CreateTopic());
+ await using var reader = CreateReader(client, MessageId.Earliest,
await _fixture.CreateTopic(_cts.Token));
var expected = new List<MessageId>() { MessageId.Earliest };
//Act
@@ -54,7 +52,7 @@ public sealed class ReaderTests : IDisposable
public async Task
GetLastMessageIds_GivenNonPartitionedTopic_ShouldGetMessageIdFromPartition()
{
//Arrange
- var topicName = _fixture.CreateTopic();
+ var topicName = await _fixture.CreateTopic(_cts.Token);
const int numberOfMessages = 6;
await using var client = CreateClient();
@@ -83,7 +81,7 @@ public sealed class ReaderTests : IDisposable
//Arrange
const int numberOfMessages = 6;
const int partitions = 3;
- var topicName = _fixture.CreatePartitionedTopic(partitions);
+ var topicName = await _fixture.CreatePartitionedTopic(partitions,
_cts.Token);
await using var client = CreateClient();
await using var reader = CreateReader(client, MessageId.Earliest,
topicName);
@@ -110,7 +108,7 @@ public sealed class ReaderTests : IDisposable
public async Task Receive_GivenNonPartitionedTopic_ShouldReceiveAll()
{
//Arrange
- var topicName = _fixture.CreateTopic();
+ var topicName = await _fixture.CreateTopic(_cts.Token);
const int numberOfMessages = 10;
await using var client = CreateClient();
@@ -142,7 +140,7 @@ public sealed class ReaderTests : IDisposable
//Arrange
const int numberOfMessages = 50;
const int partitions = 3;
- var topicName = _fixture.CreatePartitionedTopic(partitions);
+ var topicName = await _fixture.CreatePartitionedTopic(partitions,
_cts.Token);
await using var client = CreateClient();
await using var producer = CreateProducer(client, topicName);
@@ -180,7 +178,7 @@ public sealed class ReaderTests : IDisposable
})
.ServiceUrl(new Uri("pulsar://nosuchhost")).Build();
- await using var reader = CreateReader(client, MessageId.Earliest,
_fixture.CreateTopic());
+ await using var reader = CreateReader(client, MessageId.Earliest,
await _fixture.CreateTopic(_cts.Token));
var receiveTask = reader.Receive(_cts.Token).AsTask();
semaphoreSlim.Release();
@@ -203,7 +201,7 @@ public sealed class ReaderTests : IDisposable
})
.ServiceUrl(new Uri("pulsar://nosuchhost")).Build();
- await using var reader = CreateReader(client, MessageId.Earliest,
_fixture.CreateTopic());
+ await using var reader = CreateReader(client, MessageId.Earliest,
await _fixture.CreateTopic(_cts.Token));
await reader.OnStateChangeTo(ReaderState.Faulted, _cts.Token);
diff --git a/tests/DotPulsar.Tests/PulsarClientTests.cs
b/tests/DotPulsar.Tests/PulsarClientTests.cs
index 328334b..f90e8c8 100644
--- a/tests/DotPulsar.Tests/PulsarClientTests.cs
+++ b/tests/DotPulsar.Tests/PulsarClientTests.cs
@@ -17,7 +17,6 @@ namespace DotPulsar.Tests;
using DotPulsar.Abstractions;
using DotPulsar.Exceptions;
using DotPulsar.Extensions;
-using System;
using Xunit.Abstractions;
[Collection("Integration"), Trait("Category", "Integration")]
@@ -39,7 +38,7 @@ public sealed class PulsarClientTests : IDisposable
{
// Arrange
await using var client = CreateClient(ct => throw new Exception());
- await using var producer = CreateProducer(client);
+ await using var producer = await CreateProducer(client);
// Act
var exception = await Record.ExceptionAsync(() =>
producer.Send("Test", _cts.Token).AsTask());
@@ -55,16 +54,16 @@ public sealed class PulsarClientTests : IDisposable
{
// Arrange
var throwException = false;
- await using var client = CreateClient(ct =>
+ await using var client = CreateClient(async ct =>
{
if (throwException)
throw new Exception();
- var token = _fixture.CreateToken(TimeSpan.FromSeconds(10));
+ var token = await _fixture.CreateToken(TimeSpan.FromSeconds(10),
_cts.Token);
_testOutputHelper.Log($"Received token: {token}");
- return ValueTask.FromResult(token);
+ return token;
});
- await using var producer = CreateProducer(client);
+ await using var producer = await CreateProducer(client);
// Act
_ = await producer.Send("Test", _cts.Token); // Make sure we have a
working connection
@@ -85,7 +84,7 @@ public sealed class PulsarClientTests : IDisposable
return string.Empty;
});
- await using var producer = CreateProducer(client);
+ await using var producer = await CreateProducer(client);
// Act
var exception = await Record.ExceptionAsync(() =>
producer.Send("Test", _cts.Token).AsTask());
@@ -104,18 +103,18 @@ public sealed class PulsarClientTests : IDisposable
var refreshCount = 0;
var tcs = new
TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
- await using var client = CreateClient(ct =>
+ await using var client = CreateClient(async ct =>
{
++refreshCount;
if (refreshCount == 3)
tcs.SetResult();
- var token = _fixture.CreateToken(TimeSpan.FromSeconds(10));
+ var token = await _fixture.CreateToken(TimeSpan.FromSeconds(10),
_cts.Token);
_testOutputHelper.Log($"Received token: {token}");
- return ValueTask.FromResult(token);
+ return token;
});
- await using var producer = CreateProducer(client);
+ await using var producer = await CreateProducer(client);
// Act
_ = await producer.Send("Test", _cts.Token); // Make sure we have a
working connection
@@ -134,10 +133,10 @@ public sealed class PulsarClientTests : IDisposable
.ServiceUrl(_fixture.ServiceUrl)
.Build();
- private IProducer<string> CreateProducer(IPulsarClient client)
+ private async Task<IProducer<string>> CreateProducer(IPulsarClient client)
=> client
.NewProducer(Schema.String)
- .Topic(_fixture.CreateTopic())
+ .Topic(await _fixture.CreateTopic(_cts.Token))
.StateChangedHandler(_testOutputHelper.Log)
.Create();