This is an automated email from the ASF dual-hosted git repository.

florianhockmann pushed a commit to branch TINKERPOP-2358
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit e411d85f8d9f1b8201586a54ee96ce06a016ca85
Author: Florian Hockmann <[email protected]>
AuthorDate: Thu Jun 3 15:18:58 2021 +0200

    TINKERPOP-2358 Stop leaking connections on Dispose
    
    If the `ConnectionPool` was disposed while it was in parallel creating
    new connections (e.g., to replace closed connections), those
    connections could be leaked. `Dispose()` could not dispose them yet as
    they were not completely established so they could be added to the pool.
    
    We now check after creating new connections whether the pool has been
    disposed in the meantime and then dispose these connections directly
    again.
    
    Simply checking if the pool was already disposed after having created
    new connections and then closing them directly in that case solves this
    problem.
    
    In addition to that, the pool now also has a `CancellationTokenSource`
    which allows us to cancel all active creations of new connections in
    `Dispose()`. So we don't have to wait until they are created only so we
    can then dispose them if we can already cancel the connection
    establishment.
---
 CHANGELOG.asciidoc                                 |   1 +
 .../src/Gremlin.Net/Driver/Connection.cs           |   4 +-
 .../src/Gremlin.Net/Driver/ConnectionPool.cs       |  32 ++++--
 .../src/Gremlin.Net/Driver/IConnection.cs          |   3 +-
 .../src/Gremlin.Net/Driver/ProxyConnection.cs      |   5 +-
 .../src/Gremlin.Net/Driver/WebSocketConnection.cs  |   4 +-
 .../Driver/ConnectionPoolTests.cs                  | 128 +++++++++++++++++++--
 7 files changed, 155 insertions(+), 22 deletions(-)

diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 46fdeb1..c9a2d10 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -43,6 +43,7 @@ 
image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima
 * Fixed `NullPointerException` in `ResponseMessage` deserialization for 
GraphSON.
 * Enabled the Gremlin.Net driver to repair its connection pool after the 
server was temporarily unavailable.
 * Added the ability to supply a `HandshakeInterceptor` to a `Cluster` which 
will provide access to the initial HTTP request that establishes the websocket.
+* Fixed a possible leakage of connections in the Gremlin.NET driver that could 
happen if Dispose() was called while the pool was creating connections.
 
 ==== Bugs
 
diff --git a/gremlin-dotnet/src/Gremlin.Net/Driver/Connection.cs 
b/gremlin-dotnet/src/Gremlin.Net/Driver/Connection.cs
index 7436c3d..7188fd2 100644
--- a/gremlin-dotnet/src/Gremlin.Net/Driver/Connection.cs
+++ b/gremlin-dotnet/src/Gremlin.Net/Driver/Connection.cs
@@ -79,9 +79,9 @@ namespace Gremlin.Net.Driver
             _webSocketConnection = new 
WebSocketConnection(webSocketConfiguration);
         }
 
-        public async Task ConnectAsync()
+        public async Task ConnectAsync(CancellationToken cancellationToken)
         {
-            await 
_webSocketConnection.ConnectAsync(_uri).ConfigureAwait(false);
+            await _webSocketConnection.ConnectAsync(_uri, 
cancellationToken).ConfigureAwait(false);
             BeginReceiving();
         }
 
diff --git a/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPool.cs 
b/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPool.cs
index 261504a..3c6bd5c 100644
--- a/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPool.cs
+++ b/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPool.cs
@@ -46,6 +46,7 @@ namespace Gremlin.Net.Driver
         private int _poolState;
         private const int PoolIdle = 0;
         private const int PoolPopulationInProgress = 1;
+        private readonly CancellationTokenSource _cts = new 
CancellationTokenSource();
 
         public ConnectionPool(IConnectionFactory connectionFactory, 
ConnectionPoolSettings settings)
         {
@@ -130,23 +131,36 @@ namespace Gremlin.Net.Driver
                 var createdConnections = await 
Task.WhenAll(connectionCreationTasks).ConfigureAwait(false);
                 _connections.AddRange(createdConnections);
             }
