Use own AsyncAutoResetEvent in ConnectionPool This replaces the synchronous AutoResetEvent with our own AsyncAutoResetEvent to avoid blocking threads that wait for an available connection.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/e16e6246 Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/e16e6246 Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/e16e6246 Branch: refs/heads/TINKERPOP-1774 Commit: e16e6246ff9e1a42e9f94e6c376af9882508987b Parents: 962ebe3 Author: Florian Hockmann <f...@florian-hockmann.de> Authored: Thu Aug 30 16:24:51 2018 +0200 Committer: Florian Hockmann <f...@florian-hockmann.de> Committed: Thu Aug 30 16:30:06 2018 +0200 ---------------------------------------------------------------------- .../Gremlin.Net/Driver/AsyncAutoResetEvent.cs | 103 +++++++++++ .../src/Gremlin.Net/Driver/ConnectionPool.cs | 9 +- .../Driver/AsyncAutoResetEventTests.cs | 169 +++++++++++++++++++ pom.xml | 1 + 4 files changed, 278 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/e16e6246/gremlin-dotnet/src/Gremlin.Net/Driver/AsyncAutoResetEvent.cs ---------------------------------------------------------------------- diff --git a/gremlin-dotnet/src/Gremlin.Net/Driver/AsyncAutoResetEvent.cs b/gremlin-dotnet/src/Gremlin.Net/Driver/AsyncAutoResetEvent.cs new file mode 100644 index 0000000..52c07b0 --- /dev/null +++ b/gremlin-dotnet/src/Gremlin.Net/Driver/AsyncAutoResetEvent.cs @@ -0,0 +1,103 @@ +#region License + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#endregion + +using System; +using System.Collections.Generic; +using System.Threading.Tasks; + +// The implementation is based on this blog post by Stephen Toub: +// https://blogs.msdn.microsoft.com/pfxteam/2012/02/11/building-async-coordination-primitives-part-2-asyncautoresetevent/ + +namespace Gremlin.Net.Driver +{ + /// <summary> + /// An async version of the AutoResetEvent. + /// </summary> + public class AsyncAutoResetEvent + { + private static readonly Task<bool> CompletedTask = Task.FromResult(true); + private readonly List<TaskCompletionSource<bool>> _waitingTasks = new List<TaskCompletionSource<bool>>(); + private bool _isSignaled; + + /// <summary> + /// Asynchronously waits for this event to be set or until a timeout occurs. + /// </summary> + /// <param name="timeout">A <see cref="TimeSpan"/> that that represents the number of milliseconds to wait.</param> + /// <returns>true if the current instance received a signal before timing out; otherwise, false.</returns> + public async Task<bool> WaitOneAsync(TimeSpan timeout) + { + var tcs = new TaskCompletionSource<bool>(); + var waitTask = WaitForSignalAsync(tcs); + if (waitTask.IsCompleted) return true; + + await Task.WhenAny(waitTask, Task.Delay(timeout)).ConfigureAwait(false); + lock (_waitingTasks) + { + if (!waitTask.IsCompleted) + { + // The wait timed out, so we need to remove the waiting task. + _waitingTasks.Remove(tcs); + tcs.SetResult(false); + } + } + + return waitTask.Result; + } + + private Task<bool> WaitForSignalAsync(TaskCompletionSource<bool> tcs) + { + lock (_waitingTasks) + { + if (_isSignaled) + { + _isSignaled = false; + return CompletedTask; + } + _waitingTasks.Add(tcs); + } + return tcs.Task; + } + + /// <summary> + /// Sets the event. + /// </summary> + public void Set() + { + TaskCompletionSource<bool> toRelease = null; + lock (_waitingTasks) + { + if (_waitingTasks.Count == 0) + { + _isSignaled = true; + } + else + { + toRelease = _waitingTasks[0]; + _waitingTasks.RemoveAt(0); + } + } + + toRelease?.SetResult(true); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/e16e6246/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPool.cs ---------------------------------------------------------------------- diff --git a/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPool.cs b/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPool.cs index a65208a..e76cf51 100644 --- a/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPool.cs +++ b/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPool.cs @@ -34,7 +34,7 @@ namespace Gremlin.Net.Driver { private readonly ConnectionFactory _connectionFactory; private readonly ConcurrentBag<Connection> _connections = new ConcurrentBag<Connection>(); - private readonly AutoResetEvent _newConnectionAvailable = new AutoResetEvent(false); + private readonly AsyncAutoResetEvent _newConnectionAvailable = new AsyncAutoResetEvent(); private readonly int _minPoolSize; private readonly int _maxPoolSize; private readonly TimeSpan _waitForConnectionTimeout; @@ -72,7 +72,8 @@ namespace Gremlin.Net.Driver { if (TryGetConnectionFromPool(out var connection)) return ProxiedConnection(connection); - connection = await AddConnectionIfUnderMaximumAsync().ConfigureAwait(false) ?? WaitForConnection(); + connection = await AddConnectionIfUnderMaximumAsync().ConfigureAwait(false) ?? + await WaitForConnectionAsync().ConfigureAwait(false); return ProxiedConnection(connection); } @@ -115,13 +116,13 @@ namespace Gremlin.Net.Driver return newConnection; } - private Connection WaitForConnection() + private async Task<Connection> WaitForConnectionAsync() { var start = DateTimeOffset.Now; var remaining = _waitForConnectionTimeout; do { - if (_newConnectionAvailable.WaitOne(remaining)) + if (await _newConnectionAvailable.WaitOneAsync(remaining).ConfigureAwait(false)) { if (TryGetConnectionFromPool(out var connection)) return connection; http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/e16e6246/gremlin-dotnet/test/Gremlin.Net.UnitTest/Driver/AsyncAutoResetEventTests.cs ---------------------------------------------------------------------- diff --git a/gremlin-dotnet/test/Gremlin.Net.UnitTest/Driver/AsyncAutoResetEventTests.cs b/gremlin-dotnet/test/Gremlin.Net.UnitTest/Driver/AsyncAutoResetEventTests.cs new file mode 100644 index 0000000..26a5a58 --- /dev/null +++ b/gremlin-dotnet/test/Gremlin.Net.UnitTest/Driver/AsyncAutoResetEventTests.cs @@ -0,0 +1,169 @@ +#region License + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#endregion + +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using Gremlin.Net.Driver; +using Xunit; + +namespace Gremlin.Net.UnitTest.Driver +{ + public class AsyncAutoResetEventTests + { + private static readonly TimeSpan DefaultTimeout = TimeSpan.FromMilliseconds(100); + + [Fact] + public async Task WaitOneAsync_AfterSet_CompletesSynchronously() + { + var are = new AsyncAutoResetEvent(); + + are.Set(); + var task = are.WaitOneAsync(DefaultTimeout); + + Assert.True(task.IsCompleted); + Assert.True(await task); + } + + [Fact] + public async Task MultipleWaitOneAsync_AfterSet_OnlyFirstWaitIsSuccessful() + { + var are = new AsyncAutoResetEvent(); + + are.Set(); + var task1 = are.WaitOneAsync(DefaultTimeout); + var task2 = are.WaitOneAsync(DefaultTimeout); + + Assert.True(task1.IsCompleted); + Assert.True(await task1); + Assert.False(await task2); + } + + [Fact] + public async Task MultipleWaitOneAsync_AfterMultipleSet_OnlyFirstWaitIsSuccessful() + { + var are = new AsyncAutoResetEvent(); + + are.Set(); + are.Set(); + var task1 = are.WaitOneAsync(DefaultTimeout); + var task2 = are.WaitOneAsync(DefaultTimeout); + + Assert.True(task1.IsCompleted); + Assert.True(await task1); + Assert.False(await task2); + } + + [Fact] + public async Task WaitOneAsync_SetBeforeTimeout_WaitSuccessful() + { + var are = new AsyncAutoResetEvent(); + + var task = are.WaitOneAsync(DefaultTimeout); + are.Set(); + + Assert.True(await task); + } + + [Fact] + public async Task Set_AfterMultipleWaitOneAsync_OnlyFirstWaitIsSuccessful() + { + var are = new AsyncAutoResetEvent(); + + var task1 = are.WaitOneAsync(DefaultTimeout); + var task2 = are.WaitOneAsync(DefaultTimeout); + are.Set(); + + await AssertCompletesBeforeTimeoutAsync(task1, DefaultTimeout.Milliseconds + 50); + Assert.False(await task2); + } + + [Fact] + public async Task WaitOneAsync_NotSet_OnlyWaitUntilTimeout() + { + var are = new AsyncAutoResetEvent(); + + var task = are.WaitOneAsync(DefaultTimeout); + + await AssertCompletesBeforeTimeoutAsync(task, DefaultTimeout.Milliseconds + 50); + } + + [Fact] + public async Task WaitOneAsync_NotSet_WaitNotSuccessful() + { + var are = new AsyncAutoResetEvent(); + + var task = are.WaitOneAsync(DefaultTimeout); + + Assert.False(await task); + } + + [Fact] + public async Task WaitOneAsync_SetAfterPreviousWaitTimedOut_OnlySecondWaitSuccessful() + { + var are = new AsyncAutoResetEvent(); + + var task1 = are.WaitOneAsync(DefaultTimeout); + await Task.Delay(DefaultTimeout + TimeSpan.FromMilliseconds(50)); + var task2 = are.WaitOneAsync(DefaultTimeout); + are.Set(); + + Assert.False(await task1); + Assert.True(await task2); + } + + [Fact] + public async Task WaitOneAsync_SetAfterMultipleWaitsTimedOut_OnlyLastWaitSuccessful() + { + var are = new AsyncAutoResetEvent(); + + var timedOutTasks = new List<Task<bool>>(); + for (var i = 0; i < 1000; i++) + { + timedOutTasks.Add(are.WaitOneAsync(DefaultTimeout)); + } + + await Task.Delay(DefaultTimeout + TimeSpan.FromMilliseconds(50)); + var task2 = are.WaitOneAsync(DefaultTimeout); + are.Set(); + + foreach (var t in timedOutTasks) + { + Assert.False(await t); + } + Assert.True(await task2); + } + + private static async Task AssertCompletesBeforeTimeoutAsync(Task task, int timeoutInMs) + { + var completedTask = await WaitForTaskOrTimeoutAsync(task, TimeSpan.FromMilliseconds(timeoutInMs)); + if (completedTask != task) + throw new Exception("Task did not complete."); + } + + private static Task<Task> WaitForTaskOrTimeoutAsync(Task task, TimeSpan timeout) + { + return Task.WhenAny(task, Task.Delay(timeout)); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/e16e6246/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index dfb5824..50e84c4 100644 --- a/pom.xml +++ b/pom.xml @@ -401,6 +401,7 @@ limitations under the License. <exclude>**/node/node_modules/**</exclude> <exclude>**/node/node</exclude> <exclude>**/npm-debug.log</exclude> + <exclude>**/.idea/**</exclude> </excludes> <licenses> <license implementation="org.apache.rat.analysis.license.ApacheSoftwareLicense20"/>