This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c5be84  Fix PingPongHandler doesn't kill the connection (#167)
1c5be84 is described below

commit 1c5be84d36e5ae160fbbceb185f06c4dab5b1edd
Author: Zike Yang <[email protected]>
AuthorDate: Fri Aug 25 04:39:17 2023 +0800

    Fix PingPongHandler doesn't kill the connection (#167)
    
    * Fix PingPongHandler doesn't kill the connection
    
    * Simplify the implementation of PingPongHandler
    
    * Add the unit test for PingPongHandler
    
    * Enhance ping pong handler to reduce the reaction time
    
    * Close inactive connection in the ConnectionPool
    
    * Improve test
    
    * Fix test
    
    * Refactor inactive connection disposing
    
    * Simply the implementation
---
 src/DotPulsar/Internal/Abstractions/IConnection.cs |  2 +
 src/DotPulsar/Internal/Connection.cs               | 12 +++++-
 src/DotPulsar/Internal/ConnectionPool.cs           |  3 +-
 src/DotPulsar/Internal/PingPongHandler.cs          | 17 +++++++-
 .../Internal/PingPongHandlerTest.cs                | 47 ++++++++++++++++++++++
 5 files changed, 77 insertions(+), 4 deletions(-)

diff --git a/src/DotPulsar/Internal/Abstractions/IConnection.cs 
b/src/DotPulsar/Internal/Abstractions/IConnection.cs
index 14e6d9e..fb8ac1f 100644
--- a/src/DotPulsar/Internal/Abstractions/IConnection.cs
+++ b/src/DotPulsar/Internal/Abstractions/IConnection.cs
@@ -42,4 +42,6 @@ public interface IConnection : IAsyncDisposable
     Task Send(SendPackage command, TaskCompletionSource<BaseCommand> 
responseTcs, CancellationToken cancellationToken);
     Task<BaseCommand> Send(CommandGetOrCreateSchema command, CancellationToken 
cancellationToken);
     Task<BaseCommand> Send(CommandPartitionedTopicMetadata command, 
CancellationToken cancellationToken);
+
+    Task<IConnection> WaitForInactive();
 }
diff --git a/src/DotPulsar/Internal/Connection.cs 
b/src/DotPulsar/Internal/Connection.cs
index 537eb78..4faae41 100644
--- a/src/DotPulsar/Internal/Connection.cs
+++ b/src/DotPulsar/Internal/Connection.cs
@@ -32,12 +32,17 @@ public sealed class Connection : IConnection
     private readonly IPulsarStream _stream;
     private readonly IAuthentication? _authentication;
     private int _isDisposed;
+    private readonly TaskCompletionSource<IConnection> _inactiveTaskSource;
 
     public Connection(IPulsarStream stream, TimeSpan keepAliveInterval, 
IAuthentication? authentication)
     {
         _lock = new AsyncLock();
         _channelManager = new ChannelManager();
-        _pingPongHandler = new PingPongHandler(this, keepAliveInterval);
+        _inactiveTaskSource = new TaskCompletionSource<IConnection>();
+        _pingPongHandler = new PingPongHandler(this, keepAliveInterval, () =>
+        {
+            _inactiveTaskSource.TrySetResult(this);
+        });
         _stream = stream;
         _authentication = authentication;
     }
@@ -336,6 +341,11 @@ public sealed class Connection : IConnection
         await _stream.DisposeAsync().ConfigureAwait(false);
     }
 