-            catch (Exception)
+            catch (Exception e)
             {
-                // Dispose created connections if the connection establishment 
failed
+                // Dispose all connections that were already created
                 foreach (var creationTask in connectionCreationTasks)
                 {
-                    if (!creationTask.IsFaulted)
+                    if (creationTask.IsCompleted)
                         creationTask.Result?.Dispose();
                 }
-
-                throw;
+                throw e;
+            }
+            
+            if (_disposed)
+            {
+                await 
CloseAndRemoveAllConnectionsAsync().ConfigureAwait(false);
             }
         }
 
         private async Task<IConnection> CreateNewConnectionAsync()
         {
             var newConnection = _connectionFactory.CreateConnection();
-            await newConnection.ConnectAsync().ConfigureAwait(false);
+            try
+            {
+                await 
newConnection.ConnectAsync(_cts.Token).ConfigureAwait(false);
+            }
+            catch (Exception e)
+            {
+                // Dispose created connection if the connection establishment 
failed
+                newConnection.Dispose();
+                throw e;
+            }
             return newConnection;
         }
 
@@ -216,7 +230,7 @@ namespace Gremlin.Net.Driver
         {
             var poolWasPopulated = await 
EnsurePoolIsHealthyAsync().ConfigureAwait(false);
             // Another connection could have been removed already, check if 
another population is necessary
-            if (poolWasPopulated)
+            if (poolWasPopulated && !_disposed)
                 await ReplaceClosedConnectionsAsync().ConfigureAwait(false);
         }
 
@@ -260,7 +274,11 @@ namespace Gremlin.Net.Driver
             if (!_disposed)
             {
                 if (disposing)
+                {
+                    _cts.Cancel();
                     CloseAndRemoveAllConnectionsAsync().WaitUnwrap();
+                    _cts.Dispose();
+                }
                 _disposed = true;
             }
         }
diff --git a/gremlin-dotnet/src/Gremlin.Net/Driver/IConnection.cs 
b/gremlin-dotnet/src/Gremlin.Net/Driver/IConnection.cs
index 7d29571..f9df79b 100644
--- a/gremlin-dotnet/src/Gremlin.Net/Driver/IConnection.cs
+++ b/gremlin-dotnet/src/Gremlin.Net/Driver/IConnection.cs
@@ -23,6 +23,7 @@
 
 using System;
 using System.Collections.Generic;
+using System.Threading;
 using System.Threading.Tasks;
 using Gremlin.Net.Driver.Messages;
 
@@ -30,7 +31,7 @@ namespace Gremlin.Net.Driver
 {
     internal interface IConnection : IDisposable
     {
-        Task ConnectAsync();
+        Task ConnectAsync(CancellationToken cancellationToken);
         Task<ResultSet<T>> SubmitAsync<T>(RequestMessage requestMessage);
         int NrRequestsInFlight { get; }
         bool IsOpen { get; }
diff --git a/gremlin-dotnet/src/Gremlin.Net/Driver/ProxyConnection.cs 
b/gremlin-dotnet/src/Gremlin.Net/Driver/ProxyConnection.cs
index 41c5c4a..d8d443a 100644
--- a/gremlin-dotnet/src/Gremlin.Net/Driver/ProxyConnection.cs
+++ b/gremlin-dotnet/src/Gremlin.Net/Driver/ProxyConnection.cs
@@ -23,6 +23,7 @@
 
 using System;
 using System.Collections.Generic;
+using System.Threading;
 using System.Threading.Tasks;
 using Gremlin.Net.Driver.Messages;
 
@@ -39,9 +40,9 @@ namespace Gremlin.Net.Driver
             _releaseAction = releaseAction;
         }
 
-        public async Task ConnectAsync()
+        public async Task ConnectAsync(CancellationToken cancellationToken)
         {
-            await ProxiedConnection.ConnectAsync().ConfigureAwait(false);
+            await 
ProxiedConnection.ConnectAsync(cancellationToken).ConfigureAwait(false);
         }
 
         public async Task<ResultSet<T>> SubmitAsync<T>(RequestMessage 
requestMessage)
diff --git a/gremlin-dotnet/src/Gremlin.Net/Driver/WebSocketConnection.cs 
b/gremlin-dotnet/src/Gremlin.Net/Driver/WebSocketConnection.cs
index 1ea02dd..936de12 100644
--- a/gremlin-dotnet/src/Gremlin.Net/Driver/WebSocketConnection.cs
+++ b/gremlin-dotnet/src/Gremlin.Net/Driver/WebSocketConnection.cs
@@ -41,9 +41,9 @@ namespace Gremlin.Net.Driver
             webSocketConfiguration?.Invoke(_client.Options);
         }
 
-        public async Task ConnectAsync(Uri uri)
+        public async Task ConnectAsync(Uri uri, CancellationToken 
cancellationToken)
         {
-            await _client.ConnectAsync(uri, 
CancellationToken.None).ConfigureAwait(false);
+            await _client.ConnectAsync(uri, 
cancellationToken).ConfigureAwait(false);
         }
 
         public async Task CloseAsync()
