This is an automated email from the ASF dual-hosted git repository. florianhockmann pushed a commit to branch TINKERPOP-2135 in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit c42621a666029bf65a364504a81d7ba74668b949 Author: Florian Hockmann <[email protected]> AuthorDate: Sun Mar 3 12:32:06 2019 +0100 TINKERPOP-2135 Fix for closed idle connections Connections that were closed while sitting idle in the pool are now correctly removed from the pool. The pool population method also needed to be changed as it is now able to populate the pool again if only some connections were removed. The previous assumption here was that the pool is either completely populated or completely empty. --- .../src/Gremlin.Net/Driver/Connection.cs | 2 +- .../src/Gremlin.Net/Driver/ConnectionPool.cs | 79 +++++++++++----------- 2 files changed, 41 insertions(+), 40 deletions(-) diff --git a/gremlin-dotnet/src/Gremlin.Net/Driver/Connection.cs b/gremlin-dotnet/src/Gremlin.Net/Driver/Connection.cs index bd70da6..5a35465 100644 --- a/gremlin-dotnet/src/Gremlin.Net/Driver/Connection.cs +++ b/gremlin-dotnet/src/Gremlin.Net/Driver/Connection.cs @@ -79,7 +79,7 @@ namespace Gremlin.Net.Driver public int NrRequestsInFlight => _callbackByRequestId.Count; - public bool IsOpen => _webSocketConnection.IsOpen; + public bool IsOpen => _webSocketConnection.IsOpen && Volatile.Read(ref _connectionState) != Closed; public Task<ResultSet<T>> SubmitAsync<T>(RequestMessage requestMessage) { diff --git a/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPool.cs b/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPool.cs index 8557d01..40aa99b 100644 --- a/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPool.cs +++ b/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPool.cs @@ -34,12 +34,13 @@ namespace Gremlin.Net.Driver internal class ConnectionPool : IDisposable { private readonly ConnectionFactory _connectionFactory; - private readonly ConcurrentBag<Connection> _connections = new ConcurrentBag<Connection>(); + private readonly ConcurrentDictionary<Connection, object> _connections = + new ConcurrentDictionary<Connection, object>(); private readonly int _poolSize; private readonly int _maxInProcessPerConnection; - private int _nrConnections; - private const int PoolEmpty = 0; - private const int PoolPopulationInProgress = -1; + private int _poolState; + private const int PoolIdle = 0; + private const int PoolPopulationInProgress = 1; public ConnectionPool(ConnectionFactory connectionFactory, ConnectionPoolSettings settings) { @@ -49,15 +50,8 @@ namespace Gremlin.Net.Driver PopulatePoolAsync().WaitUnwrap(); } - public int NrConnections - { - get - { - var nrConnections = Interlocked.CompareExchange(ref _nrConnections, PoolEmpty, PoolEmpty); - return nrConnections < 0 ? 0 : nrConnections; - } - } - + public int NrConnections => _connections.Count; + public async Task<IConnection> GetAvailableConnectionAsync() { await EnsurePoolIsPopulatedAsync().ConfigureAwait(false); @@ -69,38 +63,34 @@ namespace Gremlin.Net.Driver // The pool could have been empty because of connection problems. So, we need to populate it again. while (true) { - var nrOpened = Interlocked.CompareExchange(ref _nrConnections, PoolEmpty, PoolEmpty); - if (nrOpened >= _poolSize) break; - if (nrOpened != PoolPopulationInProgress) + if (NrConnections >= _poolSize) break; + var poolState = Interlocked.CompareExchange(ref _poolState, PoolPopulationInProgress, PoolIdle); + if (poolState == PoolPopulationInProgress) continue; + try { await PopulatePoolAsync().ConfigureAwait(false); } + finally + { + // We need to remove the PoolPopulationInProgress flag again even if an exception occurred, so we don't block the pool population for ever + Interlocked.CompareExchange(ref _poolState, PoolIdle, PoolPopulationInProgress); + } } } private async Task PopulatePoolAsync() { - var nrOpened = Interlocked.CompareExchange(ref _nrConnections, PoolPopulationInProgress, PoolEmpty); - if (nrOpened == PoolPopulationInProgress || nrOpened >= _poolSize) return; - - try + var nrConnectionsToCreate = _poolSize - _connections.Count; + var connectionCreationTasks = new List<Task<Connection>>(nrConnectionsToCreate); + for (var i = 0; i < nrConnectionsToCreate; i++) { - var connectionCreationTasks = new List<Task<Connection>>(_poolSize); - for (var i = 0; i < _poolSize; i++) - { - connectionCreationTasks.Add(CreateNewConnectionAsync()); - } - - var createdConnections = await Task.WhenAll(connectionCreationTasks).ConfigureAwait(false); - foreach (var c in createdConnections) - { - _connections.Add(c); - } + connectionCreationTasks.Add(CreateNewConnectionAsync()); } - finally + + var createdConnections = await Task.WhenAll(connectionCreationTasks).ConfigureAwait(false); + foreach (var c in createdConnections) { - // We need to remove the PoolPopulationInProgress flag again even if an exception occurred, so we don't block the pool population for ever - Interlocked.CompareExchange(ref _nrConnections, _connections.Count, PoolPopulationInProgress); + _connections.TryAdd(c, null); } } @@ -120,8 +110,7 @@ namespace Gremlin.Net.Driver throw new ServerUnavailableException(); if (connection.NrRequestsInFlight >= _maxInProcessPerConnection) throw new ConnectionPoolBusyException(_poolSize, _maxInProcessPerConnection); - if (connection.IsOpen) return connection; - DefinitelyDestroyConnection(connection); + return connection; } } @@ -130,10 +119,16 @@ namespace Gremlin.Net.Driver if (_connections.IsEmpty) return null; var nrMinInFlightConnections = int.MaxValue; Connection leastBusy = null; - foreach (var connection in _connections) + + foreach (var connection in _connections.Keys) { var nrInFlight = connection.NrRequestsInFlight; if (nrInFlight >= nrMinInFlightConnections) continue; + if (!connection.IsOpen) + { + RemoveConnectionFromPool(connection); + continue; + } nrMinInFlightConnections = nrInFlight; leastBusy = connection; } @@ -141,6 +136,12 @@ namespace Gremlin.Net.Driver return leastBusy; } + private void RemoveConnectionFromPool(Connection connection) + { + if (_connections.TryRemove(connection, out _)) + DefinitelyDestroyConnection(connection); + } + private IConnection ProxiedConnection(Connection connection) { return new ProxyConnection(connection, ReturnConnectionIfOpen); @@ -159,8 +160,9 @@ namespace Gremlin.Net.Driver private async Task CloseAndRemoveAllConnectionsAsync() { - while (_connections.TryTake(out var connection)) + foreach (var connection in _connections.Keys) { + _connections.TryRemove(connection, out _); await connection.CloseAsync().ConfigureAwait(false); DefinitelyDestroyConnection(connection); } @@ -169,7 +171,6 @@ namespace Gremlin.Net.Driver private void DefinitelyDestroyConnection(Connection connection) { connection.Dispose(); - Interlocked.Decrement(ref _nrConnections); } #region IDisposable Support
