This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f1044dce fix(csharp): fix .net node redirection in tcp client (#2843)
4f1044dce is described below

commit 4f1044dcec0855cfc0781c2e06644f53c1f74b61
Author: Ɓukasz Zborek <[email protected]>
AuthorDate: Tue Mar 3 10:27:58 2026 +0100

    fix(csharp): fix .net node redirection in tcp client (#2843)
---
 .../ClusterRedirectionTests.cs                     | 129 +++++++++++++
 .../Fixtures/IggyClusterFixture.cs                 | 202 +++++++++++++++++++++
 foreign/csharp/Iggy_SDK/IggyClient/IIggyClient.cs  |   6 +
 .../Implementations/HttpMessageStream.cs           |   9 +-
 .../IggyClient/Implementations/TcpMessageStream.cs |  77 +++++---
 5 files changed, 399 insertions(+), 24 deletions(-)

diff --git 
a/foreign/csharp/Iggy_SDK.Tests.Integration/ClusterRedirectionTests.cs 
b/foreign/csharp/Iggy_SDK.Tests.Integration/ClusterRedirectionTests.cs
new file mode 100644
index 000000000..1ea2c594c
--- /dev/null
+++ b/foreign/csharp/Iggy_SDK.Tests.Integration/ClusterRedirectionTests.cs
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Apache.Iggy.Configuration;
+using Apache.Iggy.Contracts;
+using Apache.Iggy.Enums;
+using Apache.Iggy.Factory;
+using Apache.Iggy.Tests.Integrations.Fixtures;
+using Shouldly;
+
+namespace Apache.Iggy.Tests.Integrations;
+
+public class ClusterRedirectionTests
+{
+    [ClassDataSource<IggyClusterFixture>(Shared = SharedType.PerAssembly)]
+    public required IggyClusterFixture Fixture { get; init; }
+
+    [Test]
+    public async Task 
ConnectToFollower_Should_ReturnClusterMetadataWithTwoNodes()
+    {
+        using var client = IggyClientFactory.CreateClient(new 
IggyClientConfigurator
+        {
+            BaseAddress = Fixture.GetFollowerAddress(),
+            Protocol = Protocol.Tcp,
+            ReconnectionSettings = new ReconnectionSettings { Enabled = false 
},
+            AutoLoginSettings = new AutoLoginSettings { Enabled = false }
+        });
+        await client.ConnectAsync();
+        await client.LoginUser("iggy", "iggy");
+
+        var metadata = await client.GetClusterMetadataAsync();
+
+        metadata.ShouldNotBeNull();
+        metadata.Name.ShouldBe("test-cluster");
+        metadata.Nodes.Length.ShouldBe(2);
+        metadata.Nodes.ShouldContain(n => n.Role == ClusterNodeRole.Leader);
+        metadata.Nodes.ShouldContain(n => n.Role == ClusterNodeRole.Follower);
+    }
+
+    [Test]
+    public async Task ConnectToFollowerWithAutoLogin_Should_RedirectToLeader()
+    {
+        using var client = IggyClientFactory.CreateClient(new 
IggyClientConfigurator
+        {
+            BaseAddress = Fixture.GetFollowerAddress(),
+            Protocol = Protocol.Tcp,
+            ReconnectionSettings = new ReconnectionSettings { Enabled = true },
+            AutoLoginSettings = new AutoLoginSettings
+            {
+                Enabled = true,
+                Username = "iggy",
+                Password = "iggy"
+            }
+        });
+        await client.ConnectAsync();
+
+        var address = client.GetCurrentAddress();
+        address.ShouldNotBeNullOrEmpty();
+        address.ShouldBe(Fixture.GetLeaderAddress());
+    }
+
+    [Test]
+    public async Task 
ConnectToFollowerWithManualLogin_Should_RedirectToLeader()
+    {
+        using var client = IggyClientFactory.CreateClient(new 
IggyClientConfigurator
+        {
+            BaseAddress = Fixture.GetFollowerAddress(),
+            Protocol = Protocol.Tcp,
+            ReconnectionSettings = new ReconnectionSettings { Enabled = true },
+            AutoLoginSettings = new AutoLoginSettings { Enabled = false }
+        });
+        await client.ConnectAsync();
+        await client.LoginUser("iggy", "iggy");
+
+        var address = client.GetCurrentAddress();
+        address.ShouldNotBeNullOrEmpty();
+        address.ShouldBe(Fixture.GetLeaderAddress());
+    }
+
+    [Test]
+    [Skip("Currently personal access token exist only on leader. Unskip when 
it will be available on follower.")]
+    public async Task 
ConnectToFollowerWithPersonalAccessToken_Should_RedirectToLeader()
+    {
+        using var leaderClient = IggyClientFactory.CreateClient(new 
IggyClientConfigurator
+        {
+            BaseAddress = Fixture.GetLeaderAddress(),
+            Protocol = Protocol.Tcp,
+            ReconnectionSettings = new ReconnectionSettings { Enabled = false 
},
+            AutoLoginSettings = new AutoLoginSettings { Enabled = false }
+        });
+        await leaderClient.ConnectAsync();
+        await leaderClient.LoginUser("iggy", "iggy");
+
+        var name = $"pat-{Guid.NewGuid():N}"[..20];
+        var pat = await leaderClient.CreatePersonalAccessTokenAsync(name, 
TimeSpan.FromHours(1));
+        pat.ShouldNotBeNull();
+
+        using var client = IggyClientFactory.CreateClient(new 
IggyClientConfigurator
+        {
+            BaseAddress = Fixture.GetFollowerAddress(),
+            Protocol = Protocol.Tcp,
+            ReconnectionSettings = new ReconnectionSettings { Enabled = true },
+            AutoLoginSettings = new AutoLoginSettings { Enabled = false }
+        });
+        await client.ConnectAsync();
+        var authResponse = await 
client.LoginWithPersonalAccessToken(pat.Token);
+
+        authResponse.ShouldNotBeNull();
+        authResponse.UserId.ShouldBeGreaterThanOrEqualTo(0);
+
+        var address = client.GetCurrentAddress();
+        address.ShouldNotBeNullOrEmpty();
+        address.ShouldBe(Fixture.GetLeaderAddress());
+    }
+}
diff --git 
a/foreign/csharp/Iggy_SDK.Tests.Integration/Fixtures/IggyClusterFixture.cs 
b/foreign/csharp/Iggy_SDK.Tests.Integration/Fixtures/IggyClusterFixture.cs
new file mode 100644
index 000000000..32b026a85
--- /dev/null
+++ b/foreign/csharp/Iggy_SDK.Tests.Integration/Fixtures/IggyClusterFixture.cs
@@ -0,0 +1,202 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using DotNet.Testcontainers.Builders;
+using DotNet.Testcontainers.Containers;
+using DotNet.Testcontainers.Networks;
+using TUnit.Core.Interfaces;
+
+namespace Apache.Iggy.Tests.Integrations.Fixtures;
+
+public class IggyClusterFixture : IAsyncInitializer, IAsyncDisposable
+{
+    private const string LeaderAlias = "iggy-leader";
+    private const string FollowerAlias = "iggy-follower";
+
+    private static readonly Random Random = new();
+    private static readonly HashSet<ushort> UsedPorts = [];
+    private readonly IContainer _followerContainer;
+    private readonly ushort _followerHttpPort;
+    private readonly ushort _followerQuicPort;
+
+    private readonly ushort _followerTcpPort;
+    private readonly ushort _followerWsPort;
+    private readonly IContainer _leaderContainer;
+    private readonly ushort _leaderHttpPort;
+    private readonly ushort _leaderQuicPort;
+
+    private readonly ushort _leaderTcpPort;
+    private readonly ushort _leaderWsPort;
+
+    private readonly INetwork _network;
+
+    private string DockerImage =>
+        Environment.GetEnvironmentVariable("IGGY_SERVER_DOCKER_IMAGE") ?? 
"apache/iggy:edge";
+
+    private static string? LogDirectory =>
+        Environment.GetEnvironmentVariable("IGGY_TEST_LOGS_DIR");
+
+    public IggyClusterFixture()
+    {
+        _leaderTcpPort = GetRandomPort();
+        _leaderHttpPort = GetRandomPort();
+        _leaderQuicPort = GetRandomPort();
+        _leaderWsPort = GetRandomPort();
+        _followerTcpPort = GetRandomPort();
+        _followerHttpPort = GetRandomPort();
+        _followerQuicPort = GetRandomPort();
+        _followerWsPort = GetRandomPort();
+
+        _network = new NetworkBuilder()
+            .WithName($"iggy-cluster-{Guid.NewGuid():N}")
+            .Build();
+
+        _leaderContainer = new ContainerBuilder(DockerImage)
+            .WithName($"iggy-leader-{Guid.NewGuid():N}")
+            .WithNetwork(_network)
+            .WithNetworkAliases(LeaderAlias)
+            .WithPortBinding(_leaderTcpPort.ToString(), 
_leaderTcpPort.ToString())
+            .WithPortBinding(_leaderHttpPort.ToString(), 
_leaderHttpPort.ToString())
+            .WithEnvironment("RUST_LOG", "trace")
+            .WithEnvironment("IGGY_SYSTEM_LOGGING_LEVEL", "trace")
+            .WithEnvironment("IGGY_ROOT_USERNAME", "iggy")
+            .WithEnvironment("IGGY_ROOT_PASSWORD", "iggy")
+            .WithEnvironment("IGGY_SYSTEM_PATH", "local_data_leader")
+            .WithEnvironment("IGGY_TCP_ADDRESS", $"0.0.0.0:{_leaderTcpPort}")
+            .WithEnvironment("IGGY_HTTP_ADDRESS", $"0.0.0.0:{_leaderHttpPort}")
+            .WithEnvironment("IGGY_QUIC_ADDRESS", $"0.0.0.0:{_leaderQuicPort}")
+            .WithEnvironment("IGGY_WEBSOCKET_ADDRESS", 
$"0.0.0.0:{_leaderWsPort}")
+            .WithEnvironment("IGGY_CLUSTER_ENABLED", "true")
+            .WithEnvironment("IGGY_CLUSTER_NAME", "test-cluster")
+            .WithEnvironment("IGGY_CLUSTER_NODE_CURRENT_NAME", "leader-node")
+            .WithEnvironment("IGGY_CLUSTER_NODE_CURRENT_IP", "127.0.0.1")
+            .WithEnvironment("IGGY_CLUSTER_NODE_OTHERS_0_NAME", 
"follower-node")
+            .WithEnvironment("IGGY_CLUSTER_NODE_OTHERS_0_IP", "127.0.0.1")
+            .WithEnvironment("IGGY_CLUSTER_NODE_OTHERS_0_PORTS_TCP", 
_followerTcpPort.ToString())
+            .WithEnvironment("IGGY_CLUSTER_NODE_OTHERS_0_PORTS_HTTP", 
_followerHttpPort.ToString())
+            .WithEnvironment("IGGY_CLUSTER_NODE_OTHERS_0_PORTS_QUIC", 
_followerQuicPort.ToString())
+            .WithEnvironment("IGGY_CLUSTER_NODE_OTHERS_0_PORTS_WEBSOCKET", 
_followerWsPort.ToString())
+            .WithPrivileged(true)
+            .WithCleanUp(true)
+            
.WithWaitStrategy(Wait.ForUnixContainer().UntilInternalTcpPortIsAvailable(_leaderTcpPort))
+            .Build();
+
+        _followerContainer = new ContainerBuilder(DockerImage)
+            .WithName($"iggy-follower-{Guid.NewGuid():N}")
+            .WithCommand("--follower")
+            .WithNetwork(_network)
+            .WithNetworkAliases(FollowerAlias)
+            .WithPortBinding(_followerTcpPort.ToString(), 
_followerTcpPort.ToString())
+            .WithPortBinding(_followerHttpPort.ToString(), 
_followerHttpPort.ToString())
+            .WithEnvironment("RUST_LOG", "trace")
+            .WithEnvironment("IGGY_SYSTEM_LOGGING_LEVEL", "trace")
+            .WithEnvironment("IGGY_ROOT_USERNAME", "iggy")
+            .WithEnvironment("IGGY_ROOT_PASSWORD", "iggy")
+            .WithEnvironment("IGGY_SYSTEM_PATH", "local_data_follower")
+            .WithEnvironment("IGGY_TCP_ADDRESS", $"0.0.0.0:{_followerTcpPort}")
+            .WithEnvironment("IGGY_HTTP_ADDRESS", 
$"0.0.0.0:{_followerHttpPort}")
+            .WithEnvironment("IGGY_QUIC_ADDRESS", 
$"0.0.0.0:{_followerQuicPort}")
+            .WithEnvironment("IGGY_WEBSOCKET_ADDRESS", 
$"0.0.0.0:{_followerWsPort}")
+            .WithEnvironment("IGGY_CLUSTER_ENABLED", "true")
+            .WithEnvironment("IGGY_CLUSTER_NAME", "test-cluster")
+            .WithEnvironment("IGGY_CLUSTER_NODE_CURRENT_NAME", "follower-node")
+            .WithEnvironment("IGGY_CLUSTER_NODE_CURRENT_IP", "127.0.0.1")
+            .WithEnvironment("IGGY_CLUSTER_NODE_OTHERS_0_NAME", "leader-node")
+            .WithEnvironment("IGGY_CLUSTER_NODE_OTHERS_0_IP", "127.0.0.1")
+            .WithEnvironment("IGGY_CLUSTER_NODE_OTHERS_0_PORTS_TCP", 
_leaderTcpPort.ToString())
+            .WithEnvironment("IGGY_CLUSTER_NODE_OTHERS_0_PORTS_HTTP", 
_leaderHttpPort.ToString())
+            .WithEnvironment("IGGY_CLUSTER_NODE_OTHERS_0_PORTS_QUIC", 
_leaderQuicPort.ToString())
+            .WithEnvironment("IGGY_CLUSTER_NODE_OTHERS_0_PORTS_WEBSOCKET", 
_leaderWsPort.ToString())
+            .WithPrivileged(true)
+            .WithCleanUp(true)
+            
.WithWaitStrategy(Wait.ForUnixContainer().UntilInternalTcpPortIsAvailable(_followerTcpPort))
+            .Build();
+    }
+
+    public async ValueTask DisposeAsync()
+    {
+        await SaveContainerLogsAsync(_leaderContainer, "leader");
+        await SaveContainerLogsAsync(_followerContainer, "follower");
+        await _followerContainer.StopAsync();
+        await _leaderContainer.StopAsync();
+        await _network.DeleteAsync();
+    }
+
+    public async Task InitializeAsync()
+    {
+        await _network.CreateAsync();
+        await Task.WhenAll(_leaderContainer.StartAsync(), 
_followerContainer.StartAsync());
+    }
+
+    public string GetLeaderAddress()
+    {
+        return $"127.0.0.1:{_leaderTcpPort}";
+    }
+
+    public string GetFollowerAddress()
+    {
+        return $"127.0.0.1:{_followerTcpPort}";
+    }
+
+    private static ushort GetRandomPort()
+    {
+        lock (UsedPorts)
+        {
+            ushort port;
+            do
+            {
+                port = (ushort)Random.Next(30000, 40000);
+            } while (!UsedPorts.Add(port));
+
+            return port;
+        }
+    }
+
+    private static async Task SaveContainerLogsAsync(IContainer container, 
string role)
+    {
+        if (string.IsNullOrEmpty(LogDirectory))
+        {
+            return;
+        }
+
+        try
+        {
+            Directory.CreateDirectory(LogDirectory);
+            var dotnetVersion = 
$"net{Environment.Version.Major}.{Environment.Version.Minor}";
+            var logFilePath = Path.Combine(LogDirectory, 
$"iggy-{role}-{dotnetVersion}-{container.Name}.log");
+
+            var (stdout, stderr) = await container.GetLogsAsync();
+
+            await using var writer = new StreamWriter(logFilePath);
+            if (!string.IsNullOrEmpty(stdout))
+            {
+                await writer.WriteLineAsync("=== STDOUT ===");
+                await writer.WriteLineAsync(stdout);
+            }
+
+            if (!string.IsNullOrEmpty(stderr))
+            {
+                await writer.WriteLineAsync("=== STDERR ===");
+                await writer.WriteLineAsync(stderr);
+            }
+        }
+        catch (Exception ex)
+        {
+            Console.WriteLine($"Failed to save {role} container logs: 
{ex.Message}");
+        }
+    }
+}
diff --git a/foreign/csharp/Iggy_SDK/IggyClient/IIggyClient.cs 
b/foreign/csharp/Iggy_SDK/IggyClient/IIggyClient.cs
index 17d88baaa..d8b28296c 100644
--- a/foreign/csharp/Iggy_SDK/IggyClient/IIggyClient.cs
+++ b/foreign/csharp/Iggy_SDK/IggyClient/IIggyClient.cs
@@ -43,4 +43,10 @@ public interface IIggyClient : IIggyPublisher, IIggyStream, 
IIggyTopic, IIggyCon
     /// </remarks>
     /// <param name="callback">The method previously registered for connection 
event notifications to be removed.</param>
     void UnsubscribeConnectionEvents(Func<ConnectionStateChangedEventArgs, 
Task> callback);
+
+    /// <summary>
+    ///     Gets the current address of the client.
+    /// </summary>
+    /// <returns>The current address of the client.</returns>
+    string GetCurrentAddress();
 }
diff --git 
a/foreign/csharp/Iggy_SDK/IggyClient/Implementations/HttpMessageStream.cs 
b/foreign/csharp/Iggy_SDK/IggyClient/Implementations/HttpMessageStream.cs
index 958d6643e..6f1c5a0ce 100644
--- a/foreign/csharp/Iggy_SDK/IggyClient/Implementations/HttpMessageStream.cs
+++ b/foreign/csharp/Iggy_SDK/IggyClient/Implementations/HttpMessageStream.cs
@@ -177,7 +177,8 @@ public class HttpMessageStream : IIggyClient
         CancellationToken token = default)
     {
         var json = JsonSerializer.Serialize(
-            new UpdateTopicRequest(name, compressionAlgorithm, maxTopicSize, 
DurationHelpers.ToDuration(messageExpiry), replicationFactor),
+            new UpdateTopicRequest(name, compressionAlgorithm, maxTopicSize, 
DurationHelpers.ToDuration(messageExpiry),
+                replicationFactor),
             _jsonSerializerOptions);
         var data = new StringContent(json, Encoding.UTF8, "application/json");
         var response = await 
_httpClient.PutAsync($"/streams/{streamId}/topics/{topicId}", data, token);
@@ -818,6 +819,12 @@ public class HttpMessageStream : IIggyClient
     {
     }
 
+    /// <inheritdoc />
+    public string GetCurrentAddress()
+    {
+        return _httpClient.BaseAddress?.ToString() ?? string.Empty;
+    }
+
     private static async Task HandleResponseAsync(HttpResponseMessage 
response, bool shouldThrowOnGetNotFound = false)
     {
         if ((int)response.StatusCode > 300
diff --git 
a/foreign/csharp/Iggy_SDK/IggyClient/Implementations/TcpMessageStream.cs 
b/foreign/csharp/Iggy_SDK/IggyClient/Implementations/TcpMessageStream.cs
index b3223673f..93a85e13e 100644
--- a/foreign/csharp/Iggy_SDK/IggyClient/Implementations/TcpMessageStream.cs
+++ b/foreign/csharp/Iggy_SDK/IggyClient/Implementations/TcpMessageStream.cs
@@ -47,7 +47,7 @@ public sealed class TcpMessageStream : IIggyClient
     private readonly SemaphoreSlim _connectionSemaphore;
     private readonly ILogger<TcpMessageStream> _logger;
     private readonly SemaphoreSlim _sendingSemaphore;
-    private ClusterNode? _currentLeaderNode;
+    private string _currentAddress = string.Empty;
     private X509Certificate2Collection _customCaStore = [];
     private bool _isConnecting;
     private DateTimeOffset _lastConnectionTime;
@@ -89,6 +89,12 @@ public sealed class TcpMessageStream : IIggyClient
         _connectionEvents.Unsubscribe(callback);
     }
 
+    /// <inheritdoc />
+    public string GetCurrentAddress()
+    {
+        return _currentAddress;
+    }
+
     /// <inheritdoc />
     public async Task<StreamResponse?> CreateStreamAsync(string name, 
CancellationToken token = default)
     {
@@ -736,6 +742,13 @@ public sealed class TcpMessageStream : IIggyClient
 
         var userId = 
BinaryPrimitives.ReadInt32LittleEndian(responseBuffer.AsSpan()[..responseBuffer.Length]);
         SetConnectionStateAsync(ConnectionState.Authenticated);
+
+        if (await RedirectAsync(token))
+        {
+            await ConnectAsync(token);
+            return await LoginUser(userName, password, token);
+        }
+
         var authResponse = new AuthResponse(userId, null);
         return authResponse;
     }
@@ -803,6 +816,7 @@ public sealed class TcpMessageStream : IIggyClient
         var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + 
message.Length];
         TcpMessageStreamHelpers.CreatePayload(payload, message, 
CommandCodes.LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE);
 
+        SetConnectionStateAsync(ConnectionState.Authenticating);
         var responseBuffer = await SendWithResponseAsync(payload, ct);
 
         if (responseBuffer.Length <= 1)
@@ -812,8 +826,15 @@ public sealed class TcpMessageStream : IIggyClient
 
         var userId = 
BinaryPrimitives.ReadInt32LittleEndian(responseBuffer.AsSpan()[..4]);
 
-        //TODO: Figure out how to solve this workaround about default of 
TokenInfo
-        return new AuthResponse(userId, default);
+        SetConnectionStateAsync(ConnectionState.Authenticated);
+
+        if (await RedirectAsync(ct))
+        {
+            await ConnectAsync(ct);
+            return await LoginWithPersonalAccessToken(token, ct);
+        }
+
+        return new AuthResponse(userId, null);
     }
 
     private async Task TryEstablishConnectionAsync(CancellationToken token)
@@ -826,10 +847,12 @@ public sealed class TcpMessageStream : IIggyClient
             _stream?.Close();
             _stream?.Dispose();
 
-            var connectionAddress = _currentLeaderNode != null
-                ? $"{_currentLeaderNode.Ip}:{_currentLeaderNode.Endpoints.Tcp}"
-                : _configuration.BaseAddress;
-            var urlPortSplitter = connectionAddress.Split(":");
+            if (string.IsNullOrEmpty(_currentAddress))
+            {
+                _currentAddress = _configuration.BaseAddress;
+            }
+
+            var urlPortSplitter = _currentAddress.Split(":");
             if (urlPortSplitter.Length > 2)
             {
                 throw new InvalidBaseAddressException();
@@ -861,22 +884,6 @@ public sealed class TcpMessageStream : IIggyClient
                         _configuration.AutoLoginSettings.Username);
                     await LoginUser(_configuration.AutoLoginSettings.Username,
                         _configuration.AutoLoginSettings.Password, token);
-
-                    _currentLeaderNode = await 
GetCurrentLeaderNodeAsync(token);
-                    if (_currentLeaderNode == null)
-                    {
-                        break;
-                    }
-
-                    var currentAddress = 
$"{_currentLeaderNode.Ip}:{_currentLeaderNode.Endpoints.Tcp}";
-                    if (currentAddress == _configuration.BaseAddress)
-                    {
-                        break;
-                    }
-
-                    _logger.LogInformation("Leader address changed. Trying to 
reconnect to {Address}",
-                        currentAddress);
-                    continue;
                 }
 
                 break;
@@ -1204,4 +1211,28 @@ public sealed class TcpMessageStream : IIggyClient
 
         return false;
     }
+
+    private async Task<bool> RedirectAsync(CancellationToken token)
+    {
+        var currentLeaderNode = await GetCurrentLeaderNodeAsync(token);
+        if (currentLeaderNode == null)
+        {
+            return false;
+        }
+
+        var leaderAddress = 
$"{currentLeaderNode.Ip}:{currentLeaderNode.Endpoints.Tcp}";
+        if (leaderAddress == _currentAddress)
+        {
+            return false;
+        }
+
+        _currentAddress = leaderAddress;
+
+        _logger.LogInformation("Leader address changed. Trying to reconnect to 
{Address}",
+            leaderAddress);
+
+        _stream.Close();
+        SetConnectionStateAsync(ConnectionState.Disconnected);
+        return true;
+    }
 }

Reply via email to