This is an automated email from the ASF dual-hosted git repository. florianhockmann pushed a commit to branch TINKERPOP-2288 in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit 217018e890f355d32318daeb1d380978853b8394 Author: Florian Hockmann <[email protected]> AuthorDate: Tue Mar 10 16:27:42 2020 +0100 TINKERPOP-2288 Replace closed connections directly Closed connections are now replaced automatically in the background. If no open connection is available to answer a request, then the pool tries it again after some time. It uses a retry policy with exponential backoff for that, implemented with Polly. This change also ensures that only one task performs a pool resizing operation at a time. These changes should ensure that: - A connection is still returned quickly if one is available. - Closed connections are replaced immediately, without needing to wait for the next incoming request. - If the server is only unavailable temporarily (or it just closed open connections for some reason), then the user should should not get an exception. He only has to wait until the connections are replaced. TODO: - Make the retry policy configurable. - Document changes. --- gremlin-dotnet/glv/Gremlin.Net.csproj.template | 5 +- .../src/Gremlin.Net/Driver/ConnectionFactory.cs | 4 +- .../src/Gremlin.Net/Driver/ConnectionPool.cs | 118 +++++++++---- .../src/Gremlin.Net/Driver/GremlinClient.cs | 2 +- .../src/Gremlin.Net/Driver/IConnection.cs | 4 + .../{IConnection.cs => IConnectionFactory.cs} | 9 +- .../src/Gremlin.Net/Driver/ProxyConnection.cs | 26 ++- gremlin-dotnet/src/Gremlin.Net/Gremlin.Net.csproj | 5 +- .../src/Gremlin.Net/Properties/AssemblyInfo.cs | 3 +- .../Driver/ConnectionPoolTests.cs | 193 +++++++++++++++++++++ 10 files changed, 315 insertions(+), 54 deletions(-) diff --git a/gremlin-dotnet/glv/Gremlin.Net.csproj.template b/gremlin-dotnet/glv/Gremlin.Net.csproj.template index a6a57a3..d7bac7e 100644 --- a/gremlin-dotnet/glv/Gremlin.Net.csproj.template +++ b/gremlin-dotnet/glv/Gremlin.Net.csproj.template @@ -63,11 +63,12 @@ NOTE that versions suffixed with "-rc" are considered release candidates (i.e. p <PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" PrivateAssets="All"/> <PackageReference Include="Newtonsoft.Json" Version="11.0.2" /> <PackageReference Include="Microsoft.CSharp" Version="4.3.0" /> + <PackageReference Include="Polly" Version="7.2.0" /> </ItemGroup> <ItemGroup> - <None Include="../../LICENSE" Pack="true" PackagePath=""/> - <None Include="../../NOTICE" Pack="true" PackagePath=""/> + <None Include="../../LICENSE" Pack="true" PackagePath="" /> + <None Include="../../NOTICE" Pack="true" PackagePath="" /> </ItemGroup> </Project> diff --git a/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionFactory.cs b/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionFactory.cs index 7a6c2d5..c59aba7 100644 --- a/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionFactory.cs +++ b/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionFactory.cs @@ -27,7 +27,7 @@ using Gremlin.Net.Structure.IO.GraphSON; namespace Gremlin.Net.Driver { - internal class ConnectionFactory + internal class ConnectionFactory : IConnectionFactory { private readonly GraphSONReader _graphSONReader; private readonly GraphSONWriter _graphSONWriter; @@ -45,7 +45,7 @@ namespace Gremlin.Net.Driver _webSocketConfiguration = webSocketConfiguration; } - public Connection CreateConnection() + public IConnection CreateConnection() { return new Connection(_gremlinServer.Uri, _gremlinServer.Username, _gremlinServer.Password, _graphSONReader, _graphSONWriter, _mimeType, _webSocketConfiguration); diff --git a/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPool.cs b/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPool.cs index 34bc77f..50138f7 100644 --- a/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPool.cs +++ b/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPool.cs @@ -22,11 +22,13 @@ #endregion using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using Gremlin.Net.Driver.Exceptions; using Gremlin.Net.Process; +using Polly; namespace Gremlin.Net.Driver { @@ -34,8 +36,11 @@ namespace Gremlin.Net.Driver { private const int ConnectionIndexOverflowLimit = int.MaxValue - 1000000; - private readonly ConnectionFactory _connectionFactory; - private readonly CopyOnWriteCollection<Connection> _connections = new CopyOnWriteCollection<Connection>(); + private readonly IConnectionFactory _connectionFactory; + private readonly CopyOnWriteCollection<IConnection> _connections = new CopyOnWriteCollection<IConnection>(); + + private readonly ConcurrentDictionary<IConnection, byte> _deadConnections = + new ConcurrentDictionary<IConnection, byte>(); private readonly int _poolSize; private readonly int _maxInProcessPerConnection; private int _connectionIndex; @@ -43,53 +48,84 @@ namespace Gremlin.Net.Driver private const int PoolIdle = 0; private const int PoolPopulationInProgress = 1; - public ConnectionPool(ConnectionFactory connectionFactory, ConnectionPoolSettings settings) + public ConnectionPool(IConnectionFactory connectionFactory, ConnectionPoolSettings settings) { _connectionFactory = connectionFactory; _poolSize = settings.PoolSize; _maxInProcessPerConnection = settings.MaxInProcessPerConnection; - PopulatePoolAsync().WaitUnwrap(); + ReplaceDeadConnectionsAsync().WaitUnwrap(); } public int NrConnections => _connections.Count; - public async Task<IConnection> GetAvailableConnectionAsync() + public IConnection GetAvailableConnection() { - await EnsurePoolIsPopulatedAsync().ConfigureAwait(false); - return ProxiedConnection(GetConnectionFromPool()); + var connection = Policy.Handle<ServerUnavailableException>() + .WaitAndRetry(3, attempt => TimeSpan.FromSeconds(Math.Pow(2, attempt))) + .Execute(GetConnectionFromPool); + + return ProxiedConnection(connection); } - private async Task EnsurePoolIsPopulatedAsync() + /// <summary> + /// Replaces dead connections. + /// </summary> + /// <returns>True if the pool was repaired, false if repairing was not necessary.</returns> + private async Task<bool> EnsurePoolIsHealthyAsync() { - // The pool could have been (partially) empty because of connection problems. So, we need to populate it again. - if (_poolSize <= NrConnections) return; + if (_deadConnections.IsEmpty) return false; var poolState = Interlocked.CompareExchange(ref _poolState, PoolPopulationInProgress, PoolIdle); - if (poolState == PoolPopulationInProgress) return; + if (poolState == PoolPopulationInProgress) return false; try { - await PopulatePoolAsync().ConfigureAwait(false); + await ReplaceDeadConnectionsAsync().ConfigureAwait(false); } finally { // We need to remove the PoolPopulationInProgress flag again even if an exception occurred, so we don't block the pool population for ever Interlocked.CompareExchange(ref _poolState, PoolIdle, PoolPopulationInProgress); } + + return true; + } + + private async Task ReplaceDeadConnectionsAsync() + { + RemoveDeadConnections(); + + await FillPoolAsync().ConfigureAwait(false); + } + + private void RemoveDeadConnections() + { + if (_deadConnections.IsEmpty) return; + + foreach (var deadConnection in _deadConnections.Keys) + { + if (_connections.TryRemove(deadConnection)) + { + DefinitelyDestroyConnection(deadConnection); + } + } + + _deadConnections.Clear(); } - private async Task PopulatePoolAsync() + private async Task FillPoolAsync() { var nrConnectionsToCreate = _poolSize - _connections.Count; - var connectionCreationTasks = new List<Task<Connection>>(nrConnectionsToCreate); + var connectionCreationTasks = new List<Task<IConnection>>(nrConnectionsToCreate); try { for (var i = 0; i < nrConnectionsToCreate; i++) { connectionCreationTasks.Add(CreateNewConnectionAsync()); } + var createdConnections = await Task.WhenAll(connectionCreationTasks).ConfigureAwait(false); _connections.AddRange(createdConnections); } - catch(Exception) + catch (Exception) { // Dispose created connections if the connection establishment failed foreach (var creationTask in connectionCreationTasks) @@ -97,42 +133,45 @@ namespace Gremlin.Net.Driver if (!creationTask.IsFaulted) creationTask.Result?.Dispose(); } + throw; } } - - private async Task<Connection> CreateNewConnectionAsync() + + private async Task<IConnection> CreateNewConnectionAsync() { var newConnection = _connectionFactory.CreateConnection(); await newConnection.ConnectAsync().ConfigureAwait(false); return newConnection; } - private Connection GetConnectionFromPool() + private IConnection GetConnectionFromPool() { var connections = _connections.Snapshot; if (connections.Length == 0) throw new ServerUnavailableException(); return TryGetAvailableConnection(connections); } - - private Connection TryGetAvailableConnection(Connection[] connections) + + private IConnection TryGetAvailableConnection(IConnection[] connections) { var index = Interlocked.Increment(ref _connectionIndex); ProtectIndexFromOverflowing(index); + var closedConnections = 0; for (var i = 0; i < connections.Length; i++) { var connection = connections[(index + i) % connections.Length]; if (connection.NrRequestsInFlight >= _maxInProcessPerConnection) continue; if (!connection.IsOpen) { - RemoveConnectionFromPool(connection); + ReplaceConnection(connection); + closedConnections++; continue; } return connection; } - if (connections.Length > 0) + if (connections.Length > closedConnections) { throw new ConnectionPoolBusyException(_poolSize, _maxInProcessPerConnection); } @@ -148,26 +187,39 @@ namespace Gremlin.Net.Driver Interlocked.Exchange(ref _connectionIndex, 0); } - private void RemoveConnectionFromPool(Connection connection) + private void ReplaceConnection(IConnection connection) { - if (_connections.TryRemove(connection)) - DefinitelyDestroyConnection(connection); + RemoveConnectionFromPool(connection); + TriggerReplacementOfDeadConnections(); } - private IConnection ProxiedConnection(Connection connection) + private void RemoveConnectionFromPool(IConnection connection) { - return new ProxyConnection(connection, ReturnConnectionIfOpen); + _deadConnections.TryAdd(connection, 0); } - private void ReturnConnectionIfOpen(Connection connection) + private void TriggerReplacementOfDeadConnections() { - if (connection.IsOpen) return; - ConsiderUnavailable(); + ReplaceClosedConnectionsAsync().Forget(); } - private void ConsiderUnavailable() + private async Task ReplaceClosedConnectionsAsync() { - CloseAndRemoveAllConnectionsAsync().WaitUnwrap(); + var poolWasPopulated = await EnsurePoolIsHealthyAsync().ConfigureAwait(false); + // Another connection could have been removed already, check if another population is necessary + if (poolWasPopulated) + await ReplaceClosedConnectionsAsync().ConfigureAwait(false); + } + + private IConnection ProxiedConnection(IConnection connection) + { + return new ProxyConnection(connection, ReplaceConnectionIfItWasClosed); + } + + private void ReplaceConnectionIfItWasClosed(IConnection connection) + { + if (connection.IsOpen) return; + ReplaceConnection(connection); } private async Task CloseAndRemoveAllConnectionsAsync() @@ -179,7 +231,7 @@ namespace Gremlin.Net.Driver } } - private void DefinitelyDestroyConnection(Connection connection) + private void DefinitelyDestroyConnection(IConnection connection) { connection.Dispose(); } diff --git a/gremlin-dotnet/src/Gremlin.Net/Driver/GremlinClient.cs b/gremlin-dotnet/src/Gremlin.Net/Driver/GremlinClient.cs index 2dc44ec..ba559b1 100644 --- a/gremlin-dotnet/src/Gremlin.Net/Driver/GremlinClient.cs +++ b/gremlin-dotnet/src/Gremlin.Net/Driver/GremlinClient.cs @@ -80,7 +80,7 @@ namespace Gremlin.Net.Driver /// <inheritdoc /> public async Task<ResultSet<T>> SubmitAsync<T>(RequestMessage requestMessage) { - using (var connection = await _connectionPool.GetAvailableConnectionAsync().ConfigureAwait(false)) + using (var connection = _connectionPool.GetAvailableConnection()) { return await connection.SubmitAsync<T>(requestMessage).ConfigureAwait(false); } diff --git a/gremlin-dotnet/src/Gremlin.Net/Driver/IConnection.cs b/gremlin-dotnet/src/Gremlin.Net/Driver/IConnection.cs index b5ef52c..7d29571 100644 --- a/gremlin-dotnet/src/Gremlin.Net/Driver/IConnection.cs +++ b/gremlin-dotnet/src/Gremlin.Net/Driver/IConnection.cs @@ -30,6 +30,10 @@ namespace Gremlin.Net.Driver { internal interface IConnection : IDisposable { + Task ConnectAsync(); Task<ResultSet<T>> SubmitAsync<T>(RequestMessage requestMessage); + int NrRequestsInFlight { get; } + bool IsOpen { get; } + Task CloseAsync(); } } \ No newline at end of file diff --git a/gremlin-dotnet/src/Gremlin.Net/Driver/IConnection.cs b/gremlin-dotnet/src/Gremlin.Net/Driver/IConnectionFactory.cs similarity index 78% copy from gremlin-dotnet/src/Gremlin.Net/Driver/IConnection.cs copy to gremlin-dotnet/src/Gremlin.Net/Driver/IConnectionFactory.cs index b5ef52c..0c7ace2 100644 --- a/gremlin-dotnet/src/Gremlin.Net/Driver/IConnection.cs +++ b/gremlin-dotnet/src/Gremlin.Net/Driver/IConnectionFactory.cs @@ -21,15 +21,10 @@ #endregion -using System; -using System.Collections.Generic; -using System.Threading.Tasks; -using Gremlin.Net.Driver.Messages; - namespace Gremlin.Net.Driver { - internal interface IConnection : IDisposable + internal interface IConnectionFactory { - Task<ResultSet<T>> SubmitAsync<T>(RequestMessage requestMessage); + IConnection CreateConnection(); } } \ No newline at end of file diff --git a/gremlin-dotnet/src/Gremlin.Net/Driver/ProxyConnection.cs b/gremlin-dotnet/src/Gremlin.Net/Driver/ProxyConnection.cs index fef6ede..421d310 100644 --- a/gremlin-dotnet/src/Gremlin.Net/Driver/ProxyConnection.cs +++ b/gremlin-dotnet/src/Gremlin.Net/Driver/ProxyConnection.cs @@ -30,23 +30,37 @@ namespace Gremlin.Net.Driver { internal sealed class ProxyConnection : IConnection { - private readonly Connection _realConnection; - private readonly Action<Connection> _releaseAction; + public IConnection ProxiedConnection { get; set; } + private readonly Action<IConnection> _releaseAction; - public ProxyConnection(Connection realConnection, Action<Connection> releaseAction) + public ProxyConnection(IConnection proxiedConnection, Action<IConnection> releaseAction) { - _realConnection = realConnection; + ProxiedConnection = proxiedConnection; _releaseAction = releaseAction; } + public async Task ConnectAsync() + { + await ProxiedConnection.ConnectAsync().ConfigureAwait(false); + } + public async Task<ResultSet<T>> SubmitAsync<T>(RequestMessage requestMessage) { - return await _realConnection.SubmitAsync<T>(requestMessage).ConfigureAwait(false); + return await ProxiedConnection.SubmitAsync<T>(requestMessage).ConfigureAwait(false); + } + + public int NrRequestsInFlight => ProxiedConnection.NrRequestsInFlight; + + public bool IsOpen => ProxiedConnection.IsOpen; + + public async Task CloseAsync() + { + await ProxiedConnection.CloseAsync().ConfigureAwait(false); } public void Dispose() { - _releaseAction(_realConnection); + _releaseAction(ProxiedConnection); } } } \ No newline at end of file diff --git a/gremlin-dotnet/src/Gremlin.Net/Gremlin.Net.csproj b/gremlin-dotnet/src/Gremlin.Net/Gremlin.Net.csproj index 4a43c81..20b8edf 100644 --- a/gremlin-dotnet/src/Gremlin.Net/Gremlin.Net.csproj +++ b/gremlin-dotnet/src/Gremlin.Net/Gremlin.Net.csproj @@ -63,11 +63,12 @@ NOTE that versions suffixed with "-rc" are considered release candidates (i.e. p <PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" PrivateAssets="All"/> <PackageReference Include="Newtonsoft.Json" Version="11.0.2" /> <PackageReference Include="Microsoft.CSharp" Version="4.3.0" /> + <PackageReference Include="Polly" Version="7.2.0" /> </ItemGroup> <ItemGroup> - <None Include="../../LICENSE" Pack="true" PackagePath=""/> - <None Include="../../NOTICE" Pack="true" PackagePath=""/> + <None Include="../../LICENSE" Pack="true" PackagePath="" /> + <None Include="../../NOTICE" Pack="true" PackagePath="" /> </ItemGroup> </Project> diff --git a/gremlin-dotnet/src/Gremlin.Net/Properties/AssemblyInfo.cs b/gremlin-dotnet/src/Gremlin.Net/Properties/AssemblyInfo.cs index 3f90e5d..4351b0e 100644 --- a/gremlin-dotnet/src/Gremlin.Net/Properties/AssemblyInfo.cs +++ b/gremlin-dotnet/src/Gremlin.Net/Properties/AssemblyInfo.cs @@ -23,4 +23,5 @@ using System.Runtime.CompilerServices; -[assembly: InternalsVisibleTo("Gremlin.Net.UnitTest, PublicKey=00240000048000009400000006020000002400005253413100040000010001009bbf7a5b9966d9207d8abb9d3d3e98f5e387b292742cfb791dc657357221c3ac9b38ab6dab89630dc8edb3cde84a107f493d192116a934afa463355eefd58b82fd08dc2616ee6074a74bf5845652864746e285bd04e2e1a87921e8e2c383d1b302e7bee1fd7cdab5fe2bbed8c6677624d63433548d43a873ab5650ed96fb0687")] \ No newline at end of file +[assembly: InternalsVisibleTo("Gremlin.Net.UnitTest, PublicKey=00240000048000009400000006020000002400005253413100040000010001009bbf7a5b9966d9207d8abb9d3d3e98f5e387b292742cfb791dc657357221c3ac9b38ab6dab89630dc8edb3cde84a107f493d192116a934afa463355eefd58b82fd08dc2616ee6074a74bf5845652864746e285bd04e2e1a87921e8e2c383d1b302e7bee1fd7cdab5fe2bbed8c6677624d63433548d43a873ab5650ed96fb0687")] +[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2, PublicKey=0024000004800000940000000602000000240000525341310004000001000100c547cac37abd99c8db225ef2f6c8a3602f3b3606cc9891605d02baa56104f4cfc0734aa39b93bf7852f7d9266654753cc297e7d2edfe0bac1cdcf9f717241550e0a7b191195b7667bb4f64bcb8e2121380fd1d9d46ad2d92d2d15605093924cceaf74c4861eff62abf69b9291ed0a340e113be11e6a7d3113e92484cf7045cc7")] \ No newline at end of file diff --git a/gremlin-dotnet/test/Gremlin.Net.UnitTest/Driver/ConnectionPoolTests.cs b/gremlin-dotnet/test/Gremlin.Net.UnitTest/Driver/ConnectionPoolTests.cs new file mode 100644 index 0000000..2d33d23 --- /dev/null +++ b/gremlin-dotnet/test/Gremlin.Net.UnitTest/Driver/ConnectionPoolTests.cs @@ -0,0 +1,193 @@ +#region License + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#endregion + +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using Gremlin.Net.Driver; +using Gremlin.Net.Driver.Exceptions; +using Moq; +using Xunit; + +namespace Gremlin.Net.UnitTest.Driver +{ + public class ConnectionPoolTests + { + [Theory] + [InlineData(1)] + [InlineData(2)] + [InlineData(10)] + public void ShouldEstablishConfiguredNrConnections(int poolSize) + { + var mockedConnectionFactory = new Mock<IConnectionFactory>(); + var mockedConnection = new Mock<IConnection>(); + mockedConnectionFactory.Setup(m => m.CreateConnection()).Returns(mockedConnection.Object); + var pool = CreateConnectionPool(mockedConnectionFactory.Object, poolSize); + + Assert.Equal(poolSize, pool.NrConnections); + mockedConnectionFactory.Verify(m => m.CreateConnection(), Times.Exactly(poolSize)); + mockedConnection.Verify(m => m.ConnectAsync(), Times.Exactly(poolSize)); + } + + [Fact] + public void GetAvailableConnectionShouldReturnFirstOpenConnection() + { + var fakeConnectionFactory = new Mock<IConnectionFactory>(); + var openConnectionToReturn = OpenConnection; + fakeConnectionFactory.SetupSequence(m => m.CreateConnection()).Returns(ClosedConnection) + .Returns(ClosedConnection).Returns(openConnectionToReturn); + var pool = CreateConnectionPool(fakeConnectionFactory.Object, 3); + + var returnedConnection = pool.GetAvailableConnection(); + + Assert.Equal(openConnectionToReturn, ((ProxyConnection) returnedConnection).ProxiedConnection); + } + + [Fact] + public void GetAvailableConnectionShouldThrowIfAllConnectionsAreClosed() + { + var fakeConnectionFactory = new Mock<IConnectionFactory>(); + fakeConnectionFactory.Setup(m => m.CreateConnection()).Returns(ClosedConnection); + var pool = CreateConnectionPool(fakeConnectionFactory.Object); + + Assert.Throws<ServerUnavailableException>(() => pool.GetAvailableConnection()); + } + + [Fact] + public void GetAvailableConnectionShouldReplaceClosedConnections() + { + var fakeConnectionFactory = new Mock<IConnectionFactory>(); + fakeConnectionFactory.SetupSequence(m => m.CreateConnection()).Returns(ClosedConnection) + .Returns(ClosedConnection).Returns(OpenConnection); + var pool = CreateConnectionPool(fakeConnectionFactory.Object, 3); + fakeConnectionFactory.Setup(m => m.CreateConnection()).Returns(OpenConnection); + var nrCreatedConnections = pool.NrConnections; + + pool.GetAvailableConnection(); + pool.GetAvailableConnection(); + pool.GetAvailableConnection(); + + AssertNrOpenConnections(pool, nrCreatedConnections); + } + + private static void AssertNrOpenConnections(ConnectionPool connectionPool, int expectedNrConnections) + { + for (var i = 0; i < expectedNrConnections; i++) + { + var connection = connectionPool.GetAvailableConnection(); + Assert.True(connection.IsOpen); + } + Assert.Equal(expectedNrConnections, connectionPool.NrConnections); + } + + [Fact] + public async Task ShouldNotCreateMoreConnectionsThanConfiguredForParallelRequests() + { + var mockedConnectionFactory = new Mock<IConnectionFactory>(); + mockedConnectionFactory.SetupSequence(m => m.CreateConnection()).Returns(ClosedConnection) + .Returns(ClosedConnection).Returns(OpenConnection); + var pool = CreateConnectionPool(mockedConnectionFactory.Object, 3); + mockedConnectionFactory.Setup(m => m.CreateConnection()).Returns(OpenConnection); + var nrCreatedConnections = pool.NrConnections; + var getConnectionTasks = new List<Task<IConnection>>(); + + for (var i = 0; i < 100; i++) + { + getConnectionTasks.Add(Task.Run(() => pool.GetAvailableConnection())); + } + await Task.WhenAll(getConnectionTasks); + + await Task.Delay(1000); + Assert.Equal(nrCreatedConnections, pool.NrConnections); + } + + [Fact] + public async Task ShouldReplaceConnectionClosedDuringSubmit() + { + var mockedConnectionFactory = new Mock<IConnectionFactory>(); + var fakedConnection = new Mock<IConnection>(); + fakedConnection.Setup(f => f.IsOpen).Returns(true); + mockedConnectionFactory.Setup(m => m.CreateConnection()).Returns(fakedConnection.Object); + var pool = CreateConnectionPool(mockedConnectionFactory.Object, 1); + var returnedConnection = pool.GetAvailableConnection(); + fakedConnection.Setup(f => f.IsOpen).Returns(false); + mockedConnectionFactory.Setup(m => m.CreateConnection()).Returns(OpenConnection); + + await returnedConnection.SubmitAsync<bool>(null); + returnedConnection.Dispose(); + + Assert.Equal(1, pool.NrConnections); + Assert.True(pool.GetAvailableConnection().IsOpen); + } + + [Fact] + public void ShouldWaitForHostToBecomeAvailable() + { + var fakeConnectionFactory = new Mock<IConnectionFactory>(); + fakeConnectionFactory.Setup(m => m.CreateConnection()).Returns(ClosedConnection); + var pool = CreateConnectionPool(fakeConnectionFactory.Object, 1); + fakeConnectionFactory.Setup(m => m.CreateConnection()).Returns(OpenConnection); + var nrCreatedConnections = pool.NrConnections; + + var connection = pool.GetAvailableConnection(); + + AssertNrOpenConnections(pool, nrCreatedConnections); + Assert.True(connection.IsOpen); + } + + [Fact] + public void ShouldThrowAfterWaitingTooLongForUnavailableServer() + { + var fakeConnectionFactory = new Mock<IConnectionFactory>(); + fakeConnectionFactory.Setup(m => m.CreateConnection()).Returns(ClosedConnection); + var pool = CreateConnectionPool(fakeConnectionFactory.Object, 1); + + Assert.Throws<ServerUnavailableException>(() => pool.GetAvailableConnection()); + } + + private static IConnection OpenConnection + { + get + { + var fakedConnection = new Mock<IConnection>(); + fakedConnection.Setup(f => f.IsOpen).Returns(true); + return fakedConnection.Object; + } + } + + private static IConnection ClosedConnection + { + get + { + var fakedConnection = new Mock<IConnection>(); + fakedConnection.Setup(f => f.IsOpen).Returns(false); + return fakedConnection.Object; + } + } + + private static ConnectionPool CreateConnectionPool(IConnectionFactory connectionFactory, int poolSize = 2) + { + return new ConnectionPool(connectionFactory, new ConnectionPoolSettings {PoolSize = poolSize}); + } + } +} \ No newline at end of file
