This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 36370dd Refactoring and (slowly) making ready to support batching
when sending. Needs more automated tests to test reconnectivity, ping/pong and
inactivity
36370dd is described below
commit 36370dd068dee155aaf69e4c1676343b98a25789
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Fri Sep 22 23:40:27 2023 +0200
Refactoring and (slowly) making ready to support batching when sending.
Needs more automated tests to test reconnectivity, ping/pong and inactivity
---
src/DotPulsar/Internal/Abstractions/IConnection.cs | 9 +-
src/DotPulsar/Internal/ChannelManager.cs | 66 +++++++++++---
src/DotPulsar/Internal/ChannelManagerState.cs | 22 +++++
src/DotPulsar/Internal/Connection.cs | 100 ++++++++++++++++-----
src/DotPulsar/Internal/ConnectionPool.cs | 50 ++---------
src/DotPulsar/Internal/ConnectionState.cs | 22 +++++
src/DotPulsar/Internal/PingPongHandler.cs | 73 ++++++---------
src/DotPulsar/Internal/PingPongHandlerState.cs | 23 +++++
src/DotPulsar/Internal/SubConsumer.cs | 4 +-
.../Internal/PingPongHandlerTest.cs | 59 ++++++++----
10 files changed, 277 insertions(+), 151 deletions(-)
diff --git a/src/DotPulsar/Internal/Abstractions/IConnection.cs
b/src/DotPulsar/Internal/Abstractions/IConnection.cs
index fb8ac1f..f190ef2 100644
--- a/src/DotPulsar/Internal/Abstractions/IConnection.cs
+++ b/src/DotPulsar/Internal/Abstractions/IConnection.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -14,14 +14,15 @@
namespace DotPulsar.Internal.Abstractions;
+using DotPulsar.Abstractions;
using DotPulsar.Internal.PulsarApi;
using System;
using System.Threading;
using System.Threading.Tasks;
-public interface IConnection : IAsyncDisposable
+public interface IConnection : IState<ConnectionState>, IAsyncDisposable
{
- ValueTask<bool> HasChannels(CancellationToken cancellationToken);
+ public int MaxMessageSize { get; }
Task<ProducerResponse> Send(CommandProducer command, IChannel channel,
CancellationToken cancellationToken);
Task<SubscribeResponse> Send(CommandSubscribe command, IChannel channel,
CancellationToken cancellationToken);
@@ -42,6 +43,4 @@ public interface IConnection : IAsyncDisposable
Task Send(SendPackage command, TaskCompletionSource<BaseCommand>
responseTcs, CancellationToken cancellationToken);
Task<BaseCommand> Send(CommandGetOrCreateSchema command, CancellationToken
cancellationToken);
Task<BaseCommand> Send(CommandPartitionedTopicMetadata command,
CancellationToken cancellationToken);
-
- Task<IConnection> WaitForInactive();
}
diff --git a/src/DotPulsar/Internal/ChannelManager.cs
b/src/DotPulsar/Internal/ChannelManager.cs
index 87a1fa3..ddfa99a 100644
--- a/src/DotPulsar/Internal/ChannelManager.cs
+++ b/src/DotPulsar/Internal/ChannelManager.cs
@@ -14,15 +14,18 @@
namespace DotPulsar.Internal;
+using DotPulsar.Abstractions;
using DotPulsar.Internal.Abstractions;
using DotPulsar.Internal.Extensions;
using DotPulsar.Internal.PulsarApi;
using System;
using System.Buffers;
+using System.Threading;
using System.Threading.Tasks;
-public sealed class ChannelManager : IDisposable
+public sealed class ChannelManager : IState<ChannelManagerState>, IDisposable
{
+ private readonly StateManager<ChannelManagerState> _stateManager;
private readonly RequestResponseHandler _requestResponseHandler;
private readonly IdLookup<IChannel> _consumerChannels;
private readonly IdLookup<IChannel> _producerChannels;
@@ -30,6 +33,7 @@ public sealed class ChannelManager : IDisposable
public ChannelManager()
{
+ _stateManager = new
StateManager<ChannelManagerState>(ChannelManagerState.Inactive,
ChannelManagerState.Closed);
_requestResponseHandler = new RequestResponseHandler();
_consumerChannels = new IdLookup<IChannel>();
_producerChannels = new IdLookup<IChannel>();
@@ -40,12 +44,9 @@ public sealed class ChannelManager : IDisposable
_incoming.Set(BaseCommand.Type.ReachedEndOfTopic, cmd =>
Incoming(cmd.ReachedEndOfTopic));
}
- public bool HasChannels()
- => !_consumerChannels.IsEmpty() || !_producerChannels.IsEmpty();
-
public Task<ProducerResponse> Outgoing(CommandProducer command, IChannel
channel)
{
- var producerId = _producerChannels.Add(channel);
+ var producerId = AddProducerChannel(channel);
command.ProducerId = producerId;
var response = _requestResponseHandler.Outgoing(command);
@@ -53,7 +54,7 @@ public sealed class ChannelManager : IDisposable
{
if (result.Result.CommandType == BaseCommand.Type.Error)
{
- _ = _producerChannels.Remove(producerId);
+ _ = RemoveProducerChannel(producerId);
result.Result.Error.Throw();
}
@@ -73,7 +74,7 @@ public sealed class ChannelManager : IDisposable
public Task<SubscribeResponse> Outgoing(CommandSubscribe command, IChannel
channel)
{
- var consumerId = _consumerChannels.Add(channel);
+ var consumerId = AddConsumerChannel(channel);
command.ConsumerId = consumerId;
var response = _requestResponseHandler.Outgoing(command);
@@ -81,7 +82,7 @@ public sealed class ChannelManager : IDisposable
{
if (result.Result.CommandType == BaseCommand.Type.Error)
{
- _ = _consumerChannels.Remove(consumerId);
+ _ = RemoveConsumerChannel(consumerId);
result.Result.Error.Throw();
}
@@ -105,7 +106,7 @@ public sealed class ChannelManager : IDisposable
_ = response.ContinueWith(result =>
{
if (result.Result.CommandType == BaseCommand.Type.Success)
- _ = _consumerChannels.Remove(consumerId);
+ _ = RemoveConsumerChannel(consumerId);
}, TaskContinuationOptions.OnlyOnRanToCompletion);
return response;
@@ -125,7 +126,7 @@ public sealed class ChannelManager : IDisposable
_ = response.ContinueWith(result =>
{
if (result.Result.CommandType == BaseCommand.Type.Success)
- _ = _producerChannels.Remove(producerId);
+ _ = RemoveProducerChannel(producerId);
}, TaskContinuationOptions.OnlyOnRanToCompletion);
return response;
@@ -145,7 +146,7 @@ public sealed class ChannelManager : IDisposable
_ = response.ContinueWith(result =>
{
if (result.Result.CommandType == BaseCommand.Type.Success)
- _consumerChannels.Remove(consumerId)?.Unsubscribed();
+ RemoveConsumerChannel(consumerId)?.Unsubscribed();
}, TaskContinuationOptions.OnlyOnRanToCompletion);
return response;
@@ -195,6 +196,7 @@ public sealed class ChannelManager : IDisposable
public void Dispose()
{
+ _stateManager.SetState(ChannelManagerState.Closed);
_requestResponseHandler.Dispose();
foreach (var channel in _consumerChannels.RemoveAll())
@@ -209,7 +211,7 @@ public sealed class ChannelManager : IDisposable
private void Incoming(CommandCloseConsumer command)
{
- var channel = _consumerChannels.Remove(command.ConsumerId);
+ var channel = RemoveConsumerChannel(command.ConsumerId);
if (channel is null)
return;
@@ -219,7 +221,7 @@ public sealed class ChannelManager : IDisposable
private void Incoming(CommandCloseProducer command)
{
- var channel = _producerChannels.Remove(command.ProducerId);
+ var channel = RemoveProducerChannel(command.ProducerId);
if (channel is null)
return;
@@ -275,4 +277,42 @@ public sealed class ChannelManager : IDisposable
successAction.Invoke(response.Result.ProducerSuccess.TopicEpoch);
});
}
+
+ private ulong AddProducerChannel(IChannel channel) =>
AddChannel(_producerChannels, channel);
+
+ private ulong AddConsumerChannel(IChannel channel) =>
AddChannel(_consumerChannels, channel);
+
+ private ulong AddChannel(IdLookup<IChannel> lookup, IChannel channel)
+ {
+ var id = lookup.Add(channel);
+ _stateManager.SetState(ChannelManagerState.Active);
+ return id;
+ }
+
+ private IChannel? RemoveProducerChannel(ulong producerId) =>
RemoveChannel(_producerChannels, producerId);
+
+ private IChannel? RemoveConsumerChannel(ulong consumerId) =>
RemoveChannel(_consumerChannels, consumerId);
+
+ private IChannel? RemoveChannel(IdLookup<IChannel> lookup, ulong id)
+ {
+ var channel = lookup.Remove(id);
+ ChannelRemoved();
+ return channel;
+ }
+
+ private void ChannelRemoved()
+ {
+ if (_consumerChannels.IsEmpty() && _producerChannels.IsEmpty())
+ _stateManager.SetState(ChannelManagerState.Inactive);
+ }
+
+ public bool IsFinalState() => _stateManager.IsFinalState();
+
+ public bool IsFinalState(ChannelManagerState state) =>
_stateManager.IsFinalState(state);
+
+ public async ValueTask<ChannelManagerState>
OnStateChangeTo(ChannelManagerState state, CancellationToken cancellationToken
= default)
+ => await _stateManager.StateChangedTo(state,
cancellationToken).ConfigureAwait(false);
+
+ public async ValueTask<ChannelManagerState>
OnStateChangeFrom(ChannelManagerState state, CancellationToken
cancellationToken = default)
+ => await _stateManager.StateChangedFrom(state,
cancellationToken).ConfigureAwait(false);
}
diff --git a/src/DotPulsar/Internal/ChannelManagerState.cs
b/src/DotPulsar/Internal/ChannelManagerState.cs
new file mode 100644
index 0000000..ba91ecb
--- /dev/null
+++ b/src/DotPulsar/Internal/ChannelManagerState.cs
@@ -0,0 +1,22 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal;
+
+public enum ChannelManagerState : byte
+{
+ Active,
+ Inactive,
+ Closed
+}
diff --git a/src/DotPulsar/Internal/Connection.cs
b/src/DotPulsar/Internal/Connection.cs
index 952ece9..1b72be8 100644
--- a/src/DotPulsar/Internal/Connection.cs
+++ b/src/DotPulsar/Internal/Connection.cs
@@ -15,6 +15,7 @@
namespace DotPulsar.Internal;
using DotPulsar.Abstractions;
+using DotPulsar.Extensions;
using DotPulsar.Internal.Abstractions;
using DotPulsar.Internal.Exceptions;
using DotPulsar.Internal.Extensions;
@@ -26,37 +27,58 @@ using System.Threading.Tasks;
public sealed class Connection : IConnection
{
+ private readonly StateManager<ConnectionState> _stateManager;
private readonly AsyncLock _lock;
+ private readonly CancellationTokenSource _cancellationTokenSource;
private readonly ChannelManager _channelManager;
private readonly PingPongHandler _pingPongHandler;
private readonly IPulsarStream _stream;
private readonly IAuthentication? _authentication;
+ private readonly TimeSpan _closeOnInactiveInterval;
private int _isDisposed;
- private readonly TaskCompletionSource<IConnection> _inactiveTaskSource;
- public Connection(IPulsarStream stream, TimeSpan keepAliveInterval,
IAuthentication? authentication)
+ private Connection(IPulsarStream stream, IAuthentication? authentication,
TimeSpan keepAliveInterval, TimeSpan closeOnInactiveInterval)
{
+ _stateManager = new
StateManager<ConnectionState>(ConnectionState.Connected,
ConnectionState.Disconnected, ConnectionState.Closed);
_lock = new AsyncLock();
+ _cancellationTokenSource = new CancellationTokenSource();
_channelManager = new ChannelManager();
- _inactiveTaskSource = new TaskCompletionSource<IConnection>();
- _pingPongHandler = new PingPongHandler(this, keepAliveInterval, () =>
- {
- _inactiveTaskSource.TrySetResult(this);
- });
+ _pingPongHandler = new PingPongHandler(keepAliveInterval);
_stream = stream;
_authentication = authentication;
+ _closeOnInactiveInterval = closeOnInactiveInterval;
+ _ = Task.Factory.StartNew(() => Setup(_cancellationTokenSource.Token));
}
- public async ValueTask<bool> HasChannels(CancellationToken
cancellationToken)
+ public static async Task<Connection> Connect(
+ IPulsarStream stream,
+ IAuthentication? authentication,
+ CommandConnect commandConnect,
+ TimeSpan keepAliveInterval,
+ TimeSpan closeOnInactiveInterval,
+ CancellationToken cancellationToken)
{
- ThrowIfDisposed();
+ Connection? connection = null;
- using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
+ try
+ {
+ connection = new Connection(stream, authentication,
keepAliveInterval, closeOnInactiveInterval);
+ var response = await connection.Send(commandConnect,
cancellationToken).ConfigureAwait(false);
+ response.Expect(BaseCommand.Type.Connected);
+ connection.MaxMessageSize = response.Connected.MaxMessageSize;
+ DotPulsarMeter.ConnectionCreated();
+ return connection;
+ }
+ catch
{
- return _channelManager.HasChannels();
+ if (connection is not null)
+ await connection.DisposeAsync().ConfigureAwait(false);
+ throw;
}
}
+ public int MaxMessageSize { get; private set; }
+
public async Task<ProducerResponse> Send(CommandProducer command, IChannel
channel, CancellationToken cancellationToken)
{
ThrowIfDisposed();
@@ -91,8 +113,6 @@ public sealed class Connection : IConnection
private async Task Send(CommandAuthResponse command, CancellationToken
cancellationToken)
{
- await Task.Yield();
-
if (_authentication is not null)
{
command.Response ??= new AuthData();
@@ -300,10 +320,32 @@ public sealed class Connection : IConnection
}
}
- public async Task ProcessIncomingFrames(CancellationToken
cancellationToken)
+ private async Task Setup(CancellationToken cancellationToken)
+ {
+ var incoming = ProcessIncomingFrames(cancellationToken);
+ var channelManager =
_channelManager.OnStateChangeTo(ChannelManagerState.Inactive,
_closeOnInactiveInterval, cancellationToken).AsTask();
+ var pingPongTimeOut =
_pingPongHandler.OnStateChangeTo(PingPongHandlerState.TimedOut,
cancellationToken).AsTask();
+ _ = Task.Factory.StartNew(async () => await
KeepAlive(PingPongHandlerState.Active,
cancellationToken).ConfigureAwait(false));
+ await Task.WhenAny(incoming, channelManager,
pingPongTimeOut).ConfigureAwait(false);
+ _stateManager.SetState(ConnectionState.Disconnected);
+ }
+
+ private async Task KeepAlive(PingPongHandlerState state, CancellationToken
cancellationToken)
{
- await Task.Yield();
+ while (!cancellationToken.IsCancellationRequested)
+ {
+ state = await _pingPongHandler.OnStateChangeFrom(state,
cancellationToken).ConfigureAwait(false);
+ if (state == PingPongHandlerState.TimedOut)
+ return;
+ if (state == PingPongHandlerState.Active)
+ continue;
+ if (state == PingPongHandlerState.ThresholdExceeded)
+ await Send(new CommandPing(),
cancellationToken).ConfigureAwait(false);
+ }
+ }
+ private async Task ProcessIncomingFrames(CancellationToken
cancellationToken)
+ {
try
{
await foreach (var frame in _stream.Frames(cancellationToken))
@@ -311,13 +353,16 @@ public sealed class Connection : IConnection
var commandSize = frame.ReadUInt32(0, true);
var command =
Serializer.Deserialize<BaseCommand>(frame.Slice(4, commandSize));
- if (_pingPongHandler.Incoming(command.CommandType))
- continue;
+ _pingPongHandler.Incoming(command.CommandType);
if (command.CommandType == BaseCommand.Type.Message)
_channelManager.Incoming(command.Message, new
ReadOnlySequence<byte>(frame.Slice(commandSize + 4).ToArray()));
else if (command.CommandType == BaseCommand.Type.AuthChallenge)
- _ = Send(new CommandAuthResponse(),
cancellationToken).ConfigureAwait(false);
+ _ = Task.Factory.StartNew(async () => await Send(new
CommandAuthResponse(), cancellationToken).ConfigureAwait(false));
+ else if (command.CommandType == BaseCommand.Type.Ping)
+ _ = Task.Factory.StartNew(async () => await Send(new
CommandPong(), cancellationToken).ConfigureAwait(false));
+ else if (command.CommandType == BaseCommand.Type.Pong)
+ continue;
else
_channelManager.Incoming(command);
}
@@ -333,20 +378,29 @@ public sealed class Connection : IConnection
if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
return;
+ DotPulsarMeter.ConnectionDisposed();
+
+ _stateManager.SetState(ConnectionState.Closed);
+ _cancellationTokenSource.Cancel();
await _pingPongHandler.DisposeAsync().ConfigureAwait(false);
await _lock.DisposeAsync().ConfigureAwait(false);
_channelManager.Dispose();
await _stream.DisposeAsync().ConfigureAwait(false);
}
- public Task<IConnection> WaitForInactive()
- {
- return _inactiveTaskSource.Task;
- }
-
private void ThrowIfDisposed()
{
if (_isDisposed != 0)
throw new ConnectionDisposedException();
}
+
+ public bool IsFinalState() => _stateManager.IsFinalState();
+
+ public bool IsFinalState(ConnectionState state) =>
_stateManager.IsFinalState(state);
+
+ public async ValueTask<ConnectionState> OnStateChangeTo(ConnectionState
state, CancellationToken cancellationToken = default)
+ => await _stateManager.StateChangedTo(state,
cancellationToken).ConfigureAwait(false);
+
+ public async ValueTask<ConnectionState> OnStateChangeFrom(ConnectionState
state, CancellationToken cancellationToken = default)
+ => await _stateManager.StateChangedFrom(state,
cancellationToken).ConfigureAwait(false);
}
diff --git a/src/DotPulsar/Internal/ConnectionPool.cs
b/src/DotPulsar/Internal/ConnectionPool.cs
index 939eef7..124b50d 100644
--- a/src/DotPulsar/Internal/ConnectionPool.cs
+++ b/src/DotPulsar/Internal/ConnectionPool.cs
@@ -34,8 +34,8 @@ public sealed class ConnectionPool : IConnectionPool
private readonly EncryptionPolicy _encryptionPolicy;
private readonly ConcurrentDictionary<PulsarUrl, Connection> _connections;
private readonly CancellationTokenSource _cancellationTokenSource;
- private readonly Task _closeInactiveConnections;
private readonly string? _listenerName;
+ private readonly TimeSpan _closeInactiveConnectionsInterval;
private readonly TimeSpan _keepAliveInterval;
private readonly IAuthentication? _authentication;
@@ -57,7 +57,7 @@ public sealed class ConnectionPool : IConnectionPool
_listenerName = listenerName;
_connections = new ConcurrentDictionary<PulsarUrl, Connection>();
_cancellationTokenSource = new CancellationTokenSource();
- _closeInactiveConnections =
CloseInactiveConnections(closeInactiveConnectionsInterval,
_cancellationTokenSource.Token);
+ _closeInactiveConnectionsInterval = closeInactiveConnectionsInterval;
_keepAliveInterval = keepAliveInterval;
_authentication = authentication;
}
@@ -66,8 +66,6 @@ public sealed class ConnectionPool : IConnectionPool
{
_cancellationTokenSource.Cancel();
- await _closeInactiveConnections.ConfigureAwait(false);
-
await _lock.DisposeAsync().ConfigureAwait(false);
foreach (var serviceUrl in _connections.Keys.ToArray())
@@ -159,28 +157,21 @@ public sealed class ConnectionPool : IConnectionPool
private async Task<Connection> EstablishNewConnection(PulsarUrl url,
CancellationToken cancellationToken)
{
var stream = await
_connector.Connect(url.Physical).ConfigureAwait(false);
- var connection = new Connection(new PulsarStream(stream),
_keepAliveInterval, _authentication);
- _ = connection.WaitForInactive().ContinueWith(async _ => await
DisposeConnection(url).ConfigureAwait(false), cancellationToken);
- DotPulsarMeter.ConnectionCreated();
- _connections[url] = connection;
- _ =
connection.ProcessIncomingFrames(_cancellationTokenSource.Token).ContinueWith(t
=> DisposeConnection(url));
- var commandConnect = _commandConnect;
+ var commandConnect = _commandConnect;
if (url.ProxyThroughServiceUrl)
commandConnect = WithProxyToBroker(commandConnect, url.Logical);
- var response = await connection.Send(commandConnect,
cancellationToken).ConfigureAwait(false);
- response.Expect(BaseCommand.Type.Connected);
+ var connection = await Connection.Connect(new PulsarStream(stream),
_authentication, commandConnect, _keepAliveInterval,
_closeInactiveConnectionsInterval, cancellationToken).ConfigureAwait(false);
+ _connections[url] = connection;
+ _ =
connection.OnStateChangeFrom(ConnectionState.Connected).AsTask().ContinueWith(t
=> DisposeConnection(url));
return connection;
}
private async ValueTask DisposeConnection(PulsarUrl serviceUrl)
{
if (_connections.TryRemove(serviceUrl, out var connection) &&
connection is not null)
- {
await connection.DisposeAsync().ConfigureAwait(false);
- DotPulsarMeter.ConnectionDisposed();
- }
}
private static CommandConnect WithProxyToBroker(CommandConnect
commandConnect, Uri logicalUrl)
@@ -200,35 +191,6 @@ public sealed class ConnectionPool : IConnectionPool
};
}
- private async Task CloseInactiveConnections(TimeSpan interval,
CancellationToken cancellationToken)
- {
- while (!cancellationToken.IsCancellationRequested)
- {
- try
- {
- await Task.Delay(interval,
cancellationToken).ConfigureAwait(false);
-
- using (await
_lock.Lock(cancellationToken).ConfigureAwait(false))
- {
- var serviceUrls = _connections.Keys;
- foreach (var serviceUrl in serviceUrls)
- {
- var connection = _connections[serviceUrl];
- if (connection is null)
- continue;
-
- if (!await
connection.HasChannels(cancellationToken).ConfigureAwait(false))
- await
DisposeConnection(serviceUrl).ConfigureAwait(false);
- }
- }
- }
- catch
- {
- // ignored
- }
- }
- }
-
private sealed class PulsarUrl : IEquatable<PulsarUrl>
{
public PulsarUrl(Uri physical, Uri logical)
diff --git a/src/DotPulsar/Internal/ConnectionState.cs
b/src/DotPulsar/Internal/ConnectionState.cs
new file mode 100644
index 0000000..d1d693e
--- /dev/null
+++ b/src/DotPulsar/Internal/ConnectionState.cs
@@ -0,0 +1,22 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal;
+
+public enum ConnectionState : byte
+{
+ Connected,
+ Disconnected,
+ Closed
+}
diff --git a/src/DotPulsar/Internal/PingPongHandler.cs
b/src/DotPulsar/Internal/PingPongHandler.cs
index 0fd670f..c89a3fb 100644
--- a/src/DotPulsar/Internal/PingPongHandler.cs
+++ b/src/DotPulsar/Internal/PingPongHandler.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -14,48 +14,34 @@
namespace DotPulsar.Internal;
-using DotPulsar.Internal.Abstractions;
+using DotPulsar.Abstractions;
using DotPulsar.Internal.PulsarApi;
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
-public sealed class PingPongHandler : IAsyncDisposable
+public sealed class PingPongHandler : IState<PingPongHandlerState>,
IAsyncDisposable
{
- private readonly IConnection _connection;
+ private readonly StateManager<PingPongHandlerState> _stateManager;
private readonly TimeSpan _keepAliveInterval;
private readonly Timer _timer;
- private readonly CommandPing _ping;
- private readonly CommandPong _pong;
private long _lastCommand;
private bool _waitForPong;
- private readonly Action _inactiveCallback;
- public PingPongHandler(IConnection connection, TimeSpan keepAliveInterval,
Action inactiveCallback)
+ public PingPongHandler(TimeSpan keepAliveInterval)
{
- _connection = connection;
+ _stateManager = new
StateManager<PingPongHandlerState>(PingPongHandlerState.Active,
PingPongHandlerState.TimedOut, PingPongHandlerState.Closed);
_keepAliveInterval = keepAliveInterval;
_timer = new Timer(Watch);
_timer.Change(_keepAliveInterval, TimeSpan.Zero);
- _ping = new CommandPing();
- _pong = new CommandPong();
_lastCommand = Stopwatch.GetTimestamp();
- _inactiveCallback = inactiveCallback;
}
- public bool Incoming(BaseCommand.Type commandType)
+ public void Incoming(BaseCommand.Type _)
{
Interlocked.Exchange(ref _lastCommand, Stopwatch.GetTimestamp());
_waitForPong = false;
-
- if (commandType == BaseCommand.Type.Ping)
- {
- Task.Factory.StartNew(() => SendPong());
- return true;
- }
-
- return commandType == BaseCommand.Type.Pong;
}
private void Watch(object? state)
@@ -65,22 +51,23 @@ public sealed class PingPongHandler : IAsyncDisposable
var lastCommand = Interlocked.Read(ref _lastCommand);
var now = Stopwatch.GetTimestamp();
var elapsed = TimeSpan.FromSeconds((now - lastCommand) /
Stopwatch.Frequency);
- if (elapsed >= _keepAliveInterval)
+ if (elapsed > _keepAliveInterval)
{
if (_waitForPong)
{
- _inactiveCallback();
+ _stateManager.SetState(PingPongHandlerState.TimedOut);
return;
}
- Task.Factory.StartNew(() =>
- {
- _waitForPong = true;
- return SendPing();
- });
+
+ _waitForPong = true;
+ _stateManager.SetState(PingPongHandlerState.ThresholdExceeded);
_timer.Change(_keepAliveInterval, TimeSpan.Zero);
}
else
+ {
+ _stateManager.SetState(PingPongHandlerState.Active);
_timer.Change(_keepAliveInterval.Subtract(elapsed),
TimeSpan.Zero);
+ }
}
catch
{
@@ -88,34 +75,28 @@ public sealed class PingPongHandler : IAsyncDisposable
}
}
- private async Task SendPing()
- {
- try
- {
- await _connection.Send(_ping, default).ConfigureAwait(false);
- }
- catch { }
- }
-
- private async Task SendPong()
- {
- try
- {
- await _connection.Send(_pong, default).ConfigureAwait(false);
- }
- catch { }
- }
-
#if NETSTANDARD2_0
public ValueTask DisposeAsync()
{
_timer.Dispose();
+ _stateManager.SetState(PingPongHandlerState.Closed);
return new ValueTask();
}
#else
public async ValueTask DisposeAsync()
{
await _timer.DisposeAsync().ConfigureAwait(false);
+ _stateManager.SetState(PingPongHandlerState.Closed);
}
#endif
+
+ public bool IsFinalState() => _stateManager.IsFinalState();
+
+ public bool IsFinalState(PingPongHandlerState state) =>
_stateManager.IsFinalState(state);
+
+ public async ValueTask<PingPongHandlerState>
OnStateChangeTo(PingPongHandlerState state, CancellationToken cancellationToken
= default)
+ => await _stateManager.StateChangedTo(state,
cancellationToken).ConfigureAwait(false);
+
+ public async ValueTask<PingPongHandlerState>
OnStateChangeFrom(PingPongHandlerState state, CancellationToken
cancellationToken = default)
+ => await _stateManager.StateChangedFrom(state,
cancellationToken).ConfigureAwait(false);
}
diff --git a/src/DotPulsar/Internal/PingPongHandlerState.cs
b/src/DotPulsar/Internal/PingPongHandlerState.cs
new file mode 100644
index 0000000..ca945d6
--- /dev/null
+++ b/src/DotPulsar/Internal/PingPongHandlerState.cs
@@ -0,0 +1,23 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal;
+
+public enum PingPongHandlerState : byte
+{
+ Active,
+ ThresholdExceeded,
+ TimedOut,
+ Closed
+}
diff --git a/src/DotPulsar/Internal/SubConsumer.cs
b/src/DotPulsar/Internal/SubConsumer.cs
index 1c1d6dd..3e56fdf 100644
--- a/src/DotPulsar/Internal/SubConsumer.cs
+++ b/src/DotPulsar/Internal/SubConsumer.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -86,7 +86,7 @@ public sealed class SubConsumer<TMessage> :
IConsumer<TMessage>, IContainsChanne
if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
return;
- _eventRegister.Register(new ProducerDisposed(_correlationId));
+ _eventRegister.Register(new ConsumerDisposed(_correlationId));
await DisposeChannel().ConfigureAwait(false);
}
diff --git a/tests/DotPulsar.Tests/Internal/PingPongHandlerTest.cs
b/tests/DotPulsar.Tests/Internal/PingPongHandlerTest.cs
index 42ae214..0318588 100644
--- a/tests/DotPulsar.Tests/Internal/PingPongHandlerTest.cs
+++ b/tests/DotPulsar.Tests/Internal/PingPongHandlerTest.cs
@@ -15,13 +15,8 @@
namespace DotPulsar.Tests.Internal;
using DotPulsar.Internal;
-using DotPulsar.Internal.Abstractions;
-using DotPulsar.Internal.PulsarApi;
using FluentAssertions;
-using NSubstitute;
-using NSubstitute.ClearExtensions;
using System;
-using System.Threading;
using System.Threading.Tasks;
using Xunit;
@@ -29,19 +24,47 @@ using Xunit;
public class PingPongHandlerTest
{
[Fact]
- public async Task Watch_GivenConnectionNotAlive_ShouldDisposeConnection()
+ public async Task
Constructor_GivenNoIncomingCommands_ShouldChangeStateToTimedOut()
{
- var connection = Substitute.For<IConnection>();
- var keepAliveInterval = TimeSpan.FromSeconds(1);
- var isActive = true;
- var pingPongHandler = new PingPongHandler(connection,
keepAliveInterval, () => isActive = false);
-
- connection.When(c => c.Send(Arg.Any<CommandPing>(),
Arg.Any<CancellationToken>())).Do(c =>
pingPongHandler.Incoming(BaseCommand.Type.Pong));
- await Task.Delay(3 * keepAliveInterval);
- isActive.Should().BeTrue();
-
- connection.ClearSubstitute();
- await Task.Delay(3 * keepAliveInterval);
- isActive.Should().BeFalse();
+ // Arrange
+ var expected = PingPongHandlerState.TimedOut;
+ var uut = new PingPongHandler(TimeSpan.FromSeconds(1));
+
+ // Act
+ var actual = await uut.OnStateChangeTo(PingPongHandlerState.TimedOut);
+
+ // Assert
+ actual.Should().Be(expected);
+ }
+
+ [Fact]
+ public async Task
Incoming_GivenIncomingCommandAfterThresholdExceeded_ShouldChangeStateToActive()
+ {
+ // Arrange
+ var expected = PingPongHandlerState.Active;
+ var uut = new PingPongHandler(TimeSpan.FromSeconds(1));
+
+ // Act
+ _ = await uut.OnStateChangeTo(PingPongHandlerState.ThresholdExceeded);
+ uut.Incoming(DotPulsar.Internal.PulsarApi.BaseCommand.Type.Ack);
+ var actual = await uut.OnStateChangeTo(PingPongHandlerState.Active);
+
+ // Assert
+ actual.Should().Be(expected);
+ }
+
+ [Fact]
+ public async Task
Dispose_GivenTheStateWasActive_ShouldChangeStateToClosed()
+ {
+ // Arrange
+ var expected = PingPongHandlerState.Closed;
+ var uut = new PingPongHandler(TimeSpan.FromSeconds(1));
+
+ // Act
+ var actual = uut.OnStateChangeTo(PingPongHandlerState.Closed);
+ await uut.DisposeAsync();
+
+ // Assert
+ (await actual).Should().Be(expected);
}
}