This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1c5be84 Fix PingPongHandler doesn't kill the connection (#167)
1c5be84 is described below
commit 1c5be84d36e5ae160fbbceb185f06c4dab5b1edd
Author: Zike Yang <[email protected]>
AuthorDate: Fri Aug 25 04:39:17 2023 +0800
Fix PingPongHandler doesn't kill the connection (#167)
* Fix PingPongHandler doesn't kill the connection
* Simplify the implementation of PingPongHandler
* Add the unit test for PingPongHandler
* Enhance ping pong handler to reduce the reaction time
* Close inactive connection in the ConnectionPool
* Improve test
* Fix test
* Refactor inactive connection disposing
* Simply the implementation
---
src/DotPulsar/Internal/Abstractions/IConnection.cs | 2 +
src/DotPulsar/Internal/Connection.cs | 12 +++++-
src/DotPulsar/Internal/ConnectionPool.cs | 3 +-
src/DotPulsar/Internal/PingPongHandler.cs | 17 +++++++-
.../Internal/PingPongHandlerTest.cs | 47 ++++++++++++++++++++++
5 files changed, 77 insertions(+), 4 deletions(-)
diff --git a/src/DotPulsar/Internal/Abstractions/IConnection.cs
b/src/DotPulsar/Internal/Abstractions/IConnection.cs
index 14e6d9e..fb8ac1f 100644
--- a/src/DotPulsar/Internal/Abstractions/IConnection.cs
+++ b/src/DotPulsar/Internal/Abstractions/IConnection.cs
@@ -42,4 +42,6 @@ public interface IConnection : IAsyncDisposable
Task Send(SendPackage command, TaskCompletionSource<BaseCommand>
responseTcs, CancellationToken cancellationToken);
Task<BaseCommand> Send(CommandGetOrCreateSchema command, CancellationToken
cancellationToken);
Task<BaseCommand> Send(CommandPartitionedTopicMetadata command,
CancellationToken cancellationToken);
+
+ Task<IConnection> WaitForInactive();
}
diff --git a/src/DotPulsar/Internal/Connection.cs
b/src/DotPulsar/Internal/Connection.cs
index 537eb78..4faae41 100644
--- a/src/DotPulsar/Internal/Connection.cs
+++ b/src/DotPulsar/Internal/Connection.cs
@@ -32,12 +32,17 @@ public sealed class Connection : IConnection
private readonly IPulsarStream _stream;
private readonly IAuthentication? _authentication;
private int _isDisposed;
+ private readonly TaskCompletionSource<IConnection> _inactiveTaskSource;
public Connection(IPulsarStream stream, TimeSpan keepAliveInterval,
IAuthentication? authentication)
{
_lock = new AsyncLock();
_channelManager = new ChannelManager();
- _pingPongHandler = new PingPongHandler(this, keepAliveInterval);
+ _inactiveTaskSource = new TaskCompletionSource<IConnection>();
+ _pingPongHandler = new PingPongHandler(this, keepAliveInterval, () =>
+ {
+ _inactiveTaskSource.TrySetResult(this);
+ });
_stream = stream;
_authentication = authentication;
}
@@ -336,6 +341,11 @@ public sealed class Connection : IConnection
await _stream.DisposeAsync().ConfigureAwait(false);
}
+ public Task<IConnection> WaitForInactive()
+ {
+ return _inactiveTaskSource.Task;
+ }
+
private void ThrowIfDisposed()
{
if (_isDisposed != 0)
diff --git a/src/DotPulsar/Internal/ConnectionPool.cs
b/src/DotPulsar/Internal/ConnectionPool.cs
index f9139c5..3ea5a2f 100644
--- a/src/DotPulsar/Internal/ConnectionPool.cs
+++ b/src/DotPulsar/Internal/ConnectionPool.cs
@@ -160,6 +160,7 @@ public sealed class ConnectionPool : IConnectionPool
{
var stream = await
_connector.Connect(url.Physical).ConfigureAwait(false);
var connection = new Connection(new PulsarStream(stream),
_keepAliveInterval, _authentication);
+ connection.WaitForInactive().ContinueWith(async t => { await
DisposeConnection(url); }, cancellationToken);
DotPulsarMeter.ConnectionCreated();
_connections[url] = connection;
_ =
connection.ProcessIncomingFrames(_cancellationTokenSource.Token).ContinueWith(t
=> DisposeConnection(url));
@@ -205,7 +206,7 @@ public sealed class ConnectionPool : IConnectionPool
{
try
{
- await Task.Delay(interval,
cancellationToken).ConfigureAwait(false);
+ await Task.Delay(interval, cancellationToken);
using (await
_lock.Lock(cancellationToken).ConfigureAwait(false))
{
diff --git a/src/DotPulsar/Internal/PingPongHandler.cs
b/src/DotPulsar/Internal/PingPongHandler.cs
index 2e5602c..0fd670f 100644
--- a/src/DotPulsar/Internal/PingPongHandler.cs
+++ b/src/DotPulsar/Internal/PingPongHandler.cs
@@ -29,8 +29,10 @@ public sealed class PingPongHandler : IAsyncDisposable
private readonly CommandPing _ping;
private readonly CommandPong _pong;
private long _lastCommand;
+ private bool _waitForPong;
+ private readonly Action _inactiveCallback;
- public PingPongHandler(IConnection connection, TimeSpan keepAliveInterval)
+ public PingPongHandler(IConnection connection, TimeSpan keepAliveInterval,
Action inactiveCallback)
{
_connection = connection;
_keepAliveInterval = keepAliveInterval;
@@ -39,11 +41,13 @@ public sealed class PingPongHandler : IAsyncDisposable
_ping = new CommandPing();
_pong = new CommandPong();
_lastCommand = Stopwatch.GetTimestamp();
+ _inactiveCallback = inactiveCallback;
}
public bool Incoming(BaseCommand.Type commandType)
{
Interlocked.Exchange(ref _lastCommand, Stopwatch.GetTimestamp());
+ _waitForPong = false;
if (commandType == BaseCommand.Type.Ping)
{
@@ -63,7 +67,16 @@ public sealed class PingPongHandler : IAsyncDisposable
var elapsed = TimeSpan.FromSeconds((now - lastCommand) /
Stopwatch.Frequency);
if (elapsed >= _keepAliveInterval)
{
- Task.Factory.StartNew(() => SendPing());
+ if (_waitForPong)
+ {
+ _inactiveCallback();
+ return;
+ }
+ Task.Factory.StartNew(() =>
+ {
+ _waitForPong = true;
+ return SendPing();
+ });
_timer.Change(_keepAliveInterval, TimeSpan.Zero);
}
else
diff --git a/tests/DotPulsar.Tests/Internal/PingPongHandlerTest.cs
b/tests/DotPulsar.Tests/Internal/PingPongHandlerTest.cs
new file mode 100644
index 0000000..42ae214
--- /dev/null
+++ b/tests/DotPulsar.Tests/Internal/PingPongHandlerTest.cs
@@ -0,0 +1,47 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Tests.Internal;
+
+using DotPulsar.Internal;
+using DotPulsar.Internal.Abstractions;
+using DotPulsar.Internal.PulsarApi;
+using FluentAssertions;
+using NSubstitute;
+using NSubstitute.ClearExtensions;
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using Xunit;
+
+[Trait("Category", "Unit")]
+public class PingPongHandlerTest
+{
+ [Fact]
+ public async Task Watch_GivenConnectionNotAlive_ShouldDisposeConnection()
+ {
+ var connection = Substitute.For<IConnection>();
+ var keepAliveInterval = TimeSpan.FromSeconds(1);
+ var isActive = true;
+ var pingPongHandler = new PingPongHandler(connection,
keepAliveInterval, () => isActive = false);
+
+ connection.When(c => c.Send(Arg.Any<CommandPing>(),
Arg.Any<CancellationToken>())).Do(c =>
pingPongHandler.Incoming(BaseCommand.Type.Pong));
+ await Task.Delay(3 * keepAliveInterval);
+ isActive.Should().BeTrue();
+
+ connection.ClearSubstitute();
+ await Task.Delay(3 * keepAliveInterval);
+ isActive.Should().BeFalse();
+ }
+}