diff --git 
a/gremlin-dotnet/test/Gremlin.Net.UnitTest/Driver/ConnectionPoolTests.cs 
b/gremlin-dotnet/test/Gremlin.Net.UnitTest/Driver/ConnectionPoolTests.cs
index fcc8eb5..156a149 100644
--- a/gremlin-dotnet/test/Gremlin.Net.UnitTest/Driver/ConnectionPoolTests.cs
+++ b/gremlin-dotnet/test/Gremlin.Net.UnitTest/Driver/ConnectionPoolTests.cs
@@ -23,6 +23,7 @@
 
 using System;
 using System.Collections.Generic;
+using System.Threading;
 using System.Threading.Tasks;
 using Gremlin.Net.Driver;
 using Gremlin.Net.Driver.Exceptions;
@@ -46,7 +47,7 @@ namespace Gremlin.Net.UnitTest.Driver
             
             Assert.Equal(poolSize, pool.NrConnections);
             mockedConnectionFactory.Verify(m => m.CreateConnection(), 
Times.Exactly(poolSize));
-            mockedConnection.Verify(m => m.ConnectAsync(), 
Times.Exactly(poolSize));
+            mockedConnection.Verify(m => 
m.ConnectAsync(It.IsAny<CancellationToken>()), Times.Exactly(poolSize));
         }
 
         [Fact]
@@ -235,34 +236,145 @@ namespace Gremlin.Net.UnitTest.Driver
             Assert.Throws<ServerUnavailableException>(() => 
pool.GetAvailableConnection());
         }
 
