This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1d3d8fd Creating an IAuthentication interface and an
AuthenticationFactory to be more aligned with the other clients and with the
way exception handling is implemented. We have an issue (at least locally) with
the standalone clusters used for tests. We get an
"org.apache.pulsar.broker.web.RestException: Policies not found for
public/default namespace" about 50% of the time using
docker-compose-standalone-tests.yml and always using
docker-compose-standalone-token-tests.yml
1d3d8fd is described below
commit 1d3d8fdb969bf082c8130447c7603ade9f4c5c8d
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Fri Jan 28 12:01:27 2022 +0100
Creating an IAuthentication interface and an AuthenticationFactory to be
more aligned with the other clients and with the way exception handling is
implemented.
We have an issue (at least locally) with the standalone clusters used for
tests. We get an "org.apache.pulsar.broker.web.RestException: Policies not
found for public/default namespace" about 50% of the time using
docker-compose-standalone-tests.yml and always using
docker-compose-standalone-token-tests.yml
---
src/DotPulsar/Abstractions/IAuthentication.cs | 34 ++++++
src/DotPulsar/Abstractions/IPulsarClientBuilder.cs | 6 +-
src/DotPulsar/AuthenticationFactory.cs | 37 +++++++
src/DotPulsar/Internal/Connection.cs | 45 ++++----
src/DotPulsar/Internal/ConnectionPool.cs | 29 ++---
src/DotPulsar/Internal/DefaultExceptionHandler.cs | 1 -
src/DotPulsar/Internal/DotPulsarEventSource.cs | 17 ---
.../Exceptions/TokenFactoryFailedException.cs | 10 --
src/DotPulsar/Internal/PulsarClientBuilder.cs | 31 ++----
src/DotPulsar/Internal/TokenAuthentication.cs | 37 +++++++
src/DotPulsar/Internal/TokenFactoryExtensions.cs | 26 -----
.../DotPulsar.IntegrationTests.csproj | 2 +-
...Fixture.cs => StandaloneTokenClusterFixture.cs} | 41 +++++---
.../Fixtures/StandaloneTokenClusterTest.cs | 6 --
.../Fixtures/StandaloneTokenClusterTests.cs | 20 ++++
.../ProcessAsyncHelper.cs | 20 +++-
.../TokenRefreshTests.cs | 117 +++++++++++++--------
.../DotPulsar.StressTests.csproj | 2 +-
tests/DotPulsar.Tests/DotPulsar.Tests.csproj | 4 +-
19 files changed, 297 insertions(+), 188 deletions(-)
diff --git a/src/DotPulsar/Abstractions/IAuthentication.cs
b/src/DotPulsar/Abstractions/IAuthentication.cs
new file mode 100644
index 0000000..102eb05
--- /dev/null
+++ b/src/DotPulsar/Abstractions/IAuthentication.cs
@@ -0,0 +1,34 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Abstractions;
+
+using System.Threading;
+using System.Threading.Tasks;
+
+/// <summary>
+/// An authentication abstraction.
+/// </summary>
+public interface IAuthentication
+{
+ /// <summary>
+ /// The authentication method name
+ /// </summary>
+ string AuthenticationMethodName { get; }
+
+ /// <summary>
+ /// Get the authentication data
+ /// </summary>
+ ValueTask<byte[]> GetAuthenticationData(CancellationToken
cancellationToken);
+}
diff --git a/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs
b/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs
index ea2ab5b..9cf41e0 100644
--- a/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs
+++ b/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs
@@ -16,7 +16,6 @@ namespace DotPulsar.Abstractions;
using System;
using System.Security.Cryptography.X509Certificates;
-using System.Threading.Tasks;
/// <summary>
/// A pulsar client building abstraction.
@@ -31,12 +30,13 @@ public interface IPulsarClientBuilder
/// <summary>
/// Authenticate using a (JSON Web) token. This is optional.
/// </summary>
+ [Obsolete("This method is obsolete. Call
Authentication(AuthenticationFactory.Token(...)) instead.", false)]
IPulsarClientBuilder AuthenticateUsingToken(string token);
/// <summary>
- /// Authenticate using a (JSON Web) token factory. This is optional.
+ /// Set the authentication provider. This is optional.
/// </summary>
- IPulsarClientBuilder AuthenticateUsingToken(Func<Task<string>>
tokenFactory);
+ IPulsarClientBuilder Authentication(IAuthentication authentication);
/// <summary>
/// Set connection encryption policy. The default is 'EnforceUnencrypted'
if the ServiceUrl scheme is 'pulsar' and 'EnforceEncrypted' if it's
'pulsar+ssl'.
diff --git a/src/DotPulsar/AuthenticationFactory.cs
b/src/DotPulsar/AuthenticationFactory.cs
new file mode 100644
index 0000000..6ed3dfb
--- /dev/null
+++ b/src/DotPulsar/AuthenticationFactory.cs
@@ -0,0 +1,37 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar;
+
+using DotPulsar.Abstractions;
+using DotPulsar.Internal;
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+/// <summary>
+/// Factory class that allows to create Authentication instances for all the
supported authentication methods.
+/// </summary>
+public static class AuthenticationFactory
+{
+ /// <summary>
+ /// Create an authentication provider for token based authentication.
+ /// </summary>
+ public static IAuthentication Token(string token) => new
TokenAuthentication(token);
+
+ /// <summary>
+ /// Create an authentication provider for token based authentication.
+ /// </summary>
+ public static IAuthentication Token(Func<CancellationToken,
ValueTask<string>> tokenSupplier) => new TokenAuthentication(tokenSupplier);
+}
diff --git a/src/DotPulsar/Internal/Connection.cs
b/src/DotPulsar/Internal/Connection.cs
index 60c6433..5b35c7d 100644
--- a/src/DotPulsar/Internal/Connection.cs
+++ b/src/DotPulsar/Internal/Connection.cs
@@ -15,13 +15,12 @@
namespace DotPulsar.Internal;
using Abstractions;
+using DotPulsar.Abstractions;
using Exceptions;
using Extensions;
using PulsarApi;
using System;
using System.Buffers;
-using System.Diagnostics;
-using System.Text;
using System.Threading;
using System.Threading.Tasks;
@@ -31,16 +30,16 @@ public sealed class Connection : IConnection
private readonly ChannelManager _channelManager;
private readonly PingPongHandler _pingPongHandler;
private readonly IPulsarStream _stream;
- private readonly Func<Task<string>>? _accessTokenFactory;
+ private readonly IAuthentication? _authentication;
private int _isDisposed;
- public Connection(IPulsarStream stream, TimeSpan keepAliveInterval,
Func<Task<string>>? accessTokenFactory)
+ public Connection(IPulsarStream stream, TimeSpan keepAliveInterval,
IAuthentication? authentication)
{
_lock = new AsyncLock();
_channelManager = new ChannelManager();
_pingPongHandler = new PingPongHandler(this, keepAliveInterval);
_stream = stream;
- _accessTokenFactory = accessTokenFactory;
+ _authentication = authentication;
}
public async ValueTask<bool> HasChannels(CancellationToken
cancellationToken)
@@ -85,8 +84,19 @@ public sealed class Connection : IConnection
return await responseTask.ConfigureAwait(false);
}
- private Task Send(CommandAuthResponse authResponse, CancellationToken
cancellationToken)
- => Send(authResponse.AsBaseCommand(), cancellationToken);
+ private async Task Send(CommandAuthResponse command, CancellationToken
cancellationToken)
+ {
+ if (_authentication is not null)
+ {
+ if (command.Response is null)
+ command.Response = new AuthData();
+
+ command.Response.AuthMethodName =
_authentication.AuthenticationMethodName;
+ command.Response.Data = await
_authentication.GetAuthenticationData(cancellationToken).ConfigureAwait(false);
+ }
+
+ await Send(command.AsBaseCommand(),
cancellationToken).ConfigureAwait(false);
+ }
public Task Send(CommandPing command, CancellationToken cancellationToken)
=> Send(command.AsBaseCommand(), cancellationToken);
@@ -123,6 +133,12 @@ public sealed class Connection : IConnection
{
ThrowIfDisposed();
+ if (_authentication is not null)
+ {
+ command.AuthMethodName = _authentication.AuthenticationMethodName;
+ command.AuthData = await
_authentication.GetAuthenticationData(cancellationToken).ConfigureAwait(false);
+ }
+
Task<BaseCommand>? responseTask;
using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
@@ -291,19 +307,10 @@ public sealed class Connection : IConnection
if (command.CommandType == BaseCommand.Type.Message)
_channelManager.Incoming(command.Message, new
ReadOnlySequence<byte>(frame.Slice(commandSize + 4).ToArray()));
+ else if (command.CommandType == BaseCommand.Type.AuthChallenge)
+ await Send(new CommandAuthResponse(),
cancellationToken).ConfigureAwait(false);
else
- {
- if (_accessTokenFactory != null && command.CommandType ==
BaseCommand.Type.AuthChallenge)
- {
- var token = await _accessTokenFactory.GetToken();
- await Send(new CommandAuthResponse { Response = new
AuthData { Data = Encoding.UTF8.GetBytes(token), AuthMethodName = "token" } },
cancellationToken);
- DotPulsarEventSource.Log.TokenRefreshed();
- }
- else
- {
- _channelManager.Incoming(command);
- }
- }
+ _channelManager.Incoming(command);
}
}
catch
diff --git a/src/DotPulsar/Internal/ConnectionPool.cs
b/src/DotPulsar/Internal/ConnectionPool.cs
index 9e5dc2f..e1eccb9 100644
--- a/src/DotPulsar/Internal/ConnectionPool.cs
+++ b/src/DotPulsar/Internal/ConnectionPool.cs
@@ -15,13 +15,13 @@
namespace DotPulsar.Internal;
using Abstractions;
+using DotPulsar.Abstractions;
using DotPulsar.Exceptions;
using Extensions;
using PulsarApi;
using System;
using System.Collections.Concurrent;
using System.Linq;
-using System.Text;
using System.Threading;
using System.Threading.Tasks;
@@ -37,8 +37,7 @@ public sealed class ConnectionPool : IConnectionPool
private readonly Task _closeInactiveConnections;
private readonly string? _listenerName;
private readonly TimeSpan _keepAliveInterval;
- private readonly IExecute _executor;
- private readonly Func<Task<string>>? _accessTokenFactory;
+ private readonly IAuthentication? _authentication;
public ConnectionPool(
CommandConnect commandConnect,
@@ -48,8 +47,7 @@ public sealed class ConnectionPool : IConnectionPool
TimeSpan closeInactiveConnectionsInterval,
string? listenerName,
TimeSpan keepAliveInterval,
- IExecute executor,
- Func<Task<string>>? accessTokenFactory)
+ IAuthentication? authentication)
{
_lock = new AsyncLock();
_commandConnect = commandConnect;
@@ -61,8 +59,7 @@ public sealed class ConnectionPool : IConnectionPool
_cancellationTokenSource = new CancellationTokenSource();
_closeInactiveConnections =
CloseInactiveConnections(closeInactiveConnectionsInterval,
_cancellationTokenSource.Token);
_keepAliveInterval = keepAliveInterval;
- _executor = executor;
- _accessTokenFactory = accessTokenFactory;
+ _authentication = authentication;
}
public async ValueTask DisposeAsync()
@@ -112,7 +109,6 @@ public sealed class ConnectionPool : IConnectionPool
if (response.LookupTopicResponse.ProxyThroughServiceUrl)
{
- await connection.DisposeAsync();
var url = new PulsarUrl(physicalUrl, lookupResponseServiceUrl);
return await GetConnection(url,
cancellationToken).ConfigureAwait(false);
}
@@ -146,10 +142,8 @@ public sealed class ConnectionPool : IConnectionPool
}
}
- private ValueTask<Connection> GetConnection(Uri serviceUrl,
CancellationToken cancellationToken)
- {
- return GetConnection(new PulsarUrl(serviceUrl, serviceUrl),
cancellationToken);
- }
+ private async ValueTask<Connection> GetConnection(Uri serviceUrl,
CancellationToken cancellationToken)
+ => await GetConnection(new PulsarUrl(serviceUrl, serviceUrl),
cancellationToken).ConfigureAwait(false);
private async ValueTask<Connection> GetConnection(PulsarUrl url,
CancellationToken cancellationToken)
{
@@ -165,19 +159,12 @@ public sealed class ConnectionPool : IConnectionPool
private async Task<Connection> EstablishNewConnection(PulsarUrl url,
CancellationToken cancellationToken)
{
var stream = await
_connector.Connect(url.Physical).ConfigureAwait(false);
- var connection = new Connection(new PulsarStream(stream),
_keepAliveInterval, _accessTokenFactory);
+ var connection = new Connection(new PulsarStream(stream),
_keepAliveInterval, _authentication);
DotPulsarEventSource.Log.ConnectionCreated();
_connections[url] = connection;
- _ =
connection.ProcessIncommingFrames(cancellationToken).ContinueWith(t =>
DisposeConnection(url));
+ _ =
connection.ProcessIncommingFrames(_cancellationTokenSource.Token).ContinueWith(t
=> DisposeConnection(url));
var commandConnect = _commandConnect;
- if (_accessTokenFactory != null)
- {
- var token = await _accessTokenFactory.GetToken(_executor);
- commandConnect.AuthMethodName = "token";
- commandConnect.AuthData = Encoding.UTF8.GetBytes(token);
- }
-
if (url.ProxyThroughServiceUrl)
commandConnect = WithProxyToBroker(commandConnect, url.Logical);
diff --git a/src/DotPulsar/Internal/DefaultExceptionHandler.cs
b/src/DotPulsar/Internal/DefaultExceptionHandler.cs
index 019340b..a42eeef 100644
--- a/src/DotPulsar/Internal/DefaultExceptionHandler.cs
+++ b/src/DotPulsar/Internal/DefaultExceptionHandler.cs
@@ -62,7 +62,6 @@ public sealed class DefaultExceptionHandler : IHandleException
SocketError.NetworkUnreachable => FaultAction.Rethrow,
_ => FaultAction.Retry
},
- TokenFactoryFailedException => FaultAction.Retry,
_ => FaultAction.Rethrow
};
}
diff --git a/src/DotPulsar/Internal/DotPulsarEventSource.cs
b/src/DotPulsar/Internal/DotPulsarEventSource.cs
index aaccbc4..cf6c1fc 100644
--- a/src/DotPulsar/Internal/DotPulsarEventSource.cs
+++ b/src/DotPulsar/Internal/DotPulsarEventSource.cs
@@ -38,10 +38,6 @@ public sealed class DotPulsarEventSource
public void ReaderCreated() { }
public void ReaderDisposed() { }
-
- public void TokenRefreshed() { }
-
- public long TokenRefreshCount => 0;
}
#else
@@ -81,14 +77,11 @@ public sealed class DotPulsarEventSource : EventSource
private PollingCounter? _currentReadersCounter;
private long _currentReaders;
- private PollingCounter? _tokenRefreshCounter;
- private long _totalTokenRefreshes;
#pragma warning restore IDE0052 // Remove unread private members
public static readonly DotPulsarEventSource Log = new();
public DotPulsarEventSource() : base("DotPulsar") { }
- public long TokenRefreshCount => _totalTokenRefreshes;
public void ClientCreated()
{
@@ -145,11 +138,6 @@ public sealed class DotPulsarEventSource : EventSource
Interlocked.Decrement(ref _currentReaders);
}
- public void TokenRefreshed()
- {
- Interlocked.Increment(ref _totalTokenRefreshes);
- }
-
protected override void OnEventCommand(EventCommandEventArgs command)
{
if (command.Command != EventCommand.Enable)
@@ -204,11 +192,6 @@ public sealed class DotPulsarEventSource : EventSource
{
DisplayName = "Current number of readers"
};
-
- _tokenRefreshCounter ??= new PollingCounter("total-token-refreshes",
this, () => Volatile.Read(ref _totalTokenRefreshes))
- {
- DisplayName = "Number of times token was refreshed"
- };
}
}
#endif
diff --git a/src/DotPulsar/Internal/Exceptions/TokenFactoryFailedException.cs
b/src/DotPulsar/Internal/Exceptions/TokenFactoryFailedException.cs
deleted file mode 100644
index 7dee344..0000000
--- a/src/DotPulsar/Internal/Exceptions/TokenFactoryFailedException.cs
+++ /dev/null
@@ -1,10 +0,0 @@
-namespace DotPulsar.Internal.Exceptions;
-
-using System;
-
-public class TokenFactoryFailedException : Exception
-{
- public TokenFactoryFailedException(Exception innerException) :
base("Exception when trying to fetch token from token factory", innerException)
- {
- }
-}
diff --git a/src/DotPulsar/Internal/PulsarClientBuilder.cs
b/src/DotPulsar/Internal/PulsarClientBuilder.cs
index ddd1a52..92e3fbb 100644
--- a/src/DotPulsar/Internal/PulsarClientBuilder.cs
+++ b/src/DotPulsar/Internal/PulsarClientBuilder.cs
@@ -14,15 +14,12 @@
namespace DotPulsar.Internal;
-using Abstractions;
using DotPulsar.Abstractions;
using DotPulsar.Exceptions;
using PulsarApi;
using System;
using System.Collections.Generic;
using System.Security.Cryptography.X509Certificates;
-using System.Text;
-using System.Threading.Tasks;
public sealed class PulsarClientBuilder : IPulsarClientBuilder
{
@@ -38,14 +35,18 @@ public sealed class PulsarClientBuilder :
IPulsarClientBuilder
private bool _verifyCertificateAuthority;
private bool _verifyCertificateName;
private TimeSpan _closeInactiveConnectionsInterval;
- private Func<Task<string>>? _tokenFactory;
+ private IAuthentication? _authentication;
public PulsarClientBuilder()
{
_commandConnect = new CommandConnect
{
ProtocolVersion = Constants.ProtocolVersion,
- ClientVersion = $"{Constants.ClientName} {Constants.ClientVersion}"
+ ClientVersion = $"{Constants.ClientName}
{Constants.ClientVersion}",
+ FeatureFlags = new FeatureFlags
+ {
+ SupportsAuthRefresh = true
+ }
};
_exceptionHandlers = new List<IHandleException>();
@@ -67,17 +68,13 @@ public sealed class PulsarClientBuilder :
IPulsarClientBuilder
public IPulsarClientBuilder AuthenticateUsingToken(string token)
{
- _commandConnect.AuthMethodName = "token";
- _commandConnect.AuthData = Encoding.UTF8.GetBytes(token);
+ _authentication = AuthenticationFactory.Token(token);
return this;
}
- public IPulsarClientBuilder AuthenticateUsingToken(Func<Task<string>>
tokenFactory)
+ public IPulsarClientBuilder Authentication(IAuthentication authentication)
{
- _tokenFactory = tokenFactory;
- var featureFlags = _commandConnect.FeatureFlags ?? new FeatureFlags();
- featureFlags.SupportsAuthRefresh = true;
- _commandConnect.FeatureFlags = featureFlags;
+ _authentication = authentication;
return this;
}
@@ -168,17 +165,9 @@ public sealed class PulsarClientBuilder :
IPulsarClientBuilder
var exceptionHandlers = new List<IHandleException>(_exceptionHandlers)
{ new DefaultExceptionHandler(_retryInterval) };
var exceptionHandlerPipeline = new
ExceptionHandlerPipeline(exceptionHandlers);
- var connectionPool = new ConnectionPool(_commandConnect, _serviceUrl,
connector, _encryptionPolicy.Value, _closeInactiveConnectionsInterval,
_listenerName,
- _keepAliveInterval,
- new Executor(Guid.Empty, new EmptyRegisterEvent(),
exceptionHandlerPipeline),
- _tokenFactory);
+ var connectionPool = new ConnectionPool(_commandConnect, _serviceUrl,
connector, _encryptionPolicy.Value, _closeInactiveConnectionsInterval,
_listenerName, _keepAliveInterval, _authentication);
var processManager = new ProcessManager(connectionPool);
return new PulsarClient(connectionPool, processManager,
exceptionHandlerPipeline, _serviceUrl);
}
}
-
-internal class EmptyRegisterEvent : IRegisterEvent
-{
- public void Register(IEvent @event) { }
-}
diff --git a/src/DotPulsar/Internal/TokenAuthentication.cs
b/src/DotPulsar/Internal/TokenAuthentication.cs
new file mode 100644
index 0000000..a9f7ea1
--- /dev/null
+++ b/src/DotPulsar/Internal/TokenAuthentication.cs
@@ -0,0 +1,37 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal;
+
+using DotPulsar.Abstractions;
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+public sealed class TokenAuthentication : IAuthentication
+{
+ private readonly Func<CancellationToken, ValueTask<string>> _tokenSupplier;
+
+ public TokenAuthentication(string token) => _tokenSupplier =
(cancellationToken) => new ValueTask<string>(token);
+
+ public TokenAuthentication(Func<CancellationToken, ValueTask<string>>
tokenSupplier) => _tokenSupplier = tokenSupplier;
+
+ public string AuthenticationMethodName => "token";
+
+ public async ValueTask<byte[]> GetAuthenticationData(CancellationToken
cancellationToken)
+ {
+ var token = await
_tokenSupplier(cancellationToken).ConfigureAwait(false);
+ return System.Text.Encoding.UTF8.GetBytes(token);
+ }
+}
diff --git a/src/DotPulsar/Internal/TokenFactoryExtensions.cs
b/src/DotPulsar/Internal/TokenFactoryExtensions.cs
deleted file mode 100644
index 3aa26bf..0000000
--- a/src/DotPulsar/Internal/TokenFactoryExtensions.cs
+++ /dev/null
@@ -1,26 +0,0 @@
-namespace DotPulsar.Internal;
-
-using Abstractions;
-using Exceptions;
-using System;
-using System.Threading.Tasks;
-
-internal static class TokenFactoryExtensions
-{
- public static async Task<string> GetToken(this Func<Task<string>>
tokenFactory, IExecute executor)
- {
- return await executor.Execute(tokenFactory.GetToken);
- }
-
- public static async Task<string> GetToken(this Func<Task<string>>
tokenFactory)
- {
- try
- {
- return await tokenFactory();
- }
- catch (Exception e)
- {
- throw new TokenFactoryFailedException(e);
- }
- }
-}
diff --git a/tests/DotPulsar.IntegrationTests/DotPulsar.IntegrationTests.csproj
b/tests/DotPulsar.IntegrationTests/DotPulsar.IntegrationTests.csproj
index c1997b6..7ce98c4 100644
--- a/tests/DotPulsar.IntegrationTests/DotPulsar.IntegrationTests.csproj
+++ b/tests/DotPulsar.IntegrationTests/DotPulsar.IntegrationTests.csproj
@@ -7,7 +7,7 @@
</PropertyGroup>
<ItemGroup>
- <PackageReference Include="FluentAssertions" Version="6.3.0" />
+ <PackageReference Include="FluentAssertions" Version="6.4.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.0.0" />
<PackageReference Include="xunit" Version="2.4.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.3">
diff --git a/tests/DotPulsar.IntegrationTests/Fixtures/TokenClusterFixture.cs
b/tests/DotPulsar.IntegrationTests/Fixtures/StandaloneTokenClusterFixture.cs
similarity index 59%
rename from tests/DotPulsar.IntegrationTests/Fixtures/TokenClusterFixture.cs
rename to
tests/DotPulsar.IntegrationTests/Fixtures/StandaloneTokenClusterFixture.cs
index 3c57911..f2c7884 100644
--- a/tests/DotPulsar.IntegrationTests/Fixtures/TokenClusterFixture.cs
+++ b/tests/DotPulsar.IntegrationTests/Fixtures/StandaloneTokenClusterFixture.cs
@@ -1,3 +1,17 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
namespace DotPulsar.IntegrationTests.Fixtures;
using Abstraction;
@@ -9,11 +23,11 @@ using System.Threading.Tasks;
using Xunit.Abstractions;
using Xunit.Sdk;
-public class TokenClusterFixture : PulsarServiceBase
+public class StandaloneTokenClusterFixture : PulsarServiceBase
{
private readonly IMessageSink _messageSink;
- public TokenClusterFixture(IMessageSink messageSink) : base(messageSink)
+ public StandaloneTokenClusterFixture(IMessageSink messageSink) :
base(messageSink)
{
_messageSink = messageSink;
}
@@ -24,7 +38,8 @@ public class TokenClusterFixture : PulsarServiceBase
{
await TakeDownPulsar(); // clean-up if anything was left running from
previous run
- await ProcessAsyncHelper.ExecuteShellCommand("docker-compose", "-f
docker-compose-standalone-token-tests.yml up -d")
+ await ProcessAsyncHelper
+ .ExecuteShellCommand("docker-compose", "-f
docker-compose-standalone-token-tests.yml up -d")
.ThrowOnFailure();
var waitTries = 10;
@@ -44,7 +59,7 @@ public class TokenClusterFixture : PulsarServiceBase
await
client.GetAsync($"{PulsarService.GetWebServiceUri()}/metrics/").ConfigureAwait(false);
return;
}
- catch(Exception e)
+ catch (Exception e)
{
_messageSink.OnMessage(new DiagnosticMessage("Error trying to
fetch metrics: {0}", e));
waitTries--;
@@ -63,26 +78,22 @@ public class TokenClusterFixture : PulsarServiceBase
public override Uri GetWebServiceUri() => new("http://localhost:54548");
private Task TakeDownPulsar()
- => ProcessAsyncHelper.ExecuteShellCommand("docker-compose", "-f
docker-compose-standalone-token-tests.yml down")
- .LogFailure(s => MessageSink.OnMessage(new
DiagnosticMessage("Error bringing down container: {0}", s)));
+ => ProcessAsyncHelper
+ .ExecuteShellCommand("docker-compose", "-f
docker-compose-standalone-token-tests.yml down")
+ .LogFailure(s => MessageSink.OnMessage(new DiagnosticMessage("Error
bringing down container: {0}", s)));
public static async Task<string> GetAuthToken(bool includeExpiry)
{
var arguments = "exec pulsar-tokens bin/pulsar tokens create
--secret-key file:///appdata/my-secret.key --subject test-user";
if (includeExpiry)
- {
arguments += " --expiry-time 10s";
- }
- var result = await ProcessAsyncHelper.ExecuteShellCommand("docker",
- arguments);
+ var tokenCreateRequest = await
ProcessAsyncHelper.ExecuteShellCommand("docker", arguments);
- if (!result.Completed)
- {
- throw new InvalidOperationException($"Getting token from container
failed{Environment.NewLine}{result.Output}");
- }
+ if (!tokenCreateRequest.Completed)
+ throw new InvalidOperationException($"Getting token from container
failed: {tokenCreateRequest.Output}");
- return result.Output;
+ return tokenCreateRequest.Output;
}
}
diff --git
a/tests/DotPulsar.IntegrationTests/Fixtures/StandaloneTokenClusterTest.cs
b/tests/DotPulsar.IntegrationTests/Fixtures/StandaloneTokenClusterTest.cs
deleted file mode 100644
index cb27b93..0000000
--- a/tests/DotPulsar.IntegrationTests/Fixtures/StandaloneTokenClusterTest.cs
+++ /dev/null
@@ -1,6 +0,0 @@
-namespace DotPulsar.IntegrationTests.Fixtures;
-
-using Xunit;
-
-[CollectionDefinition(nameof(StandaloneTokenClusterTest))]
-public class StandaloneTokenClusterTest :
ICollectionFixture<TokenClusterFixture> { }
diff --git
a/tests/DotPulsar.IntegrationTests/Fixtures/StandaloneTokenClusterTests.cs
b/tests/DotPulsar.IntegrationTests/Fixtures/StandaloneTokenClusterTests.cs
new file mode 100644
index 0000000..612e79c
--- /dev/null
+++ b/tests/DotPulsar.IntegrationTests/Fixtures/StandaloneTokenClusterTests.cs
@@ -0,0 +1,20 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.IntegrationTests.Fixtures;
+
+using Xunit;
+
+[CollectionDefinition(nameof(StandaloneTokenClusterTests))]
+public class StandaloneTokenClusterTests :
ICollectionFixture<StandaloneTokenClusterFixture> { }
diff --git a/tests/DotPulsar.IntegrationTests/ProcessAsyncHelper.cs
b/tests/DotPulsar.IntegrationTests/ProcessAsyncHelper.cs
index 56c00b8..c967563 100644
--- a/tests/DotPulsar.IntegrationTests/ProcessAsyncHelper.cs
+++ b/tests/DotPulsar.IntegrationTests/ProcessAsyncHelper.cs
@@ -1,10 +1,24 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.IntegrationTests;
+
using System;
using System.Diagnostics;
using System.Text;
using System.Threading.Tasks;
-namespace DotPulsar.IntegrationTests;
-
public static class ProcessAsyncHelper
{
public static async Task ThrowOnFailure(this Task<ProcessResult>
resultTask)
@@ -110,14 +124,12 @@ public static class ProcessAsyncHelper
return result;
}
-
private static async Task<bool> WaitForExitAsync(Process process)
{
await process.WaitForExitAsync();
return process.ExitCode == 0;
}
-
public struct ProcessResult
{
public bool Completed;
diff --git a/tests/DotPulsar.IntegrationTests/TokenRefreshTests.cs
b/tests/DotPulsar.IntegrationTests/TokenRefreshTests.cs
index 5a0891a..e244cab 100644
--- a/tests/DotPulsar.IntegrationTests/TokenRefreshTests.cs
+++ b/tests/DotPulsar.IntegrationTests/TokenRefreshTests.cs
@@ -1,21 +1,33 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
namespace DotPulsar.IntegrationTests;
using Abstraction;
using Abstractions;
using Extensions;
using Fixtures;
-using Internal;
using System;
-using System.Buffers;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
-using System.Text;
+using System.Threading;
using System.Threading.Tasks;
using Xunit;
using Xunit.Abstractions;
-[Collection(nameof(StandaloneTokenClusterTest))]
+[Collection(nameof(StandaloneTokenClusterTests))]
public class TokenRefreshTests
{
public enum TokenTestRefreshType
@@ -30,7 +42,7 @@ public class TokenRefreshTests
private readonly ITestOutputHelper _testOutputHelper;
private readonly IPulsarService _pulsarService;
- public TokenRefreshTests(ITestOutputHelper outputHelper,
TokenClusterFixture fixture)
+ public TokenRefreshTests(ITestOutputHelper outputHelper,
StandaloneTokenClusterFixture fixture)
{
_testOutputHelper = outputHelper;
Debug.Assert(fixture.PulsarService != null, "fixture.PulsarService !=
null");
@@ -44,11 +56,9 @@ public class TokenRefreshTests
[Theory]
public async Task TestExpiryRefresh(TokenTestRefreshType refreshType, int
timesToFail)
{
- var initialRefreshCount = DotPulsarEventSource.Log.TokenRefreshCount;
-
var publishingStarted = false;
var delayedNames = new HashSet<string>();
- Task<string> GetToken(string name, ref int count)
+ ValueTask<string> GetToken(string name, ref int count)
{
if (refreshType is TokenTestRefreshType.Standard)
{
@@ -57,74 +67,68 @@ public class TokenRefreshTests
if (refreshType is TokenTestRefreshType.FailAtStartup &&
!publishingStarted && ++count <= timesToFail)
{
- return Task.FromException<string>(new Exception("Initial Token
Failed"));
+ return ValueTask.FromException<string>(new Exception("Initial
Token Failed"));
}
if (refreshType is TokenTestRefreshType.FailOnRefresh &&
publishingStarted && ++count <= timesToFail)
{
- return Task.FromException<string>(count == 1 ? new
Exception("Refresh Failed") : new Exception("Initial Token Failed"));
+ return ValueTask.FromException<string>(count == 1 ? new
Exception("Refresh Failed") : new Exception("Initial Token Failed"));
}
if (refreshType is TokenTestRefreshType.TimeoutOnRefresh &&
publishingStarted && !delayedNames.Contains(name))
{
delayedNames.Add(name);
- return Task.Delay(6000).ContinueWith(_ =>
GetAuthToken(name)).Unwrap();
+ Task.Delay(6000);
}
return GetAuthToken(name);
}
var producerTokenCount = 0;
- await using var producerClient = GetPulsarClient("Producer", ()
- => GetToken("Producer", ref producerTokenCount));
+ await using var producerClient = GetPulsarClient("Producer", (ct) =>
GetToken("Producer", ref producerTokenCount));
var consumerTokenCount = 0;
- await using var consumerClient = GetPulsarClient("Consumer", ()
- => GetToken("Consumer", ref consumerTokenCount));
+ await using var consumerClient = GetPulsarClient("Consumer", (ct) =>
GetToken("Consumer", ref consumerTokenCount));
- var producer = CreateProducer(producerClient);
+ await using var producer = producerClient.NewProducer(Schema.String)
+ .Topic(MyTopic)
+ .StateChangedHandler(Monitor)
+ .Create();
- var consumer = consumerClient.NewConsumer(Schema.String)
+ await using var consumer = consumerClient.NewConsumer(Schema.String)
.Topic(MyTopic)
+ .StateChangedHandler(Monitor)
.SubscriptionName("test-sub")
.InitialPosition(SubscriptionInitialPosition.Earliest)
.Create();
- var received = new List<string>();
const int messageCount = 20;
+ var received = new List<string>(messageCount);
var publisherTask = Task.Run(async () =>
{
for (var i = 0; i < messageCount; i++)
{
_testOutputHelper.WriteLine("Trying to publish message for
index {0}", i);
- var messageId = await
producer.Send(Encoding.UTF8.GetBytes(i.ToString()));
+ var messageId = await producer.Send(i.ToString());
publishingStarted = true;
_testOutputHelper.WriteLine("Published message {0} for index
{1}", messageId, i);
-
await Task.Delay(1000);
}
});
var consumerTask = Task.Run(async () =>
{
- await consumer.OnStateChangeTo(ConsumerState.Active);
- for (int j = 0; j < messageCount; j++)
+ for (var i = 0; i < messageCount; i++)
{
var message = await consumer.Receive();
- received.Add(Encoding.UTF8.GetString(message.Data));
+ received.Add(message.Value());
}
});
- var all = Task.WhenAll(consumerTask, publisherTask);
var timeoutTask = Task.Delay(60_000);
- var result = await Task.WhenAny(all, timeoutTask);
- Assert.True(result != timeoutTask);
-
- if (refreshType is TokenTestRefreshType.Standard)
- {
- Assert.True(DotPulsarEventSource.Log.TokenRefreshCount >
initialRefreshCount);
- }
+ await Task.WhenAny(Task.WhenAll(consumerTask, publisherTask),
timeoutTask);
+ Assert.False(timeoutTask.IsCompleted);
var expected = Enumerable.Range(0, messageCount).Select(i =>
i.ToString()).ToList();
var missing = expected.Except(received).ToList();
@@ -135,22 +139,53 @@ public class TokenRefreshTests
}
}
- private static IProducer<ReadOnlySequence<byte>>
CreateProducer(IPulsarClient producerClient)
- => producerClient.NewProducer()
- .Topic(MyTopic)
- .Create();
-
- private IPulsarClient GetPulsarClient(string name, Func<Task<string>>
tokenFactory)
+ private IPulsarClient GetPulsarClient(string name, Func<CancellationToken,
ValueTask<string>> tokenFactory)
=> PulsarClient.Builder()
- .AuthenticateUsingToken(tokenFactory)
+ .Authentication(AuthenticationFactory.Token(tokenFactory))
.RetryInterval(TimeSpan.FromSeconds(1))
- .ExceptionHandler(ec => _testOutputHelper.WriteLine("Error
(handled={0}) occurred in {1} client: {2}", ec.ExceptionHandled, name,
ec.Exception))
+ .ExceptionHandler(ec =>
+ {
+ _testOutputHelper.WriteLine("Error (handled={0}) occurred in
{1} client: {2}", ec.ExceptionHandled, name, ec.Exception);
+ })
.ServiceUrl(_pulsarService.GetBrokerUri()).Build();
- private async Task<string> GetAuthToken(string name)
+ private async ValueTask<string> GetAuthToken(string name)
{
- var result = await TokenClusterFixture.GetAuthToken(true);
+ var result = await StandaloneTokenClusterFixture.GetAuthToken(true);
_testOutputHelper.WriteLine("{0} received token {1}", name, result);
return result;
}
+
+ private void Monitor(ProducerStateChanged stateChanged, CancellationToken
cancellationToken)
+ {
+ var stateMessage = stateChanged.ProducerState switch
+ {
+ ProducerState.Connected => "is connected",
+ ProducerState.Disconnected => "is disconnected",
+ ProducerState.PartiallyConnected => "is partially connected",
+ ProducerState.Closed => "has closed",
+ ProducerState.Faulted => "has faulted",
+ _ => $"has an unknown state '{stateChanged.ProducerState}'"
+ };
+
+ var topic = stateChanged.Producer.Topic;
+ _testOutputHelper.WriteLine($"The producer for topic '{topic}' " +
stateMessage);
+ }
+
+ private void Monitor(ConsumerStateChanged stateChanged, CancellationToken
cancellationToken)
+ {
+ var stateMessage = stateChanged.ConsumerState switch
+ {
+ ConsumerState.Active => "is active",
+ ConsumerState.Inactive => "is inactive",
+ ConsumerState.Disconnected => "is disconnected",
+ ConsumerState.Closed => "has closed",
+ ConsumerState.ReachedEndOfTopic => "has reached end of topic",
+ ConsumerState.Faulted => "has faulted",
+ _ => $"has an unknown state '{stateChanged.ConsumerState}'"
+ };
+
+ var topic = stateChanged.Consumer.Topic;
+ _testOutputHelper.WriteLine($"The consumer for topic '{topic}' " +
stateMessage);
+ }
}
diff --git a/tests/DotPulsar.StressTests/DotPulsar.StressTests.csproj
b/tests/DotPulsar.StressTests/DotPulsar.StressTests.csproj
index 5524900..2d5d538 100644
--- a/tests/DotPulsar.StressTests/DotPulsar.StressTests.csproj
+++ b/tests/DotPulsar.StressTests/DotPulsar.StressTests.csproj
@@ -16,7 +16,7 @@
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers;
buildtransitive</IncludeAssets>
</PackageReference>
- <PackageReference Include="FluentAssertions" Version="6.3.0" />
+ <PackageReference Include="FluentAssertions" Version="6.4.0" />
</ItemGroup>
<ItemGroup>
diff --git a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
index 55fd489..4d44986 100644
--- a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
+++ b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
@@ -6,8 +6,8 @@
</PropertyGroup>
<ItemGroup>
- <PackageReference Include="FluentAssertions" Version="6.3.0" />
- <PackageReference Include="NSubstitute" Version="4.2.2" />
+ <PackageReference Include="FluentAssertions" Version="6.4.0" />
+ <PackageReference Include="NSubstitute" Version="4.3.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.0.0" />
<PackageReference Include="xunit" Version="2.4.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.3">