This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 388270de1c IGNITE-20749 .NET: Fix missing lock release in
ClientFailoverSocket.ConnectAsync (#2827)
388270de1c is described below
commit 388270de1c20f8b372050374e0e411ea373bd55f
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Mon Nov 13 15:26:09 2023 +0200
IGNITE-20749 .NET: Fix missing lock release in
ClientFailoverSocket.ConnectAsync (#2827)
Fix missing lock release in `ClientFailoverSocket.ConnectAsync` (was
causing `TestReconnectAfterFullClusterRestart` flakiness).
Additionally:
* Refactor `ConnectAllSockets` to capture all exceptions
* Refactor `ClientSocket.ConnectAsync`:
* Add `WaitAsync` to handle timeout for all stages
* Ensure proper disposal of socket and stream
* Add `CancellationTokenSource` to cancel pending tasks on timeout
---
.../dotnet/Apache.Ignite.Tests/LoggingTests.cs | 2 +-
.../dotnet/Apache.Ignite.Tests/SslTests.cs | 6 +-
.../dotnet/Apache.Ignite/ISslStreamFactory.cs | 4 +-
.../Apache.Ignite/Internal/ClientFailoverSocket.cs | 54 ++++++++++-----
.../dotnet/Apache.Ignite/Internal/ClientSocket.cs | 79 +++++++++++++++-------
.../dotnet/Apache.Ignite/SslStreamFactory.cs | 7 +-
6 files changed, 102 insertions(+), 50 deletions(-)
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/LoggingTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/LoggingTests.cs
index 22cdf6827d..77e726a3b3 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/LoggingTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/LoggingTests.cs
@@ -57,7 +57,7 @@ public class LoggingTests
StringAssert.Contains("Connection established", log);
StringAssert.Contains("Handshake succeeded", log);
StringAssert.Contains("Trying to establish secondary connections -
awaiting 2 tasks", log);
- StringAssert.Contains("All secondary connections established", log);
+ StringAssert.Contains("2 secondary connections established, 0 failed",
log);
StringAssert.Contains("Sending request [op=TablesGet", log);
StringAssert.Contains("Sending request [op=SqlExec", log);
StringAssert.Contains("Connection closed", log);
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/SslTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/SslTests.cs
index 18818cc9a4..283f24cfd3 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/SslTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/SslTests.cs
@@ -24,6 +24,7 @@ using System.Linq;
using System.Net.Security;
using System.Security.Authentication;
using System.Security.Cryptography.X509Certificates;
+using System.Threading;
using System.Threading.Tasks;
using Log;
using NUnit.Framework;
@@ -207,12 +208,13 @@ public class SslTests : IgniteTestsBase
private class NullSslStreamFactory : ISslStreamFactory
{
- public Task<SslStream?> CreateAsync(Stream stream, string targetHost)
=> Task.FromResult<SslStream?>(null);
+ public Task<SslStream?> CreateAsync(Stream stream, string targetHost,
CancellationToken cancellationToken) =>
+ Task.FromResult<SslStream?>(null);
}
private class CustomSslStreamFactory : ISslStreamFactory
{
- public async Task<SslStream?> CreateAsync(Stream stream, string
targetHost)
+ public async Task<SslStream?> CreateAsync(Stream stream, string
targetHost, CancellationToken cancellationToken)
{
var sslStream = new SslStream(
innerStream: stream,
diff --git a/modules/platforms/dotnet/Apache.Ignite/ISslStreamFactory.cs
b/modules/platforms/dotnet/Apache.Ignite/ISslStreamFactory.cs
index 9bc51c3ceb..a7053ddd9b 100644
--- a/modules/platforms/dotnet/Apache.Ignite/ISslStreamFactory.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/ISslStreamFactory.cs
@@ -19,6 +19,7 @@ namespace Apache.Ignite;
using System.IO;
using System.Net.Security;
+using System.Threading;
using System.Threading.Tasks;
/// <summary>
@@ -33,8 +34,9 @@ public interface ISslStreamFactory
/// </summary>
/// <param name="stream">The underlying raw stream.</param>
/// <param name="targetHost">Target host.</param>
+ /// <param name="cancellationToken">Cancellation token.</param>
/// <returns>
/// SSL stream, or null if SSL is not enabled.
/// </returns>
- Task<SslStream?> CreateAsync(Stream stream, string targetHost);
+ Task<SslStream?> CreateAsync(Stream stream, string targetHost,
CancellationToken cancellationToken);
}
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs
index 5f2e64f4df..172bf4a1c8 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs
@@ -331,29 +331,49 @@ namespace Apache.Ignite.Internal
while (!_disposed)
{
- try
- {
- tasks.Clear();
+ tasks.Clear();
- foreach (var endpoint in _endpoints)
+ foreach (var endpoint in _endpoints)
+ {
+ try
{
- if (endpoint.Socket?.IsDisposed == false)
+ var connectTask = ConnectAsync(endpoint);
+ if (connectTask.IsCompleted)
{
continue;
}
- tasks.Add(ConnectAsync(endpoint).AsTask());
+ tasks.Add(connectTask.AsTask());
}
+ catch (Exception e)
+ {
+ _logger?.Warn(e, "Error while trying to establish
secondary connections: " + e.Message);
+ }
+ }
- _logger?.Debug("Trying to establish secondary connections
- awaiting {0} tasks...", tasks.Count);
-
- await Task.WhenAll(tasks).ConfigureAwait(false);
+ if (_logger?.IsEnabled(LogLevel.Debug) == true)
+ {
+ _logger.Debug("Trying to establish secondary connections -
awaiting {0} tasks...", tasks.Count);
+ }
- _logger?.Debug("All secondary connections established.");
+ // Await every task separately instead of using WhenAll to
capture exceptions and avoid extra allocations.
+ int failed = 0;
+ foreach (var task in tasks)
+ {
+ try
+ {
+ await task.ConfigureAwait(false);
+ }
+ catch (Exception e)
+ {
+ _logger?.Warn(e, "Error while trying to establish
secondary connections: " + e.Message);
+ failed++;
+ }
}
- catch (Exception e)
+
+ if (_logger?.IsEnabled(LogLevel.Debug) == true)
{
- _logger?.Warn(e, "Error while trying to establish
secondary connections: " + e.Message);
+ _logger.Debug($"{tasks.Count - failed} secondary
connections established, {failed} failed.");
}
if (Configuration.ReconnectInterval <= TimeSpan.Zero)
@@ -445,13 +465,13 @@ namespace Apache.Ignite.Internal
await _socketLock.WaitAsync().ConfigureAwait(false);
- if (endpoint.Socket?.IsDisposed == false)
- {
- return endpoint.Socket;
- }
-
try
{
+ if (endpoint.Socket?.IsDisposed == false)
+ {
+ return endpoint.Socket;
+ }
+
var socket = await ClientSocket.ConnectAsync(endpoint,
Configuration, this).ConfigureAwait(false);
if (_clusterId == null)
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
index 2bb5d57a07..a0801138b0 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
@@ -160,22 +160,32 @@ namespace Apache.Ignite.Internal
"Microsoft.Reliability",
"CA2000:Dispose objects before losing scope",
Justification = "NetworkStream is returned from this method in the
socket.")]
+ [SuppressMessage("Maintainability", "CA1508:Avoid dead conditional
code", Justification = "False positive")]
+ [SuppressMessage("Design", "CA1031:Do not catch general exception
types", Justification = "Reviewed")]
public static async Task<ClientSocket> ConnectAsync(
SocketEndpoint endPoint,
IgniteClientConfiguration configuration,
IClientSocketEventListener listener)
{
- var socket = new Socket(SocketType.Stream, ProtocolType.Tcp)
- {
- NoDelay = true
- };
-
+ using var cts = new CancellationTokenSource();
var logger = configuration.Logger.GetLogger(nameof(ClientSocket) +
"-" + Interlocked.Increment(ref _socketId));
+
bool connected = false;
+ Socket? socket = null;
+ Stream? stream = null;
try
{
- await
socket.ConnectAsync(endPoint.EndPoint).ConfigureAwait(false);
+ socket = new Socket(SocketType.Stream, ProtocolType.Tcp)
+ {
+ NoDelay = true
+ };
+
+ await socket.ConnectAsync(endPoint.EndPoint, cts.Token)
+ .AsTask()
+ .WaitAsync(configuration.SocketTimeout, cts.Token)
+ .ConfigureAwait(false);
+
connected = true;
if (logger?.IsEnabled(LogLevel.Debug) == true)
@@ -186,10 +196,12 @@ namespace Apache.Ignite.Internal
Metrics.ConnectionsEstablished.Add(1);
Metrics.ConnectionsActiveIncrement();
- Stream stream = new NetworkStream(socket, ownsSocket: true);
+ stream = new NetworkStream(socket, ownsSocket: true);
if (configuration.SslStreamFactory is { } sslStreamFactory &&
- await sslStreamFactory.CreateAsync(stream,
endPoint.Host).ConfigureAwait(false) is { } sslStream)
+ await sslStreamFactory.CreateAsync(stream, endPoint.Host,
cts.Token)
+ .WaitAsync(configuration.SocketTimeout, cts.Token)
+ .ConfigureAwait(false) is { } sslStream)
{
stream = sslStream;
@@ -200,8 +212,8 @@ namespace Apache.Ignite.Internal
}
}
- var context = await HandshakeAsync(stream, endPoint.EndPoint,
configuration)
- .WaitAsync(configuration.SocketTimeout)
+ var context = await HandshakeAsync(stream, endPoint.EndPoint,
configuration, cts.Token)
+ .WaitAsync(configuration.SocketTimeout, cts.Token)
.ConfigureAwait(false);
if (logger?.IsEnabled(LogLevel.Debug) == true)
@@ -211,11 +223,26 @@ namespace Apache.Ignite.Internal
return new ClientSocket(stream, configuration, context,
listener, logger);
}
- catch (Exception e)
+ catch (Exception ex)
{
- logger?.Warn($"Connection failed before or during handshake
[remoteAddress={endPoint.EndPoint}]: {e.Message}.", e);
+ try
+ {
+ cts.Cancel();
+ socket?.Dispose();
- if (e.GetBaseException() is TimeoutException)
+ if (stream != null)
+ {
+ await stream.DisposeAsync().ConfigureAwait(false);
+ }
+ }
+ catch (Exception disposeEx)
+ {
+ logger?.Warn(disposeEx, "Failed to dispose socket after
failed connection attempt: " + disposeEx.Message);
+ }
+
+ logger?.Warn(ex, $"Connection failed before or during
handshake [remoteAddress={endPoint.EndPoint}]: {ex.Message}.");
+
+ if (ex.GetBaseException() is TimeoutException)
{
Metrics.HandshakesFailedTimeout.Add(1);
}
@@ -224,9 +251,6 @@ namespace Apache.Ignite.Internal
Metrics.HandshakesFailed.Add(1);
}
- // ReSharper disable once MethodHasAsyncOverload
- socket.Dispose();
-
if (connected)
{
Metrics.ConnectionsActiveDecrement();
@@ -235,7 +259,7 @@ namespace Apache.Ignite.Internal
throw new IgniteClientConnectionException(
ErrorGroups.Client.Connection,
"Failed to connect to endpoint: " + endPoint.EndPoint,
- e);
+ ex);
}
}
@@ -315,29 +339,31 @@ namespace Apache.Ignite.Internal
/// <param name="stream">Network stream.</param>
/// <param name="endPoint">Endpoint.</param>
/// <param name="configuration">Configuration.</param>
+ /// <param name="cancellationToken">Cancellation token.</param>
private static async Task<ConnectionContext> HandshakeAsync(
Stream stream,
IPEndPoint endPoint,
- IgniteClientConfiguration configuration)
+ IgniteClientConfiguration configuration,
+ CancellationToken cancellationToken)
{
- await
stream.WriteAsync(ProtoCommon.MagicBytes).ConfigureAwait(false);
- await WriteHandshakeAsync(stream, CurrentProtocolVersion,
configuration).ConfigureAwait(false);
+ await stream.WriteAsync(ProtoCommon.MagicBytes,
cancellationToken).ConfigureAwait(false);
+ await WriteHandshakeAsync(stream, CurrentProtocolVersion,
configuration, cancellationToken).ConfigureAwait(false);
- await stream.FlushAsync().ConfigureAwait(false);
+ await stream.FlushAsync(cancellationToken).ConfigureAwait(false);
- await CheckMagicBytesAsync(stream).ConfigureAwait(false);
+ await CheckMagicBytesAsync(stream,
cancellationToken).ConfigureAwait(false);
using var response = await ReadResponseAsync(stream, new byte[4],
CancellationToken.None).ConfigureAwait(false);
return ReadHandshakeResponse(response.GetReader(), endPoint,
GetSslInfo(stream));
}
- private static async ValueTask CheckMagicBytesAsync(Stream stream)
+ private static async ValueTask CheckMagicBytesAsync(Stream stream,
CancellationToken cancellationToken)
{
var responseMagic =
ByteArrayPool.Rent(ProtoCommon.MagicBytes.Length);
try
{
- await ReceiveBytesAsync(stream, responseMagic,
ProtoCommon.MagicBytes.Length, CancellationToken.None).ConfigureAwait(false);
+ await ReceiveBytesAsync(stream, responseMagic,
ProtoCommon.MagicBytes.Length, cancellationToken).ConfigureAwait(false);
for (var i = 0; i < ProtoCommon.MagicBytes.Length; i++)
{
@@ -484,7 +510,8 @@ namespace Apache.Ignite.Internal
private static async ValueTask WriteHandshakeAsync(
Stream stream,
ClientProtocolVersion version,
- IgniteClientConfiguration configuration)
+ IgniteClientConfiguration configuration,
+ CancellationToken token)
{
using var bufferWriter = new PooledArrayBuffer(prefixSize:
ProtoCommon.MessagePrefixSize);
WriteHandshake(bufferWriter.MessageWriter, version, configuration);
@@ -495,7 +522,7 @@ namespace Apache.Ignite.Internal
var resBuf = buf.Slice(ProtoCommon.MessagePrefixSize - 4);
WriteMessageSize(resBuf, size);
- await stream.WriteAsync(resBuf).ConfigureAwait(false);
+ await stream.WriteAsync(resBuf, token).ConfigureAwait(false);
Metrics.BytesSent.Add(resBuf.Length);
}
diff --git a/modules/platforms/dotnet/Apache.Ignite/SslStreamFactory.cs
b/modules/platforms/dotnet/Apache.Ignite/SslStreamFactory.cs
index 5ac5a87fbc..1b57c53b4a 100644
--- a/modules/platforms/dotnet/Apache.Ignite/SslStreamFactory.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/SslStreamFactory.cs
@@ -19,6 +19,7 @@ namespace Apache.Ignite;
using System.IO;
using System.Net.Security;
+using System.Threading;
using System.Threading.Tasks;
using Internal.Common;
@@ -33,16 +34,16 @@ public sealed class SslStreamFactory : ISslStreamFactory
public SslClientAuthenticationOptions? SslClientAuthenticationOptions {
get; set; }
/// <inheritdoc />
- public async Task<SslStream?> CreateAsync(Stream stream, string targetHost)
+ public async Task<SslStream?> CreateAsync(Stream stream, string
targetHost, CancellationToken cancellationToken)
{
IgniteArgumentCheck.NotNull(stream);
- var sslStream = new SslStream(stream, false, null, null);
+ var sslStream = new SslStream(stream, leaveInnerStreamOpen: false,
null, null);
var options = SslClientAuthenticationOptions ?? new
SslClientAuthenticationOptions();
options.TargetHost ??= targetHost;
- await
sslStream.AuthenticateAsClientAsync(options).ConfigureAwait(false);
+ await sslStream.AuthenticateAsClientAsync(options,
cancellationToken).ConfigureAwait(false);
return sslStream;
}