This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c40f55d  Migrated from fluent docker to testcontainers (#197)
c40f55d is described below

commit c40f55dda722825e2c34853084ec6cb21b9be9ff
Author: entvex <[email protected]>
AuthorDate: Fri Dec 22 14:16:07 2023 +0100

    Migrated from fluent docker to testcontainers (#197)
    
    * Migrated from fluent docker to testcontainers
    
    * Made improvements to IntegrationFixture.cs according to feedback.
    
    * IntegrationFixture get better CancellationToken support.
    
    Implemented CancellationToken support for methods and enabled 
Testcontainers to dynamically map to a random port on the host.
    Tests dependent on the functionality within IntegrationFixture now include 
cancellationToken where necessary.
    
    * Fixed warning and indents
    
    * Made a DiagnosticMessage more clear.
---
 tests/DotPulsar.Tests/DotPulsar.Tests.csproj    |   2 +-
 tests/DotPulsar.Tests/IntegrationFixture.cs     | 123 ++++++++++++------------
 tests/DotPulsar.Tests/Internal/ConsumerTests.cs |  16 ++-
 tests/DotPulsar.Tests/Internal/ProducerTests.cs |  16 ++-
 tests/DotPulsar.Tests/Internal/ReaderTests.cs   |  16 ++-
 tests/DotPulsar.Tests/PulsarClientTests.cs      |  25 +++--
 6 files changed, 97 insertions(+), 101 deletions(-)

diff --git a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj 
b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
index 450dc44..9ee0ddf 100644
--- a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
+++ b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
@@ -19,12 +19,12 @@
     <PackageReference Include="AutoFixture.AutoNSubstitute" Version="4.18.1" />
     <PackageReference Include="AutoFixture.Xunit2" Version="4.18.1" />
     <PackageReference Include="DotNetZip" Version="1.16.0" />
-    <PackageReference Include="Ductus.FluentDocker" Version="2.10.59" />
     <PackageReference Include="FluentAssertions" Version="6.12.0" />
     <PackageReference Include="IronSnappy" Version="1.3.1" />
     <PackageReference Include="K4os.Compression.LZ4" Version="1.3.6" />
     <PackageReference Include="NSubstitute" Version="5.1.0" />
     <PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.8.0" />
+    <PackageReference Include="Testcontainers" Version="3.6.0" />
     <PackageReference Include="xunit" Version="2.6.3" />
     <PackageReference Include="xunit.runner.visualstudio" Version="2.5.5">
       <PrivateAssets>all</PrivateAssets>
diff --git a/tests/DotPulsar.Tests/IntegrationFixture.cs 
b/tests/DotPulsar.Tests/IntegrationFixture.cs
index b6a019b..001cd8a 100644
--- a/tests/DotPulsar.Tests/IntegrationFixture.cs
+++ b/tests/DotPulsar.Tests/IntegrationFixture.cs
@@ -14,11 +14,9 @@
 
 namespace DotPulsar.Tests;
 
+using DotNet.Testcontainers.Builders;
+using DotNet.Testcontainers.Containers;
 using DotPulsar.Abstractions;
-using Ductus.FluentDocker.Builders;
-using Ductus.FluentDocker.Services;
-using Ductus.FluentDocker.Services.Extensions;
-using System;
 using Xunit.Abstractions;
 using Xunit.Sdk;
 
@@ -28,122 +26,127 @@ public class IntegrationFixture : IAsyncLifetime
     private const string SecretKeyPath = "/pulsar/secret.key";
     private const string UserName = "test-user";
     private const int Port = 6650;
+    private readonly CancellationTokenSource _cts;
 
     private readonly IMessageSink _messageSink;
-    private readonly IContainerService _cluster;
+    private readonly IContainer _cluster;
 
     private string? _token;
 
     public IntegrationFixture(IMessageSink messageSink)
     {
         _messageSink = messageSink;
+        _cts = new CancellationTokenSource(TimeSpan.FromMinutes(10));
 
-        var environmentVariables = new[]
+        var environmentVariables = new Dictionary<string, string>()
         {
-            $"PULSAR_PREFIX_tokenSecretKey=file://{SecretKeyPath}",
-            "PULSAR_PREFIX_authenticationRefreshCheckSeconds=5",
-            $"superUserRoles={UserName}",
-            "authenticationEnabled=true",
-            "authorizationEnabled=true",
-            
"authenticationProviders=org.apache.pulsar.broker.authentication.AuthenticationProviderToken",
-            "authenticateOriginalAuthData=false",
-            $"brokerClientAuthenticationPlugin={AuthenticationPlugin}",
-            $"CLIENT_PREFIX_authPlugin={AuthenticationPlugin}",
+            { "PULSAR_PREFIX_tokenSecretKey", $"file://{SecretKeyPath}" },
+            { "PULSAR_PREFIX_authenticationRefreshCheckSeconds", "5" },
+            { "superUserRoles", UserName },
+            { "authenticationEnabled", "true" },
+            { "authorizationEnabled", "true" },
+            { "authenticationProviders", 
"org.apache.pulsar.broker.authentication.AuthenticationProviderToken" },
+            { "authenticateOriginalAuthData", "false" },
+            { "brokerClientAuthenticationPlugin", AuthenticationPlugin },
+            { "CLIENT_PREFIX_authPlugin", AuthenticationPlugin }
         };
 
-        var arguments = "\"" +
-                        $"bin/pulsar tokens create-secret-key --output 
{SecretKeyPath} && " +
-                        $"export 
brokerClientAuthenticationParameters=token:$(bin/pulsar tokens create 
--secret-key {SecretKeyPath} --subject {UserName}) && " +
-                        "export 
CLIENT_PREFIX_authParams=$brokerClientAuthenticationParameters && " +
-                        "bin/apply-config-from-env.py conf/standalone.conf && 
" +
-                        "bin/apply-config-from-env-with-prefix.py 
CLIENT_PREFIX_ conf/client.conf && " +
-                        "bin/pulsar standalone --no-functions-worker"
-                        + "\"";
-
-        _cluster = new Builder()
-            .UseContainer()
-            .UseImage("apachepulsar/pulsar:3.1.1")
+        var arguments =
+            $"bin/pulsar tokens create-secret-key --output {SecretKeyPath} && 
" +
+            $"export brokerClientAuthenticationParameters=token:$(bin/pulsar 
tokens create --secret-key {SecretKeyPath} --subject {UserName}) && " +
+            $"export 
CLIENT_PREFIX_authParams=$brokerClientAuthenticationParameters && 
bin/apply-config-from-env.py conf/standalone.conf && " +
+            $"bin/apply-config-from-env-with-prefix.py CLIENT_PREFIX_ 
conf/client.conf && bin/pulsar standalone --no-functions-worker";
+
+        _cluster = new ContainerBuilder()
+            .WithImage("apachepulsar/pulsar:3.1.1")
             .WithEnvironment(environmentVariables)
-            .ExposePort(Port)
-            .Command("/bin/bash -c", arguments)
+            .WithPortBinding(Port, true)
+            
.WithWaitStrategy(Wait.ForUnixContainer().UntilCommandIsCompleted(["/bin/bash", 
"-c", "bin/pulsar-admin clusters list"]))
+            .WithCommand("/bin/bash", "-c", arguments)
             .Build();
 
-        ServiceUrl = new Uri("pulsar://localhost:6650");
+        ServiceUrl = new Uri($"pulsar://{_cluster.Hostname}:{Port}");
     }
 
     public Uri ServiceUrl { get; private set; }
 
     public IAuthentication Authentication => AuthenticationFactory.Token(ct => 
ValueTask.FromResult(_token!));
 
-    public Task DisposeAsync()
+    public async Task DisposeAsync()
     {
-        _cluster.Dispose();
-        return Task.CompletedTask;
+        await _cluster.DisposeAsync();
+        _cts.Dispose();
     }
 
-    public Task InitializeAsync()
+    private void HandleClusterStateChange(string state) =>
+        _messageSink.OnMessage(new DiagnosticMessage($"The Pulsar cluster 
changed state to: {state}"));
+
+    public async Task InitializeAsync()
     {
-        _cluster.StateChange += (sender, args) => _messageSink.OnMessage(new 
DiagnosticMessage($"The Pulsar cluster changed state to: {args.State}"));
+        _cluster.Created += (_, _) => HandleClusterStateChange("Created");
+        _cluster.Creating += (_, _) => HandleClusterStateChange("Creating");
+        _cluster.Started += (_, _) => HandleClusterStateChange("Started");
+        _cluster.Starting += (_, _) => HandleClusterStateChange("Starting");
+        _cluster.Stopped += (_, _) => HandleClusterStateChange("Stopped");
+        _cluster.Stopping += (_, _) => HandleClusterStateChange("Stopping");
+
         _messageSink.OnMessage(new DiagnosticMessage("Starting container 
service"));
-        _cluster.Start();
-        _messageSink.OnMessage(new DiagnosticMessage("Container service 
started. Waiting for message in logs"));
-        _cluster.WaitForMessageInLogs("[test-user] Created namespace 
public/default", int.MaxValue);
-        _messageSink.OnMessage(new DiagnosticMessage("Got message, will now 
get endpoint"));
-        var endpoint = _cluster.ToHostExposedEndpoint($"{Port}/tcp");
+        await _cluster.StartAsync(_cts.Token);
+        _messageSink.OnMessage(new DiagnosticMessage("The container service 
has initiated. Next, we'll retrieve the endpoint."));
+        var endpoint = _cluster.GetMappedPublicPort(Port);
         _messageSink.OnMessage(new DiagnosticMessage($"Endpoint opened at 
{endpoint}"));
-        ServiceUrl = new Uri($"pulsar://localhost:{endpoint.Port}");
-        _token = CreateToken(Timeout.InfiniteTimeSpan);
-        return Task.CompletedTask;
+        ServiceUrl = new Uri($"pulsar://{_cluster.Hostname}:{endpoint}");
+        _token = await CreateToken(Timeout.InfiniteTimeSpan, _cts.Token);
     }
 
-    public string CreateToken(TimeSpan expiryTime)
+    public async Task<string> CreateToken(TimeSpan expiryTime, 
CancellationToken cancellationToken)
     {
         var arguments = $"bin/pulsar tokens create --secret-key 
{SecretKeyPath} --subject {UserName}";
 
         if (expiryTime != Timeout.InfiniteTimeSpan)
             arguments += $" --expiry-time {expiryTime.TotalSeconds}s";
 
-        var result = _cluster.Execute(arguments);
+        var result = await _cluster.ExecAsync(new[] { "/bin/bash", "-c", 
arguments }, cancellationToken);
 
-        if (!result.Success)
-            throw new InvalidOperationException($"Could not create the token: 
{result.Error}");
+        if (result.ExitCode != 0)
+            throw new InvalidOperationException($"Could not create the token: 
{result.Stderr}");
 
-        return result.Data[0];
+        return result.Stdout;
     }
 
     private static string CreateTopicName() => 
$"persistent://public/default/{Guid.NewGuid():N}";
 
-    public string CreateTopic()
+    public async Task<string> CreateTopic(CancellationToken cancellationToken)
     {
         var topic = CreateTopicName();
-        CreateTopic(topic);
+        await CreateTopic(topic, cancellationToken);
         return topic;
     }
 
-    public void CreateTopic(string topic)
+    public async Task CreateTopic(string topic, CancellationToken 
cancellationToken)
     {
         var arguments = $"bin/pulsar-admin topics create {topic}";
 
-        var result = _cluster.Execute(arguments);
+        var result = await _cluster.ExecAsync(new[] { "/bin/bash", "-c", 
arguments }, cancellationToken);
 
-        if (!result.Success)
-            throw new Exception($"Could not create the topic: {result.Error}");
+        if (result.ExitCode != 0)
+            throw new Exception($"Could not create the topic: 
{result.Stderr}");
     }
 
-    public string CreatePartitionedTopic(int numberOfPartitions)
+    public async Task<string> CreatePartitionedTopic(int numberOfPartitions, 
CancellationToken cancellationToken)
     {
         var topic = CreateTopicName();
-        CreatePartitionedTopic(topic, numberOfPartitions);
+        await CreatePartitionedTopic(topic, numberOfPartitions, 
cancellationToken);
         return topic;
     }
 
-    public void CreatePartitionedTopic(string topic, int numberOfPartitions)
+    public async Task CreatePartitionedTopic(string topic, int 
numberOfPartitions, CancellationToken cancellationToken)
     {
         var arguments = $"bin/pulsar-admin topics create-partitioned-topic 
{topic} -p {numberOfPartitions}";
 
-        var result = _cluster.Execute(arguments);
+        var result = await _cluster.ExecAsync(new[] { "/bin/bash", "-c", 
arguments }, cancellationToken);
 
-        if (!result.Success)
-            throw new Exception($"Could not create the partitioned topic: 
{result.Error}");
+        if (result.ExitCode != 0)
+            throw new Exception($"Could not create the partitioned topic: 
{result.Stderr}");
     }
 }
diff --git a/tests/DotPulsar.Tests/Internal/ConsumerTests.cs 
b/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
index 73d2938..e74fb9b 100644
--- a/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
+++ b/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
@@ -17,8 +17,6 @@ namespace DotPulsar.Tests.Internal;
 using DotPulsar.Abstractions;
 using DotPulsar.Exceptions;
 using DotPulsar.Extensions;
-using System;
-using System.Collections.Generic;
 using Xunit.Abstractions;
 
 [Collection("Integration"), Trait("Category", "Integration")]
@@ -39,7 +37,7 @@ public sealed class ConsumerTests : IDisposable
     public async Task 
GetLastMessageIds_GivenNonPartitionedTopic_ShouldGetMessageIdFromPartition()
     {
         //Arrange
-        var topicName = _fixture.CreateTopic();
+        var topicName = await _fixture.CreateTopic(_cts.Token);
         const int numberOfMessages = 6;
 
         await using var client = CreateClient();
@@ -69,7 +67,7 @@ public sealed class ConsumerTests : IDisposable
         //Arrange
         const int numberOfMessages = 6;
         const int partitions = 3;
-        var topicName = _fixture.CreatePartitionedTopic(partitions);
+        var topicName = await _fixture.CreatePartitionedTopic(partitions, 
_cts.Token);
 
         await using var client = CreateClient();
         await using var consumer = CreateConsumer(client, topicName);
@@ -97,7 +95,7 @@ public sealed class ConsumerTests : IDisposable
     {
         //Arrange
         await using var client = CreateClient();
-        await using var consumer = CreateConsumer(client, 
_fixture.CreateTopic());
+        await using var consumer = CreateConsumer(client, await 
_fixture.CreateTopic(_cts.Token));
         var expected = new List<MessageId>() { MessageId.Earliest };
 
         //Act
@@ -111,7 +109,7 @@ public sealed class ConsumerTests : IDisposable
     public async Task Receive_GivenNonPartitionedTopic_ShouldReceiveAll()
     {
         //Arrange
-        var topicName = _fixture.CreateTopic();
+        var topicName = await _fixture.CreateTopic(_cts.Token);
         const int numberOfMessages = 1000;
 
         await using var client = CreateClient();
@@ -133,7 +131,7 @@ public sealed class ConsumerTests : IDisposable
         const int numberOfMessages = 1000;
         const int partitions = 3;
 
-        var topicName = _fixture.CreatePartitionedTopic(partitions);
+        var topicName = await _fixture.CreatePartitionedTopic(partitions, 
_cts.Token);
 
         await using var client = CreateClient();
         await using var consumer = CreateConsumer(client, topicName);
@@ -160,7 +158,7 @@ public sealed class ConsumerTests : IDisposable
         })
         .ServiceUrl(new Uri("pulsar://nosuchhost")).Build();
 
-        await using var consumer = CreateConsumer(client, 
_fixture.CreateTopic());
+        await using var consumer = CreateConsumer(client, await 
_fixture.CreateTopic(_cts.Token));
 
         var receiveTask = consumer.Receive(_cts.Token).AsTask();
         semaphoreSlim.Release();
@@ -183,7 +181,7 @@ public sealed class ConsumerTests : IDisposable
         })
         .ServiceUrl(new Uri("pulsar://nosuchhost")).Build();
 
-        await using var consumer = CreateConsumer(client, 
_fixture.CreateTopic());
+        await using var consumer = CreateConsumer(client, await 
_fixture.CreateTopic(_cts.Token));
 
         await consumer.OnStateChangeTo(ConsumerState.Faulted, _cts.Token);
 
diff --git a/tests/DotPulsar.Tests/Internal/ProducerTests.cs 
b/tests/DotPulsar.Tests/Internal/ProducerTests.cs
index eb416ad..545849c 100644
--- a/tests/DotPulsar.Tests/Internal/ProducerTests.cs
+++ b/tests/DotPulsar.Tests/Internal/ProducerTests.cs
@@ -16,8 +16,6 @@ namespace DotPulsar.Tests.Internal;
 
 using DotPulsar.Abstractions;
 using DotPulsar.Extensions;
-using System;
-using System.Collections.Generic;
 using Xunit.Abstractions;
 
 [Collection("Integration"), Trait("Category", "Integration")]
@@ -39,7 +37,7 @@ public sealed class ProducerTests : IDisposable
     {
         //Arrange
         const string content = "test-message";
-        var topicName = _fixture.CreateTopic();
+        var topicName = await _fixture.CreateTopic(_cts.Token);
         await using var client = CreateClient();
         await using var producer = CreateProducer(client, topicName);
         await using var consumer = CreateConsumer(client, topicName);
@@ -58,7 +56,7 @@ public sealed class ProducerTests : IDisposable
     {
         //Arrange
         const string content = "test-message";
-        var topicName = _fixture.CreateTopic();
+        var topicName = await _fixture.CreateTopic(_cts.Token);
         await using var client = CreateClient();
         await using var producer = CreateProducer(client, topicName);
         await using var consumer = CreateConsumer(client, topicName);
@@ -106,7 +104,7 @@ public sealed class ProducerTests : IDisposable
         ProducerState expectedStateForProducer2)
     {
         //Arrange
-        var topicName = _fixture.CreateTopic();
+        var topicName = await _fixture.CreateTopic(_cts.Token);
         await using var client = CreateClient();
         await using var producer1 = CreateProducer(client, topicName, 
accessModeForProducer1);
         await producer1.OnStateChangeTo(ProducerState.Connected, _cts.Token);
@@ -143,7 +141,7 @@ public sealed class ProducerTests : IDisposable
         const string content = "test-message";
         const int partitions = 3;
         const int msgCount = 3;
-        var topicName = _fixture.CreatePartitionedTopic(partitions);
+        var topicName = await _fixture.CreatePartitionedTopic(partitions, 
_cts.Token);
         await using var client = CreateClient();
 
         //Act
@@ -189,7 +187,7 @@ public sealed class ProducerTests : IDisposable
         const int partitions = 3;
         var consumers = new List<IConsumer<string>>();
         await using var client = CreateClient();
-        var topicName = _fixture.CreatePartitionedTopic(partitions);
+        var topicName = await _fixture.CreatePartitionedTopic(partitions, 
_cts.Token);
 
         //Act
         await using var producer = CreateProducer(client, topicName);
@@ -213,7 +211,7 @@ public sealed class ProducerTests : IDisposable
     {
         //Arrange
         const int numberOfMessages = 10;
-        var topicName = _fixture.CreateTopic();
+        var topicName = await _fixture.CreateTopic(_cts.Token);
 
         await using var client = CreateClient();
         await using var consumer = CreateConsumer(client, topicName);
@@ -245,7 +243,7 @@ public sealed class ProducerTests : IDisposable
         //Arrange
         const int numberOfMessages = 10;
         const int partitions = 4;
-        var topicName = _fixture.CreatePartitionedTopic(partitions);
+        var topicName = await _fixture.CreatePartitionedTopic(partitions, 
_cts.Token);
         await using var client = CreateClient();
         await using var consumer = CreateConsumer(client, topicName);
         await using var producer = CreateProducer(client, topicName);
diff --git a/tests/DotPulsar.Tests/Internal/ReaderTests.cs 
b/tests/DotPulsar.Tests/Internal/ReaderTests.cs
index 70df162..7969844 100644
--- a/tests/DotPulsar.Tests/Internal/ReaderTests.cs
+++ b/tests/DotPulsar.Tests/Internal/ReaderTests.cs
@@ -17,8 +17,6 @@ namespace DotPulsar.Tests.Internal;
 using DotPulsar.Abstractions;
 using DotPulsar.Exceptions;
 using DotPulsar.Extensions;
-using System;
-using System.Collections.Generic;
 using Xunit.Abstractions;
 
 [Collection("Integration"), Trait("Category", "Integration")]
@@ -40,7 +38,7 @@ public sealed class ReaderTests : IDisposable
     {
         //Arrange
         await using var client = CreateClient();
-        await using var reader = CreateReader(client, MessageId.Earliest, 
_fixture.CreateTopic());
+        await using var reader = CreateReader(client, MessageId.Earliest, 
await _fixture.CreateTopic(_cts.Token));
         var expected = new List<MessageId>() { MessageId.Earliest };
 
         //Act
@@ -54,7 +52,7 @@ public sealed class ReaderTests : IDisposable
     public async Task 
GetLastMessageIds_GivenNonPartitionedTopic_ShouldGetMessageIdFromPartition()
     {
         //Arrange
-        var topicName = _fixture.CreateTopic();
+        var topicName = await _fixture.CreateTopic(_cts.Token);
         const int numberOfMessages = 6;
 
         await using var client = CreateClient();
@@ -83,7 +81,7 @@ public sealed class ReaderTests : IDisposable
         //Arrange
         const int numberOfMessages = 6;
         const int partitions = 3;
-        var topicName = _fixture.CreatePartitionedTopic(partitions);
+        var topicName = await _fixture.CreatePartitionedTopic(partitions, 
_cts.Token);
 
         await using var client = CreateClient();
         await using var reader = CreateReader(client, MessageId.Earliest, 
topicName);
@@ -110,7 +108,7 @@ public sealed class ReaderTests : IDisposable
     public async Task Receive_GivenNonPartitionedTopic_ShouldReceiveAll()
     {
         //Arrange
-        var topicName = _fixture.CreateTopic();
+        var topicName = await _fixture.CreateTopic(_cts.Token);
         const int numberOfMessages = 10;
 
         await using var client = CreateClient();
@@ -142,7 +140,7 @@ public sealed class ReaderTests : IDisposable
         //Arrange
         const int numberOfMessages = 50;
         const int partitions = 3;
-        var topicName = _fixture.CreatePartitionedTopic(partitions);
+        var topicName = await _fixture.CreatePartitionedTopic(partitions, 
_cts.Token);
 
         await using var client = CreateClient();
         await using var producer = CreateProducer(client, topicName);
@@ -180,7 +178,7 @@ public sealed class ReaderTests : IDisposable
         })
         .ServiceUrl(new Uri("pulsar://nosuchhost")).Build();
 
-        await using var reader = CreateReader(client, MessageId.Earliest, 
_fixture.CreateTopic());
+        await using var reader = CreateReader(client, MessageId.Earliest, 
await _fixture.CreateTopic(_cts.Token));
 
         var receiveTask = reader.Receive(_cts.Token).AsTask();
         semaphoreSlim.Release();
@@ -203,7 +201,7 @@ public sealed class ReaderTests : IDisposable
         })
         .ServiceUrl(new Uri("pulsar://nosuchhost")).Build();
 
-        await using var reader = CreateReader(client, MessageId.Earliest, 
_fixture.CreateTopic());
+        await using var reader = CreateReader(client, MessageId.Earliest, 
await _fixture.CreateTopic(_cts.Token));
 
         await reader.OnStateChangeTo(ReaderState.Faulted, _cts.Token);
 
diff --git a/tests/DotPulsar.Tests/PulsarClientTests.cs 
b/tests/DotPulsar.Tests/PulsarClientTests.cs
index 328334b..f90e8c8 100644
--- a/tests/DotPulsar.Tests/PulsarClientTests.cs
+++ b/tests/DotPulsar.Tests/PulsarClientTests.cs
@@ -17,7 +17,6 @@ namespace DotPulsar.Tests;
 using DotPulsar.Abstractions;
 using DotPulsar.Exceptions;
 using DotPulsar.Extensions;
-using System;
 using Xunit.Abstractions;
 
 [Collection("Integration"), Trait("Category", "Integration")]
@@ -39,7 +38,7 @@ public sealed class PulsarClientTests : IDisposable
     {
         // Arrange
         await using var client = CreateClient(ct => throw new Exception());
-        await using var producer = CreateProducer(client);
+        await using var producer = await CreateProducer(client);
 
         // Act
         var exception = await Record.ExceptionAsync(() => 
producer.Send("Test", _cts.Token).AsTask());
@@ -55,16 +54,16 @@ public sealed class PulsarClientTests : IDisposable
     {
         // Arrange
         var throwException = false;
-        await using var client = CreateClient(ct =>
+        await using var client = CreateClient(async ct =>
         {
             if (throwException)
                 throw new Exception();
-            var token = _fixture.CreateToken(TimeSpan.FromSeconds(10));
+            var token = await _fixture.CreateToken(TimeSpan.FromSeconds(10), 
_cts.Token);
             _testOutputHelper.Log($"Received token: {token}");
-            return ValueTask.FromResult(token);
+            return token;
         });
 
-        await using var producer = CreateProducer(client);
+        await using var producer = await CreateProducer(client);
 
         // Act
         _ = await producer.Send("Test", _cts.Token); // Make sure we have a 
working connection
@@ -85,7 +84,7 @@ public sealed class PulsarClientTests : IDisposable
             return string.Empty;
         });
 
-        await using var producer = CreateProducer(client);
+        await using var producer = await CreateProducer(client);
 
         // Act
         var exception = await Record.ExceptionAsync(() => 
producer.Send("Test", _cts.Token).AsTask());
@@ -104,18 +103,18 @@ public sealed class PulsarClientTests : IDisposable
         var refreshCount = 0;
         var tcs = new 
TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
 
-        await using var client = CreateClient(ct =>
+        await using var client = CreateClient(async ct =>
         {
             ++refreshCount;
             if (refreshCount == 3)
                 tcs.SetResult();
 
-            var token = _fixture.CreateToken(TimeSpan.FromSeconds(10));
+            var token = await _fixture.CreateToken(TimeSpan.FromSeconds(10), 
_cts.Token);
             _testOutputHelper.Log($"Received token: {token}");
-            return ValueTask.FromResult(token);
+            return token;
         });
 
-        await using var producer = CreateProducer(client);
+        await using var producer = await CreateProducer(client);
 
         // Act
         _ = await producer.Send("Test", _cts.Token); // Make sure we have a 
working connection
@@ -134,10 +133,10 @@ public sealed class PulsarClientTests : IDisposable
         .ServiceUrl(_fixture.ServiceUrl)
         .Build();
 
-    private IProducer<string> CreateProducer(IPulsarClient client)
+    private async Task<IProducer<string>> CreateProducer(IPulsarClient client)
         => client
         .NewProducer(Schema.String)
-        .Topic(_fixture.CreateTopic())
+        .Topic(await _fixture.CreateTopic(_cts.Token))
         .StateChangedHandler(_testOutputHelper.Log)
         .Create();
 

Reply via email to