Use own AsyncAutoResetEvent in ConnectionPool

This replaces the synchronous AutoResetEvent with our own
AsyncAutoResetEvent to avoid blocking threads that wait for an available
connection.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/e16e6246
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/e16e6246
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/e16e6246

Branch: refs/heads/TINKERPOP-1774
Commit: e16e6246ff9e1a42e9f94e6c376af9882508987b
Parents: 962ebe3
Author: Florian Hockmann <f...@florian-hockmann.de>
Authored: Thu Aug 30 16:24:51 2018 +0200
Committer: Florian Hockmann <f...@florian-hockmann.de>
Committed: Thu Aug 30 16:30:06 2018 +0200

----------------------------------------------------------------------
 .../Gremlin.Net/Driver/AsyncAutoResetEvent.cs   | 103 +++++++++++
 .../src/Gremlin.Net/Driver/ConnectionPool.cs    |   9 +-
 .../Driver/AsyncAutoResetEventTests.cs          | 169 +++++++++++++++++++
 pom.xml                                         |   1 +
 4 files changed, 278 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/e16e6246/gremlin-dotnet/src/Gremlin.Net/Driver/AsyncAutoResetEvent.cs
----------------------------------------------------------------------
diff --git a/gremlin-dotnet/src/Gremlin.Net/Driver/AsyncAutoResetEvent.cs 
b/gremlin-dotnet/src/Gremlin.Net/Driver/AsyncAutoResetEvent.cs
new file mode 100644
index 0000000..52c07b0
--- /dev/null
+++ b/gremlin-dotnet/src/Gremlin.Net/Driver/AsyncAutoResetEvent.cs
@@ -0,0 +1,103 @@
+#region License
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#endregion
+
+using System;
+using System.Collections.Generic;
+using System.Threading.Tasks;
+
+// The implementation is based on this blog post by Stephen Toub:
+// 
https://blogs.msdn.microsoft.com/pfxteam/2012/02/11/building-async-coordination-primitives-part-2-asyncautoresetevent/
+
+namespace Gremlin.Net.Driver
+{
+    /// <summary>
+    ///     An async version of the AutoResetEvent.
+    /// </summary>
+    public class AsyncAutoResetEvent
+    {
+        private static readonly Task<bool> CompletedTask = 
Task.FromResult(true);
+        private readonly List<TaskCompletionSource<bool>> _waitingTasks = new 
List<TaskCompletionSource<bool>>();
+        private bool _isSignaled;
+        
+        /// <summary>
+        ///     Asynchronously waits for this event to be set or until a 
timeout occurs.
+        /// </summary>
+        /// <param name="timeout">A <see cref="TimeSpan"/> that that 
represents the number of milliseconds to wait.</param>
+        /// <returns>true if the current instance received a signal before 
timing out; otherwise, false.</returns>
+        public async Task<bool> WaitOneAsync(TimeSpan timeout)
+        {
+            var tcs = new TaskCompletionSource<bool>();
+            var waitTask = WaitForSignalAsync(tcs);
+            if (waitTask.IsCompleted) return true;
+            
+            await Task.WhenAny(waitTask, 
Task.Delay(timeout)).ConfigureAwait(false);
+            lock (_waitingTasks)
+            {
+                if (!waitTask.IsCompleted)
+                {
+                    // The wait timed out, so we need to remove the waiting 
task.
+                    _waitingTasks.Remove(tcs);
+                    tcs.SetResult(false);
+                }
+            }
+            
+            return waitTask.Result;
+        }
+
+        private Task<bool> WaitForSignalAsync(TaskCompletionSource<bool> tcs)
+        {
+            lock (_waitingTasks)
+            {
+                if (_isSignaled)
+                {
+                    _isSignaled = false;
+                    return CompletedTask;
+                }
+                _waitingTasks.Add(tcs);
+            }
+            return tcs.Task;
+        }
+        
+        /// <summary>
+        ///     Sets the event.
+        /// </summary>
+        public void Set()
+        {
+            TaskCompletionSource<bool> toRelease = null;
+            lock (_waitingTasks)
+            {
+                if (_waitingTasks.Count == 0)
+                {
+                    _isSignaled = true;
+                }
+                else
+                {
+                    toRelease = _waitingTasks[0];
+                    _waitingTasks.RemoveAt(0);
+                }
+            }
+
+            toRelease?.SetResult(true);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/e16e6246/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPool.cs
----------------------------------------------------------------------
diff --git a/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPool.cs 
b/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPool.cs
index a65208a..e76cf51 100644
--- a/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPool.cs
+++ b/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPool.cs
@@ -34,7 +34,7 @@ namespace Gremlin.Net.Driver
     {
         private readonly ConnectionFactory _connectionFactory;
         private readonly ConcurrentBag<Connection> _connections = new 
ConcurrentBag<Connection>();
-        private readonly AutoResetEvent _newConnectionAvailable = new 
AutoResetEvent(false);
+        private readonly AsyncAutoResetEvent _newConnectionAvailable = new 
AsyncAutoResetEvent();
         private readonly int _minPoolSize;
         private readonly int _maxPoolSize;
         private readonly TimeSpan _waitForConnectionTimeout;
@@ -72,7 +72,8 @@ namespace Gremlin.Net.Driver
         {
             if (TryGetConnectionFromPool(out var connection))
                 return ProxiedConnection(connection);
-            connection = await 
AddConnectionIfUnderMaximumAsync().ConfigureAwait(false) ?? WaitForConnection();
+            connection = await 
AddConnectionIfUnderMaximumAsync().ConfigureAwait(false) ??
+                         await WaitForConnectionAsync().ConfigureAwait(false);
             return ProxiedConnection(connection);
         }
 
@@ -115,13 +116,13 @@ namespace Gremlin.Net.Driver
             return newConnection;
         }
 
-        private Connection WaitForConnection()
+        private async Task<Connection> WaitForConnectionAsync()
         {
             var start = DateTimeOffset.Now;
             var remaining = _waitForConnectionTimeout;
             do
             {
-                if (_newConnectionAvailable.WaitOne(remaining))
+                if (await 
_newConnectionAvailable.WaitOneAsync(remaining).ConfigureAwait(false))
                 {
                     if (TryGetConnectionFromPool(out var connection))
                         return connection;

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/e16e6246/gremlin-dotnet/test/Gremlin.Net.UnitTest/Driver/AsyncAutoResetEventTests.cs
----------------------------------------------------------------------
diff --git 
a/gremlin-dotnet/test/Gremlin.Net.UnitTest/Driver/AsyncAutoResetEventTests.cs 
b/gremlin-dotnet/test/Gremlin.Net.UnitTest/Driver/AsyncAutoResetEventTests.cs
new file mode 100644
index 0000000..26a5a58
--- /dev/null
+++ 
b/gremlin-dotnet/test/Gremlin.Net.UnitTest/Driver/AsyncAutoResetEventTests.cs
@@ -0,0 +1,169 @@
+#region License
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#endregion
+
+using System;
+using System.Collections.Generic;
+using System.Threading.Tasks;
+using Gremlin.Net.Driver;
+using Xunit;
+
+namespace Gremlin.Net.UnitTest.Driver
+{
+    public class AsyncAutoResetEventTests
+    {
+        private static readonly TimeSpan DefaultTimeout = 
TimeSpan.FromMilliseconds(100);
+        
+        [Fact]
+        public async Task WaitOneAsync_AfterSet_CompletesSynchronously()
+        {
+            var are = new AsyncAutoResetEvent();
+
+            are.Set();
+            var task = are.WaitOneAsync(DefaultTimeout);
+            
+            Assert.True(task.IsCompleted);
+            Assert.True(await task);
+        }
+        
+        [Fact]
+        public async Task 
MultipleWaitOneAsync_AfterSet_OnlyFirstWaitIsSuccessful()
+        {
+            var are = new AsyncAutoResetEvent();
+
+            are.Set();
+            var task1 = are.WaitOneAsync(DefaultTimeout);
+            var task2 = are.WaitOneAsync(DefaultTimeout);
+
+            Assert.True(task1.IsCompleted);
+            Assert.True(await task1);
+            Assert.False(await task2);
+        }
+
+        [Fact]
+        public async Task 
MultipleWaitOneAsync_AfterMultipleSet_OnlyFirstWaitIsSuccessful()
+        {
+            var are = new AsyncAutoResetEvent();
+
+            are.Set();
+            are.Set();
+            var task1 = are.WaitOneAsync(DefaultTimeout);
+            var task2 = are.WaitOneAsync(DefaultTimeout);
+
+            Assert.True(task1.IsCompleted);
+            Assert.True(await task1);
+            Assert.False(await task2);
+        }
+        
+        [Fact]
+        public async Task WaitOneAsync_SetBeforeTimeout_WaitSuccessful()
+        {
+            var are = new AsyncAutoResetEvent();
+            
+            var task = are.WaitOneAsync(DefaultTimeout);
+            are.Set();
+            
+            Assert.True(await task);
+        }
+
+        [Fact]
+        public async Task 
Set_AfterMultipleWaitOneAsync_OnlyFirstWaitIsSuccessful()
+        {
+            var are = new AsyncAutoResetEvent();
+            
+            var task1 = are.WaitOneAsync(DefaultTimeout);
+            var task2 = are.WaitOneAsync(DefaultTimeout);
+            are.Set();
+
+            await AssertCompletesBeforeTimeoutAsync(task1, 
DefaultTimeout.Milliseconds + 50);
+            Assert.False(await task2);
+        }
+        
+        [Fact]
+        public async Task WaitOneAsync_NotSet_OnlyWaitUntilTimeout()
+        {
+            var are = new AsyncAutoResetEvent();
+
+            var task = are.WaitOneAsync(DefaultTimeout);
+
+            await AssertCompletesBeforeTimeoutAsync(task, 
DefaultTimeout.Milliseconds + 50);
+        }
+        
+        [Fact]
+        public async Task WaitOneAsync_NotSet_WaitNotSuccessful()
+        {
+            var are = new AsyncAutoResetEvent();
+
+            var task = are.WaitOneAsync(DefaultTimeout);
+
+            Assert.False(await task);
+        }
+
+        [Fact]
+        public async Task 
WaitOneAsync_SetAfterPreviousWaitTimedOut_OnlySecondWaitSuccessful()
+        {
+            var are = new AsyncAutoResetEvent();
+
+            var task1 = are.WaitOneAsync(DefaultTimeout);
+            await Task.Delay(DefaultTimeout + TimeSpan.FromMilliseconds(50));
+            var task2 = are.WaitOneAsync(DefaultTimeout);
+            are.Set();
+            
+            Assert.False(await task1);
+            Assert.True(await task2);
+        }
+        
+        [Fact]
+        public async Task 
WaitOneAsync_SetAfterMultipleWaitsTimedOut_OnlyLastWaitSuccessful()
+        {
+            var are = new AsyncAutoResetEvent();
+
+            var timedOutTasks = new List<Task<bool>>();
+            for (var i = 0; i < 1000; i++)
+            {
+                timedOutTasks.Add(are.WaitOneAsync(DefaultTimeout));
+            }
+            
+            await Task.Delay(DefaultTimeout + TimeSpan.FromMilliseconds(50));
+            var task2 = are.WaitOneAsync(DefaultTimeout);
+            are.Set();
+
+            foreach (var t in timedOutTasks)
+            {
+                Assert.False(await t);
+            }
+            Assert.True(await task2);
+        }
+
+        private static async Task AssertCompletesBeforeTimeoutAsync(Task task, 
int timeoutInMs)
+        {
+            var completedTask = await WaitForTaskOrTimeoutAsync(task, 
TimeSpan.FromMilliseconds(timeoutInMs));
+            if (completedTask != task)
+                throw new Exception("Task did not complete.");
+        }
+
+        private static Task<Task> WaitForTaskOrTimeoutAsync(Task task, 
TimeSpan timeout)
+        {
+            return Task.WhenAny(task, Task.Delay(timeout));
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/e16e6246/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index dfb5824..50e84c4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -401,6 +401,7 @@ limitations under the License.
                         <exclude>**/node/node_modules/**</exclude>
                         <exclude>**/node/node</exclude>
                         <exclude>**/npm-debug.log</exclude>
+                        <exclude>**/.idea/**</exclude>
                     </excludes>
                     <licenses>
                         <license 
implementation="org.apache.rat.analysis.license.ApacheSoftwareLicense20"/>

Reply via email to