This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 36370dd  Refactoring and (slowly) making ready to support batching 
when sending. Needs more automated tests to test reconnectivity, ping/pong and 
inactivity
36370dd is described below

commit 36370dd068dee155aaf69e4c1676343b98a25789
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Fri Sep 22 23:40:27 2023 +0200

    Refactoring and (slowly) making ready to support batching when sending. 
Needs more automated tests to test reconnectivity, ping/pong and inactivity
---
 src/DotPulsar/Internal/Abstractions/IConnection.cs |   9 +-
 src/DotPulsar/Internal/ChannelManager.cs           |  66 +++++++++++---
 src/DotPulsar/Internal/ChannelManagerState.cs      |  22 +++++
 src/DotPulsar/Internal/Connection.cs               | 100 ++++++++++++++++-----
 src/DotPulsar/Internal/ConnectionPool.cs           |  50 ++---------
 src/DotPulsar/Internal/ConnectionState.cs          |  22 +++++
 src/DotPulsar/Internal/PingPongHandler.cs          |  73 ++++++---------
 src/DotPulsar/Internal/PingPongHandlerState.cs     |  23 +++++
 src/DotPulsar/Internal/SubConsumer.cs              |   4 +-
 .../Internal/PingPongHandlerTest.cs                |  59 ++++++++----
 10 files changed, 277 insertions(+), 151 deletions(-)

diff --git a/src/DotPulsar/Internal/Abstractions/IConnection.cs 
b/src/DotPulsar/Internal/Abstractions/IConnection.cs
index fb8ac1f..f190ef2 100644
--- a/src/DotPulsar/Internal/Abstractions/IConnection.cs
+++ b/src/DotPulsar/Internal/Abstractions/IConnection.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -14,14 +14,15 @@
 
 namespace DotPulsar.Internal.Abstractions;
 
+using DotPulsar.Abstractions;
 using DotPulsar.Internal.PulsarApi;
 using System;
 using System.Threading;
 using System.Threading.Tasks;
 
-public interface IConnection : IAsyncDisposable
+public interface IConnection : IState<ConnectionState>, IAsyncDisposable
 {
-    ValueTask<bool> HasChannels(CancellationToken cancellationToken);
+    public int MaxMessageSize { get; }
 
     Task<ProducerResponse> Send(CommandProducer command, IChannel channel, 
CancellationToken cancellationToken);
     Task<SubscribeResponse> Send(CommandSubscribe command, IChannel channel, 
CancellationToken cancellationToken);
@@ -42,6 +43,4 @@ public interface IConnection : IAsyncDisposable
     Task Send(SendPackage command, TaskCompletionSource<BaseCommand> 
responseTcs, CancellationToken cancellationToken);
     Task<BaseCommand> Send(CommandGetOrCreateSchema command, CancellationToken 
cancellationToken);
     Task<BaseCommand> Send(CommandPartitionedTopicMetadata command, 
CancellationToken cancellationToken);
-
-    Task<IConnection> WaitForInactive();
 }
diff --git a/src/DotPulsar/Internal/ChannelManager.cs 
b/src/DotPulsar/Internal/ChannelManager.cs
index 87a1fa3..ddfa99a 100644
--- a/src/DotPulsar/Internal/ChannelManager.cs
+++ b/src/DotPulsar/Internal/ChannelManager.cs
@@ -14,15 +14,18 @@
 
 namespace DotPulsar.Internal;
 
+using DotPulsar.Abstractions;
 using DotPulsar.Internal.Abstractions;
 using DotPulsar.Internal.Extensions;
 using DotPulsar.Internal.PulsarApi;
 using System;
 using System.Buffers;
+using System.Threading;
 using System.Threading.Tasks;
 