+    public Task<IConnection> WaitForInactive()
+    {
+        return _inactiveTaskSource.Task;
+    }
+
     private void ThrowIfDisposed()
     {
         if (_isDisposed != 0)
diff --git a/src/DotPulsar/Internal/ConnectionPool.cs 
b/src/DotPulsar/Internal/ConnectionPool.cs
index f9139c5..3ea5a2f 100644
--- a/src/DotPulsar/Internal/ConnectionPool.cs
+++ b/src/DotPulsar/Internal/ConnectionPool.cs
@@ -160,6 +160,7 @@ public sealed class ConnectionPool : IConnectionPool
     {
         var stream = await 
_connector.Connect(url.Physical).ConfigureAwait(false);
         var connection = new Connection(new PulsarStream(stream), 
_keepAliveInterval, _authentication);
+        connection.WaitForInactive().ContinueWith(async t => { await 
DisposeConnection(url); }, cancellationToken);
         DotPulsarMeter.ConnectionCreated();
         _connections[url] = connection;
         _ = 
connection.ProcessIncomingFrames(_cancellationTokenSource.Token).ContinueWith(t 
=> DisposeConnection(url));
@@ -205,7 +206,7 @@ public sealed class ConnectionPool : IConnectionPool
         {
             try
             {
-                await Task.Delay(interval, 
cancellationToken).ConfigureAwait(false);
+                await Task.Delay(interval, cancellationToken);
 
                 using (await 
_lock.Lock(cancellationToken).ConfigureAwait(false))
                 {
diff --git a/src/DotPulsar/Internal/PingPongHandler.cs 
b/src/DotPulsar/Internal/PingPongHandler.cs
index 2e5602c..0fd670f 100644
--- a/src/DotPulsar/Internal/PingPongHandler.cs
+++ b/src/DotPulsar/Internal/PingPongHandler.cs
@@ -29,8 +29,10 @@ public sealed class PingPongHandler : IAsyncDisposable
     private readonly CommandPing _ping;
     private readonly CommandPong _pong;
     private long _lastCommand;
+    private bool _waitForPong;
+    private readonly Action _inactiveCallback;
 
-    public PingPongHandler(IConnection connection, TimeSpan keepAliveInterval)
+    public PingPongHandler(IConnection connection, TimeSpan keepAliveInterval, 
Action inactiveCallback)
     {
         _connection = connection;
         _keepAliveInterval = keepAliveInterval;
@@ -39,11 +41,13 @@ public sealed class PingPongHandler : IAsyncDisposable
         _ping = new CommandPing();
         _pong = new CommandPong();
         _lastCommand = Stopwatch.GetTimestamp();
+        _inactiveCallback = inactiveCallback;
     }
 
     public bool Incoming(BaseCommand.Type commandType)
     {
         Interlocked.Exchange(ref _lastCommand, Stopwatch.GetTimestamp());
+        _waitForPong = false;
 
         if (commandType == BaseCommand.Type.Ping)
         {
@@ -63,7 +67,16 @@ public sealed class PingPongHandler : IAsyncDisposable
             var elapsed = TimeSpan.FromSeconds((now - lastCommand) / 
Stopwatch.Frequency);
             if (elapsed >= _keepAliveInterval)
             {
-                Task.Factory.StartNew(() => SendPing());
+                if (_waitForPong)
+                {
+                    _inactiveCallback();
+                    return;
+                }
+                Task.Factory.StartNew(() =>
+                {
+                    _waitForPong = true;
+                    return SendPing();
+                });
                 _timer.Change(_keepAliveInterval, TimeSpan.Zero);
             }
             else
diff --git a/tests/DotPulsar.Tests/Internal/PingPongHandlerTest.cs 
b/tests/DotPulsar.Tests/Internal/PingPongHandlerTest.cs
new file mode 100644
index 0000000..42ae214
--- /dev/null
+++ b/tests/DotPulsar.Tests/Internal/PingPongHandlerTest.cs
@@ -0,0 +1,47 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Tests.Internal;
+
+using DotPulsar.Internal;
+using DotPulsar.Internal.Abstractions;
+using DotPulsar.Internal.PulsarApi;
+using FluentAssertions;
+using NSubstitute;
+using NSubstitute.ClearExtensions;
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using Xunit;
+
+[Trait("Category", "Unit")]
+public class PingPongHandlerTest
+{
+    [Fact]
+    public async Task Watch_GivenConnectionNotAlive_ShouldDisposeConnection()
+    {
+        var connection = Substitute.For<IConnection>();
+        var keepAliveInterval = TimeSpan.FromSeconds(1);
+        var isActive = true;
+        var pingPongHandler = new PingPongHandler(connection, 
keepAliveInterval, () => isActive = false);
+
+        connection.When(c => c.Send(Arg.Any<CommandPing>(), 
Arg.Any<CancellationToken>())).Do(c => 
pingPongHandler.Incoming(BaseCommand.Type.Pong));
+        await Task.Delay(3 * keepAliveInterval);
+        isActive.Should().BeTrue();
+
+        connection.ClearSubstitute();
+        await Task.Delay(3 * keepAliveInterval);
+        isActive.Should().BeFalse();
+    }
+}

Reply via email to