This is an automated email from the ASF dual-hosted git repository.

florianhockmann pushed a commit to branch TINKERPOP-2288
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit 217018e890f355d32318daeb1d380978853b8394
Author: Florian Hockmann <[email protected]>
AuthorDate: Tue Mar 10 16:27:42 2020 +0100

    TINKERPOP-2288 Replace closed connections directly
    
    Closed connections are now replaced automatically in the background.
    If no open connection is available to answer a request, then the pool
    tries it again after some time. It uses a retry policy with exponential
    backoff for that, implemented with Polly.
    This change also ensures that only one task performs a pool resizing
    operation at a time.
    
    These changes should ensure that:
    - A connection is still returned quickly if one is available.
    - Closed connections are replaced immediately, without needing to wait
     for the next incoming request.
    - If the server is only unavailable temporarily (or it just closed
    open connections for some reason), then the user should should not get
    an exception.
    He only has to wait until the connections are replaced.
    
    TODO:
    - Make the retry policy configurable.
    - Document changes.
---
 gremlin-dotnet/glv/Gremlin.Net.csproj.template     |   5 +-
 .../src/Gremlin.Net/Driver/ConnectionFactory.cs    |   4 +-
 .../src/Gremlin.Net/Driver/ConnectionPool.cs       | 118 +++++++++----
 .../src/Gremlin.Net/Driver/GremlinClient.cs        |   2 +-
 .../src/Gremlin.Net/Driver/IConnection.cs          |   4 +
 .../{IConnection.cs => IConnectionFactory.cs}      |   9 +-
 .../src/Gremlin.Net/Driver/ProxyConnection.cs      |  26 ++-
 gremlin-dotnet/src/Gremlin.Net/Gremlin.Net.csproj  |   5 +-
 .../src/Gremlin.Net/Properties/AssemblyInfo.cs     |   3 +-
 .../Driver/ConnectionPoolTests.cs                  | 193 +++++++++++++++++++++
 10 files changed, 315 insertions(+), 54 deletions(-)