-public sealed class ChannelManager : IDisposable
+public sealed class ChannelManager : IState<ChannelManagerState>, IDisposable
 {
+    private readonly StateManager<ChannelManagerState> _stateManager;
     private readonly RequestResponseHandler _requestResponseHandler;
     private readonly IdLookup<IChannel> _consumerChannels;
     private readonly IdLookup<IChannel> _producerChannels;
@@ -30,6 +33,7 @@ public sealed class ChannelManager : IDisposable
 
     public ChannelManager()
     {
+        _stateManager = new 
StateManager<ChannelManagerState>(ChannelManagerState.Inactive, 
ChannelManagerState.Closed);
         _requestResponseHandler = new RequestResponseHandler();
         _consumerChannels = new IdLookup<IChannel>();
         _producerChannels = new IdLookup<IChannel>();
@@ -40,12 +44,9 @@ public sealed class ChannelManager : IDisposable
         _incoming.Set(BaseCommand.Type.ReachedEndOfTopic, cmd => 
Incoming(cmd.ReachedEndOfTopic));
     }
 
-    public bool HasChannels()
-        => !_consumerChannels.IsEmpty() || !_producerChannels.IsEmpty();
-
     public Task<ProducerResponse> Outgoing(CommandProducer command, IChannel 
channel)
     {
-        var producerId = _producerChannels.Add(channel);
+        var producerId = AddProducerChannel(channel);
         command.ProducerId = producerId;
         var response = _requestResponseHandler.Outgoing(command);
 
@@ -53,7 +54,7 @@ public sealed class ChannelManager : IDisposable
         {
             if (result.Result.CommandType == BaseCommand.Type.Error)
             {
-                _ = _producerChannels.Remove(producerId);
+                _ = RemoveProducerChannel(producerId);
                 result.Result.Error.Throw();
             }
 
@@ -73,7 +74,7 @@ public sealed class ChannelManager : IDisposable
 
     public Task<SubscribeResponse> Outgoing(CommandSubscribe command, IChannel 
channel)
     {
-        var consumerId = _consumerChannels.Add(channel);
+        var consumerId = AddConsumerChannel(channel);
         command.ConsumerId = consumerId;
         var response = _requestResponseHandler.Outgoing(command);
 
@@ -81,7 +82,7 @@ public sealed class ChannelManager : IDisposable
         {
             if (result.Result.CommandType == BaseCommand.Type.Error)
             {
-                _ = _consumerChannels.Remove(consumerId);
+                _ = RemoveConsumerChannel(consumerId);
                 result.Result.Error.Throw();
             }
 
@@ -105,7 +106,7 @@ public sealed class ChannelManager : IDisposable
         _ = response.ContinueWith(result =>
         {
             if (result.Result.CommandType == BaseCommand.Type.Success)
-                _ = _consumerChannels.Remove(consumerId);
+                _ = RemoveConsumerChannel(consumerId);
         }, TaskContinuationOptions.OnlyOnRanToCompletion);
 
         return response;
@@ -125,7 +126,7 @@ public sealed class ChannelManager : IDisposable
         _ = response.ContinueWith(result =>
         {
             if (result.Result.CommandType == BaseCommand.Type.Success)
-                _ = _producerChannels.Remove(producerId);
+                _ = RemoveProducerChannel(producerId);
         }, TaskContinuationOptions.OnlyOnRanToCompletion);
 
         return response;
@@ -145,7 +146,7 @@ public sealed class ChannelManager : IDisposable
         _ = response.ContinueWith(result =>
         {
             if (result.Result.CommandType == BaseCommand.Type.Success)
-                _consumerChannels.Remove(consumerId)?.Unsubscribed();
+                RemoveConsumerChannel(consumerId)?.Unsubscribed();
         }, TaskContinuationOptions.OnlyOnRanToCompletion);
 
         return response;
@@ -195,6 +196,7 @@ public sealed class ChannelManager : IDisposable
 
     public void Dispose()
     {
+        _stateManager.SetState(ChannelManagerState.Closed);
         _requestResponseHandler.Dispose();
 
         foreach (var channel in _consumerChannels.RemoveAll())
@@ -209,7 +211,7 @@ public sealed class ChannelManager : IDisposable
 
     private void Incoming(CommandCloseConsumer command)
     {
-        var channel = _consumerChannels.Remove(command.ConsumerId);
+        var channel = RemoveConsumerChannel(command.ConsumerId);
         if (channel is null)
             return;
 
@@ -219,7 +221,7 @@ public sealed class ChannelManager : IDisposable
 
     private void Incoming(CommandCloseProducer command)
     {
-        var channel = _producerChannels.Remove(command.ProducerId);
+        var channel = RemoveProducerChannel(command.ProducerId);
         if (channel is null)
             return;
 
@@ -275,4 +277,42 @@ public sealed class ChannelManager : IDisposable
             successAction.Invoke(response.Result.ProducerSuccess.TopicEpoch);
         });
     }
+
+    private ulong AddProducerChannel(IChannel channel) => 
AddChannel(_producerChannels, channel);
+
+    private ulong AddConsumerChannel(IChannel channel) => 
AddChannel(_consumerChannels, channel);
+
+    private ulong AddChannel(IdLookup<IChannel> lookup, IChannel channel)
+    {
+        var id = lookup.Add(channel);
+        _stateManager.SetState(ChannelManagerState.Active);
+        return id;
+    }
+
+    private IChannel? RemoveProducerChannel(ulong producerId) => 
RemoveChannel(_producerChannels, producerId);
+
+    private IChannel? RemoveConsumerChannel(ulong consumerId) => 
RemoveChannel(_consumerChannels, consumerId);
+
+    private IChannel? RemoveChannel(IdLookup<IChannel> lookup, ulong id)
+    {
+        var channel = lookup.Remove(id);
+        ChannelRemoved();
+        return channel;
+    }
+
+    private void ChannelRemoved()
+    {
+        if (_consumerChannels.IsEmpty() && _producerChannels.IsEmpty())
+            _stateManager.SetState(ChannelManagerState.Inactive);
+    }
+
+    public bool IsFinalState() => _stateManager.IsFinalState();
+
+    public bool IsFinalState(ChannelManagerState state) => 
_stateManager.IsFinalState(state);
+
+    public async ValueTask<ChannelManagerState> 
OnStateChangeTo(ChannelManagerState state, CancellationToken cancellationToken 
= default)
+        => await _stateManager.StateChangedTo(state, 
cancellationToken).ConfigureAwait(false);
+
+    public async ValueTask<ChannelManagerState> 
OnStateChangeFrom(ChannelManagerState state, CancellationToken 
cancellationToken = default)
+        => await _stateManager.StateChangedFrom(state, 
cancellationToken).ConfigureAwait(false);
 }
diff --git a/src/DotPulsar/Internal/ChannelManagerState.cs 
b/src/DotPulsar/Internal/ChannelManagerState.cs
new file mode 100644
index 0000000..ba91ecb
--- /dev/null
+++ b/src/DotPulsar/Internal/ChannelManagerState.cs
@@ -0,0 +1,22 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal;
+
+public enum ChannelManagerState : byte
+{
+    Active,
+    Inactive,
+    Closed
+}
diff --git a/src/DotPulsar/Internal/Connection.cs 
b/src/DotPulsar/Internal/Connection.cs
index 952ece9..1b72be8 100644
--- a/src/DotPulsar/Internal/Connection.cs
+++ b/src/DotPulsar/Internal/Connection.cs
@@ -15,6 +15,7 @@
 namespace DotPulsar.Internal;
 
 using DotPulsar.Abstractions;
+using DotPulsar.Extensions;
 using DotPulsar.Internal.Abstractions;
 using DotPulsar.Internal.Exceptions;
 using DotPulsar.Internal.Extensions;
@@ -26,37 +27,58 @@ using System.Threading.Tasks;
 
 public sealed class Connection : IConnection
 {
+    private readonly StateManager<ConnectionState> _stateManager;
     private readonly AsyncLock _lock;
+    private readonly CancellationTokenSource _cancellationTokenSource;
     private readonly ChannelManager _channelManager;
     private readonly PingPongHandler _pingPongHandler;
     private readonly IPulsarStream _stream;
     private readonly IAuthentication? _authentication;
+    private readonly TimeSpan _closeOnInactiveInterval;
     private int _isDisposed;
-    private readonly TaskCompletionSource<IConnection> _inactiveTaskSource;
 
-    public Connection(IPulsarStream stream, TimeSpan keepAliveInterval, 
IAuthentication? authentication)
+    private Connection(IPulsarStream stream, IAuthentication? authentication, 
TimeSpan keepAliveInterval, TimeSpan closeOnInactiveInterval)
     {
+        _stateManager = new 
StateManager<ConnectionState>(ConnectionState.Connected, 
ConnectionState.Disconnected, ConnectionState.Closed);
         _lock = new AsyncLock();
+        _cancellationTokenSource = new CancellationTokenSource();
         _channelManager = new ChannelManager();
-        _inactiveTaskSource = new TaskCompletionSource<IConnection>();
-        _pingPongHandler = new PingPongHandler(this, keepAliveInterval, () =>
-        {
-            _inactiveTaskSource.TrySetResult(this);
-        });
+        _pingPongHandler = new PingPongHandler(keepAliveInterval);
         _stream = stream;
         _authentication = authentication;
+        _closeOnInactiveInterval = closeOnInactiveInterval;
+        _ = Task.Factory.StartNew(() => Setup(_cancellationTokenSource.Token));
     }
 
-    public async ValueTask<bool> HasChannels(CancellationToken 
cancellationToken)
+    public static async Task<Connection> Connect(
+        IPulsarStream stream,
+        IAuthentication? authentication,
+        CommandConnect commandConnect, 
+        TimeSpan keepAliveInterval,
+        TimeSpan closeOnInactiveInterval,
+        CancellationToken cancellationToken)
     {
-        ThrowIfDisposed();
+        Connection? connection = null;
 
-        using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
+        try
+        {
+            connection = new Connection(stream, authentication, 
keepAliveInterval, closeOnInactiveInterval);
+            var response = await connection.Send(commandConnect, 
cancellationToken).ConfigureAwait(false);
+            response.Expect(BaseCommand.Type.Connected);
+            connection.MaxMessageSize = response.Connected.MaxMessageSize;
+            DotPulsarMeter.ConnectionCreated();
+            return connection;
+        }
+        catch
         {
-            return _channelManager.HasChannels();
+            if (connection is not null)
+                await connection.DisposeAsync().ConfigureAwait(false);
+            throw;
         }
     }
 
+    public int MaxMessageSize { get; private set; }
+
     public async Task<ProducerResponse> Send(CommandProducer command, IChannel 
channel, CancellationToken cancellationToken)
     {
         ThrowIfDisposed();
@@ -91,8 +113,6 @@ public sealed class Connection : IConnection
 
     private async Task Send(CommandAuthResponse command, CancellationToken 
cancellationToken)
     {
-        await Task.Yield();
-
         if (_authentication is not null)
         {
             command.Response ??= new AuthData();
@@ -300,10 +320,32 @@ public sealed class Connection : IConnection
         }
     }
 
-    public async Task ProcessIncomingFrames(CancellationToken 
cancellationToken)
+    private async Task Setup(CancellationToken cancellationToken)
+    {
+        var incoming = ProcessIncomingFrames(cancellationToken);
+        var channelManager = 
_channelManager.OnStateChangeTo(ChannelManagerState.Inactive, 
_closeOnInactiveInterval, cancellationToken).AsTask();
+        var pingPongTimeOut = 
_pingPongHandler.OnStateChangeTo(PingPongHandlerState.TimedOut, 
cancellationToken).AsTask();
+        _ = Task.Factory.StartNew(async () => await 
KeepAlive(PingPongHandlerState.Active, 
cancellationToken).ConfigureAwait(false));
+        await Task.WhenAny(incoming, channelManager, 
pingPongTimeOut).ConfigureAwait(false);
+        _stateManager.SetState(ConnectionState.Disconnected);
+    }
+
+    private async Task KeepAlive(PingPongHandlerState state, CancellationToken 
cancellationToken)
     {
-        await Task.Yield();
+        while (!cancellationToken.IsCancellationRequested)
+        {
+            state = await _pingPongHandler.OnStateChangeFrom(state, 
cancellationToken).ConfigureAwait(false);
+            if (state == PingPongHandlerState.TimedOut)
+                return;
+            if (state == PingPongHandlerState.Active)
+                continue;
+            if (state == PingPongHandlerState.ThresholdExceeded)
+                await Send(new CommandPing(), 
cancellationToken).ConfigureAwait(false);
+        }
+    }
 
+    private async Task ProcessIncomingFrames(CancellationToken 
cancellationToken)
+    {
         try
         {
             await foreach (var frame in _stream.Frames(cancellationToken))
@@ -311,13 +353,16 @@ public sealed class Connection : IConnection
                 var commandSize = frame.ReadUInt32(0, true);
                 var command = 
Serializer.Deserialize<BaseCommand>(frame.Slice(4, commandSize));
 
-                if (_pingPongHandler.Incoming(command.CommandType))
-                    continue;
+                _pingPongHandler.Incoming(command.CommandType);
 
                 if (command.CommandType == BaseCommand.Type.Message)
                     _channelManager.Incoming(command.Message, new 
ReadOnlySequence<byte>(frame.Slice(commandSize + 4).ToArray()));
                 else if (command.CommandType == BaseCommand.Type.AuthChallenge)
-                    _ = Send(new CommandAuthResponse(), 
cancellationToken).ConfigureAwait(false);
+                    _ = Task.Factory.StartNew(async () => await Send(new 
CommandAuthResponse(), cancellationToken).ConfigureAwait(false));
+                else if (command.CommandType == BaseCommand.Type.Ping)
+                    _ = Task.Factory.StartNew(async () => await Send(new 
CommandPong(), cancellationToken).ConfigureAwait(false));
+                else if (command.CommandType == BaseCommand.Type.Pong)
+                    continue;
                 else
                     _channelManager.Incoming(command);
             }
@@ -333,20 +378,29 @@ public sealed class Connection : IConnection
         if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
             return;
 
+        DotPulsarMeter.ConnectionDisposed();
+
+        _stateManager.SetState(ConnectionState.Closed);
+        _cancellationTokenSource.Cancel();
         await _pingPongHandler.DisposeAsync().ConfigureAwait(false);
         await _lock.DisposeAsync().ConfigureAwait(false);
         _channelManager.Dispose();
         await _stream.DisposeAsync().ConfigureAwait(false);
     }
 
-    public Task<IConnection> WaitForInactive()
-    {
-        return _inactiveTaskSource.Task;
-    }
-
     private void ThrowIfDisposed()
     {
         if (_isDisposed != 0)
             throw new ConnectionDisposedException();
     }
+
+    public bool IsFinalState() => _stateManager.IsFinalState();
+
+    public bool IsFinalState(ConnectionState state) => 
_stateManager.IsFinalState(state);
+
+    public async ValueTask<ConnectionState> OnStateChangeTo(ConnectionState 
state, CancellationToken cancellationToken = default)
+        => await _stateManager.StateChangedTo(state, 
cancellationToken).ConfigureAwait(false);
+
+    public async ValueTask<ConnectionState> OnStateChangeFrom(ConnectionState 
state, CancellationToken cancellationToken = default)
+        => await _stateManager.StateChangedFrom(state, 
cancellationToken).ConfigureAwait(false);
 }
diff --git a/src/DotPulsar/Internal/ConnectionPool.cs 
b/src/DotPulsar/Internal/ConnectionPool.cs
index 939eef7..124b50d 100644
--- a/src/DotPulsar/Internal/ConnectionPool.cs
+++ b/src/DotPulsar/Internal/ConnectionPool.cs
@@ -34,8 +34,8 @@ public sealed class ConnectionPool : IConnectionPool
     private readonly EncryptionPolicy _encryptionPolicy;
     private readonly ConcurrentDictionary<PulsarUrl, Connection> _connections;
     private readonly CancellationTokenSource _cancellationTokenSource;
-    private readonly Task _closeInactiveConnections;
     private readonly string? _listenerName;
+    private readonly TimeSpan _closeInactiveConnectionsInterval;
     private readonly TimeSpan _keepAliveInterval;
     private readonly IAuthentication? _authentication;
 
@@ -57,7 +57,7 @@ public sealed class ConnectionPool : IConnectionPool
         _listenerName = listenerName;
         _connections = new ConcurrentDictionary<PulsarUrl, Connection>();
         _cancellationTokenSource = new CancellationTokenSource();
-        _closeInactiveConnections = 
CloseInactiveConnections(closeInactiveConnectionsInterval, 
_cancellationTokenSource.Token);
+        _closeInactiveConnectionsInterval = closeInactiveConnectionsInterval;
         _keepAliveInterval = keepAliveInterval;
         _authentication = authentication;
     }
@@ -66,8 +66,6 @@ public sealed class ConnectionPool : IConnectionPool
     {
         _cancellationTokenSource.Cancel();
 
-        await _closeInactiveConnections.ConfigureAwait(false);
-
         await _lock.DisposeAsync().ConfigureAwait(false);
 
         foreach (var serviceUrl in _connections.Keys.ToArray())
@@ -159,28 +157,21 @@ public sealed class ConnectionPool : IConnectionPool
     private async Task<Connection> EstablishNewConnection(PulsarUrl url, 
CancellationToken cancellationToken)
     {
         var stream = await 
_connector.Connect(url.Physical).ConfigureAwait(false);
-        var connection = new Connection(new PulsarStream(stream), 
_keepAliveInterval, _authentication);
-        _ = connection.WaitForInactive().ContinueWith(async _ => await 
DisposeConnection(url).ConfigureAwait(false), cancellationToken);
-        DotPulsarMeter.ConnectionCreated();
-        _connections[url] = connection;
-        _ = 
connection.ProcessIncomingFrames(_cancellationTokenSource.Token).ContinueWith(t 
=> DisposeConnection(url));
-        var commandConnect = _commandConnect;
 
+        var commandConnect = _commandConnect;
         if (url.ProxyThroughServiceUrl)
             commandConnect = WithProxyToBroker(commandConnect, url.Logical);
 
-        var response = await connection.Send(commandConnect, 
cancellationToken).ConfigureAwait(false);
-        response.Expect(BaseCommand.Type.Connected);
+        var connection = await Connection.Connect(new PulsarStream(stream), 
_authentication, commandConnect, _keepAliveInterval, 
_closeInactiveConnectionsInterval, cancellationToken).ConfigureAwait(false);
+        _connections[url] = connection;
+        _ = 
connection.OnStateChangeFrom(ConnectionState.Connected).AsTask().ContinueWith(t 
=> DisposeConnection(url));
         return connection;
     }
 
     private async ValueTask DisposeConnection(PulsarUrl serviceUrl)
     {
         if (_connections.TryRemove(serviceUrl, out var connection) && 
connection is not null)
-        {
             await connection.DisposeAsync().ConfigureAwait(false);
-            DotPulsarMeter.ConnectionDisposed();
-        }
     }
 
     private static CommandConnect WithProxyToBroker(CommandConnect 
commandConnect, Uri logicalUrl)
@@ -200,35 +191,6 @@ public sealed class ConnectionPool : IConnectionPool
         };
     }
 
-    private async Task CloseInactiveConnections(TimeSpan interval, 
CancellationToken cancellationToken)
-    {
-        while (!cancellationToken.IsCancellationRequested)
-        {
-            try
-            {
-                await Task.Delay(interval, 
cancellationToken).ConfigureAwait(false);
-
-                using (await 
_lock.Lock(cancellationToken).ConfigureAwait(false))
-                {
-                    var serviceUrls = _connections.Keys;
-                    foreach (var serviceUrl in serviceUrls)
-                    {
-                        var connection = _connections[serviceUrl];
-                        if (connection is null)
-                            continue;
-
-                        if (!await 
connection.HasChannels(cancellationToken).ConfigureAwait(false))
-                            await 
DisposeConnection(serviceUrl).ConfigureAwait(false);
-                    }
-                }
-            }
-            catch
-            {
-                // ignored
-            }
-        }
-    }
-
     private sealed class PulsarUrl : IEquatable<PulsarUrl>
     {
         public PulsarUrl(Uri physical, Uri logical)
diff --git a/src/DotPulsar/Internal/ConnectionState.cs 
b/src/DotPulsar/Internal/ConnectionState.cs
new file mode 100644
index 0000000..d1d693e
--- /dev/null
+++ b/src/DotPulsar/Internal/ConnectionState.cs
@@ -0,0 +1,22 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal;
+
+public enum ConnectionState : byte
+{
+    Connected,
+    Disconnected,
+    Closed
+}
diff --git a/src/DotPulsar/Internal/PingPongHandler.cs 
b/src/DotPulsar/Internal/PingPongHandler.cs
index 0fd670f..c89a3fb 100644
--- a/src/DotPulsar/Internal/PingPongHandler.cs
+++ b/src/DotPulsar/Internal/PingPongHandler.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -14,48 +14,34 @@
 
 namespace DotPulsar.Internal;
 
-using DotPulsar.Internal.Abstractions;
+using DotPulsar.Abstractions;
 using DotPulsar.Internal.PulsarApi;
 using System;
 using System.Diagnostics;
 using System.Threading;
 using System.Threading.Tasks;
 
-public sealed class PingPongHandler : IAsyncDisposable
+public sealed class PingPongHandler : IState<PingPongHandlerState>, 
IAsyncDisposable
 {
-    private readonly IConnection _connection;
+    private readonly StateManager<PingPongHandlerState> _stateManager;
     private readonly TimeSpan _keepAliveInterval;
     private readonly Timer _timer;
-    private readonly CommandPing _ping;
-    private readonly CommandPong _pong;
     private long _lastCommand;
     private bool _waitForPong;
-    private readonly Action _inactiveCallback;
 
-    public PingPongHandler(IConnection connection, TimeSpan keepAliveInterval, 
Action inactiveCallback)
+    public PingPongHandler(TimeSpan keepAliveInterval)
     {
-        _connection = connection;
+        _stateManager = new 
StateManager<PingPongHandlerState>(PingPongHandlerState.Active, 
PingPongHandlerState.TimedOut, PingPongHandlerState.Closed);
         _keepAliveInterval = keepAliveInterval;
         _timer = new Timer(Watch);
         _timer.Change(_keepAliveInterval, TimeSpan.Zero);
-        _ping = new CommandPing();
-        _pong = new CommandPong();
         _lastCommand = Stopwatch.GetTimestamp();
-        _inactiveCallback = inactiveCallback;
     }
 
-    public bool Incoming(BaseCommand.Type commandType)
+    public void Incoming(BaseCommand.Type _)
     {
         Interlocked.Exchange(ref _lastCommand, Stopwatch.GetTimestamp());
         _waitForPong = false;
-
-        if (commandType == BaseCommand.Type.Ping)
-        {
-            Task.Factory.StartNew(() => SendPong());
-            return true;
-        }
-
-        return commandType == BaseCommand.Type.Pong;
     }
 
     private void Watch(object? state)
@@ -65,22 +51,23 @@ public sealed class PingPongHandler : IAsyncDisposable
             var lastCommand = Interlocked.Read(ref _lastCommand);
             var now = Stopwatch.GetTimestamp();
             var elapsed = TimeSpan.FromSeconds((now - lastCommand) / 
Stopwatch.Frequency);
-            if (elapsed >= _keepAliveInterval)
+            if (elapsed > _keepAliveInterval)
             {
                 if (_waitForPong)
                 {
-                    _inactiveCallback();
+                    _stateManager.SetState(PingPongHandlerState.TimedOut);
                     return;
                 }
-                Task.Factory.StartNew(() =>
-                {
-                    _waitForPong = true;
-                    return SendPing();
-                });
+
+                _waitForPong = true;
+                _stateManager.SetState(PingPongHandlerState.ThresholdExceeded);
                 _timer.Change(_keepAliveInterval, TimeSpan.Zero);
             }
             else
+            {
+                _stateManager.SetState(PingPongHandlerState.Active);
                 _timer.Change(_keepAliveInterval.Subtract(elapsed), 
TimeSpan.Zero);
+            }
         }
         catch
         {
@@ -88,34 +75,28 @@ public sealed class PingPongHandler : IAsyncDisposable
         }
     }
 
-    private async Task SendPing()
-    {
-        try
-        {
-            await _connection.Send(_ping, default).ConfigureAwait(false);
-        }
-        catch { }
-    }
-
-    private async Task SendPong()
-    {
-        try
-        {
-            await _connection.Send(_pong, default).ConfigureAwait(false);
-        }
-        catch { }
-    }
-
 #if NETSTANDARD2_0
     public ValueTask DisposeAsync()
     {
         _timer.Dispose();
+        _stateManager.SetState(PingPongHandlerState.Closed);
         return new ValueTask();
     }
 #else
     public async ValueTask DisposeAsync()
     {
         await _timer.DisposeAsync().ConfigureAwait(false);
+        _stateManager.SetState(PingPongHandlerState.Closed);
     }
 #endif
+
+    public bool IsFinalState() => _stateManager.IsFinalState();
+
+    public bool IsFinalState(PingPongHandlerState state) => 
_stateManager.IsFinalState(state);
+
+    public async ValueTask<PingPongHandlerState> 
OnStateChangeTo(PingPongHandlerState state, CancellationToken cancellationToken 
= default)
+        => await _stateManager.StateChangedTo(state, 
cancellationToken).ConfigureAwait(false);
+
+    public async ValueTask<PingPongHandlerState> 
OnStateChangeFrom(PingPongHandlerState state, CancellationToken 
cancellationToken = default)
+        => await _stateManager.StateChangedFrom(state, 
cancellationToken).ConfigureAwait(false);
 }
diff --git a/src/DotPulsar/Internal/PingPongHandlerState.cs 
b/src/DotPulsar/Internal/PingPongHandlerState.cs
new file mode 100644
index 0000000..ca945d6
--- /dev/null
+++ b/src/DotPulsar/Internal/PingPongHandlerState.cs
@@ -0,0 +1,23 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal;
+
+public enum PingPongHandlerState : byte
+{
+    Active,
+    ThresholdExceeded,
+    TimedOut,
+    Closed
+}
diff --git a/src/DotPulsar/Internal/SubConsumer.cs 
b/src/DotPulsar/Internal/SubConsumer.cs
index 1c1d6dd..3e56fdf 100644
--- a/src/DotPulsar/Internal/SubConsumer.cs
+++ b/src/DotPulsar/Internal/SubConsumer.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -86,7 +86,7 @@ public sealed class SubConsumer<TMessage> : 
IConsumer<TMessage>, IContainsChanne
         if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
             return;
 
-        _eventRegister.Register(new ProducerDisposed(_correlationId));
+        _eventRegister.Register(new ConsumerDisposed(_correlationId));
         await DisposeChannel().ConfigureAwait(false);
     }
 
diff --git a/tests/DotPulsar.Tests/Internal/PingPongHandlerTest.cs 
b/tests/DotPulsar.Tests/Internal/PingPongHandlerTest.cs
index 42ae214..0318588 100644
--- a/tests/DotPulsar.Tests/Internal/PingPongHandlerTest.cs
+++ b/tests/DotPulsar.Tests/Internal/PingPongHandlerTest.cs
@@ -15,13 +15,8 @@
 namespace DotPulsar.Tests.Internal;
 
 using DotPulsar.Internal;
-using DotPulsar.Internal.Abstractions;
-using DotPulsar.Internal.PulsarApi;
 using FluentAssertions;
-using NSubstitute;
-using NSubstitute.ClearExtensions;
 using System;
-using System.Threading;
 using System.Threading.Tasks;
 using Xunit;
 
@@ -29,19 +24,47 @@ using Xunit;
 public class PingPongHandlerTest
 {
     [Fact]
-    public async Task Watch_GivenConnectionNotAlive_ShouldDisposeConnection()
+    public async Task 
Constructor_GivenNoIncomingCommands_ShouldChangeStateToTimedOut()
     {
-        var connection = Substitute.For<IConnection>();
-        var keepAliveInterval = TimeSpan.FromSeconds(1);
-        var isActive = true;
-        var pingPongHandler = new PingPongHandler(connection, 
keepAliveInterval, () => isActive = false);
-
-        connection.When(c => c.Send(Arg.Any<CommandPing>(), 
Arg.Any<CancellationToken>())).Do(c => 
pingPongHandler.Incoming(BaseCommand.Type.Pong));
-        await Task.Delay(3 * keepAliveInterval);
-        isActive.Should().BeTrue();
-
-        connection.ClearSubstitute();
-        await Task.Delay(3 * keepAliveInterval);
-        isActive.Should().BeFalse();
+        // Arrange
+        var expected = PingPongHandlerState.TimedOut;
+        var uut = new PingPongHandler(TimeSpan.FromSeconds(1));
+
+        // Act
+        var actual = await uut.OnStateChangeTo(PingPongHandlerState.TimedOut);
+
+        // Assert
+        actual.Should().Be(expected);
+    }
+
+    [Fact]
+    public async Task 
Incoming_GivenIncomingCommandAfterThresholdExceeded_ShouldChangeStateToActive()
+    {
+        // Arrange
+        var expected = PingPongHandlerState.Active;
+        var uut = new PingPongHandler(TimeSpan.FromSeconds(1));
+
+        // Act
+        _ = await uut.OnStateChangeTo(PingPongHandlerState.ThresholdExceeded);
+        uut.Incoming(DotPulsar.Internal.PulsarApi.BaseCommand.Type.Ack);
+        var actual = await uut.OnStateChangeTo(PingPongHandlerState.Active);
+
+        // Assert
+        actual.Should().Be(expected);
+    }
+
+    [Fact]
+    public async Task 
Dispose_GivenTheStateWasActive_ShouldChangeStateToClosed()
+    {
+        // Arrange
+        var expected = PingPongHandlerState.Closed;
+        var uut = new PingPongHandler(TimeSpan.FromSeconds(1));
+
+        // Act
+        var actual = uut.OnStateChangeTo(PingPongHandlerState.Closed);
+        await uut.DisposeAsync();
+
+        // Assert
+        (await actual).Should().Be(expected);
     }
 }

Reply via email to