This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 388270de1c IGNITE-20749 .NET: Fix missing lock release in 
ClientFailoverSocket.ConnectAsync (#2827)
388270de1c is described below

commit 388270de1c20f8b372050374e0e411ea373bd55f
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Mon Nov 13 15:26:09 2023 +0200

    IGNITE-20749 .NET: Fix missing lock release in 
ClientFailoverSocket.ConnectAsync (#2827)
    
    Fix missing lock release in `ClientFailoverSocket.ConnectAsync` (was 
causing `TestReconnectAfterFullClusterRestart` flakiness).
    
    Additionally:
    * Refactor `ConnectAllSockets` to capture all exceptions
    * Refactor `ClientSocket.ConnectAsync`:
      * Add `WaitAsync` to handle timeout for all stages
      * Ensure proper disposal of socket and stream
      * Add `CancellationTokenSource` to cancel pending tasks on timeout
---
 .../dotnet/Apache.Ignite.Tests/LoggingTests.cs     |  2 +-
 .../dotnet/Apache.Ignite.Tests/SslTests.cs         |  6 +-
 .../dotnet/Apache.Ignite/ISslStreamFactory.cs      |  4 +-
 .../Apache.Ignite/Internal/ClientFailoverSocket.cs | 54 ++++++++++-----
 .../dotnet/Apache.Ignite/Internal/ClientSocket.cs  | 79 +++++++++++++++-------
 .../dotnet/Apache.Ignite/SslStreamFactory.cs       |  7 +-
 6 files changed, 102 insertions(+), 50 deletions(-)

diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/LoggingTests.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/LoggingTests.cs
index 22cdf6827d..77e726a3b3 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/LoggingTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/LoggingTests.cs
@@ -57,7 +57,7 @@ public class LoggingTests
         StringAssert.Contains("Connection established", log);
         StringAssert.Contains("Handshake succeeded", log);
         StringAssert.Contains("Trying to establish secondary connections - 
awaiting 2 tasks", log);
-        StringAssert.Contains("All secondary connections established", log);
+        StringAssert.Contains("2 secondary connections established, 0 failed", 
log);
         StringAssert.Contains("Sending request [op=TablesGet", log);
         StringAssert.Contains("Sending request [op=SqlExec", log);
         StringAssert.Contains("Connection closed", log);
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/SslTests.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/SslTests.cs
index 18818cc9a4..283f24cfd3 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/SslTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/SslTests.cs
@@ -24,6 +24,7 @@ using System.Linq;
 using System.Net.Security;
 using System.Security.Authentication;
 using System.Security.Cryptography.X509Certificates;
+using System.Threading;
 using System.Threading.Tasks;
 using Log;
 using NUnit.Framework;
@@ -207,12 +208,13 @@ public class SslTests : IgniteTestsBase
 
     private class NullSslStreamFactory : ISslStreamFactory
     {
-        public Task<SslStream?> CreateAsync(Stream stream, string targetHost) 
=> Task.FromResult<SslStream?>(null);
+        public Task<SslStream?> CreateAsync(Stream stream, string targetHost, 
CancellationToken cancellationToken) =>
+            Task.FromResult<SslStream?>(null);
     }
 
     private class CustomSslStreamFactory : ISslStreamFactory
     {
-        public async Task<SslStream?> CreateAsync(Stream stream, string 
targetHost)
+        public async Task<SslStream?> CreateAsync(Stream stream, string 
targetHost, CancellationToken cancellationToken)
         {
             var sslStream = new SslStream(
                 innerStream: stream,
diff --git a/modules/platforms/dotnet/Apache.Ignite/ISslStreamFactory.cs 
b/modules/platforms/dotnet/Apache.Ignite/ISslStreamFactory.cs
index 9bc51c3ceb..a7053ddd9b 100644
--- a/modules/platforms/dotnet/Apache.Ignite/ISslStreamFactory.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/ISslStreamFactory.cs
@@ -19,6 +19,7 @@ namespace Apache.Ignite;
 
 using System.IO;
 using System.Net.Security;
+using System.Threading;
 using System.Threading.Tasks;
 
 /// <summary>
@@ -33,8 +34,9 @@ public interface ISslStreamFactory
     /// </summary>
     /// <param name="stream">The underlying raw stream.</param>
     /// <param name="targetHost">Target host.</param>
+    /// <param name="cancellationToken">Cancellation token.</param>
     /// <returns>
     /// SSL stream, or null if SSL is not enabled.
     /// </returns>
-    Task<SslStream?> CreateAsync(Stream stream, string targetHost);
+    Task<SslStream?> CreateAsync(Stream stream, string targetHost, 
CancellationToken cancellationToken);
 }
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs
index 5f2e64f4df..172bf4a1c8 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs
@@ -331,29 +331,49 @@ namespace Apache.Ignite.Internal
 
             while (!_disposed)
             {
-                try
-                {
-                    tasks.Clear();
+                tasks.Clear();
 
-                    foreach (var endpoint in _endpoints)
+                foreach (var endpoint in _endpoints)
+                {
+                    try
                     {
-                        if (endpoint.Socket?.IsDisposed == false)
+                        var connectTask = ConnectAsync(endpoint);
+                        if (connectTask.IsCompleted)
                         {
                             continue;
                         }
 
-                        tasks.Add(ConnectAsync(endpoint).AsTask());
+                        tasks.Add(connectTask.AsTask());
                     }
+                    catch (Exception e)
+                    {
+                        _logger?.Warn(e, "Error while trying to establish 
secondary connections: " + e.Message);
+                    }
+                }
 
-                    _logger?.Debug("Trying to establish secondary connections 
- awaiting {0} tasks...", tasks.Count);
-
-                    await Task.WhenAll(tasks).ConfigureAwait(false);
+                if (_logger?.IsEnabled(LogLevel.Debug) == true)
+                {
+                    _logger.Debug("Trying to establish secondary connections - 
awaiting {0} tasks...", tasks.Count);
+                }
 
-                    _logger?.Debug("All secondary connections established.");
+                // Await every task separately instead of using WhenAll to 
capture exceptions and avoid extra allocations.
+                int failed = 0;
+                foreach (var task in tasks)
+                {
+                    try
+                    {
+                        await task.ConfigureAwait(false);
+                    }
+                    catch (Exception e)
+                    {
+                        _logger?.Warn(e, "Error while trying to establish 
secondary connections: " + e.Message);
+                        failed++;
+                    }
                 }
-                catch (Exception e)
+
+                if (_logger?.IsEnabled(LogLevel.Debug) == true)
                 {
-                    _logger?.Warn(e, "Error while trying to establish 
secondary connections: " + e.Message);
+                    _logger.Debug($"{tasks.Count - failed} secondary 
connections established, {failed} failed.");
                 }
 
                 if (Configuration.ReconnectInterval <= TimeSpan.Zero)
@@ -445,13 +465,13 @@ namespace Apache.Ignite.Internal
 
             await _socketLock.WaitAsync().ConfigureAwait(false);
 
-            if (endpoint.Socket?.IsDisposed == false)
-            {
-                return endpoint.Socket;
-            }
-
             try
             {
+                if (endpoint.Socket?.IsDisposed == false)
+                {
+                    return endpoint.Socket;
+                }
+
                 var socket = await ClientSocket.ConnectAsync(endpoint, 
Configuration, this).ConfigureAwait(false);
 
                 if (_clusterId == null)
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
index 2bb5d57a07..a0801138b0 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
@@ -160,22 +160,32 @@ namespace Apache.Ignite.Internal
             "Microsoft.Reliability",
             "CA2000:Dispose objects before losing scope",
             Justification = "NetworkStream is returned from this method in the 
socket.")]
+        [SuppressMessage("Maintainability", "CA1508:Avoid dead conditional 
code", Justification = "False positive")]
+        [SuppressMessage("Design", "CA1031:Do not catch general exception 
types", Justification = "Reviewed")]
         public static async Task<ClientSocket> ConnectAsync(
             SocketEndpoint endPoint,
             IgniteClientConfiguration configuration,
             IClientSocketEventListener listener)
         {
-            var socket = new Socket(SocketType.Stream, ProtocolType.Tcp)
-            {
-                NoDelay = true
-            };
-
+            using var cts = new CancellationTokenSource();
             var logger = configuration.Logger.GetLogger(nameof(ClientSocket) + 
"-" + Interlocked.Increment(ref _socketId));
+
             bool connected = false;
+            Socket? socket = null;
+            Stream? stream = null;
 
             try
             {
-                await 
socket.ConnectAsync(endPoint.EndPoint).ConfigureAwait(false);
+                socket = new Socket(SocketType.Stream, ProtocolType.Tcp)
+                {
+                    NoDelay = true
+                };
+
+                await socket.ConnectAsync(endPoint.EndPoint, cts.Token)
+                    .AsTask()
+                    .WaitAsync(configuration.SocketTimeout, cts.Token)
+                    .ConfigureAwait(false);
+
                 connected = true;
 
                 if (logger?.IsEnabled(LogLevel.Debug) == true)
@@ -186,10 +196,12 @@ namespace Apache.Ignite.Internal
                 Metrics.ConnectionsEstablished.Add(1);
                 Metrics.ConnectionsActiveIncrement();
 
-                Stream stream = new NetworkStream(socket, ownsSocket: true);
+                stream = new NetworkStream(socket, ownsSocket: true);
 
                 if (configuration.SslStreamFactory is { } sslStreamFactory &&
-                    await sslStreamFactory.CreateAsync(stream, 
endPoint.Host).ConfigureAwait(false) is { } sslStream)
+                    await sslStreamFactory.CreateAsync(stream, endPoint.Host, 
cts.Token)
+                        .WaitAsync(configuration.SocketTimeout, cts.Token)
+                        .ConfigureAwait(false) is { } sslStream)
                 {
                     stream = sslStream;
 
@@ -200,8 +212,8 @@ namespace Apache.Ignite.Internal
                     }
                 }
 
-                var context = await HandshakeAsync(stream, endPoint.EndPoint, 
configuration)
-                    .WaitAsync(configuration.SocketTimeout)
+                var context = await HandshakeAsync(stream, endPoint.EndPoint, 
configuration, cts.Token)
+                    .WaitAsync(configuration.SocketTimeout, cts.Token)
                     .ConfigureAwait(false);
 
                 if (logger?.IsEnabled(LogLevel.Debug) == true)
@@ -211,11 +223,26 @@ namespace Apache.Ignite.Internal
 
                 return new ClientSocket(stream, configuration, context, 
listener, logger);
             }
-            catch (Exception e)
+            catch (Exception ex)
             {
-                logger?.Warn($"Connection failed before or during handshake 
[remoteAddress={endPoint.EndPoint}]: {e.Message}.", e);
+                try
+                {
+                    cts.Cancel();
+                    socket?.Dispose();
 
-                if (e.GetBaseException() is TimeoutException)
+                    if (stream != null)
+                    {
+                        await stream.DisposeAsync().ConfigureAwait(false);
+                    }
+                }
+                catch (Exception disposeEx)
+                {
+                    logger?.Warn(disposeEx, "Failed to dispose socket after 
failed connection attempt: " + disposeEx.Message);
+                }
+
+                logger?.Warn(ex, $"Connection failed before or during 
handshake [remoteAddress={endPoint.EndPoint}]: {ex.Message}.");
+
+                if (ex.GetBaseException() is TimeoutException)
                 {
                     Metrics.HandshakesFailedTimeout.Add(1);
                 }
@@ -224,9 +251,6 @@ namespace Apache.Ignite.Internal
                     Metrics.HandshakesFailed.Add(1);
                 }
 
-                // ReSharper disable once MethodHasAsyncOverload
-                socket.Dispose();
-
                 if (connected)
                 {
                     Metrics.ConnectionsActiveDecrement();
@@ -235,7 +259,7 @@ namespace Apache.Ignite.Internal
                 throw new IgniteClientConnectionException(
                     ErrorGroups.Client.Connection,
                     "Failed to connect to endpoint: " + endPoint.EndPoint,
-                    e);
+                    ex);
             }
         }
 
@@ -315,29 +339,31 @@ namespace Apache.Ignite.Internal
         /// <param name="stream">Network stream.</param>
         /// <param name="endPoint">Endpoint.</param>
         /// <param name="configuration">Configuration.</param>
+        /// <param name="cancellationToken">Cancellation token.</param>
         private static async Task<ConnectionContext> HandshakeAsync(
             Stream stream,
             IPEndPoint endPoint,
-            IgniteClientConfiguration configuration)
+            IgniteClientConfiguration configuration,
+            CancellationToken cancellationToken)
         {
-            await 
stream.WriteAsync(ProtoCommon.MagicBytes).ConfigureAwait(false);
-            await WriteHandshakeAsync(stream, CurrentProtocolVersion, 
configuration).ConfigureAwait(false);
+            await stream.WriteAsync(ProtoCommon.MagicBytes, 
cancellationToken).ConfigureAwait(false);
+            await WriteHandshakeAsync(stream, CurrentProtocolVersion, 
configuration, cancellationToken).ConfigureAwait(false);
 
-            await stream.FlushAsync().ConfigureAwait(false);
+            await stream.FlushAsync(cancellationToken).ConfigureAwait(false);
 
-            await CheckMagicBytesAsync(stream).ConfigureAwait(false);
+            await CheckMagicBytesAsync(stream, 
cancellationToken).ConfigureAwait(false);
 
             using var response = await ReadResponseAsync(stream, new byte[4], 
CancellationToken.None).ConfigureAwait(false);
             return ReadHandshakeResponse(response.GetReader(), endPoint, 
GetSslInfo(stream));
         }
 
-        private static async ValueTask CheckMagicBytesAsync(Stream stream)
+        private static async ValueTask CheckMagicBytesAsync(Stream stream, 
CancellationToken cancellationToken)
         {
             var responseMagic = 
ByteArrayPool.Rent(ProtoCommon.MagicBytes.Length);
 
             try
             {
-                await ReceiveBytesAsync(stream, responseMagic, 
ProtoCommon.MagicBytes.Length, CancellationToken.None).ConfigureAwait(false);
+                await ReceiveBytesAsync(stream, responseMagic, 
ProtoCommon.MagicBytes.Length, cancellationToken).ConfigureAwait(false);
 
                 for (var i = 0; i < ProtoCommon.MagicBytes.Length; i++)
                 {
@@ -484,7 +510,8 @@ namespace Apache.Ignite.Internal
         private static async ValueTask WriteHandshakeAsync(
             Stream stream,
             ClientProtocolVersion version,
-            IgniteClientConfiguration configuration)
+            IgniteClientConfiguration configuration,
+            CancellationToken token)
         {
             using var bufferWriter = new PooledArrayBuffer(prefixSize: 
ProtoCommon.MessagePrefixSize);
             WriteHandshake(bufferWriter.MessageWriter, version, configuration);
@@ -495,7 +522,7 @@ namespace Apache.Ignite.Internal
             var resBuf = buf.Slice(ProtoCommon.MessagePrefixSize - 4);
             WriteMessageSize(resBuf, size);
 
-            await stream.WriteAsync(resBuf).ConfigureAwait(false);
+            await stream.WriteAsync(resBuf, token).ConfigureAwait(false);
             Metrics.BytesSent.Add(resBuf.Length);
         }
 
diff --git a/modules/platforms/dotnet/Apache.Ignite/SslStreamFactory.cs 
b/modules/platforms/dotnet/Apache.Ignite/SslStreamFactory.cs
index 5ac5a87fbc..1b57c53b4a 100644
--- a/modules/platforms/dotnet/Apache.Ignite/SslStreamFactory.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/SslStreamFactory.cs
@@ -19,6 +19,7 @@ namespace Apache.Ignite;
 
 using System.IO;
 using System.Net.Security;
+using System.Threading;
 using System.Threading.Tasks;
 using Internal.Common;
 
@@ -33,16 +34,16 @@ public sealed class SslStreamFactory : ISslStreamFactory
     public SslClientAuthenticationOptions? SslClientAuthenticationOptions { 
get; set; }
 
     /// <inheritdoc />
-    public async Task<SslStream?> CreateAsync(Stream stream, string targetHost)
+    public async Task<SslStream?> CreateAsync(Stream stream, string 
targetHost, CancellationToken cancellationToken)
     {
         IgniteArgumentCheck.NotNull(stream);
 
-        var sslStream = new SslStream(stream, false, null, null);
+        var sslStream = new SslStream(stream, leaveInnerStreamOpen: false, 
null, null);
 
         var options = SslClientAuthenticationOptions ?? new 
SslClientAuthenticationOptions();
         options.TargetHost ??= targetHost;
 
-        await 
sslStream.AuthenticateAsClientAsync(options).ConfigureAwait(false);
+        await sslStream.AuthenticateAsClientAsync(options, 
cancellationToken).ConfigureAwait(false);
 
         return sslStream;
     }

Reply via email to