-        private static IConnection OpenConnection
+        [Fact]
+        public void 
ShouldNotLeakConnectionsIfDisposeIsCalledWhilePoolIsPopulating()
+        {
+            var fakeConnectionFactory = new Mock<IConnectionFactory>();
+            fakeConnectionFactory.Setup(m => 
m.CreateConnection()).Returns(ClosedConnection);
+            var pool = CreateConnectionPool(fakeConnectionFactory.Object, 1);
+            var mockedConnectionToBeDisposed = new Mock<IConnection>();
+            var poolWasDisposedSignal = new SemaphoreSlim(0, 1);
+            mockedConnectionToBeDisposed.Setup(f => 
f.ConnectAsync(It.IsAny<CancellationToken>()))
+                .Returns((CancellationToken _) => 
poolWasDisposedSignal.WaitAsync(CancellationToken.None));
+            // We don't use the `CancellationToken` here as the connection 
should also be disposed if it did not
+            //  react on the cancellation. This can happen if the task is 
cancelled just before `ConnectAsync` returns.
+            fakeConnectionFactory.Setup(m => 
m.CreateConnection()).Returns(mockedConnectionToBeDisposed.Object);
+            try
+            {
+                pool.GetAvailableConnection();
+            }
+            catch (ServerUnavailableException)
+            {
+                // expected as the pool only contains a closed connection at 
this point
+            }
+            
+            pool.Dispose();
+            poolWasDisposedSignal.Release();
+            
+            Assert.Equal(0, pool.NrConnections);
+            mockedConnectionToBeDisposed.Verify(m => 
m.ConnectAsync(It.IsAny<CancellationToken>()), Times.Once);
+            mockedConnectionToBeDisposed.Verify(m => m.Dispose(), Times.Once);
+        }
+
+        [Fact]
+        public void DisposeShouldCancelConnectionEstablishment()
+        {
+            var fakeConnectionFactory = new Mock<IConnectionFactory>();
+            fakeConnectionFactory.Setup(m => 
m.CreateConnection()).Returns(ClosedConnection);
+            var pool = CreateConnectionPool(fakeConnectionFactory.Object, 1, 
0);
+            var mockedConnectionToBeDisposed = new Mock<IConnection>();
+            mockedConnectionToBeDisposed.Setup(f => 
f.ConnectAsync(It.IsAny<CancellationToken>()))
+                .Returns((CancellationToken ct) => Task.Delay(-1, ct));
+            fakeConnectionFactory.Setup(m => 
m.CreateConnection()).Returns(mockedConnectionToBeDisposed.Object);
+            try
+            {
+                pool.GetAvailableConnection();
+            }
+            catch (ServerUnavailableException)
+            {
+                // expected as the pool only contains a closed connection at 
this point
+            }
+            
+            pool.Dispose();
+            
+            Assert.Equal(0, pool.NrConnections);
+            mockedConnectionToBeDisposed.Verify(m => 
m.ConnectAsync(It.IsAny<CancellationToken>()));
+            mockedConnectionToBeDisposed.Verify(m => m.Dispose(), Times.Once);
+        }
+        
+        [Fact]
+        public async Task 
ConnectionsEstablishedInParallelShouldAllBeDisposedIfOneThrowsDuringCreation()
+        {
+            // This test unfortunately needs a lot of knowledge about the 
inner working of the ConnectionPool to 
+            //  adequately test that connections established in parallel will 
all be disposed if one throws an
+            //  exception.
+            
+            // First create a pool with only closed connections that we can 
then let the pool replace:
+            var fakeConnectionFactory = new Mock<IConnectionFactory>();
+            fakeConnectionFactory.SetupSequence(m => m.CreateConnection())
+                .Returns(ClosedConnection) // We need to do it like this as we 
use a dictionary of dead connections in 
+                .Returns(ClosedConnection) //   ConnectionPool and the three 
connections need to be different objects
+                .Returns(ClosedConnection);//   for this to work.
+            var pool = CreateConnectionPool(fakeConnectionFactory.Object, 3, 
0);
+            var startEstablishingProblematicConnections = new SemaphoreSlim(0, 
1);
+            // Let the pool get one connection that is so slow to open that 
the pool will afterwards try to create two
+            //  more connections in parallel.
+            var fakedSlowToEstablishConnection = new Mock<IConnection>();
+            fakedSlowToEstablishConnection.Setup(m => 
m.ConnectAsync(It.IsAny<CancellationToken>()))
+                .Returns(startEstablishingProblematicConnections.WaitAsync);
+            fakeConnectionFactory.Setup(m => m.CreateConnection())
+                .Returns(fakedSlowToEstablishConnection.Object);
+            // Trigger replacement of closed connections
+            try
+            {
+                pool.GetAvailableConnection();
+            }
+            catch (ServerUnavailableException)
+            {
+                // expected as the pool only contain closed connections at 
this point
+            }
+            
+            var fakedOpenConnection = FakedOpenConnection;
+            var fakedCannotConnectConnection = FakedCannotConnectConnection;
+            fakeConnectionFactory.SetupSequence(m => m.CreateConnection())
+                .Returns(fakedOpenConnection.Object)
+                .Returns(fakedCannotConnectConnection.Object);
+            // Let the slow to establish connection finish so the pool can try 
to establish the other two connections
+            startEstablishingProblematicConnections.Release();
+            await Task.Delay(TimeSpan.FromMilliseconds(200));
+            
+            // Verify that the pool tried to establish both connections and 
then also disposed both, even though one throw an exception
+            fakedOpenConnection.Verify(m => 
m.ConnectAsync(It.IsAny<CancellationToken>()), Times.Once());
+            fakedOpenConnection.Verify(m => m.Dispose(), Times.Once);
+            fakedCannotConnectConnection.Verify(m => 
m.ConnectAsync(It.IsAny<CancellationToken>()), Times.Once);
+            fakedCannotConnectConnection.Verify(m => m.Dispose(), Times.Once);
+        }
+
+        private static IConnection OpenConnection => 
FakedOpenConnection.Object;
+
+        private static Mock<IConnection> FakedOpenConnection
         {
             get
             {
                 var fakedConnection = new Mock<IConnection>();
                 fakedConnection.Setup(f => f.IsOpen).Returns(true);
-                return fakedConnection.Object;
+                return fakedConnection;
             }
         }
+
+        private static IConnection ClosedConnection => 
FakedClosedConnection.Object;
         
-        private static IConnection ClosedConnection
+        private static Mock<IConnection> FakedClosedConnection
         {
             get
             {
                 var fakedConnection = new Mock<IConnection>();
                 fakedConnection.Setup(f => f.IsOpen).Returns(false);
-                return fakedConnection.Object;
+                return fakedConnection;
             }
         }
+
+        private static IConnection CannotConnectConnection => 
FakedCannotConnectConnection.Object;
         
-        private static IConnection CannotConnectConnection
+        private static Mock<IConnection> FakedCannotConnectConnection
         {
             get
             {
                 var fakedConnection = new Mock<IConnection>();
                 fakedConnection.Setup(f => f.IsOpen).Returns(false);
-                fakedConnection.Setup(f => f.ConnectAsync()).Throws(new 
Exception("Cannot connect to server."));
-                return fakedConnection.Object;
+                fakedConnection.Setup(f => 
f.ConnectAsync(It.IsAny<CancellationToken>()))
+                    .Throws(new Exception("Cannot connect to server."));
+                return fakedConnection;
             }
         }
 

Reply via email to