This is an automated email from the ASF dual-hosted git repository.

florianhockmann pushed a commit to branch TINKERPOP-2135
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit c42621a666029bf65a364504a81d7ba74668b949
Author: Florian Hockmann <[email protected]>
AuthorDate: Sun Mar 3 12:32:06 2019 +0100

    TINKERPOP-2135 Fix for closed idle connections
    
    Connections that were closed while sitting idle in the pool are now
    correctly removed from the pool. The pool population method also needed
    to be changed as it is now able to populate the pool again if only some
    connections were removed. The previous assumption here was that the pool
    is either completely populated or completely empty.
---
 .../src/Gremlin.Net/Driver/Connection.cs           |  2 +-
 .../src/Gremlin.Net/Driver/ConnectionPool.cs       | 79 +++++++++++-----------
 2 files changed, 41 insertions(+), 40 deletions(-)

diff --git a/gremlin-dotnet/src/Gremlin.Net/Driver/Connection.cs 
b/gremlin-dotnet/src/Gremlin.Net/Driver/Connection.cs
index bd70da6..5a35465 100644
--- a/gremlin-dotnet/src/Gremlin.Net/Driver/Connection.cs
+++ b/gremlin-dotnet/src/Gremlin.Net/Driver/Connection.cs
@@ -79,7 +79,7 @@ namespace Gremlin.Net.Driver
 
         public int NrRequestsInFlight => _callbackByRequestId.Count;
 
-        public bool IsOpen => _webSocketConnection.IsOpen;
+        public bool IsOpen => _webSocketConnection.IsOpen && Volatile.Read(ref 
_connectionState) != Closed;
 
         public Task<ResultSet<T>> SubmitAsync<T>(RequestMessage requestMessage)
         {
diff --git a/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPool.cs 
b/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPool.cs
index 8557d01..40aa99b 100644
--- a/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPool.cs
+++ b/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPool.cs
@@ -34,12 +34,13 @@ namespace Gremlin.Net.Driver
     internal class ConnectionPool : IDisposable
     {
         private readonly ConnectionFactory _connectionFactory;
-        private readonly ConcurrentBag<Connection> _connections = new 
ConcurrentBag<Connection>();
+        private readonly ConcurrentDictionary<Connection, object> _connections 
=
+            new ConcurrentDictionary<Connection, object>();
         private readonly int _poolSize;
         private readonly int _maxInProcessPerConnection;
-        private int _nrConnections;
-        private const int PoolEmpty = 0;
-        private const int PoolPopulationInProgress = -1;
+        private int _poolState;
+        private const int PoolIdle = 0;
+        private const int PoolPopulationInProgress = 1;
 
         public ConnectionPool(ConnectionFactory connectionFactory, 
ConnectionPoolSettings settings)
         {
@@ -49,15 +50,8 @@ namespace Gremlin.Net.Driver
             PopulatePoolAsync().WaitUnwrap();
         }
         
-        public int NrConnections
-        {
-            get
-            {
-                var nrConnections = Interlocked.CompareExchange(ref 
_nrConnections, PoolEmpty, PoolEmpty);
-                return nrConnections < 0 ? 0 : nrConnections;
-            }
-        }
-        
+        public int NrConnections => _connections.Count;
+
         public async Task<IConnection> GetAvailableConnectionAsync()
         {
             await EnsurePoolIsPopulatedAsync().ConfigureAwait(false);
@@ -69,38 +63,34 @@ namespace Gremlin.Net.Driver
             // The pool could have been empty because of connection problems. 
So, we need to populate it again.
             while (true)
             {
-                var nrOpened = Interlocked.CompareExchange(ref _nrConnections, 
PoolEmpty, PoolEmpty);
-                if (nrOpened >= _poolSize) break;
-                if (nrOpened != PoolPopulationInProgress)
+                if (NrConnections >= _poolSize) break;
+                var poolState = Interlocked.CompareExchange(ref _poolState, 
PoolPopulationInProgress, PoolIdle);
+                if (poolState == PoolPopulationInProgress) continue;
+                try
                 {
                     await PopulatePoolAsync().ConfigureAwait(false);
                 }
+                finally
+                {
+                    // We need to remove the PoolPopulationInProgress flag 
again even if an exception occurred, so we don't block the pool population for 
ever
+                    Interlocked.CompareExchange(ref _poolState, PoolIdle, 
PoolPopulationInProgress);
+                }
             }
         }
         
         private async Task PopulatePoolAsync()
         {
-            var nrOpened = Interlocked.CompareExchange(ref _nrConnections, 
PoolPopulationInProgress, PoolEmpty);
-            if (nrOpened == PoolPopulationInProgress || nrOpened >= _poolSize) 
return;
-
-            try
+            var nrConnectionsToCreate = _poolSize - _connections.Count;
+            var connectionCreationTasks = new 
List<Task<Connection>>(nrConnectionsToCreate);
+            for (var i = 0; i < nrConnectionsToCreate; i++)
             {
-                var connectionCreationTasks = new 
List<Task<Connection>>(_poolSize);
-                for (var i = 0; i < _poolSize; i++)
-                {
-                    connectionCreationTasks.Add(CreateNewConnectionAsync());
-                }
-
-                var createdConnections = await 
Task.WhenAll(connectionCreationTasks).ConfigureAwait(false);
-                foreach (var c in createdConnections)
-                {
-                    _connections.Add(c);
-                }
+                connectionCreationTasks.Add(CreateNewConnectionAsync());
             }
-            finally
+
+            var createdConnections = await 
Task.WhenAll(connectionCreationTasks).ConfigureAwait(false);
+            foreach (var c in createdConnections)
             {
-                // We need to remove the PoolPopulationInProgress flag again 
even if an exception occurred, so we don't block the pool population for ever
-                Interlocked.CompareExchange(ref _nrConnections, 
_connections.Count, PoolPopulationInProgress);
+                _connections.TryAdd(c, null);
             }
         }
         
@@ -120,8 +110,7 @@ namespace Gremlin.Net.Driver
                     throw new ServerUnavailableException();
                 if (connection.NrRequestsInFlight >= 
_maxInProcessPerConnection)
                     throw new ConnectionPoolBusyException(_poolSize, 
_maxInProcessPerConnection);
-                if (connection.IsOpen) return connection;
-                DefinitelyDestroyConnection(connection);
+                return connection;
             }
         }
 
