This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 4f1044dce fix(csharp): fix .net node redirection in tcp client (#2843)
4f1044dce is described below
commit 4f1044dcec0855cfc0781c2e06644f53c1f74b61
Author: Ćukasz Zborek <[email protected]>
AuthorDate: Tue Mar 3 10:27:58 2026 +0100
fix(csharp): fix .net node redirection in tcp client (#2843)
---
.../ClusterRedirectionTests.cs | 129 +++++++++++++
.../Fixtures/IggyClusterFixture.cs | 202 +++++++++++++++++++++
foreign/csharp/Iggy_SDK/IggyClient/IIggyClient.cs | 6 +
.../Implementations/HttpMessageStream.cs | 9 +-
.../IggyClient/Implementations/TcpMessageStream.cs | 77 +++++---
5 files changed, 399 insertions(+), 24 deletions(-)
diff --git
a/foreign/csharp/Iggy_SDK.Tests.Integration/ClusterRedirectionTests.cs
b/foreign/csharp/Iggy_SDK.Tests.Integration/ClusterRedirectionTests.cs
new file mode 100644
index 000000000..1ea2c594c
--- /dev/null
+++ b/foreign/csharp/Iggy_SDK.Tests.Integration/ClusterRedirectionTests.cs
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Apache.Iggy.Configuration;
+using Apache.Iggy.Contracts;
+using Apache.Iggy.Enums;
+using Apache.Iggy.Factory;
+using Apache.Iggy.Tests.Integrations.Fixtures;
+using Shouldly;
+
+namespace Apache.Iggy.Tests.Integrations;
+
+public class ClusterRedirectionTests
+{
+ [ClassDataSource<IggyClusterFixture>(Shared = SharedType.PerAssembly)]
+ public required IggyClusterFixture Fixture { get; init; }
+
+ [Test]
+ public async Task
ConnectToFollower_Should_ReturnClusterMetadataWithTwoNodes()
+ {
+ using var client = IggyClientFactory.CreateClient(new
IggyClientConfigurator
+ {
+ BaseAddress = Fixture.GetFollowerAddress(),
+ Protocol = Protocol.Tcp,
+ ReconnectionSettings = new ReconnectionSettings { Enabled = false
},
+ AutoLoginSettings = new AutoLoginSettings { Enabled = false }
+ });
+ await client.ConnectAsync();
+ await client.LoginUser("iggy", "iggy");
+
+ var metadata = await client.GetClusterMetadataAsync();
+
+ metadata.ShouldNotBeNull();
+ metadata.Name.ShouldBe("test-cluster");
+ metadata.Nodes.Length.ShouldBe(2);
+ metadata.Nodes.ShouldContain(n => n.Role == ClusterNodeRole.Leader);
+ metadata.Nodes.ShouldContain(n => n.Role == ClusterNodeRole.Follower);
+ }
+
+ [Test]
+ public async Task ConnectToFollowerWithAutoLogin_Should_RedirectToLeader()
+ {
+ using var client = IggyClientFactory.CreateClient(new
IggyClientConfigurator
+ {
+ BaseAddress = Fixture.GetFollowerAddress(),
+ Protocol = Protocol.Tcp,
+ ReconnectionSettings = new ReconnectionSettings { Enabled = true },
+ AutoLoginSettings = new AutoLoginSettings
+ {
+ Enabled = true,
+ Username = "iggy",
+ Password = "iggy"
+ }
+ });
+ await client.ConnectAsync();
+
+ var address = client.GetCurrentAddress();
+ address.ShouldNotBeNullOrEmpty();
+ address.ShouldBe(Fixture.GetLeaderAddress());
+ }
+
+ [Test]
+ public async Task
ConnectToFollowerWithManualLogin_Should_RedirectToLeader()
+ {
+ using var client = IggyClientFactory.CreateClient(new
IggyClientConfigurator
+ {
+ BaseAddress = Fixture.GetFollowerAddress(),
+ Protocol = Protocol.Tcp,
+ ReconnectionSettings = new ReconnectionSettings { Enabled = true },
+ AutoLoginSettings = new AutoLoginSettings { Enabled = false }
+ });
+ await client.ConnectAsync();
+ await client.LoginUser("iggy", "iggy");
+
+ var address = client.GetCurrentAddress();
+ address.ShouldNotBeNullOrEmpty();
+ address.ShouldBe(Fixture.GetLeaderAddress());
+ }
+
+ [Test]
+ [Skip("Currently personal access token exist only on leader. Unskip when
it will be available on follower.")]
+ public async Task
ConnectToFollowerWithPersonalAccessToken_Should_RedirectToLeader()
+ {
+ using var leaderClient = IggyClientFactory.CreateClient(new
IggyClientConfigurator
+ {
+ BaseAddress = Fixture.GetLeaderAddress(),
+ Protocol = Protocol.Tcp,
+ ReconnectionSettings = new ReconnectionSettings { Enabled = false
},
+ AutoLoginSettings = new AutoLoginSettings { Enabled = false }
+ });
+ await leaderClient.ConnectAsync();
+ await leaderClient.LoginUser("iggy", "iggy");
+
+ var name = $"pat-{Guid.NewGuid():N}"[..20];
+ var pat = await leaderClient.CreatePersonalAccessTokenAsync(name,
TimeSpan.FromHours(1));
+ pat.ShouldNotBeNull();
+
+ using var client = IggyClientFactory.CreateClient(new
IggyClientConfigurator
+ {
+ BaseAddress = Fixture.GetFollowerAddress(),
+ Protocol = Protocol.Tcp,
+ ReconnectionSettings = new ReconnectionSettings { Enabled = true },
+ AutoLoginSettings = new AutoLoginSettings { Enabled = false }
+ });
+ await client.ConnectAsync();
+ var authResponse = await
client.LoginWithPersonalAccessToken(pat.Token);
+
+ authResponse.ShouldNotBeNull();
+ authResponse.UserId.ShouldBeGreaterThanOrEqualTo(0);
+
+ var address = client.GetCurrentAddress();
+ address.ShouldNotBeNullOrEmpty();
+ address.ShouldBe(Fixture.GetLeaderAddress());
+ }
+}
diff --git
a/foreign/csharp/Iggy_SDK.Tests.Integration/Fixtures/IggyClusterFixture.cs
b/foreign/csharp/Iggy_SDK.Tests.Integration/Fixtures/IggyClusterFixture.cs
new file mode 100644
index 000000000..32b026a85
--- /dev/null
+++ b/foreign/csharp/Iggy_SDK.Tests.Integration/Fixtures/IggyClusterFixture.cs
@@ -0,0 +1,202 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using DotNet.Testcontainers.Builders;
+using DotNet.Testcontainers.Containers;
+using DotNet.Testcontainers.Networks;
+using TUnit.Core.Interfaces;
+
+namespace Apache.Iggy.Tests.Integrations.Fixtures;
+
+public class IggyClusterFixture : IAsyncInitializer, IAsyncDisposable
+{
+ private const string LeaderAlias = "iggy-leader";
+ private const string FollowerAlias = "iggy-follower";
+
+ private static readonly Random Random = new();
+ private static readonly HashSet<ushort> UsedPorts = [];
+ private readonly IContainer _followerContainer;
+ private readonly ushort _followerHttpPort;
+ private readonly ushort _followerQuicPort;
+
+ private readonly ushort _followerTcpPort;
+ private readonly ushort _followerWsPort;
+ private readonly IContainer _leaderContainer;
+ private readonly ushort _leaderHttpPort;
+ private readonly ushort _leaderQuicPort;
+
+ private readonly ushort _leaderTcpPort;
+ private readonly ushort _leaderWsPort;
+
+ private readonly INetwork _network;
+
+ private string DockerImage =>
+ Environment.GetEnvironmentVariable("IGGY_SERVER_DOCKER_IMAGE") ??
"apache/iggy:edge";
+
+ private static string? LogDirectory =>
+ Environment.GetEnvironmentVariable("IGGY_TEST_LOGS_DIR");
+
+ public IggyClusterFixture()
+ {
+ _leaderTcpPort = GetRandomPort();
+ _leaderHttpPort = GetRandomPort();
+ _leaderQuicPort = GetRandomPort();
+ _leaderWsPort = GetRandomPort();
+ _followerTcpPort = GetRandomPort();
+ _followerHttpPort = GetRandomPort();
+ _followerQuicPort = GetRandomPort();
+ _followerWsPort = GetRandomPort();
+
+ _network = new NetworkBuilder()
+ .WithName($"iggy-cluster-{Guid.NewGuid():N}")
+ .Build();
+
+ _leaderContainer = new ContainerBuilder(DockerImage)
+ .WithName($"iggy-leader-{Guid.NewGuid():N}")
+ .WithNetwork(_network)
+ .WithNetworkAliases(LeaderAlias)
+ .WithPortBinding(_leaderTcpPort.ToString(),
_leaderTcpPort.ToString())
+ .WithPortBinding(_leaderHttpPort.ToString(),
_leaderHttpPort.ToString())
+ .WithEnvironment("RUST_LOG", "trace")
+ .WithEnvironment("IGGY_SYSTEM_LOGGING_LEVEL", "trace")
+ .WithEnvironment("IGGY_ROOT_USERNAME", "iggy")
+ .WithEnvironment("IGGY_ROOT_PASSWORD", "iggy")
+ .WithEnvironment("IGGY_SYSTEM_PATH", "local_data_leader")
+ .WithEnvironment("IGGY_TCP_ADDRESS", $"0.0.0.0:{_leaderTcpPort}")
+ .WithEnvironment("IGGY_HTTP_ADDRESS", $"0.0.0.0:{_leaderHttpPort}")
+ .WithEnvironment("IGGY_QUIC_ADDRESS", $"0.0.0.0:{_leaderQuicPort}")
+ .WithEnvironment("IGGY_WEBSOCKET_ADDRESS",
$"0.0.0.0:{_leaderWsPort}")
+ .WithEnvironment("IGGY_CLUSTER_ENABLED", "true")
+ .WithEnvironment("IGGY_CLUSTER_NAME", "test-cluster")
+ .WithEnvironment("IGGY_CLUSTER_NODE_CURRENT_NAME", "leader-node")
+ .WithEnvironment("IGGY_CLUSTER_NODE_CURRENT_IP", "127.0.0.1")
+ .WithEnvironment("IGGY_CLUSTER_NODE_OTHERS_0_NAME",
"follower-node")
+ .WithEnvironment("IGGY_CLUSTER_NODE_OTHERS_0_IP", "127.0.0.1")
+ .WithEnvironment("IGGY_CLUSTER_NODE_OTHERS_0_PORTS_TCP",
_followerTcpPort.ToString())
+ .WithEnvironment("IGGY_CLUSTER_NODE_OTHERS_0_PORTS_HTTP",
_followerHttpPort.ToString())
+ .WithEnvironment("IGGY_CLUSTER_NODE_OTHERS_0_PORTS_QUIC",
_followerQuicPort.ToString())
+ .WithEnvironment("IGGY_CLUSTER_NODE_OTHERS_0_PORTS_WEBSOCKET",
_followerWsPort.ToString())
+ .WithPrivileged(true)
+ .WithCleanUp(true)
+
.WithWaitStrategy(Wait.ForUnixContainer().UntilInternalTcpPortIsAvailable(_leaderTcpPort))
+ .Build();
+
+ _followerContainer = new ContainerBuilder(DockerImage)
+ .WithName($"iggy-follower-{Guid.NewGuid():N}")
+ .WithCommand("--follower")
+ .WithNetwork(_network)
+ .WithNetworkAliases(FollowerAlias)
+ .WithPortBinding(_followerTcpPort.ToString(),
_followerTcpPort.ToString())
+ .WithPortBinding(_followerHttpPort.ToString(),
_followerHttpPort.ToString())
+ .WithEnvironment("RUST_LOG", "trace")
+ .WithEnvironment("IGGY_SYSTEM_LOGGING_LEVEL", "trace")
+ .WithEnvironment("IGGY_ROOT_USERNAME", "iggy")
+ .WithEnvironment("IGGY_ROOT_PASSWORD", "iggy")
+ .WithEnvironment("IGGY_SYSTEM_PATH", "local_data_follower")
+ .WithEnvironment("IGGY_TCP_ADDRESS", $"0.0.0.0:{_followerTcpPort}")
+ .WithEnvironment("IGGY_HTTP_ADDRESS",
$"0.0.0.0:{_followerHttpPort}")
+ .WithEnvironment("IGGY_QUIC_ADDRESS",
$"0.0.0.0:{_followerQuicPort}")
+ .WithEnvironment("IGGY_WEBSOCKET_ADDRESS",
$"0.0.0.0:{_followerWsPort}")
+ .WithEnvironment("IGGY_CLUSTER_ENABLED", "true")
+ .WithEnvironment("IGGY_CLUSTER_NAME", "test-cluster")
+ .WithEnvironment("IGGY_CLUSTER_NODE_CURRENT_NAME", "follower-node")
+ .WithEnvironment("IGGY_CLUSTER_NODE_CURRENT_IP", "127.0.0.1")
+ .WithEnvironment("IGGY_CLUSTER_NODE_OTHERS_0_NAME", "leader-node")
+ .WithEnvironment("IGGY_CLUSTER_NODE_OTHERS_0_IP", "127.0.0.1")
+ .WithEnvironment("IGGY_CLUSTER_NODE_OTHERS_0_PORTS_TCP",
_leaderTcpPort.ToString())
+ .WithEnvironment("IGGY_CLUSTER_NODE_OTHERS_0_PORTS_HTTP",
_leaderHttpPort.ToString())
+ .WithEnvironment("IGGY_CLUSTER_NODE_OTHERS_0_PORTS_QUIC",
_leaderQuicPort.ToString())
+ .WithEnvironment("IGGY_CLUSTER_NODE_OTHERS_0_PORTS_WEBSOCKET",
_leaderWsPort.ToString())
+ .WithPrivileged(true)
+ .WithCleanUp(true)
+
.WithWaitStrategy(Wait.ForUnixContainer().UntilInternalTcpPortIsAvailable(_followerTcpPort))
+ .Build();
+ }
+
+ public async ValueTask DisposeAsync()
+ {
+ await SaveContainerLogsAsync(_leaderContainer, "leader");
+ await SaveContainerLogsAsync(_followerContainer, "follower");
+ await _followerContainer.StopAsync();
+ await _leaderContainer.StopAsync();
+ await _network.DeleteAsync();
+ }
+
+ public async Task InitializeAsync()
+ {
+ await _network.CreateAsync();
+ await Task.WhenAll(_leaderContainer.StartAsync(),
_followerContainer.StartAsync());
+ }
+
+ public string GetLeaderAddress()
+ {
+ return $"127.0.0.1:{_leaderTcpPort}";
+ }
+
+ public string GetFollowerAddress()
+ {
+ return $"127.0.0.1:{_followerTcpPort}";
+ }
+
+ private static ushort GetRandomPort()
+ {
+ lock (UsedPorts)
+ {
+ ushort port;
+ do
+ {
+ port = (ushort)Random.Next(30000, 40000);
+ } while (!UsedPorts.Add(port));
+
+ return port;
+ }
+ }
+
+ private static async Task SaveContainerLogsAsync(IContainer container,
string role)
+ {
+ if (string.IsNullOrEmpty(LogDirectory))
+ {
+ return;
+ }
+
+ try
+ {
+ Directory.CreateDirectory(LogDirectory);
+ var dotnetVersion =
$"net{Environment.Version.Major}.{Environment.Version.Minor}";
+ var logFilePath = Path.Combine(LogDirectory,
$"iggy-{role}-{dotnetVersion}-{container.Name}.log");
+
+ var (stdout, stderr) = await container.GetLogsAsync();
+
+ await using var writer = new StreamWriter(logFilePath);
+ if (!string.IsNullOrEmpty(stdout))
+ {
+ await writer.WriteLineAsync("=== STDOUT ===");
+ await writer.WriteLineAsync(stdout);
+ }
+
+ if (!string.IsNullOrEmpty(stderr))
+ {
+ await writer.WriteLineAsync("=== STDERR ===");
+ await writer.WriteLineAsync(stderr);
+ }
+ }
+ catch (Exception ex)
+ {
+ Console.WriteLine($"Failed to save {role} container logs:
{ex.Message}");
+ }
+ }
+}
diff --git a/foreign/csharp/Iggy_SDK/IggyClient/IIggyClient.cs
b/foreign/csharp/Iggy_SDK/IggyClient/IIggyClient.cs
index 17d88baaa..d8b28296c 100644
--- a/foreign/csharp/Iggy_SDK/IggyClient/IIggyClient.cs
+++ b/foreign/csharp/Iggy_SDK/IggyClient/IIggyClient.cs
@@ -43,4 +43,10 @@ public interface IIggyClient : IIggyPublisher, IIggyStream,
IIggyTopic, IIggyCon
/// </remarks>
/// <param name="callback">The method previously registered for connection
event notifications to be removed.</param>
void UnsubscribeConnectionEvents(Func<ConnectionStateChangedEventArgs,
Task> callback);
+
+ /// <summary>
+ /// Gets the current address of the client.
+ /// </summary>
+ /// <returns>The current address of the client.</returns>
+ string GetCurrentAddress();
}
diff --git
a/foreign/csharp/Iggy_SDK/IggyClient/Implementations/HttpMessageStream.cs
b/foreign/csharp/Iggy_SDK/IggyClient/Implementations/HttpMessageStream.cs
index 958d6643e..6f1c5a0ce 100644
--- a/foreign/csharp/Iggy_SDK/IggyClient/Implementations/HttpMessageStream.cs
+++ b/foreign/csharp/Iggy_SDK/IggyClient/Implementations/HttpMessageStream.cs
@@ -177,7 +177,8 @@ public class HttpMessageStream : IIggyClient
CancellationToken token = default)
{
var json = JsonSerializer.Serialize(
- new UpdateTopicRequest(name, compressionAlgorithm, maxTopicSize,
DurationHelpers.ToDuration(messageExpiry), replicationFactor),
+ new UpdateTopicRequest(name, compressionAlgorithm, maxTopicSize,
DurationHelpers.ToDuration(messageExpiry),
+ replicationFactor),
_jsonSerializerOptions);
var data = new StringContent(json, Encoding.UTF8, "application/json");
var response = await
_httpClient.PutAsync($"/streams/{streamId}/topics/{topicId}", data, token);
@@ -818,6 +819,12 @@ public class HttpMessageStream : IIggyClient
{
}
+ /// <inheritdoc />
+ public string GetCurrentAddress()
+ {
+ return _httpClient.BaseAddress?.ToString() ?? string.Empty;
+ }
+
private static async Task HandleResponseAsync(HttpResponseMessage
response, bool shouldThrowOnGetNotFound = false)
{
if ((int)response.StatusCode > 300
diff --git
a/foreign/csharp/Iggy_SDK/IggyClient/Implementations/TcpMessageStream.cs
b/foreign/csharp/Iggy_SDK/IggyClient/Implementations/TcpMessageStream.cs
index b3223673f..93a85e13e 100644
--- a/foreign/csharp/Iggy_SDK/IggyClient/Implementations/TcpMessageStream.cs
+++ b/foreign/csharp/Iggy_SDK/IggyClient/Implementations/TcpMessageStream.cs
@@ -47,7 +47,7 @@ public sealed class TcpMessageStream : IIggyClient
private readonly SemaphoreSlim _connectionSemaphore;
private readonly ILogger<TcpMessageStream> _logger;
private readonly SemaphoreSlim _sendingSemaphore;
- private ClusterNode? _currentLeaderNode;
+ private string _currentAddress = string.Empty;
private X509Certificate2Collection _customCaStore = [];
private bool _isConnecting;
private DateTimeOffset _lastConnectionTime;
@@ -89,6 +89,12 @@ public sealed class TcpMessageStream : IIggyClient
_connectionEvents.Unsubscribe(callback);
}
+ /// <inheritdoc />
+ public string GetCurrentAddress()
+ {
+ return _currentAddress;
+ }
+
/// <inheritdoc />
public async Task<StreamResponse?> CreateStreamAsync(string name,
CancellationToken token = default)
{
@@ -736,6 +742,13 @@ public sealed class TcpMessageStream : IIggyClient
var userId =
BinaryPrimitives.ReadInt32LittleEndian(responseBuffer.AsSpan()[..responseBuffer.Length]);
SetConnectionStateAsync(ConnectionState.Authenticated);
+
+ if (await RedirectAsync(token))
+ {
+ await ConnectAsync(token);
+ return await LoginUser(userName, password, token);
+ }
+
var authResponse = new AuthResponse(userId, null);
return authResponse;
}
@@ -803,6 +816,7 @@ public sealed class TcpMessageStream : IIggyClient
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH +
message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message,
CommandCodes.LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE);
+ SetConnectionStateAsync(ConnectionState.Authenticating);
var responseBuffer = await SendWithResponseAsync(payload, ct);
if (responseBuffer.Length <= 1)
@@ -812,8 +826,15 @@ public sealed class TcpMessageStream : IIggyClient
var userId =
BinaryPrimitives.ReadInt32LittleEndian(responseBuffer.AsSpan()[..4]);
- //TODO: Figure out how to solve this workaround about default of
TokenInfo
- return new AuthResponse(userId, default);
+ SetConnectionStateAsync(ConnectionState.Authenticated);
+
+ if (await RedirectAsync(ct))
+ {
+ await ConnectAsync(ct);
+ return await LoginWithPersonalAccessToken(token, ct);
+ }
+
+ return new AuthResponse(userId, null);
}
private async Task TryEstablishConnectionAsync(CancellationToken token)
@@ -826,10 +847,12 @@ public sealed class TcpMessageStream : IIggyClient
_stream?.Close();
_stream?.Dispose();
- var connectionAddress = _currentLeaderNode != null
- ? $"{_currentLeaderNode.Ip}:{_currentLeaderNode.Endpoints.Tcp}"
- : _configuration.BaseAddress;
- var urlPortSplitter = connectionAddress.Split(":");
+ if (string.IsNullOrEmpty(_currentAddress))
+ {
+ _currentAddress = _configuration.BaseAddress;
+ }
+
+ var urlPortSplitter = _currentAddress.Split(":");
if (urlPortSplitter.Length > 2)
{
throw new InvalidBaseAddressException();
@@ -861,22 +884,6 @@ public sealed class TcpMessageStream : IIggyClient
_configuration.AutoLoginSettings.Username);
await LoginUser(_configuration.AutoLoginSettings.Username,
_configuration.AutoLoginSettings.Password, token);
-
- _currentLeaderNode = await
GetCurrentLeaderNodeAsync(token);
- if (_currentLeaderNode == null)
- {
- break;
- }
-
- var currentAddress =
$"{_currentLeaderNode.Ip}:{_currentLeaderNode.Endpoints.Tcp}";
- if (currentAddress == _configuration.BaseAddress)
- {
- break;
- }
-
- _logger.LogInformation("Leader address changed. Trying to
reconnect to {Address}",
- currentAddress);
- continue;
}
break;
@@ -1204,4 +1211,28 @@ public sealed class TcpMessageStream : IIggyClient
return false;
}
+
+ private async Task<bool> RedirectAsync(CancellationToken token)
+ {
+ var currentLeaderNode = await GetCurrentLeaderNodeAsync(token);
+ if (currentLeaderNode == null)
+ {
+ return false;
+ }
+
+ var leaderAddress =
$"{currentLeaderNode.Ip}:{currentLeaderNode.Endpoints.Tcp}";
+ if (leaderAddress == _currentAddress)
+ {
+ return false;
+ }
+
+ _currentAddress = leaderAddress;
+
+ _logger.LogInformation("Leader address changed. Trying to reconnect to
{Address}",
+ leaderAddress);
+
+ _stream.Close();
+ SetConnectionStateAsync(ConnectionState.Disconnected);
+ return true;
+ }
}