diff --git a/gremlin-dotnet/glv/Gremlin.Net.csproj.template 
b/gremlin-dotnet/glv/Gremlin.Net.csproj.template
index a6a57a3..d7bac7e 100644
--- a/gremlin-dotnet/glv/Gremlin.Net.csproj.template
+++ b/gremlin-dotnet/glv/Gremlin.Net.csproj.template
@@ -63,11 +63,12 @@ NOTE that versions suffixed with "-rc" are considered 
release candidates (i.e. p
     <PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" 
PrivateAssets="All"/>
     <PackageReference Include="Newtonsoft.Json" Version="11.0.2" />
     <PackageReference Include="Microsoft.CSharp" Version="4.3.0" />
+    <PackageReference Include="Polly" Version="7.2.0" />
   </ItemGroup>
 
   <ItemGroup>
-    <None Include="../../LICENSE" Pack="true" PackagePath=""/>
-    <None Include="../../NOTICE" Pack="true" PackagePath=""/>
+    <None Include="../../LICENSE" Pack="true" PackagePath="" />
+    <None Include="../../NOTICE" Pack="true" PackagePath="" />
   </ItemGroup>
 
 </Project>
diff --git a/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionFactory.cs 
b/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionFactory.cs
index 7a6c2d5..c59aba7 100644
--- a/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionFactory.cs
+++ b/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionFactory.cs
@@ -27,7 +27,7 @@ using Gremlin.Net.Structure.IO.GraphSON;
 
 namespace Gremlin.Net.Driver
 {
-    internal class ConnectionFactory
+    internal class ConnectionFactory : IConnectionFactory
     {
         private readonly GraphSONReader _graphSONReader;
         private readonly GraphSONWriter _graphSONWriter;
@@ -45,7 +45,7 @@ namespace Gremlin.Net.Driver
             _webSocketConfiguration = webSocketConfiguration;
         }
 
-        public Connection CreateConnection()
+        public IConnection CreateConnection()
         {
             return new Connection(_gremlinServer.Uri, _gremlinServer.Username, 
_gremlinServer.Password, _graphSONReader,
                                  _graphSONWriter, _mimeType, 
_webSocketConfiguration);
diff --git a/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPool.cs 
b/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPool.cs
index 34bc77f..50138f7 100644
--- a/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPool.cs
+++ b/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPool.cs
@@ -22,11 +22,13 @@
 #endregion
 
 using System;
+using System.Collections.Concurrent;
 using System.Collections.Generic;
 using System.Threading;
 using System.Threading.Tasks;
 using Gremlin.Net.Driver.Exceptions;
 using Gremlin.Net.Process;
+using Polly;
 
 namespace Gremlin.Net.Driver
 {
@@ -34,8 +36,11 @@ namespace Gremlin.Net.Driver
     {
         private const int ConnectionIndexOverflowLimit = int.MaxValue - 
1000000;
         
-        private readonly ConnectionFactory _connectionFactory;
-        private readonly CopyOnWriteCollection<Connection> _connections = new 
CopyOnWriteCollection<Connection>();
+        private readonly IConnectionFactory _connectionFactory;
+        private readonly CopyOnWriteCollection<IConnection> _connections = new 
CopyOnWriteCollection<IConnection>();
+
+        private readonly ConcurrentDictionary<IConnection, byte> 
_deadConnections =
+            new ConcurrentDictionary<IConnection, byte>();
         private readonly int _poolSize;
         private readonly int _maxInProcessPerConnection;
         private int _connectionIndex;
@@ -43,53 +48,84 @@ namespace Gremlin.Net.Driver
         private const int PoolIdle = 0;
         private const int PoolPopulationInProgress = 1;
 
-        public ConnectionPool(ConnectionFactory connectionFactory, 
ConnectionPoolSettings settings)
+        public ConnectionPool(IConnectionFactory connectionFactory, 
ConnectionPoolSettings settings)
         {
             _connectionFactory = connectionFactory;
             _poolSize = settings.PoolSize;
             _maxInProcessPerConnection = settings.MaxInProcessPerConnection;
-            PopulatePoolAsync().WaitUnwrap();
+            ReplaceDeadConnectionsAsync().WaitUnwrap();
         }
         
         public int NrConnections => _connections.Count;
 
-        public async Task<IConnection> GetAvailableConnectionAsync()
+        public IConnection GetAvailableConnection()
         {
-            await EnsurePoolIsPopulatedAsync().ConfigureAwait(false);
-            return ProxiedConnection(GetConnectionFromPool());
+            var connection = Policy.Handle<ServerUnavailableException>()
+                .WaitAndRetry(3, attempt => TimeSpan.FromSeconds(Math.Pow(2, 
attempt)))
+                .Execute(GetConnectionFromPool);
+
+            return ProxiedConnection(connection);
         }
 
-        private async Task EnsurePoolIsPopulatedAsync()
+        /// <summary>
+        ///     Replaces dead connections.
+        /// </summary>
+        /// <returns>True if the pool was repaired, false if repairing was not 
necessary.</returns>
+        private async Task<bool> EnsurePoolIsHealthyAsync()
         {
-            // The pool could have been (partially) empty because of 
connection problems. So, we need to populate it again.
-            if (_poolSize <= NrConnections) return;
+            if (_deadConnections.IsEmpty) return false;
             var poolState = Interlocked.CompareExchange(ref _poolState, 
PoolPopulationInProgress, PoolIdle);
-            if (poolState == PoolPopulationInProgress) return;
+            if (poolState == PoolPopulationInProgress) return false;
             try
             {
-                await PopulatePoolAsync().ConfigureAwait(false);
+                await ReplaceDeadConnectionsAsync().ConfigureAwait(false);
             }
             finally
             {
                 // We need to remove the PoolPopulationInProgress flag again 
even if an exception occurred, so we don't block the pool population for ever
                 Interlocked.CompareExchange(ref _poolState, PoolIdle, 
PoolPopulationInProgress);
             }
+
+            return true;
+        }
+        
+        private async Task ReplaceDeadConnectionsAsync()
+        {
+            RemoveDeadConnections();
+
+            await FillPoolAsync().ConfigureAwait(false);
+        }
+
+        private void RemoveDeadConnections()
+        {
+            if (_deadConnections.IsEmpty) return;
+            
+            foreach (var deadConnection in _deadConnections.Keys)
+            {
+                if (_connections.TryRemove(deadConnection))
+                {
+                    DefinitelyDestroyConnection(deadConnection);
+                }
+            }
+
+            _deadConnections.Clear();
         }
         
-        private async Task PopulatePoolAsync()
+        private async Task FillPoolAsync()
         {
             var nrConnectionsToCreate = _poolSize - _connections.Count;
-            var connectionCreationTasks = new 
List<Task<Connection>>(nrConnectionsToCreate);
+            var connectionCreationTasks = new 
List<Task<IConnection>>(nrConnectionsToCreate);
             try
             {
                 for (var i = 0; i < nrConnectionsToCreate; i++)
                 {
                     connectionCreationTasks.Add(CreateNewConnectionAsync());
                 }
+
                 var createdConnections = await 
Task.WhenAll(connectionCreationTasks).ConfigureAwait(false);
                 _connections.AddRange(createdConnections);
             }
-            catch(Exception)
+            catch (Exception)
             {
                 // Dispose created connections if the connection establishment 
failed
                 foreach (var creationTask in connectionCreationTasks)
@@ -97,42 +133,45 @@ namespace Gremlin.Net.Driver
                     if (!creationTask.IsFaulted)
                         creationTask.Result?.Dispose();
                 }
+
                 throw;
             }
         }
-        
-        private async Task<Connection> CreateNewConnectionAsync()
+
+        private async Task<IConnection> CreateNewConnectionAsync()
         {
             var newConnection = _connectionFactory.CreateConnection();
             await newConnection.ConnectAsync().ConfigureAwait(false);
             return newConnection;
         }
 
-        private Connection GetConnectionFromPool()
+        private IConnection GetConnectionFromPool()
         {
             var connections = _connections.Snapshot;
             if (connections.Length == 0) throw new 
ServerUnavailableException();
             return TryGetAvailableConnection(connections);
         }
-
-        private Connection TryGetAvailableConnection(Connection[] connections)
+        
+        private IConnection TryGetAvailableConnection(IConnection[] 
connections)
         {
             var index = Interlocked.Increment(ref _connectionIndex);
             ProtectIndexFromOverflowing(index);
 
+            var closedConnections = 0;
             for (var i = 0; i < connections.Length; i++)
             {
                 var connection = connections[(index + i) % connections.Length];
                 if (connection.NrRequestsInFlight >= 
_maxInProcessPerConnection) continue;
                 if (!connection.IsOpen)
                 {
-                    RemoveConnectionFromPool(connection);
+                    ReplaceConnection(connection);
+                    closedConnections++;
                     continue;
                 }
                 return connection;
             }
 
-            if (connections.Length > 0) 
+            if (connections.Length > closedConnections) 
             {
                 throw new ConnectionPoolBusyException(_poolSize, 
_maxInProcessPerConnection);
             }
@@ -148,26 +187,39 @@ namespace Gremlin.Net.Driver
                 Interlocked.Exchange(ref _connectionIndex, 0);
         }
 
-        private void RemoveConnectionFromPool(Connection connection)
+        private void ReplaceConnection(IConnection connection)
         {
-            if (_connections.TryRemove(connection))
-                DefinitelyDestroyConnection(connection);
+            RemoveConnectionFromPool(connection);
+            TriggerReplacementOfDeadConnections();
         }
         
-        private IConnection ProxiedConnection(Connection connection)
+        private void RemoveConnectionFromPool(IConnection connection)
         {
-            return new ProxyConnection(connection, ReturnConnectionIfOpen);
+            _deadConnections.TryAdd(connection, 0);
         }
 
-        private void ReturnConnectionIfOpen(Connection connection)
+        private void TriggerReplacementOfDeadConnections()
         {
-            if (connection.IsOpen) return;
-            ConsiderUnavailable();
+            ReplaceClosedConnectionsAsync().Forget();
         }
 
-        private void ConsiderUnavailable()
+        private async Task ReplaceClosedConnectionsAsync()
         {
-            CloseAndRemoveAllConnectionsAsync().WaitUnwrap();
+            var poolWasPopulated = await 
EnsurePoolIsHealthyAsync().ConfigureAwait(false);
+            // Another connection could have been removed already, check if 
another population is necessary
+            if (poolWasPopulated)
+                await ReplaceClosedConnectionsAsync().ConfigureAwait(false);
+        }
+
+        private IConnection ProxiedConnection(IConnection connection)
+        {
+            return new ProxyConnection(connection, 
ReplaceConnectionIfItWasClosed);
+        }
+
+        private void ReplaceConnectionIfItWasClosed(IConnection connection)
+        {
+            if (connection.IsOpen) return;
+            ReplaceConnection(connection);
         }
 
         private async Task CloseAndRemoveAllConnectionsAsync()
@@ -179,7 +231,7 @@ namespace Gremlin.Net.Driver
             }
         }
 
-        private void DefinitelyDestroyConnection(Connection connection)
+        private void DefinitelyDestroyConnection(IConnection connection)
         {
             connection.Dispose();
         }
diff --git a/gremlin-dotnet/src/Gremlin.Net/Driver/GremlinClient.cs 
b/gremlin-dotnet/src/Gremlin.Net/Driver/GremlinClient.cs
index 2dc44ec..ba559b1 100644
--- a/gremlin-dotnet/src/Gremlin.Net/Driver/GremlinClient.cs
+++ b/gremlin-dotnet/src/Gremlin.Net/Driver/GremlinClient.cs
@@ -80,7 +80,7 @@ namespace Gremlin.Net.Driver
         /// <inheritdoc />
         public async Task<ResultSet<T>> SubmitAsync<T>(RequestMessage 
requestMessage)
         {
-            using (var connection = await 
_connectionPool.GetAvailableConnectionAsync().ConfigureAwait(false))
+            using (var connection = _connectionPool.GetAvailableConnection())
             {
                 return await 
connection.SubmitAsync<T>(requestMessage).ConfigureAwait(false);
             }
diff --git a/gremlin-dotnet/src/Gremlin.Net/Driver/IConnection.cs 
b/gremlin-dotnet/src/Gremlin.Net/Driver/IConnection.cs
index b5ef52c..7d29571 100644
--- a/gremlin-dotnet/src/Gremlin.Net/Driver/IConnection.cs
+++ b/gremlin-dotnet/src/Gremlin.Net/Driver/IConnection.cs
@@ -30,6 +30,10 @@ namespace Gremlin.Net.Driver
 {
     internal interface IConnection : IDisposable
     {
+        Task ConnectAsync();
         Task<ResultSet<T>> SubmitAsync<T>(RequestMessage requestMessage);
+        int NrRequestsInFlight { get; }
+        bool IsOpen { get; }
+        Task CloseAsync();
     }
 }
\ No newline at end of file
diff --git a/gremlin-dotnet/src/Gremlin.Net/Driver/IConnection.cs 
b/gremlin-dotnet/src/Gremlin.Net/Driver/IConnectionFactory.cs
similarity index 78%
copy from gremlin-dotnet/src/Gremlin.Net/Driver/IConnection.cs
copy to gremlin-dotnet/src/Gremlin.Net/Driver/IConnectionFactory.cs
index b5ef52c..0c7ace2 100644
--- a/gremlin-dotnet/src/Gremlin.Net/Driver/IConnection.cs
+++ b/gremlin-dotnet/src/Gremlin.Net/Driver/IConnectionFactory.cs
@@ -21,15 +21,10 @@
 
 #endregion
 
-using System;
-using System.Collections.Generic;
-using System.Threading.Tasks;
-using Gremlin.Net.Driver.Messages;
-
 namespace Gremlin.Net.Driver
 {
-    internal interface IConnection : IDisposable
+    internal interface IConnectionFactory
     {
-        Task<ResultSet<T>> SubmitAsync<T>(RequestMessage requestMessage);
+        IConnection CreateConnection();
     }
 }
\ No newline at end of file
diff --git a/gremlin-dotnet/src/Gremlin.Net/Driver/ProxyConnection.cs 
b/gremlin-dotnet/src/Gremlin.Net/Driver/ProxyConnection.cs
index fef6ede..421d310 100644
--- a/gremlin-dotnet/src/Gremlin.Net/Driver/ProxyConnection.cs
+++ b/gremlin-dotnet/src/Gremlin.Net/Driver/ProxyConnection.cs
@@ -30,23 +30,37 @@ namespace Gremlin.Net.Driver
 {
     internal sealed class ProxyConnection : IConnection
     {
-        private readonly Connection _realConnection;
-        private readonly Action<Connection> _releaseAction;
+        public IConnection ProxiedConnection { get; set; }
+        private readonly Action<IConnection> _releaseAction;
 
-        public ProxyConnection(Connection realConnection, Action<Connection> 
releaseAction)
+        public ProxyConnection(IConnection proxiedConnection, 
Action<IConnection> releaseAction)
         {
-            _realConnection = realConnection;
+            ProxiedConnection = proxiedConnection;
             _releaseAction = releaseAction;
         }
 
+        public async Task ConnectAsync()
+        {
+            await ProxiedConnection.ConnectAsync().ConfigureAwait(false);
+        }
+
         public async Task<ResultSet<T>> SubmitAsync<T>(RequestMessage 
requestMessage)
         {
-            return await 
_realConnection.SubmitAsync<T>(requestMessage).ConfigureAwait(false);
+            return await 
ProxiedConnection.SubmitAsync<T>(requestMessage).ConfigureAwait(false);
+        }
+
+        public int NrRequestsInFlight => ProxiedConnection.NrRequestsInFlight;
+
+        public bool IsOpen => ProxiedConnection.IsOpen;
+
+        public async Task CloseAsync()
+        {
+            await ProxiedConnection.CloseAsync().ConfigureAwait(false);
         }
 
         public void Dispose()
         {
-            _releaseAction(_realConnection);
+            _releaseAction(ProxiedConnection);
         }
     }
 }
\ No newline at end of file
diff --git a/gremlin-dotnet/src/Gremlin.Net/Gremlin.Net.csproj 
b/gremlin-dotnet/src/Gremlin.Net/Gremlin.Net.csproj
index 4a43c81..20b8edf 100644
--- a/gremlin-dotnet/src/Gremlin.Net/Gremlin.Net.csproj
+++ b/gremlin-dotnet/src/Gremlin.Net/Gremlin.Net.csproj
@@ -63,11 +63,12 @@ NOTE that versions suffixed with "-rc" are considered 
release candidates (i.e. p
     <PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" 
PrivateAssets="All"/>
     <PackageReference Include="Newtonsoft.Json" Version="11.0.2" />
     <PackageReference Include="Microsoft.CSharp" Version="4.3.0" />
+    <PackageReference Include="Polly" Version="7.2.0" />
   </ItemGroup>
 
   <ItemGroup>
-    <None Include="../../LICENSE" Pack="true" PackagePath=""/>
-    <None Include="../../NOTICE" Pack="true" PackagePath=""/>
+    <None Include="../../LICENSE" Pack="true" PackagePath="" />
+    <None Include="../../NOTICE" Pack="true" PackagePath="" />
   </ItemGroup>
 
 </Project>
diff --git a/gremlin-dotnet/src/Gremlin.Net/Properties/AssemblyInfo.cs 
b/gremlin-dotnet/src/Gremlin.Net/Properties/AssemblyInfo.cs
index 3f90e5d..4351b0e 100644
--- a/gremlin-dotnet/src/Gremlin.Net/Properties/AssemblyInfo.cs
+++ b/gremlin-dotnet/src/Gremlin.Net/Properties/AssemblyInfo.cs
@@ -23,4 +23,5 @@
 
 using System.Runtime.CompilerServices;
 
-[assembly: InternalsVisibleTo("Gremlin.Net.UnitTest, 
PublicKey=00240000048000009400000006020000002400005253413100040000010001009bbf7a5b9966d9207d8abb9d3d3e98f5e387b292742cfb791dc657357221c3ac9b38ab6dab89630dc8edb3cde84a107f493d192116a934afa463355eefd58b82fd08dc2616ee6074a74bf5845652864746e285bd04e2e1a87921e8e2c383d1b302e7bee1fd7cdab5fe2bbed8c6677624d63433548d43a873ab5650ed96fb0687")]
\ No newline at end of file
+[assembly: InternalsVisibleTo("Gremlin.Net.UnitTest, 
PublicKey=00240000048000009400000006020000002400005253413100040000010001009bbf7a5b9966d9207d8abb9d3d3e98f5e387b292742cfb791dc657357221c3ac9b38ab6dab89630dc8edb3cde84a107f493d192116a934afa463355eefd58b82fd08dc2616ee6074a74bf5845652864746e285bd04e2e1a87921e8e2c383d1b302e7bee1fd7cdab5fe2bbed8c6677624d63433548d43a873ab5650ed96fb0687")]
+[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2, 
PublicKey=0024000004800000940000000602000000240000525341310004000001000100c547cac37abd99c8db225ef2f6c8a3602f3b3606cc9891605d02baa56104f4cfc0734aa39b93bf7852f7d9266654753cc297e7d2edfe0bac1cdcf9f717241550e0a7b191195b7667bb4f64bcb8e2121380fd1d9d46ad2d92d2d15605093924cceaf74c4861eff62abf69b9291ed0a340e113be11e6a7d3113e92484cf7045cc7")]
\ No newline at end of file
diff --git 
a/gremlin-dotnet/test/Gremlin.Net.UnitTest/Driver/ConnectionPoolTests.cs 
b/gremlin-dotnet/test/Gremlin.Net.UnitTest/Driver/ConnectionPoolTests.cs
new file mode 100644
index 0000000..2d33d23
--- /dev/null
+++ b/gremlin-dotnet/test/Gremlin.Net.UnitTest/Driver/ConnectionPoolTests.cs
@@ -0,0 +1,193 @@
+#region License
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#endregion
+
+using System;
+using System.Collections.Generic;
+using System.Threading.Tasks;
+using Gremlin.Net.Driver;
+using Gremlin.Net.Driver.Exceptions;
+using Moq;
+using Xunit;
+
+namespace Gremlin.Net.UnitTest.Driver
+{
+    public class ConnectionPoolTests
+    {
+        [Theory]
+        [InlineData(1)]
+        [InlineData(2)]
+        [InlineData(10)]
+        public void ShouldEstablishConfiguredNrConnections(int poolSize)
+        {
+            var mockedConnectionFactory = new Mock<IConnectionFactory>();
+            var mockedConnection = new Mock<IConnection>();
+            mockedConnectionFactory.Setup(m => 
m.CreateConnection()).Returns(mockedConnection.Object);
+            var pool = CreateConnectionPool(mockedConnectionFactory.Object, 
poolSize);
+            
+            Assert.Equal(poolSize, pool.NrConnections);
+            mockedConnectionFactory.Verify(m => m.CreateConnection(), 
Times.Exactly(poolSize));
+            mockedConnection.Verify(m => m.ConnectAsync(), 
Times.Exactly(poolSize));
+        }
+
+        [Fact]
+        public void GetAvailableConnectionShouldReturnFirstOpenConnection()
+        {
+            var fakeConnectionFactory = new Mock<IConnectionFactory>();
+            var openConnectionToReturn = OpenConnection;
+            fakeConnectionFactory.SetupSequence(m => 
m.CreateConnection()).Returns(ClosedConnection)
+                .Returns(ClosedConnection).Returns(openConnectionToReturn);
+            var pool = CreateConnectionPool(fakeConnectionFactory.Object, 3);
+
+            var returnedConnection = pool.GetAvailableConnection();
+
+            Assert.Equal(openConnectionToReturn, ((ProxyConnection) 
returnedConnection).ProxiedConnection);
+        }
+        
+        [Fact]
+        public void 
GetAvailableConnectionShouldThrowIfAllConnectionsAreClosed()
+        {
+            var fakeConnectionFactory = new Mock<IConnectionFactory>();
+            fakeConnectionFactory.Setup(m => 
m.CreateConnection()).Returns(ClosedConnection);
+            var pool = CreateConnectionPool(fakeConnectionFactory.Object);
+
+            Assert.Throws<ServerUnavailableException>(() => 
pool.GetAvailableConnection());
+        }
+        
+        [Fact]
+        public void GetAvailableConnectionShouldReplaceClosedConnections()
+        {
+            var fakeConnectionFactory = new Mock<IConnectionFactory>();
+            fakeConnectionFactory.SetupSequence(m => 
m.CreateConnection()).Returns(ClosedConnection)
+                .Returns(ClosedConnection).Returns(OpenConnection);
+            var pool = CreateConnectionPool(fakeConnectionFactory.Object, 3);
+            fakeConnectionFactory.Setup(m => 
m.CreateConnection()).Returns(OpenConnection);
+            var nrCreatedConnections = pool.NrConnections;
+            
+            pool.GetAvailableConnection();
+            pool.GetAvailableConnection();
+            pool.GetAvailableConnection();
+
+            AssertNrOpenConnections(pool, nrCreatedConnections);
+        }
+
+        private static void AssertNrOpenConnections(ConnectionPool 
connectionPool, int expectedNrConnections)
+        {
+            for (var i = 0; i < expectedNrConnections; i++)
+            {
+                var connection = connectionPool.GetAvailableConnection();
+                Assert.True(connection.IsOpen);
+            }
+            Assert.Equal(expectedNrConnections, connectionPool.NrConnections);
+        }
+        
+        [Fact]
+        public async Task 
ShouldNotCreateMoreConnectionsThanConfiguredForParallelRequests()
+        {
+            var mockedConnectionFactory = new Mock<IConnectionFactory>();
+            mockedConnectionFactory.SetupSequence(m => 
m.CreateConnection()).Returns(ClosedConnection)
+                .Returns(ClosedConnection).Returns(OpenConnection);
+            var pool = CreateConnectionPool(mockedConnectionFactory.Object, 3);
+            mockedConnectionFactory.Setup(m => 
m.CreateConnection()).Returns(OpenConnection);
+            var nrCreatedConnections = pool.NrConnections;
+            var getConnectionTasks = new List<Task<IConnection>>();
+
+            for (var i = 0; i < 100; i++)
+            {
+                getConnectionTasks.Add(Task.Run(() => 
pool.GetAvailableConnection()));
+            }
+            await Task.WhenAll(getConnectionTasks);
+
+            await Task.Delay(1000);
+            Assert.Equal(nrCreatedConnections, pool.NrConnections);
+        }
+
+        [Fact]
+        public async Task ShouldReplaceConnectionClosedDuringSubmit()
+        {
+            var mockedConnectionFactory = new Mock<IConnectionFactory>();
+            var fakedConnection = new Mock<IConnection>();
+            fakedConnection.Setup(f => f.IsOpen).Returns(true);
+            mockedConnectionFactory.Setup(m => 
m.CreateConnection()).Returns(fakedConnection.Object);
+            var pool = CreateConnectionPool(mockedConnectionFactory.Object, 1);
+            var returnedConnection = pool.GetAvailableConnection();
+            fakedConnection.Setup(f => f.IsOpen).Returns(false);
+            mockedConnectionFactory.Setup(m => 
m.CreateConnection()).Returns(OpenConnection);
+
+            await returnedConnection.SubmitAsync<bool>(null);
+            returnedConnection.Dispose();
+
+            Assert.Equal(1, pool.NrConnections);
+            Assert.True(pool.GetAvailableConnection().IsOpen);
+        }
+
+        [Fact]
+        public void ShouldWaitForHostToBecomeAvailable()
+        {
+            var fakeConnectionFactory = new Mock<IConnectionFactory>();
+            fakeConnectionFactory.Setup(m => 
m.CreateConnection()).Returns(ClosedConnection);
+            var pool = CreateConnectionPool(fakeConnectionFactory.Object, 1);
+            fakeConnectionFactory.Setup(m => 
m.CreateConnection()).Returns(OpenConnection);
+            var nrCreatedConnections = pool.NrConnections;
+            
+            var connection = pool.GetAvailableConnection();
+
+            AssertNrOpenConnections(pool, nrCreatedConnections);
+            Assert.True(connection.IsOpen);
+        }
+
+        [Fact]
+        public void ShouldThrowAfterWaitingTooLongForUnavailableServer()
+        {
+            var fakeConnectionFactory = new Mock<IConnectionFactory>();
+            fakeConnectionFactory.Setup(m => 
m.CreateConnection()).Returns(ClosedConnection);
+            var pool = CreateConnectionPool(fakeConnectionFactory.Object, 1);
+            
+            Assert.Throws<ServerUnavailableException>(() => 
pool.GetAvailableConnection());
+        }
+
+        private static IConnection OpenConnection
+        {
+            get
+            {
+                var fakedConnection = new Mock<IConnection>();
+                fakedConnection.Setup(f => f.IsOpen).Returns(true);
+                return fakedConnection.Object;
+            }
+        }
+        
+        private static IConnection ClosedConnection
+        {
+            get
+            {
+                var fakedConnection = new Mock<IConnection>();
+                fakedConnection.Setup(f => f.IsOpen).Returns(false);
+                return fakedConnection.Object;
+            }
+        }
+
+        private static ConnectionPool CreateConnectionPool(IConnectionFactory 
connectionFactory, int poolSize = 2)
+        {
+            return new ConnectionPool(connectionFactory, new 
ConnectionPoolSettings {PoolSize = poolSize});
+        }
+    }
+}
\ No newline at end of file

Reply via email to