@@ -130,10 +119,16 @@ namespace Gremlin.Net.Driver
             if (_connections.IsEmpty) return null;
             var nrMinInFlightConnections = int.MaxValue;
             Connection leastBusy = null;
-            foreach (var connection in _connections)
+            
+            foreach (var connection in _connections.Keys)
             {
                 var nrInFlight = connection.NrRequestsInFlight;
                 if (nrInFlight >= nrMinInFlightConnections) continue;
+                if (!connection.IsOpen)
+                {
+                    RemoveConnectionFromPool(connection);
+                    continue;
+                }
                 nrMinInFlightConnections = nrInFlight;
                 leastBusy = connection;
             }
@@ -141,6 +136,12 @@ namespace Gremlin.Net.Driver
             return leastBusy;
         }
         
+        private void RemoveConnectionFromPool(Connection connection)
+        {
+            if (_connections.TryRemove(connection, out _))
+                DefinitelyDestroyConnection(connection);
+        }
+        
         private IConnection ProxiedConnection(Connection connection)
         {
             return new ProxyConnection(connection, ReturnConnectionIfOpen);
@@ -159,8 +160,9 @@ namespace Gremlin.Net.Driver
 
         private async Task CloseAndRemoveAllConnectionsAsync()
         {
-            while (_connections.TryTake(out var connection))
+            foreach (var connection in _connections.Keys)
             {
+                _connections.TryRemove(connection, out _);
                 await connection.CloseAsync().ConfigureAwait(false);
                 DefinitelyDestroyConnection(connection);
             }
@@ -169,7 +171,6 @@ namespace Gremlin.Net.Driver
         private void DefinitelyDestroyConnection(Connection connection)
         {
             connection.Dispose();
-            Interlocked.Decrement(ref _nrConnections);
         }
 
         #region IDisposable Support

Reply via email to