This is an automated email from the ASF dual-hosted git repository. florianhockmann pushed a commit to branch TINKERPOP-2358 in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit e411d85f8d9f1b8201586a54ee96ce06a016ca85 Author: Florian Hockmann <[email protected]> AuthorDate: Thu Jun 3 15:18:58 2021 +0200 TINKERPOP-2358 Stop leaking connections on Dispose If the `ConnectionPool` was disposed while it was in parallel creating new connections (e.g., to replace closed connections), those connections could be leaked. `Dispose()` could not dispose them yet as they were not completely established so they could be added to the pool. We now check after creating new connections whether the pool has been disposed in the meantime and then dispose these connections directly again. Simply checking if the pool was already disposed after having created new connections and then closing them directly in that case solves this problem. In addition to that, the pool now also has a `CancellationTokenSource` which allows us to cancel all active creations of new connections in `Dispose()`. So we don't have to wait until they are created only so we can then dispose them if we can already cancel the connection establishment. --- CHANGELOG.asciidoc | 1 + .../src/Gremlin.Net/Driver/Connection.cs | 4 +- .../src/Gremlin.Net/Driver/ConnectionPool.cs | 32 ++++-- .../src/Gremlin.Net/Driver/IConnection.cs | 3 +- .../src/Gremlin.Net/Driver/ProxyConnection.cs | 5 +- .../src/Gremlin.Net/Driver/WebSocketConnection.cs | 4 +- .../Driver/ConnectionPoolTests.cs | 128 +++++++++++++++++++-- 7 files changed, 155 insertions(+), 22 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 46fdeb1..c9a2d10 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -43,6 +43,7 @@ image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima * Fixed `NullPointerException` in `ResponseMessage` deserialization for GraphSON. * Enabled the Gremlin.Net driver to repair its connection pool after the server was temporarily unavailable. * Added the ability to supply a `HandshakeInterceptor` to a `Cluster` which will provide access to the initial HTTP request that establishes the websocket. +* Fixed a possible leakage of connections in the Gremlin.NET driver that could happen if Dispose() was called while the pool was creating connections. ==== Bugs diff --git a/gremlin-dotnet/src/Gremlin.Net/Driver/Connection.cs b/gremlin-dotnet/src/Gremlin.Net/Driver/Connection.cs index 7436c3d..7188fd2 100644 --- a/gremlin-dotnet/src/Gremlin.Net/Driver/Connection.cs +++ b/gremlin-dotnet/src/Gremlin.Net/Driver/Connection.cs @@ -79,9 +79,9 @@ namespace Gremlin.Net.Driver _webSocketConnection = new WebSocketConnection(webSocketConfiguration); } - public async Task ConnectAsync() + public async Task ConnectAsync(CancellationToken cancellationToken) { - await _webSocketConnection.ConnectAsync(_uri).ConfigureAwait(false); + await _webSocketConnection.ConnectAsync(_uri, cancellationToken).ConfigureAwait(false); BeginReceiving(); } diff --git a/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPool.cs b/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPool.cs index 261504a..3c6bd5c 100644 --- a/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPool.cs +++ b/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPool.cs @@ -46,6 +46,7 @@ namespace Gremlin.Net.Driver private int _poolState; private const int PoolIdle = 0; private const int PoolPopulationInProgress = 1; + private readonly CancellationTokenSource _cts = new CancellationTokenSource(); public ConnectionPool(IConnectionFactory connectionFactory, ConnectionPoolSettings settings) { @@ -130,23 +131,36 @@ namespace Gremlin.Net.Driver var createdConnections = await Task.WhenAll(connectionCreationTasks).ConfigureAwait(false); _connections.AddRange(createdConnections); } - catch (Exception) + catch (Exception e) { - // Dispose created connections if the connection establishment failed + // Dispose all connections that were already created foreach (var creationTask in connectionCreationTasks) { - if (!creationTask.IsFaulted) + if (creationTask.IsCompleted) creationTask.Result?.Dispose(); } - - throw; + throw e; + } + + if (_disposed) + { + await CloseAndRemoveAllConnectionsAsync().ConfigureAwait(false); } } private async Task<IConnection> CreateNewConnectionAsync() { var newConnection = _connectionFactory.CreateConnection(); - await newConnection.ConnectAsync().ConfigureAwait(false); + try + { + await newConnection.ConnectAsync(_cts.Token).ConfigureAwait(false); + } + catch (Exception e) + { + // Dispose created connection if the connection establishment failed + newConnection.Dispose(); + throw e; + } return newConnection; } @@ -216,7 +230,7 @@ namespace Gremlin.Net.Driver { var poolWasPopulated = await EnsurePoolIsHealthyAsync().ConfigureAwait(false); // Another connection could have been removed already, check if another population is necessary - if (poolWasPopulated) + if (poolWasPopulated && !_disposed) await ReplaceClosedConnectionsAsync().ConfigureAwait(false); } @@ -260,7 +274,11 @@ namespace Gremlin.Net.Driver if (!_disposed) { if (disposing) + { + _cts.Cancel(); CloseAndRemoveAllConnectionsAsync().WaitUnwrap(); + _cts.Dispose(); + } _disposed = true; } } diff --git a/gremlin-dotnet/src/Gremlin.Net/Driver/IConnection.cs b/gremlin-dotnet/src/Gremlin.Net/Driver/IConnection.cs index 7d29571..f9df79b 100644 --- a/gremlin-dotnet/src/Gremlin.Net/Driver/IConnection.cs +++ b/gremlin-dotnet/src/Gremlin.Net/Driver/IConnection.cs @@ -23,6 +23,7 @@ using System; using System.Collections.Generic; +using System.Threading; using System.Threading.Tasks; using Gremlin.Net.Driver.Messages; @@ -30,7 +31,7 @@ namespace Gremlin.Net.Driver { internal interface IConnection : IDisposable { - Task ConnectAsync(); + Task ConnectAsync(CancellationToken cancellationToken); Task<ResultSet<T>> SubmitAsync<T>(RequestMessage requestMessage); int NrRequestsInFlight { get; } bool IsOpen { get; } diff --git a/gremlin-dotnet/src/Gremlin.Net/Driver/ProxyConnection.cs b/gremlin-dotnet/src/Gremlin.Net/Driver/ProxyConnection.cs index 41c5c4a..d8d443a 100644 --- a/gremlin-dotnet/src/Gremlin.Net/Driver/ProxyConnection.cs +++ b/gremlin-dotnet/src/Gremlin.Net/Driver/ProxyConnection.cs @@ -23,6 +23,7 @@ using System; using System.Collections.Generic; +using System.Threading; using System.Threading.Tasks; using Gremlin.Net.Driver.Messages; @@ -39,9 +40,9 @@ namespace Gremlin.Net.Driver _releaseAction = releaseAction; } - public async Task ConnectAsync() + public async Task ConnectAsync(CancellationToken cancellationToken) { - await ProxiedConnection.ConnectAsync().ConfigureAwait(false); + await ProxiedConnection.ConnectAsync(cancellationToken).ConfigureAwait(false); } public async Task<ResultSet<T>> SubmitAsync<T>(RequestMessage requestMessage) diff --git a/gremlin-dotnet/src/Gremlin.Net/Driver/WebSocketConnection.cs b/gremlin-dotnet/src/Gremlin.Net/Driver/WebSocketConnection.cs index 1ea02dd..936de12 100644 --- a/gremlin-dotnet/src/Gremlin.Net/Driver/WebSocketConnection.cs +++ b/gremlin-dotnet/src/Gremlin.Net/Driver/WebSocketConnection.cs @@ -41,9 +41,9 @@ namespace Gremlin.Net.Driver webSocketConfiguration?.Invoke(_client.Options); } - public async Task ConnectAsync(Uri uri) + public async Task ConnectAsync(Uri uri, CancellationToken cancellationToken) { - await _client.ConnectAsync(uri, CancellationToken.None).ConfigureAwait(false); + await _client.ConnectAsync(uri, cancellationToken).ConfigureAwait(false); } public async Task CloseAsync() diff --git a/gremlin-dotnet/test/Gremlin.Net.UnitTest/Driver/ConnectionPoolTests.cs b/gremlin-dotnet/test/Gremlin.Net.UnitTest/Driver/ConnectionPoolTests.cs index fcc8eb5..156a149 100644 --- a/gremlin-dotnet/test/Gremlin.Net.UnitTest/Driver/ConnectionPoolTests.cs +++ b/gremlin-dotnet/test/Gremlin.Net.UnitTest/Driver/ConnectionPoolTests.cs @@ -23,6 +23,7 @@ using System; using System.Collections.Generic; +using System.Threading; using System.Threading.Tasks; using Gremlin.Net.Driver; using Gremlin.Net.Driver.Exceptions; @@ -46,7 +47,7 @@ namespace Gremlin.Net.UnitTest.Driver Assert.Equal(poolSize, pool.NrConnections); mockedConnectionFactory.Verify(m => m.CreateConnection(), Times.Exactly(poolSize)); - mockedConnection.Verify(m => m.ConnectAsync(), Times.Exactly(poolSize)); + mockedConnection.Verify(m => m.ConnectAsync(It.IsAny<CancellationToken>()), Times.Exactly(poolSize)); } [Fact] @@ -235,34 +236,145 @@ namespace Gremlin.Net.UnitTest.Driver Assert.Throws<ServerUnavailableException>(() => pool.GetAvailableConnection()); } - private static IConnection OpenConnection + [Fact] + public void ShouldNotLeakConnectionsIfDisposeIsCalledWhilePoolIsPopulating() + { + var fakeConnectionFactory = new Mock<IConnectionFactory>(); + fakeConnectionFactory.Setup(m => m.CreateConnection()).Returns(ClosedConnection); + var pool = CreateConnectionPool(fakeConnectionFactory.Object, 1); + var mockedConnectionToBeDisposed = new Mock<IConnection>(); + var poolWasDisposedSignal = new SemaphoreSlim(0, 1); + mockedConnectionToBeDisposed.Setup(f => f.ConnectAsync(It.IsAny<CancellationToken>())) + .Returns((CancellationToken _) => poolWasDisposedSignal.WaitAsync(CancellationToken.None)); + // We don't use the `CancellationToken` here as the connection should also be disposed if it did not + // react on the cancellation. This can happen if the task is cancelled just before `ConnectAsync` returns. + fakeConnectionFactory.Setup(m => m.CreateConnection()).Returns(mockedConnectionToBeDisposed.Object); + try + { + pool.GetAvailableConnection(); + } + catch (ServerUnavailableException) + { + // expected as the pool only contains a closed connection at this point + } + + pool.Dispose(); + poolWasDisposedSignal.Release(); + + Assert.Equal(0, pool.NrConnections); + mockedConnectionToBeDisposed.Verify(m => m.ConnectAsync(It.IsAny<CancellationToken>()), Times.Once); + mockedConnectionToBeDisposed.Verify(m => m.Dispose(), Times.Once); + } + + [Fact] + public void DisposeShouldCancelConnectionEstablishment() + { + var fakeConnectionFactory = new Mock<IConnectionFactory>(); + fakeConnectionFactory.Setup(m => m.CreateConnection()).Returns(ClosedConnection); + var pool = CreateConnectionPool(fakeConnectionFactory.Object, 1, 0); + var mockedConnectionToBeDisposed = new Mock<IConnection>(); + mockedConnectionToBeDisposed.Setup(f => f.ConnectAsync(It.IsAny<CancellationToken>())) + .Returns((CancellationToken ct) => Task.Delay(-1, ct)); + fakeConnectionFactory.Setup(m => m.CreateConnection()).Returns(mockedConnectionToBeDisposed.Object); + try + { + pool.GetAvailableConnection(); + } + catch (ServerUnavailableException) + { + // expected as the pool only contains a closed connection at this point + } + + pool.Dispose(); + + Assert.Equal(0, pool.NrConnections); + mockedConnectionToBeDisposed.Verify(m => m.ConnectAsync(It.IsAny<CancellationToken>())); + mockedConnectionToBeDisposed.Verify(m => m.Dispose(), Times.Once); + } + + [Fact] + public async Task ConnectionsEstablishedInParallelShouldAllBeDisposedIfOneThrowsDuringCreation() + { + // This test unfortunately needs a lot of knowledge about the inner working of the ConnectionPool to + // adequately test that connections established in parallel will all be disposed if one throws an + // exception. + + // First create a pool with only closed connections that we can then let the pool replace: + var fakeConnectionFactory = new Mock<IConnectionFactory>(); + fakeConnectionFactory.SetupSequence(m => m.CreateConnection()) + .Returns(ClosedConnection) // We need to do it like this as we use a dictionary of dead connections in + .Returns(ClosedConnection) // ConnectionPool and the three connections need to be different objects + .Returns(ClosedConnection);// for this to work. + var pool = CreateConnectionPool(fakeConnectionFactory.Object, 3, 0); + var startEstablishingProblematicConnections = new SemaphoreSlim(0, 1); + // Let the pool get one connection that is so slow to open that the pool will afterwards try to create two + // more connections in parallel. + var fakedSlowToEstablishConnection = new Mock<IConnection>(); + fakedSlowToEstablishConnection.Setup(m => m.ConnectAsync(It.IsAny<CancellationToken>())) + .Returns(startEstablishingProblematicConnections.WaitAsync); + fakeConnectionFactory.Setup(m => m.CreateConnection()) + .Returns(fakedSlowToEstablishConnection.Object); + // Trigger replacement of closed connections + try + { + pool.GetAvailableConnection(); + } + catch (ServerUnavailableException) + { + // expected as the pool only contain closed connections at this point + } + + var fakedOpenConnection = FakedOpenConnection; + var fakedCannotConnectConnection = FakedCannotConnectConnection; + fakeConnectionFactory.SetupSequence(m => m.CreateConnection()) + .Returns(fakedOpenConnection.Object) + .Returns(fakedCannotConnectConnection.Object); + // Let the slow to establish connection finish so the pool can try to establish the other two connections + startEstablishingProblematicConnections.Release(); + await Task.Delay(TimeSpan.FromMilliseconds(200)); + + // Verify that the pool tried to establish both connections and then also disposed both, even though one throw an exception + fakedOpenConnection.Verify(m => m.ConnectAsync(It.IsAny<CancellationToken>()), Times.Once()); + fakedOpenConnection.Verify(m => m.Dispose(), Times.Once); + fakedCannotConnectConnection.Verify(m => m.ConnectAsync(It.IsAny<CancellationToken>()), Times.Once); + fakedCannotConnectConnection.Verify(m => m.Dispose(), Times.Once); + } + + private static IConnection OpenConnection => FakedOpenConnection.Object; + + private static Mock<IConnection> FakedOpenConnection { get { var fakedConnection = new Mock<IConnection>(); fakedConnection.Setup(f => f.IsOpen).Returns(true); - return fakedConnection.Object; + return fakedConnection; } } + + private static IConnection ClosedConnection => FakedClosedConnection.Object; - private static IConnection ClosedConnection + private static Mock<IConnection> FakedClosedConnection { get { var fakedConnection = new Mock<IConnection>(); fakedConnection.Setup(f => f.IsOpen).Returns(false); - return fakedConnection.Object; + return fakedConnection; } } + + private static IConnection CannotConnectConnection => FakedCannotConnectConnection.Object; - private static IConnection CannotConnectConnection + private static Mock<IConnection> FakedCannotConnectConnection { get { var fakedConnection = new Mock<IConnection>(); fakedConnection.Setup(f => f.IsOpen).Returns(false); - fakedConnection.Setup(f => f.ConnectAsync()).Throws(new Exception("Cannot connect to server.")); - return fakedConnection.Object; + fakedConnection.Setup(f => f.ConnectAsync(It.IsAny<CancellationToken>())) + .Throws(new Exception("Cannot connect to server.")); + return fakedConnection; } }
