This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch cluster-metadata in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 53921672a9490ebf9e30f641d4b07c68e6ed5bf0 Author: Hubert Gruszecki <[email protected]> AuthorDate: Mon Nov 17 11:05:47 2025 +0100 feat(cluster): implement leader-aware connect mechanism in Rust SDK Implement automatic client redirection to cluster leader when initially connected to follower nodes. Ensures write operations always reach the leader while maintaining transparent SDK experience. Rust SDK changes: - Add leader_aware reconnection logic to TCP, QUIC, and WS clients - Implement connection info tracking for monitoring client state - Auto-discovery and reconnection to leader from cluster metadata - Update client APIs with get_connection_info() method Server changes: - Add --follower CLI flag to start node in follower-only mode - Enhance cluster configuration validation (node topology, transport) - Update TCP/QUIC/WebSocket impl with cluster-aware connection handling Testing infrastructure: - BDD test suite (leader_redirection.feature) - Docker Compose cluster setup (leader + follower nodes) - Helper modules for cluster testing and node management --- bdd/docker-compose.yml | 138 ++++++- bdd/rust/Cargo.toml | 13 +- bdd/rust/Dockerfile | 4 +- bdd/rust/tests/common/global_context.rs | 5 - bdd/rust/tests/common/leader_context.rs | 83 +++++ bdd/rust/tests/common/mod.rs | 1 + bdd/rust/tests/helpers/cluster.rs | 141 ++++++++ bdd/rust/tests/helpers/mod.rs | 1 + .../tests/{common/mod.rs => leader_redirection.rs} | 12 +- bdd/rust/tests/steps/leader_redirection.rs | 305 ++++++++++++++++ bdd/rust/tests/steps/mod.rs | 1 + bdd/rust/tests/steps/server.rs | 26 +- bdd/scenarios/leader_redirection.feature | 61 ++++ core/common/src/types/cluster/metadata.rs | 13 +- core/common/src/types/cluster/status.rs | 2 + core/common/src/types/configuration/transport.rs | 4 +- core/configs/server.toml | 10 +- core/sdk/src/client_wrappers/connection_info.rs | 61 ++++ core/sdk/src/client_wrappers/mod.rs | 1 + core/sdk/src/clients/binary_cluster.rs | 5 +- core/sdk/src/clients/binary_users.rs | 84 ++++- core/sdk/src/clients/client.rs | 17 +- core/sdk/src/leader_aware.rs | 209 +++++++++++ core/sdk/src/lib.rs | 1 + core/sdk/src/prelude.rs | 27 +- core/sdk/src/quic/quic_client.rs | 304 +++++++++------- core/sdk/src/tcp/tcp_client.rs | 402 ++++++++++++--------- core/sdk/src/websocket/websocket_client.rs | 171 +++++---- core/server/src/args.rs | 15 + core/server/src/configs/cluster.rs | 3 +- core/server/src/configs/validators.rs | 96 ++++- core/server/src/main.rs | 6 + core/server/src/quic/listener.rs | 38 +- core/server/src/shard/builder.rs | 7 + core/server/src/shard/mod.rs | 1 + core/server/src/shard/system/cluster.rs | 42 ++- core/server/src/tcp/connection_handler.rs | 33 +- 37 files changed, 1855 insertions(+), 488 deletions(-) diff --git a/bdd/docker-compose.yml b/bdd/docker-compose.yml index 86bbc9fa9..b9469dd31 100644 --- a/bdd/docker-compose.yml +++ b/bdd/docker-compose.yml @@ -16,6 +16,7 @@ # under the License. services: + # Original single server for backward compatibility iggy-server: platform: linux/amd64 build: @@ -37,36 +38,159 @@ services: soft: -1 hard: -1 healthcheck: - test: ["CMD", "/usr/local/bin/iggy", "ping"] - interval: 1s + test: ["CMD", "/usr/local/bin/iggy", "--tcp-server-address", "127.0.0.1:8090", "ping"] + interval: 20s timeout: 3s retries: 30 start_period: 2s environment: - - IGGY_ROOT_USERNAME=iggy - - IGGY_ROOT_PASSWORD=iggy - RUST_LOG=info - IGGY_SYSTEM_PATH=local_data - IGGY_TCP_ADDRESS=0.0.0.0:8090 - IGGY_HTTP_ADDRESS=0.0.0.0:3000 - IGGY_QUIC_ADDRESS=0.0.0.0:8080 + - IGGY_WEBSOCKET_ADDRESS=0.0.0.0:8070 volumes: - iggy_data:/app/local_data networks: - iggy-bdd-network + # Leader server for cluster testing + iggy-leader: + platform: linux/amd64 + build: + context: .. + dockerfile: core/server/Dockerfile + target: runtime-prebuilt + args: + PREBUILT_IGGY_SERVER: ${IGGY_SERVER_PATH:-target/debug/iggy-server} + PREBUILT_IGGY_CLI: ${IGGY_CLI_PATH:-target/debug/iggy} + LIBC: glibc + PROFILE: debug + # Leader runs without --follower flag + command: ["--fresh", "--with-default-root-credentials"] + cap_add: + - SYS_NICE + security_opt: + - seccomp:unconfined + ulimits: + memlock: + soft: -1 + hard: -1 + healthcheck: + test: ["CMD", "/usr/local/bin/iggy", "--tcp-server-address", "127.0.0.1:8091", "ping"] + interval: 20s + timeout: 3s + retries: 30 + start_period: 2s + environment: + - RUST_LOG=info + - IGGY_SYSTEM_PATH=local_data_leader + # Server addresses + - IGGY_TCP_ADDRESS=0.0.0.0:8091 + - IGGY_HTTP_ADDRESS=0.0.0.0:3001 + - IGGY_QUIC_ADDRESS=0.0.0.0:8081 + - IGGY_WEBSOCKET_ADDRESS=0.0.0.0:8071 + # Cluster configuration + - IGGY_CLUSTER_ENABLED=true + - IGGY_CLUSTER_NAME=test-cluster + - IGGY_CLUSTER_ID=1 + - IGGY_CLUSTER_TRANSPORT=tcp + # This node's identity + - IGGY_CLUSTER_NODE_ID=0 + - IGGY_CLUSTER_NODE_NAME=leader-node + - IGGY_CLUSTER_NODE_ADDRESS=iggy-leader:8091 + # Cluster nodes configuration (indexed format for array) + - IGGY_CLUSTER_NODES_0_ID=0 + - IGGY_CLUSTER_NODES_0_NAME=leader-node + - IGGY_CLUSTER_NODES_0_ADDRESS=iggy-leader:8091 + - IGGY_CLUSTER_NODES_1_ID=1 + - IGGY_CLUSTER_NODES_1_NAME=follower-node + - IGGY_CLUSTER_NODES_1_ADDRESS=iggy-follower:8092 + volumes: + - iggy_leader_data:/app/local_data_leader + networks: + - iggy-bdd-network + + # Follower server for cluster testing + iggy-follower: + platform: linux/amd64 + build: + context: .. + dockerfile: core/server/Dockerfile + target: runtime-prebuilt + args: + PREBUILT_IGGY_SERVER: ${IGGY_SERVER_PATH:-target/debug/iggy-server} + PREBUILT_IGGY_CLI: ${IGGY_CLI_PATH:-target/debug/iggy} + LIBC: glibc + PROFILE: debug + # Follower runs with --follower flag + command: ["--fresh", "--with-default-root-credentials", "--follower"] + cap_add: + - SYS_NICE + security_opt: + - seccomp:unconfined + ulimits: + memlock: + soft: -1 + hard: -1 + healthcheck: + test: ["CMD", "/usr/local/bin/iggy", "--tcp-server-address", "127.0.0.1:8092", "ping"] + interval: 20s + timeout: 3s + retries: 30 + start_period: 2s + environment: + - RUST_LOG=info + - IGGY_SYSTEM_PATH=local_data_follower + # Server addresses (different ports from leader) + - IGGY_TCP_ADDRESS=0.0.0.0:8092 + - IGGY_HTTP_ADDRESS=0.0.0.0:3002 + - IGGY_QUIC_ADDRESS=0.0.0.0:8082 + - IGGY_WEBSOCKET_ADDRESS=0.0.0.0:8072 + # Cluster configuration (same as leader) + - IGGY_CLUSTER_ENABLED=true + - IGGY_CLUSTER_NAME=test-cluster + - IGGY_CLUSTER_ID=1 + - IGGY_CLUSTER_TRANSPORT=tcp + # This node's identity (different from leader) + - IGGY_CLUSTER_NODE_ID=1 + - IGGY_CLUSTER_NODE_NAME=follower-node + - IGGY_CLUSTER_NODE_ADDRESS=iggy-follower:8092 + # Cluster nodes configuration (indexed format for array) + - IGGY_CLUSTER_NODES_0_ID=0 + - IGGY_CLUSTER_NODES_0_NAME=leader-node + - IGGY_CLUSTER_NODES_0_ADDRESS=iggy-leader:8091 + - IGGY_CLUSTER_NODES_1_ID=1 + - IGGY_CLUSTER_NODES_1_NAME=follower-node + - IGGY_CLUSTER_NODES_1_ADDRESS=iggy-follower:8092 + volumes: + - iggy_follower_data:/app/local_data_follower + networks: + - iggy-bdd-network + rust-bdd: build: context: .. dockerfile: bdd/rust/Dockerfile depends_on: - - iggy-server + iggy-server: + condition: service_healthy + iggy-leader: + condition: service_healthy + iggy-follower: + condition: service_healthy environment: - IGGY_ROOT_USERNAME=iggy - IGGY_ROOT_PASSWORD=iggy - IGGY_TCP_ADDRESS=iggy-server:8090 + # Additional addresses for leader redirection tests + - IGGY_TCP_ADDRESS_LEADER=iggy-leader:8091 + - IGGY_TCP_ADDRESS_FOLLOWER=iggy-follower:8092 + - RUST_LOG=debug volumes: - ./scenarios/basic_messaging.feature:/app/features/basic_messaging.feature + - ./scenarios/leader_redirection.feature:/app/features/leader_redirection.feature command: [ "cargo", @@ -74,8 +198,6 @@ services: "-p", "bdd", "--features", - "iggy-server-in-docker", - "--features", "bdd", ] networks: @@ -149,3 +271,5 @@ networks: volumes: iggy_data: + iggy_leader_data: + iggy_follower_data: diff --git a/bdd/rust/Cargo.toml b/bdd/rust/Cargo.toml index 18f2bf88e..4182a31d9 100644 --- a/bdd/rust/Cargo.toml +++ b/bdd/rust/Cargo.toml @@ -23,7 +23,6 @@ edition = "2024" license = "Apache-2.0" [features] -iggy-server-in-docker = [] bdd = [] [dev-dependencies] @@ -41,11 +40,7 @@ name = "basic_messaging" harness = false required-features = ["bdd"] -# Future test scenarios can be added here: -# [[test]] -# name = "user_management" -# harness = false -# -# [[test]] -# name = "consumer_groups" -# harness = false +[[test]] +name = "leader_redirection" +harness = false +required-features = ["bdd"] diff --git a/bdd/rust/Dockerfile b/bdd/rust/Dockerfile index bdb84fb13..2e8cc8859 100644 --- a/bdd/rust/Dockerfile +++ b/bdd/rust/Dockerfile @@ -27,7 +27,7 @@ COPY . . RUN mkdir -p /app/features # Build tests -RUN cargo test --no-run -p bdd --features "iggy-server-in-docker bdd" +RUN cargo test --no-run -p bdd # Default command will be overridden by docker-compose -CMD ["cargo", "test", "-p", "bdd", "--features", "iggy-server-in-docker", "--features", "bdd"] +CMD ["cargo", "test", "-p", "bdd", "--features", "bdd"] diff --git a/bdd/rust/tests/common/global_context.rs b/bdd/rust/tests/common/global_context.rs index a526f873e..a425c6776 100644 --- a/bdd/rust/tests/common/global_context.rs +++ b/bdd/rust/tests/common/global_context.rs @@ -20,13 +20,8 @@ use cucumber::World; use iggy::clients::client::IggyClient; use iggy::prelude::{IggyMessage, PolledMessages}; -#[cfg(not(feature = "iggy-server-in-docker"))] -use integration::test_server::TestServer; - #[derive(Debug, World, Default)] pub struct GlobalContext { - #[cfg(not(feature = "iggy-server-in-docker"))] - pub server: Option<TestServer>, pub client: Option<IggyClient>, pub server_addr: Option<String>, pub last_stream_id: Option<u32>, diff --git a/bdd/rust/tests/common/leader_context.rs b/bdd/rust/tests/common/leader_context.rs new file mode 100644 index 000000000..0427fa81e --- /dev/null +++ b/bdd/rust/tests/common/leader_context.rs @@ -0,0 +1,83 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use cucumber::World; +use iggy::clients::client::IggyClient; +use iggy::prelude::ClusterNode; +use std::collections::HashMap; + +#[derive(Debug, World, Default)] +pub struct LeaderContext { + // Clients by name (e.g., "main", "A", "B") + pub clients: HashMap<String, IggyClient>, + + // Server configurations + pub servers: ServerConfigs, + + // Cluster state + pub cluster: ClusterState, + + // Test state tracking + pub test_state: TestState, +} + +#[derive(Debug, Default)] +pub struct ServerConfigs { + // Server addresses by role (e.g., "leader" -> "iggy-leader:8091") + pub addresses: HashMap<String, String>, +} + +#[derive(Debug, Default)] +pub struct ClusterState { + pub enabled: bool, + pub nodes: Vec<ClusterNode>, +} + +#[derive(Debug, Default)] +pub struct TestState { + pub redirection_occurred: bool, + pub last_stream_id: Option<u32>, + pub last_stream_name: Option<String>, +} + +impl LeaderContext { + /// Gets or creates a client by name + pub fn get_client(&self, name: &str) -> Option<&IggyClient> { + self.clients.get(name) + } + + /// Stores a client by name + pub fn store_client(&mut self, name: String, client: IggyClient) { + self.clients.insert(name, client); + } + + /// Stores a server address by role + pub fn store_server_addr(&mut self, role: String, addr: String) { + self.servers.addresses.insert(role, addr); + } + + /// Gets a server address by role + pub fn get_server_addr(&self, role: &str) -> Option<&String> { + self.servers.addresses.get(role) + } + + /// Adds a node to the cluster configuration + pub fn add_node(&mut self, node: ClusterNode) { + self.cluster.nodes.push(node); + } +} diff --git a/bdd/rust/tests/common/mod.rs b/bdd/rust/tests/common/mod.rs index 0c28f3a53..16682d2a9 100644 --- a/bdd/rust/tests/common/mod.rs +++ b/bdd/rust/tests/common/mod.rs @@ -17,3 +17,4 @@ */ pub mod global_context; +pub mod leader_context; diff --git a/bdd/rust/tests/helpers/cluster.rs b/bdd/rust/tests/helpers/cluster.rs new file mode 100644 index 000000000..71ef991af --- /dev/null +++ b/bdd/rust/tests/helpers/cluster.rs @@ -0,0 +1,141 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use iggy::clients::client::IggyClient; +use iggy::prelude::*; +use integration::tcp_client::TcpClientFactory; +use integration::test_server::ClientFactory; +use std::env; + +/// Resolves server address based on role and port, checking environment variables first +pub fn resolve_server_address(role: &str, port: u16) -> String { + match (role.to_lowercase().as_str(), port) { + ("leader", 8091) => { + env::var("IGGY_TCP_ADDRESS_LEADER").unwrap_or_else(|_| "iggy-leader:8091".to_string()) + } + ("follower", 8092) => env::var("IGGY_TCP_ADDRESS_FOLLOWER") + .unwrap_or_else(|_| "iggy-follower:8092".to_string()), + ("single", 8090) | (_, 8090) => { + env::var("IGGY_TCP_ADDRESS").unwrap_or_else(|_| "iggy-server:8090".to_string()) + } + _ => format!("iggy-server:{}", port), + } +} + +/// Creates and connects a client to the specified address +pub async fn create_and_connect_client(addr: &str) -> IggyClient { + let client_factory = TcpClientFactory { + server_addr: addr.to_string(), + ..Default::default() + }; + + let client = client_factory.create_client().await; + let client = IggyClient::create(client, None, None); + + client.connect().await.expect("Client should connect"); + client +} + +/// Verifies that a client is connected to the expected port +pub async fn verify_client_connection( + client: &IggyClient, + expected_port: u16, +) -> Result<String, String> { + let conn_info = client.get_connection_info().await; + + if !conn_info + .server_address + .contains(&format!(":{}", expected_port)) + { + return Err(format!( + "Expected connection to port {}, but connected to: {}", + expected_port, conn_info.server_address + )); + } + + // Verify client can communicate + client + .ping() + .await + .map_err(|e| format!("Client cannot ping server: {}", e))?; + + Ok(conn_info.server_address) +} + +/// Checks if cluster metadata contains a healthy leader node +pub async fn verify_leader_in_metadata(client: &IggyClient) -> Result<Option<ClusterNode>, String> { + match client.get_cluster_metadata().await { + Ok(metadata) => { + let leader = metadata.nodes.into_iter().find(|n| { + matches!(n.role, ClusterNodeRole::Leader) + && matches!(n.status, ClusterNodeStatus::Healthy) + }); + Ok(leader) + } + Err(e) if is_clustering_unavailable(&e) => { + // Clustering not enabled, this is OK + Ok(None) + } + Err(e) => Err(format!("Failed to get cluster metadata: {}", e)), + } +} + +/// Checks if an error indicates clustering is not available +pub fn is_clustering_unavailable(error: &IggyError) -> bool { + matches!( + error, + IggyError::FeatureUnavailable | IggyError::InvalidCommand + ) +} + +/// Updates a node's role in the cluster configuration +pub fn update_node_role( + nodes: &mut [ClusterNode], + node_id: u32, + port: u16, + role: ClusterNodeRole, +) -> bool { + if let Some(node) = nodes + .iter_mut() + .find(|n| n.id == node_id && n.address.ends_with(&format!(":{}", port))) + { + node.role = role; + node.status = ClusterNodeStatus::Healthy; + true + } else { + false + } +} + +/// Extracts port number from an address string +pub fn extract_port_from_address(address: &str) -> Option<u16> { + address + .rsplit(':') + .next() + .and_then(|port_str| port_str.parse().ok()) +} + +/// Determines server type from port number +pub fn server_type_from_port(port: u16) -> &'static str { + match port { + 8090 => "single", + 8091 => "leader", + 8092 => "follower", + _ => "unknown", + } +} diff --git a/bdd/rust/tests/helpers/mod.rs b/bdd/rust/tests/helpers/mod.rs index b9f11962a..2e60009b1 100644 --- a/bdd/rust/tests/helpers/mod.rs +++ b/bdd/rust/tests/helpers/mod.rs @@ -16,4 +16,5 @@ * under the License. */ +pub mod cluster; pub mod test_data; diff --git a/bdd/rust/tests/common/mod.rs b/bdd/rust/tests/leader_redirection.rs similarity index 75% copy from bdd/rust/tests/common/mod.rs copy to bdd/rust/tests/leader_redirection.rs index 0c28f3a53..08598565e 100644 --- a/bdd/rust/tests/common/mod.rs +++ b/bdd/rust/tests/leader_redirection.rs @@ -16,4 +16,14 @@ * under the License. */ -pub mod global_context; +pub(crate) mod common; +pub(crate) mod helpers; +pub(crate) mod steps; + +use crate::common::leader_context::LeaderContext; +use cucumber::World; + +#[tokio::main] +async fn main() { + LeaderContext::run("../../bdd/scenarios/leader_redirection.feature").await; +} diff --git a/bdd/rust/tests/steps/leader_redirection.rs b/bdd/rust/tests/steps/leader_redirection.rs new file mode 100644 index 000000000..7d484ff3d --- /dev/null +++ b/bdd/rust/tests/steps/leader_redirection.rs @@ -0,0 +1,305 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::common::leader_context::LeaderContext; +use crate::helpers::cluster; +use cucumber::{given, then, when}; +use iggy::prelude::*; +use integration::test_server::login_root; +use std::time::Duration; +use tokio::time::sleep; + +// ============================================================================ +// Background steps for cluster configuration +// ============================================================================ + +#[given(regex = r"^I have cluster configuration enabled with (\d+) nodes$")] +async fn given_cluster_config(world: &mut LeaderContext, node_count: usize) { + world.cluster.enabled = true; + world.cluster.nodes = Vec::with_capacity(node_count); +} + +#[given(regex = r"^node (\d+) is configured on port (\d+)$")] +async fn given_node_configured(world: &mut LeaderContext, node_id: u32, port: u16) { + let node = ClusterNode { + id: node_id, + name: format!("node-{}", node_id), + address: format!("iggy-server:{}", port), + role: ClusterNodeRole::Follower, + status: ClusterNodeStatus::Healthy, + }; + world.add_node(node); +} + +#[given(regex = r"^I start server (\d+) on port (\d+) as (leader|follower)$")] +async fn given_start_clustered_server( + world: &mut LeaderContext, + node_id: u32, + port: u16, + role: String, +) { + // Clustered server (leader or follower) + let addr = cluster::resolve_server_address(&role, port); + world.store_server_addr(role.clone(), addr); + + // Update node role in cluster configuration + let node_role = match role.as_str() { + "leader" => ClusterNodeRole::Leader, + "follower" => ClusterNodeRole::Follower, + _ => unreachable!("Regex ensures only leader or follower"), + }; + + cluster::update_node_role(&mut world.cluster.nodes, node_id, port, node_role); +} + +#[given(regex = r"^I start a single server on port (\d+) without clustering enabled$")] +async fn given_start_single_server(world: &mut LeaderContext, port: u16) { + // Single server without clustering + let addr = cluster::resolve_server_address("single", port); + world.store_server_addr("single".to_string(), addr); + world.cluster.enabled = false; +} + +#[when(regex = r"^I create a client connecting to (follower|leader) on port (\d+)$")] +async fn when_create_client_to_role(world: &mut LeaderContext, role: String, _port: u16) { + let addr = world + .get_server_addr(&role) + .unwrap_or_else(|| panic!("{} server should be configured", role)) + .clone(); + + let client = cluster::create_and_connect_client(&addr).await; + world.store_client("main".to_string(), client); + + // Track redirection expectation + if role == "leader" { + world.test_state.redirection_occurred = false; + } +} + +#[when(regex = r"^I create a client connecting directly to leader on port (\d+)$")] +async fn when_create_client_direct_to_leader(world: &mut LeaderContext, port: u16) { + let addr = world + .get_server_addr("leader") + .expect("Leader server should be configured") + .clone(); + + // Verify the leader is on the expected port + assert!( + addr.contains(&format!(":{}", port)), + "Leader should be on port {}, but address is {}", + port, + addr + ); + + let client = cluster::create_and_connect_client(&addr).await; + world.store_client("main".to_string(), client); + world.test_state.redirection_occurred = false; +} + +#[when(regex = r"^I create a client connecting to port (\d+)$")] +async fn when_create_client_to_port(world: &mut LeaderContext, port: u16) { + let role = cluster::server_type_from_port(port); + let addr = world + .get_server_addr(role) + .unwrap_or_else(|| panic!("Server on port {} should be configured", port)) + .clone(); + + let client = cluster::create_and_connect_client(&addr).await; + world.store_client("main".to_string(), client); +} + +#[when(regex = r"^I create client ([A-Z]) connecting to port (\d+)$")] +async fn when_create_named_client(world: &mut LeaderContext, client_name: String, port: u16) { + // Determine which server based on port + let role = cluster::server_type_from_port(port); + let addr = world + .get_server_addr(role) + .unwrap_or_else(|| panic!("Server on port {} should be configured", port)) + .clone(); + + let client = cluster::create_and_connect_client(&addr).await; + world.store_client(client_name, client); +} + +#[when(regex = r"^(?:I|both clients) authenticate as root user$")] +async fn when_authenticate_root(world: &mut LeaderContext) { + // Determine if we're authenticating all clients or just "main" + let client_names: Vec<String> = if world.clients.len() > 1 { + world.clients.keys().cloned().collect() + } else { + vec!["main".to_string()] + }; + + for client_name in client_names { + let client = world + .get_client(&client_name) + .unwrap_or_else(|| panic!("Client {} should be created", client_name)); + + login_root(client).await; + + // Small delay between multiple authentications to avoid race conditions + if world.clients.len() > 1 { + sleep(Duration::from_millis(100)).await; + } + } +} + +#[when(regex = r#"^I create a stream named "(.+)"$"#)] +async fn when_create_stream(world: &mut LeaderContext, stream_name: String) { + let client = world + .get_client("main") + .expect("Client should be available"); + + let stream = client + .create_stream(&stream_name) + .await + .expect("Should be able to create stream"); + + world.test_state.last_stream_id = Some(stream.id); + world.test_state.last_stream_name = Some(stream.name.clone()); +} + +#[then("the stream should be created successfully on the leader")] +async fn then_stream_created_successfully(world: &mut LeaderContext) { + assert!( + world.test_state.last_stream_id.is_some(), + "Stream should have been created on leader" + ); +} + +#[then( + regex = r"^the client should (?:automatically redirect to leader on|stay connected to|redirect to) port (\d+)$" +)] +async fn then_verify_client_port(world: &mut LeaderContext, expected_port: u16) { + let client = world.get_client("main").expect("Client should exist"); + + // Verify connection to expected port + cluster::verify_client_connection(client, expected_port) + .await + .expect("Connection verification should succeed"); + + // Check cluster metadata if available + if let Ok(Some(leader)) = cluster::verify_leader_in_metadata(client).await { + // If we found a leader and we're connected to the leader port, mark redirection + if cluster::extract_port_from_address(&leader.address) == Some(expected_port) { + world.test_state.redirection_occurred = true; + } + } +} + +#[then(regex = r"^client ([A-Z]) should (?:stay connected to|redirect to) port (\d+)$")] +async fn then_verify_named_client_port( + world: &mut LeaderContext, + client_name: String, + expected_port: u16, +) { + let client = world + .get_client(&client_name) + .unwrap_or_else(|| panic!("Client {} should exist", client_name)); + + cluster::verify_client_connection(client, expected_port) + .await + .unwrap_or_else(|_| { + panic!( + "Client {} connection verification should succeed", + client_name + ) + }); + + if let Ok(Some(leader)) = cluster::verify_leader_in_metadata(client).await { + assert!( + cluster::extract_port_from_address(&leader.address).is_some(), + "Client {} should find valid leader in cluster metadata", + client_name + ); + } +} + +#[then("the client should not perform any redirection")] +async fn then_no_redirection(world: &mut LeaderContext) { + assert!( + !world.test_state.redirection_occurred, + "No redirection should occur when connecting directly to leader" + ); +} + +#[then(regex = r"^the connection should remain on port (\d+)$")] +async fn then_connection_remains(world: &mut LeaderContext, port: u16) { + let client = world.get_client("main").expect("Client should exist"); + + cluster::verify_client_connection(client, port) + .await + .expect("Should remain on original port"); + + assert!( + !world.test_state.redirection_occurred, + "Connection should not have been redirected" + ); +} + +#[then("the client should connect successfully without redirection")] +async fn then_connect_without_redirection(world: &mut LeaderContext) { + let client = world.get_client("main").expect("Client should exist"); + + client + .ping() + .await + .expect("Client should be able to ping server"); + + assert!( + !world.test_state.redirection_occurred, + "No redirection should occur without clustering" + ); +} + +#[then("both clients should be using the same server")] +async fn then_both_use_same_server(world: &mut LeaderContext) { + let client_a = world.get_client("A").expect("Client A should exist"); + let client_b = world.get_client("B").expect("Client B should exist"); + + // Get connection info for both clients + let conn_info_a = client_a.get_connection_info().await; + let conn_info_b = client_b.get_connection_info().await; + + // Verify both clients are connected to the same server + assert_eq!( + conn_info_a.server_address, conn_info_b.server_address, + "Both clients should be connected to the same server" + ); + + // Verify both can communicate + client_a + .ping() + .await + .expect("Client A should be able to ping"); + client_b + .ping() + .await + .expect("Client B should be able to ping"); + + // Verify cluster metadata consistency if available + if let (Ok(Some(leader_a)), Ok(Some(leader_b))) = ( + cluster::verify_leader_in_metadata(client_a).await, + cluster::verify_leader_in_metadata(client_b).await, + ) { + assert_eq!( + leader_a.address, leader_b.address, + "Both clients should see the same leader" + ); + } +} diff --git a/bdd/rust/tests/steps/mod.rs b/bdd/rust/tests/steps/mod.rs index e9d00b626..45c8bde63 100644 --- a/bdd/rust/tests/steps/mod.rs +++ b/bdd/rust/tests/steps/mod.rs @@ -17,6 +17,7 @@ */ pub mod auth; +pub mod leader_redirection; pub mod messages; pub mod server; pub mod streams; diff --git a/bdd/rust/tests/steps/server.rs b/bdd/rust/tests/steps/server.rs index ad688f73b..a4027875e 100644 --- a/bdd/rust/tests/steps/server.rs +++ b/bdd/rust/tests/steps/server.rs @@ -19,28 +19,10 @@ use crate::common::global_context::GlobalContext; use cucumber::given; -#[cfg(not(feature = "iggy-server-in-docker"))] -use integration::test_server::TestServer; - #[given("I have a running Iggy server")] pub async fn given_running_server(world: &mut GlobalContext) { - #[cfg(feature = "iggy-server-in-docker")] - { - // External server mode - connect to server from environment - let server_addr = - std::env::var("IGGY_TCP_ADDRESS").unwrap_or_else(|_| "localhost:8090".to_string()); - world.server_addr = Some(server_addr); - // No TestServer instance in external mode - } - - #[cfg(not(feature = "iggy-server-in-docker"))] - { - // Embedded server mode - start our own TestServer - let mut test_server = TestServer::default(); - test_server.start(); - let server_addr = test_server.get_raw_tcp_addr().unwrap(); - - world.server_addr = Some(server_addr); - world.server = Some(test_server); - } + // External server mode - connect to server from environment + let server_addr = + std::env::var("IGGY_TCP_ADDRESS").unwrap_or_else(|_| "localhost:8090".to_string()); + world.server_addr = Some(server_addr); } diff --git a/bdd/scenarios/leader_redirection.feature b/bdd/scenarios/leader_redirection.feature new file mode 100644 index 000000000..0f6c76701 --- /dev/null +++ b/bdd/scenarios/leader_redirection.feature @@ -0,0 +1,61 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +Feature: Leader-Aware Client Connections + As a developer using Apache Iggy with clustering + I want my SDK clients to automatically connect to the leader node + So that I can always interact with the correct server instance + + Background: + Given I have cluster configuration enabled with 2 nodes + And node 0 is configured on port 8091 + And node 1 is configured on port 8092 + + Scenario: Client redirects from follower to leader + Given I start server 0 on port 8091 as leader + And I start server 1 on port 8092 as follower + When I create a client connecting to follower on port 8092 + And I authenticate as root user + Then the client should automatically redirect to leader on port 8091 + When I create a stream named "test-stream" + Then the stream should be created successfully on the leader + + Scenario: Client connects directly to leader without redirection + Given I start server 0 on port 8091 as leader + And I start server 1 on port 8092 as follower + When I create a client connecting directly to leader on port 8091 + And I authenticate as root user + Then the client should not perform any redirection + And the connection should remain on port 8091 + + Scenario: Client handles missing cluster metadata gracefully + Given I start a single server on port 8090 without clustering enabled + When I create a client connecting to port 8090 + And I authenticate as root user + Then the client should connect successfully without redirection + When I create a stream named "single-server-stream" + Then the stream should be created successfully on the leader + + Scenario: Multiple clients converge to the same leader + Given I start server 0 on port 8091 as leader + And I start server 1 on port 8092 as follower + When I create client A connecting to port 8091 + And I create client B connecting to port 8092 + And both clients authenticate as root user + Then client A should stay connected to port 8091 + And client B should redirect to port 8091 + And both clients should be using the same server \ No newline at end of file diff --git a/core/common/src/types/cluster/metadata.rs b/core/common/src/types/cluster/metadata.rs index 1c57cb26d..c89e94084 100644 --- a/core/common/src/types/cluster/metadata.rs +++ b/core/common/src/types/cluster/metadata.rs @@ -167,13 +167,16 @@ impl BytesSerializable for ClusterMetadata { impl Display for ClusterMetadata { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let nodes = self + .nodes + .iter() + .map(|node| node.to_string()) + .collect::<Vec<_>>(); + write!( f, - "ClusterMetadata {{ name: {}, id: {}, transport: {}, nodes: {} }}", - self.name, - self.id, - self.transport, - self.nodes.len() + "ClusterMetadata {{ name: {}, id: {}, transport: {}, nodes: {:?} }}", + self.name, self.id, self.transport, nodes ) } } diff --git a/core/common/src/types/cluster/status.rs b/core/common/src/types/cluster/status.rs index e60932f1a..ddb8c07d3 100644 --- a/core/common/src/types/cluster/status.rs +++ b/core/common/src/types/cluster/status.rs @@ -38,6 +38,8 @@ pub enum ClusterNodeStatus { Unreachable, /// Node is in maintenance mode Maintenance, + /// Node is unknown + Unknown, } impl TryFrom<u8> for ClusterNodeStatus { diff --git a/core/common/src/types/configuration/transport.rs b/core/common/src/types/configuration/transport.rs index 45c78eb63..1b6dca023 100644 --- a/core/common/src/types/configuration/transport.rs +++ b/core/common/src/types/configuration/transport.rs @@ -81,7 +81,9 @@ impl<'de> Deserialize<'de> for TransportProtocol { type Value = TransportProtocol; fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { - formatter.write_str("a string (\"tcp\", \"quic\", \"http\") or a number (1, 2, 3)") + formatter.write_str( + "a string (\"tcp\", \"quic\", \"http\", \"ws\") or a number (1, 2, 3, 4)", + ) } fn visit_str<E>(self, value: &str) -> Result<TransportProtocol, E> diff --git a/core/configs/server.toml b/core/configs/server.toml index db689b3ea..892c8cdbf 100644 --- a/core/configs/server.toml +++ b/core/configs/server.toml @@ -511,7 +511,7 @@ enabled = false # Unique cluster identifier (u32). # All nodes in the same cluster must share the same cluster id. -id = 1 +id = 0 # Unique cluster name (string). # All nodes in the same cluster must share the same name. @@ -519,23 +519,23 @@ id = 1 name = "iggy-cluster" # Transport protocol used for communication between nodes in the cluster (string). -# Supported values: "tcp", "quic", "http" +# Supported values: "tcp", "quic", "ws" transport = "tcp" # Current node configuration [cluster.node] # This node unique identifier within the cluster (u32). # Must be unique across all nodes and match one of the ids in the nodes list. -id = 1 +id = 0 # All nodes in the cluster. [[cluster.nodes]] -id = 1 +id = 0 name = "iggy-node-1" address = "127.0.0.1:8090" [[cluster.nodes]] -id = 2 +id = 1 name = "iggy-node-2" address = "127.0.0.1:8091" diff --git a/core/sdk/src/client_wrappers/connection_info.rs b/core/sdk/src/client_wrappers/connection_info.rs new file mode 100644 index 000000000..44720e05a --- /dev/null +++ b/core/sdk/src/client_wrappers/connection_info.rs @@ -0,0 +1,61 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::client_wrappers::client_wrapper::ClientWrapper; +use iggy_common::TransportProtocol; + +/// Connection information for the current client connection +#[derive(Debug, Clone)] +pub struct ConnectionInfo { + /// The transport protocol being used + pub protocol: TransportProtocol, + /// The current server address the client is connected to + pub server_address: String, +} + +impl ClientWrapper { + /// Returns the current connection information including protocol and server address + pub async fn get_connection_info(&self) -> ConnectionInfo { + match self { + ClientWrapper::Iggy(_) => { + // This variant should not be used in practice as IggyClient always wraps + // one of the concrete transport clients. Return a default/placeholder. + ConnectionInfo { + protocol: TransportProtocol::Tcp, + server_address: String::from("unknown"), + } + } + ClientWrapper::Tcp(client) => ConnectionInfo { + protocol: TransportProtocol::Tcp, + server_address: client.current_server_address.lock().await.clone(), + }, + ClientWrapper::Quic(client) => ConnectionInfo { + protocol: TransportProtocol::Quic, + server_address: client.current_server_address.lock().await.clone(), + }, + ClientWrapper::Http(client) => ConnectionInfo { + protocol: TransportProtocol::Http, + server_address: client.api_url.to_string(), + }, + ClientWrapper::WebSocket(client) => ConnectionInfo { + protocol: TransportProtocol::WebSocket, + server_address: client.current_server_address.lock().await.clone(), + }, + } + } +} diff --git a/core/sdk/src/client_wrappers/mod.rs b/core/sdk/src/client_wrappers/mod.rs index bd093e01b..2cf980606 100644 --- a/core/sdk/src/client_wrappers/mod.rs +++ b/core/sdk/src/client_wrappers/mod.rs @@ -29,3 +29,4 @@ mod binary_system_client; mod binary_topic_client; mod binary_user_client; pub mod client_wrapper; +pub mod connection_info; diff --git a/core/sdk/src/clients/binary_cluster.rs b/core/sdk/src/clients/binary_cluster.rs index 18ffc6f9b..6ab7ead4a 100644 --- a/core/sdk/src/clients/binary_cluster.rs +++ b/core/sdk/src/clients/binary_cluster.rs @@ -19,14 +19,11 @@ use crate::prelude::IggyClient; use async_trait::async_trait; use iggy_binary_protocol::ClusterClient; -use iggy_common::{ClusterMetadata, IggyError}; +use iggy_common::{ClusterMetadata, IggyError, locking::IggyRwLockFn}; #[async_trait] impl ClusterClient for IggyClient { async fn get_cluster_metadata(&self) -> Result<ClusterMetadata, IggyError> { - todo!(); - /* self.client.read().await.get_cluster_metadata().await - */ } } diff --git a/core/sdk/src/clients/binary_users.rs b/core/sdk/src/clients/binary_users.rs index c13ab9288..3e7f8ae8b 100644 --- a/core/sdk/src/clients/binary_users.rs +++ b/core/sdk/src/clients/binary_users.rs @@ -16,13 +16,16 @@ * under the License. */ +use crate::client_wrappers::client_wrapper::ClientWrapper; +use crate::leader_aware::check_and_redirect_to_leader; use crate::prelude::IggyClient; use async_trait::async_trait; -use iggy_binary_protocol::UserClient; +use iggy_binary_protocol::{Client, UserClient}; use iggy_common::locking::IggyRwLockFn; use iggy_common::{ Identifier, IdentityInfo, IggyError, Permissions, UserInfo, UserInfoDetails, UserStatus, }; +use tracing::info; #[async_trait] impl UserClient for IggyClient { @@ -91,11 +94,86 @@ impl UserClient for IggyClient { } async fn login_user(&self, username: &str, password: &str) -> Result<IdentityInfo, IggyError> { - self.client + let identity = self + .client .read() .await .login_user(username, password) - .await + .await?; + + let new_leader_addr = { + let client = self.client.read().await; + match &*client { + ClientWrapper::Tcp(tcp_client) => { + let current_addr = tcp_client.current_server_address.lock().await.clone(); + check_and_redirect_to_leader(tcp_client, ¤t_addr).await? + } + ClientWrapper::Quic(quic_client) => { + let current_addr = quic_client.current_server_address.lock().await.clone(); + check_and_redirect_to_leader(quic_client, ¤t_addr).await? + } + ClientWrapper::WebSocket(ws_client) => { + let current_addr = ws_client.current_server_address.lock().await.clone(); + check_and_redirect_to_leader(ws_client, ¤t_addr).await? + } + _ => None, + } + }; + + if let Some(leader_addr) = new_leader_addr { + info!( + "Redirecting to leader at {} after manual login", + leader_addr + ); + + // Clear connected_at to avoid reestablish_after delay during redirection + { + let client = self.client.read().await; + match &*client { + ClientWrapper::Tcp(tcp_client) => { + tcp_client.connected_at.lock().await.take(); + } + ClientWrapper::Quic(quic_client) => { + quic_client.connected_at.lock().await.take(); + } + ClientWrapper::WebSocket(ws_client) => { + ws_client.connected_at.lock().await.take(); + } + _ => {} + } + } + + self.client.read().await.disconnect().await?; + + { + let client = self.client.read().await; + match &*client { + ClientWrapper::Tcp(tcp_client) => { + *tcp_client.current_server_address.lock().await = leader_addr.clone(); + } + ClientWrapper::Quic(quic_client) => { + *quic_client.current_server_address.lock().await = leader_addr.clone(); + } + ClientWrapper::WebSocket(ws_client) => { + *ws_client.current_server_address.lock().await = leader_addr.clone(); + } + _ => {} + } + } + + self.client.read().await.connect().await?; + + let identity = self + .client + .read() + .await + .login_user(username, password) + .await?; + + Ok(identity) + } else { + Ok(identity) + } } async fn logout_user(&self) -> Result<(), IggyError> { diff --git a/core/sdk/src/clients/client.rs b/core/sdk/src/clients/client.rs index 15b39db2b..b2dd19689 100644 --- a/core/sdk/src/clients/client.rs +++ b/core/sdk/src/clients/client.rs @@ -16,11 +16,9 @@ * under the License. */ -use crate::clients::client_builder::IggyClientBuilder; -use iggy_common::Consumer; -use iggy_common::locking::{IggyRwLock, IggyRwLockFn}; - use crate::client_wrappers::client_wrapper::ClientWrapper; +use crate::client_wrappers::connection_info::ConnectionInfo; +use crate::clients::client_builder::IggyClientBuilder; use crate::http::http_client::HttpClient; use crate::prelude::EncryptorKind; use crate::prelude::IggyConsumerBuilder; @@ -32,6 +30,8 @@ use crate::websocket::websocket_client::WebSocketClient; use async_broadcast::Receiver; use async_trait::async_trait; use iggy_binary_protocol::{Client, SystemClient}; +use iggy_common::Consumer; +use iggy_common::locking::{IggyRwLock, IggyRwLockFn}; use iggy_common::{ConnectionStringUtils, DiagnosticEvent, Partitioner, TransportProtocol}; use std::fmt::Debug; use std::sync::Arc; @@ -63,7 +63,6 @@ impl IggyClient { IggyClientBuilder::new() } - /// Creates a new `IggyClientBuilder` from the provided connection string. /// Creates a new `IggyClientBuilder` from the provided connection string. pub fn builder_from_connection_string( connection_string: &str, @@ -81,7 +80,6 @@ impl IggyClient { } } - /// Creates a new `IggyClient` from the provided connection string. /// Creates a new `IggyClient` from the provided connection string. pub fn from_connection_string(connection_string: &str) -> Result<Self, IggyError> { match ConnectionStringUtils::parse_protocol(connection_string)? { @@ -177,6 +175,13 @@ impl IggyClient { None, )) } + + /// Returns the current connection information including the transport protocol and server address. + /// This is useful for verifying which server the client is connected to, especially after + /// leader redirection in a clustered environment. + pub async fn get_connection_info(&self) -> ConnectionInfo { + self.client.read().await.get_connection_info().await + } } #[async_trait] diff --git a/core/sdk/src/leader_aware.rs b/core/sdk/src/leader_aware.rs new file mode 100644 index 000000000..9367a7b13 --- /dev/null +++ b/core/sdk/src/leader_aware.rs @@ -0,0 +1,209 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use iggy_binary_protocol::ClusterClient; +use iggy_common::{ + ClusterMetadata, ClusterNodeRole, ClusterNodeStatus, IggyError, IggyErrorDiscriminants, +}; +use std::net::SocketAddr; +use std::str::FromStr; +use tracing::{debug, info, warn}; + +/// Maximum number of leader redirections to prevent infinite loops +const MAX_LEADER_REDIRECTS: u8 = 3; + +/// Check if we need to redirect to leader and return the leader address if redirection is needed +pub async fn check_and_redirect_to_leader<C: ClusterClient>( + client: &C, + current_address: &str, +) -> Result<Option<String>, IggyError> { + debug!("Checking cluster metadata for leader detection"); + + match client.get_cluster_metadata().await { + Ok(metadata) => { + debug!( + "Got cluster metadata: {} nodes, cluster: {}", + metadata.nodes.len(), + metadata.name + ); + process_cluster_metadata(&metadata, current_address) + } + Err(e) if is_feature_unavailable_error(&e) => { + debug!("Cluster metadata feature unavailable - server doesn't support clustering"); + Ok(None) + } + Err(e) => { + warn!( + "Failed to get cluster metadata: {}, connection will continue on server node {}", + e, current_address + ); + Ok(None) + } + } +} + +/// Process cluster metadata and determine if redirection is needed +fn process_cluster_metadata( + metadata: &ClusterMetadata, + current_address: &str, +) -> Result<Option<String>, IggyError> { + let leader = metadata + .nodes + .iter() + .find(|n| n.role == ClusterNodeRole::Leader && n.status == ClusterNodeStatus::Healthy); + + match leader { + Some(leader_node) => { + info!( + "Found leader node: {} at {}", + leader_node.name, leader_node.address + ); + + if !is_same_address(current_address, &leader_node.address) { + info!( + "Current connection to {} is not the leader, will redirect to {}", + current_address, leader_node.address + ); + Ok(Some(leader_node.address.clone())) + } else { + debug!("Already connected to leader at {}", current_address); + Ok(None) + } + } + None => { + warn!( + "No active leader found in cluster metadata, connection will continue on server node {}", + current_address + ); + Ok(None) + } + } +} + +/// Check if two addresses refer to the same endpoint +/// Handles various formats like 127.0.0.1:8090 vs localhost:8090 +fn is_same_address(addr1: &str, addr2: &str) -> bool { + match (parse_address(addr1), parse_address(addr2)) { + (Some(sock1), Some(sock2)) => sock1.ip() == sock2.ip() && sock1.port() == sock2.port(), + _ => normalize_address(addr1) == normalize_address(addr2), + } +} + +/// Parse address string to SocketAddr, handling various formats +fn parse_address(addr: &str) -> Option<SocketAddr> { + if let Ok(socket_addr) = SocketAddr::from_str(addr) { + return Some(socket_addr); + } + + let normalized = addr + .replace("localhost", "127.0.0.1") + .replace("[::]", "[::1]"); + + SocketAddr::from_str(&normalized).ok() +} + +/// Normalize address string for comparison +fn normalize_address(addr: &str) -> String { + addr.to_lowercase() + .replace("localhost", "127.0.0.1") + .replace("[::]", "[::1]") +} + +/// Check if the error indicates that the feature is unavailable +fn is_feature_unavailable_error(error: &IggyError) -> bool { + matches!( + error, + IggyError::FeatureUnavailable | IggyError::InvalidCommand + ) || error.as_code() == IggyErrorDiscriminants::FeatureUnavailable as u32 +} + +/// Struct to track leader redirection state +#[derive(Debug, Clone)] +pub struct LeaderRedirectionState { + pub redirect_count: u8, + pub last_leader_address: Option<String>, +} + +impl LeaderRedirectionState { + pub fn new() -> Self { + Self { + redirect_count: 0, + last_leader_address: None, + } + } + + pub fn can_redirect(&self) -> bool { + self.redirect_count < MAX_LEADER_REDIRECTS + } + + pub fn increment_redirect(&mut self, leader_address: String) { + self.redirect_count += 1; + self.last_leader_address = Some(leader_address); + } + + pub fn reset(&mut self) { + self.redirect_count = 0; + self.last_leader_address = None; + } +} + +impl Default for LeaderRedirectionState { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_is_same_address() { + assert!(is_same_address("127.0.0.1:8090", "127.0.0.1:8090")); + assert!(is_same_address("localhost:8090", "127.0.0.1:8090")); + assert!(!is_same_address("127.0.0.1:8090", "127.0.0.1:8091")); + assert!(!is_same_address("192.168.1.1:8090", "127.0.0.1:8090")); + } + + #[test] + fn test_normalize_address() { + assert_eq!(normalize_address("localhost:8090"), "127.0.0.1:8090"); + assert_eq!(normalize_address("LOCALHOST:8090"), "127.0.0.1:8090"); + assert_eq!(normalize_address("[::]:8090"), "[::1]:8090"); + } + + #[test] + fn test_leader_redirection_state() { + let mut state = LeaderRedirectionState::new(); + assert!(state.can_redirect()); + assert_eq!(state.redirect_count, 0); + + state.increment_redirect("127.0.0.1:8090".to_string()); + assert!(state.can_redirect()); + assert_eq!(state.redirect_count, 1); + + state.increment_redirect("127.0.0.1:8091".to_string()); + state.increment_redirect("127.0.0.1:8092".to_string()); + assert!(!state.can_redirect()); + assert_eq!(state.redirect_count, 3); + + state.reset(); + assert!(state.can_redirect()); + assert_eq!(state.redirect_count, 0); + } +} diff --git a/core/sdk/src/lib.rs b/core/sdk/src/lib.rs index c68b7dd5a..59604e296 100644 --- a/core/sdk/src/lib.rs +++ b/core/sdk/src/lib.rs @@ -22,6 +22,7 @@ pub mod client_wrappers; pub mod clients; pub mod consumer_ext; pub mod http; +mod leader_aware; pub mod prelude; pub mod quic; pub mod stream_builder; diff --git a/core/sdk/src/prelude.rs b/core/sdk/src/prelude.rs index d89afe86a..8780dd947 100644 --- a/core/sdk/src/prelude.rs +++ b/core/sdk/src/prelude.rs @@ -30,6 +30,7 @@ pub use crate::client_provider; pub use crate::client_provider::ClientProviderConfig; pub use crate::client_wrappers::client_wrapper::ClientWrapper; +pub use crate::client_wrappers::connection_info::ConnectionInfo; pub use crate::clients::client::IggyClient; pub use crate::clients::client_builder::IggyClientBuilder; pub use crate::clients::consumer::{ @@ -53,19 +54,19 @@ pub use iggy_binary_protocol::{ }; pub use iggy_common::{ Aes256GcmEncryptor, Args, ArgsOptional, AutoLogin, BytesSerializable, CacheMetrics, - CacheMetricsKey, ClientError, ClientInfoDetails, CompressionAlgorithm, Consumer, - ConsumerGroupDetails, ConsumerKind, EncryptorKind, FlushUnsavedBuffer, GlobalPermissions, - HeaderKey, HeaderValue, HttpClientConfig, HttpClientConfigBuilder, IdKind, Identifier, - IdentityInfo, IggyByteSize, IggyDuration, IggyError, IggyExpiry, IggyIndexView, IggyMessage, - IggyMessageHeader, IggyMessageHeaderView, IggyMessageView, IggyMessageViewIterator, - IggyTimestamp, MaxTopicSize, Partition, Partitioner, Partitioning, Permissions, - PersonalAccessTokenExpiry, PollMessages, PolledMessages, PollingKind, PollingStrategy, - QuicClientConfig, QuicClientConfigBuilder, QuicClientReconnectionConfig, SendMessages, - Sizeable, SnapshotCompression, Stats, Stream, StreamDetails, StreamPermissions, - SystemSnapshotType, TcpClientConfig, TcpClientConfigBuilder, TcpClientReconnectionConfig, - Topic, TopicDetails, TopicPermissions, TransportProtocol, UserId, UserStatus, Validatable, - WebSocketClientConfig, WebSocketClientConfigBuilder, WebSocketClientReconnectionConfig, - defaults, locking, + CacheMetricsKey, ClientError, ClientInfoDetails, ClusterMetadata, ClusterNode, ClusterNodeRole, + ClusterNodeStatus, CompressionAlgorithm, Consumer, ConsumerGroupDetails, ConsumerKind, + EncryptorKind, FlushUnsavedBuffer, GlobalPermissions, HeaderKey, HeaderValue, HttpClientConfig, + HttpClientConfigBuilder, IdKind, Identifier, IdentityInfo, IggyByteSize, IggyDuration, + IggyError, IggyExpiry, IggyIndexView, IggyMessage, IggyMessageHeader, IggyMessageHeaderView, + IggyMessageView, IggyMessageViewIterator, IggyTimestamp, MaxTopicSize, Partition, Partitioner, + Partitioning, Permissions, PersonalAccessTokenExpiry, PollMessages, PolledMessages, + PollingKind, PollingStrategy, QuicClientConfig, QuicClientConfigBuilder, + QuicClientReconnectionConfig, SendMessages, Sizeable, SnapshotCompression, Stats, Stream, + StreamDetails, StreamPermissions, SystemSnapshotType, TcpClientConfig, TcpClientConfigBuilder, + TcpClientReconnectionConfig, Topic, TopicDetails, TopicPermissions, TransportProtocol, UserId, + UserStatus, Validatable, WebSocketClientConfig, WebSocketClientConfigBuilder, + WebSocketClientReconnectionConfig, defaults, locking, }; pub use iggy_common::{ IGGY_MESSAGE_CHECKSUM_OFFSET_RANGE, IGGY_MESSAGE_HEADER_SIZE, diff --git a/core/sdk/src/quic/quic_client.rs b/core/sdk/src/quic/quic_client.rs index 831890cb4..ea15bf9c4 100644 --- a/core/sdk/src/quic/quic_client.rs +++ b/core/sdk/src/quic/quic_client.rs @@ -16,6 +16,7 @@ * under the License. */ +use crate::leader_aware::{LeaderRedirectionState, check_and_redirect_to_leader}; use crate::prelude::AutoLogin; use iggy_binary_protocol::{ BinaryClient, BinaryTransport, Client, PersonalAccessTokenClient, UserClient, @@ -28,7 +29,7 @@ use async_trait::async_trait; use bytes::Bytes; use iggy_common::{ ClientState, Command, ConnectionString, ConnectionStringUtils, Credentials, DiagnosticEvent, - QuicConnectionStringOptions, TransportProtocol, + IggyErrorDiscriminants, QuicConnectionStringOptions, TransportProtocol, }; use quinn::crypto::rustls::QuicClientConfig as QuinnQuicClientConfig; use quinn::{ClientConfig, Connection, Endpoint, IdleTimeout, RecvStream, VarInt}; @@ -51,10 +52,11 @@ pub struct QuicClient { pub(crate) endpoint: Endpoint, pub(crate) connection: Arc<Mutex<Option<Connection>>>, pub(crate) config: Arc<QuicClientConfig>, - pub(crate) server_address: SocketAddr, pub(crate) state: Mutex<ClientState>, events: (Sender<DiagnosticEvent>, Receiver<DiagnosticEvent>), - connected_at: Mutex<Option<IggyTimestamp>>, + pub(crate) connected_at: Mutex<Option<IggyTimestamp>>, + leader_redirection_state: Mutex<LeaderRedirectionState>, + pub(crate) current_server_address: Mutex<String>, } unsafe impl Send for QuicClient {} @@ -126,9 +128,10 @@ impl BinaryTransport for QuicClient { } self.disconnect().await?; + let server_address = self.current_server_address.lock().await.to_string(); info!( "Reconnecting to the server: {}, by client: {}", - self.config.server_address, self.config.client_address + server_address, self.config.client_address ); self.connect().await?; self.send_raw(code, payload).await @@ -195,11 +198,12 @@ impl QuicClient { Ok(Self { config, endpoint, - server_address, connection: Arc::new(Mutex::new(None)), state: Mutex::new(ClientState::Disconnected), events: broadcast(1000), connected_at: Mutex::new(None), + leader_redirection_state: Mutex::new(LeaderRedirectionState::new()), + current_server_address: Mutex::new(server_address.to_string()), }) } @@ -235,11 +239,20 @@ impl QuicClient { .map_err(|_| IggyError::InvalidNumberEncoding)?, ); if status != 0 { - error!( - "Received an invalid response with status: {} ({}).", - status, - IggyError::from_code_as_string(status) - ); + // Log FeatureUnavailable as debug instead of error (e.g., when clustering is disabled) + if status == IggyErrorDiscriminants::FeatureUnavailable as u32 { + tracing::debug!( + "Feature unavailable on server: {} ({})", + status, + IggyError::from_code_as_string(status) + ); + } else { + error!( + "Received an invalid response with status: {} ({}).", + status, + IggyError::from_code_as_string(status) + ); + } return Err(IggyError::from_code(status)); } @@ -260,134 +273,179 @@ impl QuicClient { } async fn connect(&self) -> Result<(), IggyError> { - match self.get_state().await { - ClientState::Shutdown => { - trace!("Cannot connect. Client is shutdown."); - return Err(IggyError::ClientShutdown); - } - ClientState::Connected | ClientState::Authenticating | ClientState::Authenticated => { - trace!("Client is already connected."); - return Ok(()); - } - ClientState::Connecting => { - trace!("Client is already connecting."); - return Ok(()); + loop { + match self.get_state().await { + ClientState::Shutdown => { + trace!("Cannot connect. Client is shutdown."); + return Err(IggyError::ClientShutdown); + } + ClientState::Connected + | ClientState::Authenticating + | ClientState::Authenticated => { + trace!("Client is already connected."); + return Ok(()); + } + ClientState::Connecting => { + trace!("Client is already connecting."); + return Ok(()); + } + _ => {} } - _ => {} - } - self.set_state(ClientState::Connecting).await; - if let Some(connected_at) = self.connected_at.lock().await.as_ref() { - let now = IggyTimestamp::now(); - let elapsed = now.as_micros() - connected_at.as_micros(); - let interval = self.config.reconnection.reestablish_after.as_micros(); - trace!( - "Elapsed time since last connection: {}", - IggyDuration::from(elapsed) - ); - if elapsed < interval { - let remaining = IggyDuration::from(interval - elapsed); - info!("Trying to connect to the server in: {remaining}",); - sleep(remaining.get_duration()).await; + self.set_state(ClientState::Connecting).await; + if let Some(connected_at) = self.connected_at.lock().await.as_ref() { + let now = IggyTimestamp::now(); + let elapsed = now.as_micros() - connected_at.as_micros(); + let interval = self.config.reconnection.reestablish_after.as_micros(); + trace!( + "Elapsed time since last connection: {}", + IggyDuration::from(elapsed) + ); + if elapsed < interval { + let remaining = IggyDuration::from(interval - elapsed); + info!("Trying to connect to the server in: {remaining}",); + sleep(remaining.get_duration()).await; + } } - } - let mut retry_count = 0; - let connection; - let remote_address; - loop { - info!( - "{NAME} client is connecting to server: {}...", - self.config.server_address - ); - let connection_result = self - .endpoint - .connect(self.server_address, &self.config.server_name) - .unwrap() - .await; - - if connection_result.is_err() { - error!( - "Failed to connect to server: {}", - self.config.server_address + let mut retry_count = 0; + let connection; + let remote_address; + loop { + let server_address_str = self.current_server_address.lock().await.clone(); + let server_address: SocketAddr = server_address_str.parse().map_err(|e| { + error!( + "Failed to parse server address '{}': {}", + server_address_str, e + ); + IggyError::InvalidServerAddress + })?; + info!( + "{NAME} client is connecting to server: {}...", + server_address ); - if !self.config.reconnection.enabled { - warn!("Automatic reconnection is disabled."); - return Err(IggyError::CannotEstablishConnection); - } + let connection_result = self + .endpoint + .connect(server_address, &self.config.server_name) + .unwrap() + .await; + + if connection_result.is_err() { + error!("Failed to connect to server: {}", server_address); + if !self.config.reconnection.enabled { + warn!("Automatic reconnection is disabled."); + return Err(IggyError::CannotEstablishConnection); + } - let unlimited_retries = self.config.reconnection.max_retries.is_none(); - let max_retries = self.config.reconnection.max_retries.unwrap_or_default(); - let max_retries_str = - if let Some(max_retries) = self.config.reconnection.max_retries { - max_retries.to_string() - } else { - "unlimited".to_string() - }; + let unlimited_retries = self.config.reconnection.max_retries.is_none(); + let max_retries = self.config.reconnection.max_retries.unwrap_or_default(); + let max_retries_str = + if let Some(max_retries) = self.config.reconnection.max_retries { + max_retries.to_string() + } else { + "unlimited".to_string() + }; + + let interval_str = self.config.reconnection.interval.as_human_time_string(); + if unlimited_retries || retry_count < max_retries { + retry_count += 1; + info!( + "Retrying to connect to server ({retry_count}/{max_retries_str}): {} in: {interval_str}", + server_address, + ); + sleep(self.config.reconnection.interval.get_duration()).await; + continue; + } - let interval_str = self.config.reconnection.interval.as_human_time_string(); - if unlimited_retries || retry_count < max_retries { - retry_count += 1; - info!( - "Retrying to connect to server ({retry_count}/{max_retries_str}): {} in: {interval_str}", - self.config.server_address, - ); - sleep(self.config.reconnection.interval.get_duration()).await; - continue; + self.set_state(ClientState::Disconnected).await; + self.publish_event(DiagnosticEvent::Disconnected).await; + return Err(IggyError::CannotEstablishConnection); } - self.set_state(ClientState::Disconnected).await; - self.publish_event(DiagnosticEvent::Disconnected).await; - return Err(IggyError::CannotEstablishConnection); + connection = connection_result.map_err(|error| { + error!("Failed to establish QUIC connection: {error}"); + IggyError::CannotEstablishConnection + })?; + remote_address = connection.remote_address(); + break; } - connection = connection_result.map_err(|error| { - error!("Failed to establish QUIC connection: {error}"); - IggyError::CannotEstablishConnection - })?; - remote_address = connection.remote_address(); - break; - } - - let now = IggyTimestamp::now(); - info!("{NAME} client has connected to server: {remote_address} at {now}",); - self.set_state(ClientState::Connected).await; - self.connection.lock().await.replace(connection); - self.connected_at.lock().await.replace(now); - self.publish_event(DiagnosticEvent::Connected).await; - - match &self.config.auto_login { - AutoLogin::Disabled => { - info!("Automatic sign-in is disabled."); - Ok(()) - } - AutoLogin::Enabled(credentials) => { - info!( - "{NAME} client: {} is signing in...", - self.config.client_address - ); - self.set_state(ClientState::Authenticating).await; - match credentials { - Credentials::UsernamePassword(username, password) => { - self.login_user(username, password).await?; - self.publish_event(DiagnosticEvent::SignedIn).await; - info!( - "{NAME} client: {} has signed in with the user credentials, username: {username}", - self.config.client_address - ); - Ok(()) + let now = IggyTimestamp::now(); + info!("{NAME} client has connected to server: {remote_address} at {now}",); + self.set_state(ClientState::Connected).await; + self.connection.lock().await.replace(connection); + self.connected_at.lock().await.replace(now); + self.publish_event(DiagnosticEvent::Connected).await; + + // Handle auto-login + let should_redirect = match &self.config.auto_login { + AutoLogin::Disabled => { + info!("Automatic sign-in is disabled."); + false + } + AutoLogin::Enabled(credentials) => { + info!( + "{NAME} client: {} is signing in...", + self.config.client_address + ); + self.set_state(ClientState::Authenticating).await; + match credentials { + Credentials::UsernamePassword(username, password) => { + self.login_user(username, password).await?; + self.publish_event(DiagnosticEvent::SignedIn).await; + info!( + "{NAME} client: {} has signed in with the user credentials, username: {username}", + self.config.client_address + ); + } + Credentials::PersonalAccessToken(token) => { + self.login_with_personal_access_token(token).await?; + self.publish_event(DiagnosticEvent::SignedIn).await; + info!( + "{NAME} client: {} has signed in with a personal access token.", + self.config.client_address + ); + } } - Credentials::PersonalAccessToken(token) => { - self.login_with_personal_access_token(token).await?; - self.publish_event(DiagnosticEvent::SignedIn).await; - info!( - "{NAME} client: {} has signed in with a personal access token.", - self.config.client_address - ); - Ok(()) + + // Check if we need to redirect to leader + let current_address = self.current_server_address.lock().await.clone(); + let leader_address = + check_and_redirect_to_leader(self, ¤t_address).await?; + if let Some(new_leader_address) = leader_address { + let mut redirection_state = self.leader_redirection_state.lock().await; + if !redirection_state.can_redirect() { + warn!( + "Maximum leader redirections reached, continuing with current connection" + ); + false + } else { + info!( + "Current node is not leader, redirecting to leader at: {}", + new_leader_address + ); + redirection_state.increment_redirect(new_leader_address.clone()); + drop(redirection_state); + + // Clear connected_at to avoid reestablish_after delay during redirection + self.connected_at.lock().await.take(); + self.disconnect().await?; + *self.current_server_address.lock().await = new_leader_address; + + true + } + } else { + self.leader_redirection_state.lock().await.reset(); + false } } + }; + + if should_redirect { + continue; } + + return Ok(()); } } diff --git a/core/sdk/src/tcp/tcp_client.rs b/core/sdk/src/tcp/tcp_client.rs index c4adc3f5f..ded3a89ff 100644 --- a/core/sdk/src/tcp/tcp_client.rs +++ b/core/sdk/src/tcp/tcp_client.rs @@ -16,6 +16,7 @@ * under the License. */ +use crate::leader_aware::{LeaderRedirectionState, check_and_redirect_to_leader}; use crate::prelude::Client; use crate::prelude::TcpClientConfig; use crate::tcp::tcp_connection_stream::TcpConnectionStream; @@ -53,7 +54,9 @@ pub struct TcpClient { pub(crate) state: Mutex<ClientState>, client_address: Mutex<Option<SocketAddr>>, events: (Sender<DiagnosticEvent>, Receiver<DiagnosticEvent>), - connected_at: Mutex<Option<IggyTimestamp>>, + pub(crate) connected_at: Mutex<Option<IggyTimestamp>>, + leader_redirection_state: Mutex<LeaderRedirectionState>, + pub(crate) current_server_address: Mutex<String>, } impl Default for TcpClient { @@ -129,9 +132,10 @@ impl BinaryTransport for TcpClient { { let client_address = self.get_client_address_value().await; + let server_address = self.current_server_address.lock().await.clone(); info!( "Reconnecting to the server: {} by client: {client_address}...", - self.config.server_address + server_address ); } @@ -191,6 +195,7 @@ impl TcpClient { /// Create a new TCP client based on the provided configuration. pub fn create(config: Arc<TcpClientConfig>) -> Result<Self, IggyError> { + let server_address = config.server_address.clone(); Ok(Self { config, client_address: Mutex::new(None), @@ -198,6 +203,8 @@ impl TcpClient { state: Mutex::new(ClientState::Disconnected), events: broadcast(1000), connected_at: Mutex::new(None), + leader_redirection_state: Mutex::new(LeaderRedirectionState::new()), + current_server_address: Mutex::new(server_address), }) } @@ -219,6 +226,13 @@ impl TcpClient { status, IggyError::from_code_as_string(status) ) + } else if status == IggyErrorDiscriminants::FeatureUnavailable as u32 { + // Feature unavailable - likely clustering is disabled on server + tracing::debug!( + "Feature unavailable on server: {} ({})", + status, + IggyError::from_code_as_string(status) + ) } else { error!( "Received an invalid response with status: {} ({}).", @@ -242,208 +256,248 @@ impl TcpClient { } async fn connect(&self) -> Result<(), IggyError> { - match self.get_state().await { - ClientState::Shutdown => { - trace!("Cannot connect. Client is shutdown."); - return Err(IggyError::ClientShutdown); - } - ClientState::Connected | ClientState::Authenticating | ClientState::Authenticated => { - let client_address = self.get_client_address_value().await; - trace!("Client: {client_address} is already connected."); - return Ok(()); - } - ClientState::Connecting => { - trace!("Client is already connecting."); - return Ok(()); + loop { + match self.get_state().await { + ClientState::Shutdown => { + trace!("Cannot connect. Client is shutdown."); + return Err(IggyError::ClientShutdown); + } + ClientState::Connected + | ClientState::Authenticating + | ClientState::Authenticated => { + let client_address = self.get_client_address_value().await; + trace!("Client: {client_address} is already connected."); + return Ok(()); + } + ClientState::Connecting => { + trace!("Client is already connecting."); + return Ok(()); + } + _ => {} } - _ => {} - } - self.set_state(ClientState::Connecting).await; - if let Some(connected_at) = self.connected_at.lock().await.as_ref() { - let now = IggyTimestamp::now(); - let elapsed = now.as_micros() - connected_at.as_micros(); - let interval = self.config.reconnection.reestablish_after.as_micros(); - trace!( - "Elapsed time since last connection: {}", - IggyDuration::from(elapsed) - ); - if elapsed < interval { - let remaining = IggyDuration::from(interval - elapsed); - info!("Trying to connect to the server in: {remaining}",); - sleep(remaining.get_duration()).await; + self.set_state(ClientState::Connecting).await; + if let Some(connected_at) = self.connected_at.lock().await.as_ref() { + let now = IggyTimestamp::now(); + let elapsed = now.as_micros() - connected_at.as_micros(); + let interval = self.config.reconnection.reestablish_after.as_micros(); + trace!( + "Elapsed time since last connection: {}", + IggyDuration::from(elapsed) + ); + if elapsed < interval { + let remaining = IggyDuration::from(interval - elapsed); + info!("Trying to connect to the server in: {remaining}",); + sleep(remaining.get_duration()).await; + } } - } - let tls_enabled = self.config.tls_enabled; - let mut retry_count = 0; - let connection_stream: ConnectionStreamKind; - let remote_address; - let client_address; - loop { - info!( - "{NAME} client is connecting to server: {}...", - self.config.server_address - ); - - let connection = TcpStream::connect(&self.config.server_address).await; - if let Err(err) = &connection { - error!( - "Failed to connect to server: {}. Error: {}", - self.config.server_address, err + let tls_enabled = self.config.tls_enabled; + let mut retry_count = 0; + let connection_stream: ConnectionStreamKind; + let remote_address; + let client_address; + loop { + let server_address = self.current_server_address.lock().await.clone(); + info!( + "{NAME} client is connecting to server: {}...", + server_address ); - if !self.config.reconnection.enabled { - warn!("Automatic reconnection is disabled."); - return Err(IggyError::CannotEstablishConnection); - } - let unlimited_retries = self.config.reconnection.max_retries.is_none(); - let max_retries = self.config.reconnection.max_retries.unwrap_or_default(); - let max_retries_str = - if let Some(max_retries) = self.config.reconnection.max_retries { - max_retries.to_string() - } else { - "unlimited".to_string() - }; - - let interval_str = self.config.reconnection.interval.as_human_time_string(); - if unlimited_retries || retry_count < max_retries { - retry_count += 1; - info!( - "Retrying to connect to server ({retry_count}/{max_retries_str}): {} in: {interval_str}", - self.config.server_address, + let connection = TcpStream::connect(&server_address).await; + if let Err(err) = &connection { + error!( + "Failed to connect to server: {}. Error: {}", + server_address, err ); - sleep(self.config.reconnection.interval.get_duration()).await; - continue; + if !self.config.reconnection.enabled { + warn!("Automatic reconnection is disabled."); + return Err(IggyError::CannotEstablishConnection); + } + + let unlimited_retries = self.config.reconnection.max_retries.is_none(); + let max_retries = self.config.reconnection.max_retries.unwrap_or_default(); + let max_retries_str = + if let Some(max_retries) = self.config.reconnection.max_retries { + max_retries.to_string() + } else { + "unlimited".to_string() + }; + + let interval_str = self.config.reconnection.interval.as_human_time_string(); + if unlimited_retries || retry_count < max_retries { + retry_count += 1; + info!( + "Retrying to connect to server ({retry_count}/{max_retries_str}): {} in: {interval_str}", + server_address, + ); + sleep(self.config.reconnection.interval.get_duration()).await; + continue; + } + + self.set_state(ClientState::Disconnected).await; + self.publish_event(DiagnosticEvent::Disconnected).await; + return Err(IggyError::CannotEstablishConnection); } - self.set_state(ClientState::Disconnected).await; - self.publish_event(DiagnosticEvent::Disconnected).await; - return Err(IggyError::CannotEstablishConnection); - } + let stream = connection.map_err(|error| { + error!("Failed to establish TCP connection to the server: {error}",); + IggyError::CannotEstablishConnection + })?; + client_address = stream.local_addr().map_err(|error| { + error!("Failed to get the local address of the client: {error}",); + IggyError::CannotEstablishConnection + })?; + remote_address = stream.peer_addr().map_err(|error| { + error!("Failed to get the remote address of the server: {error}",); + IggyError::CannotEstablishConnection + })?; + self.client_address.lock().await.replace(client_address); - let stream = connection.map_err(|error| { - error!("Failed to establish TCP connection to the server: {error}",); - IggyError::CannotEstablishConnection - })?; - client_address = stream.local_addr().map_err(|error| { - error!("Failed to get the local address of the client: {error}",); - IggyError::CannotEstablishConnection - })?; - remote_address = stream.peer_addr().map_err(|error| { - error!("Failed to get the remote address of the server: {error}",); - IggyError::CannotEstablishConnection - })?; - self.client_address.lock().await.replace(client_address); - - if let Err(e) = stream.set_nodelay(self.config.nodelay) { - error!("Failed to set the nodelay option on the client: {e}, continuing...",); - } + if let Err(e) = stream.set_nodelay(self.config.nodelay) { + error!("Failed to set the nodelay option on the client: {e}, continuing...",); + } - if !tls_enabled { - connection_stream = - ConnectionStreamKind::Tcp(TcpConnectionStream::new(client_address, stream)); - break; - } + if !tls_enabled { + connection_stream = + ConnectionStreamKind::Tcp(TcpConnectionStream::new(client_address, stream)); + break; + } - let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); - - let config = if self.config.tls_validate_certificate { - let mut root_cert_store = rustls::RootCertStore::empty(); - if let Some(certificate_path) = &self.config.tls_ca_file { - for cert in - CertificateDer::pem_file_iter(certificate_path).map_err(|error| { - error!("Failed to read the CA file: {certificate_path}. {error}",); - IggyError::InvalidTlsCertificatePath - })? - { - let certificate = cert.map_err(|error| { + let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); + + let config = if self.config.tls_validate_certificate { + let mut root_cert_store = rustls::RootCertStore::empty(); + if let Some(certificate_path) = &self.config.tls_ca_file { + for cert in + CertificateDer::pem_file_iter(certificate_path).map_err(|error| { + error!("Failed to read the CA file: {certificate_path}. {error}",); + IggyError::InvalidTlsCertificatePath + })? + { + let certificate = cert.map_err(|error| { error!( "Failed to read a certificate from the CA file: {certificate_path}. {error}", ); IggyError::InvalidTlsCertificate })?; - root_cert_store.add(certificate).map_err(|error| { + root_cert_store.add(certificate).map_err(|error| { error!( "Failed to add a certificate to the root certificate store. {error}", ); IggyError::InvalidTlsCertificate })?; + } + } else { + root_cert_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned()); } + + rustls::ClientConfig::builder() + .with_root_certificates(root_cert_store) + .with_no_client_auth() + } else { + use crate::tcp::tcp_tls_verifier::NoServerVerification; + rustls::ClientConfig::builder() + .dangerous() + .with_custom_certificate_verifier(Arc::new(NoServerVerification)) + .with_no_client_auth() + }; + let connector = TlsConnector::from(Arc::new(config)); + let tls_domain = if self.config.tls_domain.is_empty() { + // Extract hostname/IP from server_address when tls_domain is not specified + server_address + .split(':') + .next() + .unwrap_or(&server_address) + .to_string() } else { - root_cert_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned()); + self.config.tls_domain.to_owned() + }; + let domain = ServerName::try_from(tls_domain).map_err(|error| { + error!("Failed to create a server name from the domain. {error}",); + IggyError::InvalidTlsDomain + })?; + let stream = connector.connect(domain, stream).await.map_err(|error| { + error!("Failed to establish a TLS connection to the server: {error}",); + IggyError::CannotEstablishConnection + })?; + connection_stream = ConnectionStreamKind::TcpTls(TcpTlsConnectionStream::new( + client_address, + TlsStream::Client(stream), + )); + break; + } + + let now = IggyTimestamp::now(); + info!( + "{NAME} client: {client_address} has connected to server: {remote_address} at: {now}", + ); + self.stream.lock().await.replace(connection_stream); + self.set_state(ClientState::Connected).await; + self.connected_at.lock().await.replace(now); + self.publish_event(DiagnosticEvent::Connected).await; + // Handle auto-login + let should_redirect = match &self.config.auto_login { + AutoLogin::Disabled => { + info!("Automatic sign-in is disabled."); + false } + AutoLogin::Enabled(credentials) => { + info!("{NAME} client: {client_address} is signing in..."); + self.set_state(ClientState::Authenticating).await; + match credentials { + Credentials::UsernamePassword(username, password) => { + self.login_user(username, password).await?; + info!( + "{NAME} client: {client_address} has signed in with the user credentials, username: {username}", + ); + } + Credentials::PersonalAccessToken(token) => { + self.login_with_personal_access_token(token).await?; + info!( + "{NAME} client: {client_address} has signed in with a personal access token.", + ); + } + } - rustls::ClientConfig::builder() - .with_root_certificates(root_cert_store) - .with_no_client_auth() - } else { - use crate::tcp::tcp_tls_verifier::NoServerVerification; - rustls::ClientConfig::builder() - .dangerous() - .with_custom_certificate_verifier(Arc::new(NoServerVerification)) - .with_no_client_auth() - }; - let connector = TlsConnector::from(Arc::new(config)); - let tls_domain = if self.config.tls_domain.is_empty() { - // Extract hostname/IP from server_address when tls_domain is not specified - self.config - .server_address - .split(':') - .next() - .unwrap_or(&self.config.server_address) - .to_string() - } else { - self.config.tls_domain.to_owned() - }; - let domain = ServerName::try_from(tls_domain).map_err(|error| { - error!("Failed to create a server name from the domain. {error}",); - IggyError::InvalidTlsDomain - })?; - let stream = connector.connect(domain, stream).await.map_err(|error| { - error!("Failed to establish a TLS connection to the server: {error}",); - IggyError::CannotEstablishConnection - })?; - connection_stream = ConnectionStreamKind::TcpTls(TcpTlsConnectionStream::new( - client_address, - TlsStream::Client(stream), - )); - break; - } + // Check if we need to redirect to leader + let current_address = self.current_server_address.lock().await.clone(); + let leader_address = + check_and_redirect_to_leader(self, ¤t_address).await?; + if let Some(new_leader_address) = leader_address { + let mut redirection_state = self.leader_redirection_state.lock().await; + if !redirection_state.can_redirect() { + warn!( + "Maximum leader redirections reached, continuing with current connection" + ); + false + } else { + info!( + "Current node is not leader, redirecting to leader at: {}", + new_leader_address + ); + redirection_state.increment_redirect(new_leader_address.clone()); + drop(redirection_state); - let now = IggyTimestamp::now(); - info!( - "{NAME} client: {client_address} has connected to server: {remote_address} at: {now}", - ); - self.stream.lock().await.replace(connection_stream); - self.set_state(ClientState::Connected).await; - self.connected_at.lock().await.replace(now); - self.publish_event(DiagnosticEvent::Connected).await; - match &self.config.auto_login { - AutoLogin::Disabled => { - info!("Automatic sign-in is disabled."); - Ok(()) - } - AutoLogin::Enabled(credentials) => { - info!("{NAME} client: {client_address} is signing in..."); - self.set_state(ClientState::Authenticating).await; - match credentials { - Credentials::UsernamePassword(username, password) => { - self.login_user(username, password).await?; - info!( - "{NAME} client: {client_address} has signed in with the user credentials, username: {username}", - ); - Ok(()) - } - Credentials::PersonalAccessToken(token) => { - self.login_with_personal_access_token(token).await?; - info!( - "{NAME} client: {client_address} has signed in with a personal access token.", - ); - Ok(()) + // Clear connected_at to avoid reestablish_after delay during redirection + self.connected_at.lock().await.take(); + self.disconnect().await?; + + *self.current_server_address.lock().await = new_leader_address; + true + } + } else { + self.leader_redirection_state.lock().await.reset(); + false } } + }; + + if should_redirect { + continue; } + + return Ok(()); } } diff --git a/core/sdk/src/websocket/websocket_client.rs b/core/sdk/src/websocket/websocket_client.rs index 5556752ee..d46284cfc 100644 --- a/core/sdk/src/websocket/websocket_client.rs +++ b/core/sdk/src/websocket/websocket_client.rs @@ -16,6 +16,7 @@ * under the License. */ +use crate::leader_aware::{LeaderRedirectionState, check_and_redirect_to_leader}; use crate::websocket::websocket_connection_stream::WebSocketConnectionStream; use crate::websocket::websocket_stream_kind::WebSocketStreamKind; use crate::websocket::websocket_tls_connection_stream::WebSocketTlsConnectionStream; @@ -53,7 +54,9 @@ pub struct WebSocketClient { pub(crate) state: Mutex<ClientState>, client_address: Mutex<Option<SocketAddr>>, events: (Sender<DiagnosticEvent>, Receiver<DiagnosticEvent>), - connected_at: Mutex<Option<IggyTimestamp>>, + pub(crate) connected_at: Mutex<Option<IggyTimestamp>>, + leader_redirection_state: Mutex<LeaderRedirectionState>, + pub(crate) current_server_address: Mutex<String>, } impl Default for WebSocketClient { @@ -149,6 +152,7 @@ impl WebSocketClient { /// Create a new WebSocket client with the provided configuration. pub fn create(config: Arc<WebSocketClientConfig>) -> Result<Self, IggyError> { let (sender, receiver) = broadcast(1000); + let server_address = config.server_address.clone(); Ok(WebSocketClient { stream: Arc::new(Mutex::new(None)), config, @@ -156,6 +160,8 @@ impl WebSocketClient { client_address: Mutex::new(None), events: (sender, receiver), connected_at: Mutex::new(None), + leader_redirection_state: Mutex::new(LeaderRedirectionState::new()), + current_server_address: Mutex::new(server_address), }) } @@ -176,80 +182,82 @@ impl WebSocketClient { } async fn connect(&self) -> Result<(), IggyError> { - if self.get_state().await == ClientState::Connected { - return Ok(()); - } + loop { + if self.get_state().await == ClientState::Connected { + return Ok(()); + } - let mut retry_count = 0; + let mut retry_count = 0; - loop { - let protocol = if self.config.tls_enabled { "wss" } else { "ws" }; - info!( - "{NAME} client is connecting to server: {}://{}...", - protocol, self.config.server_address - ); - self.set_state(ClientState::Connecting).await; - - if retry_count > 0 { - let elapsed = self - .connected_at - .lock() - .await - .map(|ts| IggyTimestamp::now().as_micros() - ts.as_micros()) - .unwrap_or(0); - - let interval = self.config.reconnection.reestablish_after.as_micros(); - debug!("Elapsed time since last connection: {}μs", elapsed); - - if elapsed < interval { - let remaining = - IggyDuration::new(std::time::Duration::from_micros(interval - elapsed)); - info!("Trying to connect to the server in: {remaining}"); - sleep(remaining.get_duration()).await; + loop { + let current_address = self.current_server_address.lock().await.clone(); + let protocol = if self.config.tls_enabled { "wss" } else { "ws" }; + info!( + "{NAME} client is connecting to server: {}://{}...", + protocol, current_address + ); + self.set_state(ClientState::Connecting).await; + + if retry_count > 0 { + let elapsed = self + .connected_at + .lock() + .await + .map(|ts| IggyTimestamp::now().as_micros() - ts.as_micros()) + .unwrap_or(0); + + let interval = self.config.reconnection.reestablish_after.as_micros(); + debug!("Elapsed time since last connection: {}μs", elapsed); + + if elapsed < interval { + let remaining = + IggyDuration::new(std::time::Duration::from_micros(interval - elapsed)); + info!("Trying to connect to the server in: {remaining}"); + sleep(remaining.get_duration()).await; + } } - } - let server_addr = self - .config - .server_address - .parse::<SocketAddr>() - .map_err(|_| { - error!("Invalid server address: {}", self.config.server_address); + let server_addr = current_address.parse::<SocketAddr>().map_err(|_| { + error!("Invalid server address: {}", current_address); IggyError::InvalidConfiguration })?; - let connection_stream = if self.config.tls_enabled { - match self.connect_tls(server_addr, &mut retry_count).await { - Ok(stream) => stream, - Err(IggyError::CannotEstablishConnection) => { - return Err(IggyError::CannotEstablishConnection); + let connection_stream = if self.config.tls_enabled { + match self.connect_tls(server_addr, &mut retry_count).await { + Ok(stream) => stream, + Err(IggyError::CannotEstablishConnection) => { + return Err(IggyError::CannotEstablishConnection); + } + Err(_) => continue, // retry } - Err(_) => continue, // retry - } - } else { - match self.connect_plain(server_addr, &mut retry_count).await { - Ok(stream) => stream, - Err(IggyError::CannotEstablishConnection) => { - return Err(IggyError::CannotEstablishConnection); + } else { + match self.connect_plain(server_addr, &mut retry_count).await { + Ok(stream) => stream, + Err(IggyError::CannotEstablishConnection) => { + return Err(IggyError::CannotEstablishConnection); + } + Err(_) => continue, // retry } - Err(_) => continue, // retry - } - }; - - *self.stream.lock().await = Some(connection_stream); - *self.client_address.lock().await = Some(server_addr); - self.set_state(ClientState::Connected).await; - *self.connected_at.lock().await = Some(IggyTimestamp::now()); - self.publish_event(DiagnosticEvent::Connected).await; + }; + + *self.stream.lock().await = Some(connection_stream); + *self.client_address.lock().await = Some(server_addr); + self.set_state(ClientState::Connected).await; + *self.connected_at.lock().await = Some(IggyTimestamp::now()); + self.publish_event(DiagnosticEvent::Connected).await; + + let now = IggyTimestamp::now(); + info!( + "{NAME} client has connected to server: {} at: {now}", + server_addr + ); - let now = IggyTimestamp::now(); - info!( - "{NAME} client has connected to server: {} at: {now}", - server_addr - ); + break; + } - self.auto_login().await?; - return Ok(()); + if !self.check_and_maybe_redirect().await? { + return Ok(()); + } } } @@ -415,6 +423,41 @@ impl WebSocketClient { Err(IggyError::CannotEstablishConnection) } + async fn check_and_maybe_redirect(&self) -> Result<bool, IggyError> { + match &self.config.auto_login { + AutoLogin::Disabled => Ok(false), + AutoLogin::Enabled(_) => { + self.auto_login().await?; + + let current_address = self.current_server_address.lock().await.clone(); + let leader_address = check_and_redirect_to_leader(self, ¤t_address).await?; + + if let Some(new_leader_address) = leader_address { + let mut redirection_state = self.leader_redirection_state.lock().await; + if !redirection_state.can_redirect() { + warn!("Maximum leader redirections reached for WebSocket client"); + return Ok(false); + } + + redirection_state.increment_redirect(new_leader_address.clone()); + drop(redirection_state); + + info!( + "WebSocket client redirecting to leader at: {}", + new_leader_address + ); + self.connected_at.lock().await.take(); + self.disconnect().await?; + *self.current_server_address.lock().await = new_leader_address; + Ok(true) + } else { + self.leader_redirection_state.lock().await.reset(); + Ok(false) + } + } + } + } + async fn auto_login(&self) -> Result<(), IggyError> { let client_address = self.get_client_address_value().await; match &self.config.auto_login { diff --git a/core/server/src/args.rs b/core/server/src/args.rs index 3a6cfdb96..8ec1dc09d 100644 --- a/core/server/src/args.rs +++ b/core/server/src/args.rs @@ -105,4 +105,19 @@ pub struct Args { /// iggy-server --with-default-root-credentials # Use 'iggy/iggy' as root credentials #[arg(long, default_value_t = false, verbatim_doc_comment)] pub with_default_root_credentials: bool, + + /// Run server as a follower node (FOR TESTING LEADER REDIRECTION) + /// + /// When this flag is set, the server will report itself as a follower node + /// in cluster metadata responses. This is useful for testing leader-aware + /// client connections and redirection logic. + /// + /// The server will return cluster metadata showing this server as a follower node. + /// + /// Examples: + /// iggy-server # Run as leader (default) + /// iggy-server --follower # Run as follower + /// IGGY_TCP_ADDRESS=127.0.0.1:8091 iggy-server --follower # Follower on port 8091 + #[arg(long, default_value_t = false, verbatim_doc_comment)] + pub follower: bool, } diff --git a/core/server/src/configs/cluster.rs b/core/server/src/configs/cluster.rs index 25aae2201..3a0d4506c 100644 --- a/core/server/src/configs/cluster.rs +++ b/core/server/src/configs/cluster.rs @@ -16,6 +16,7 @@ * under the License. */ +use iggy_common::TransportProtocol; use serde::{Deserialize, Serialize}; #[derive(Debug, Deserialize, Serialize, Clone)] @@ -23,7 +24,7 @@ pub struct ClusterConfig { pub enabled: bool, pub name: String, pub id: u32, - pub transport: String, + pub transport: TransportProtocol, pub node: NodeConfig, pub nodes: Vec<ClusterNodeConfig>, } diff --git a/core/server/src/configs/validators.rs b/core/server/src/configs/validators.rs index 049f06ca0..792b8f434 100644 --- a/core/server/src/configs/validators.rs +++ b/core/server/src/configs/validators.rs @@ -16,8 +16,7 @@ * under the License. */ -extern crate sysinfo; - +use super::cluster::ClusterConfig; use super::server::{ DataMaintenanceConfig, MessageSaverConfig, MessagesMaintenanceConfig, TelemetryConfig, }; @@ -61,6 +60,9 @@ impl Validatable<ConfigError> for ServerConfig { self.system.sharding.validate().with_error(|error| { format!("{COMPONENT} (error: {error}) - failed to validate sharding config") })?; + self.cluster.validate().with_error(|error| { + format!("{COMPONENT} (error: {error}) - failed to validate cluster config") + })?; let topic_size = match self.system.topic.max_size { MaxTopicSize::Custom(size) => Ok(size.as_bytes_u64()), @@ -322,3 +324,93 @@ impl Validatable<ConfigError> for ShardingConfig { } } } + +impl Validatable<ConfigError> for ClusterConfig { + fn validate(&self) -> Result<(), ConfigError> { + if !self.enabled { + return Ok(()); + } + + // Validate cluster name is not empty + if self.name.trim().is_empty() { + eprintln!("Invalid cluster configuration: cluster name cannot be empty"); + return Err(ConfigError::InvalidConfiguration); + } + + // Validate nodes list is not empty + if self.nodes.is_empty() { + eprintln!("Invalid cluster configuration: nodes list cannot be empty"); + return Err(ConfigError::InvalidConfiguration); + } + + // Check if nodes start from ID 0 + let has_node_zero = self.nodes.iter().any(|node| node.id == 0); + if !has_node_zero { + eprintln!("Invalid cluster configuration: nodes must start from ID 0"); + return Err(ConfigError::InvalidConfiguration); + } + + // Check if current node ID exists in nodes vector + let current_node_exists = self.nodes.iter().any(|node| node.id == self.node.id); + if !current_node_exists { + eprintln!( + "Invalid cluster configuration: current node ID {} not found in nodes list", + self.node.id + ); + return Err(ConfigError::InvalidConfiguration); + } + + // Check for duplicate node IDs + let mut node_ids = std::collections::HashSet::new(); + for node in &self.nodes { + if !node_ids.insert(node.id) { + eprintln!( + "Invalid cluster configuration: duplicate node ID {} found", + node.id + ); + return Err(ConfigError::InvalidConfiguration); + } + } + + // Validate unique addresses (IP:port combinations) + let mut addresses = std::collections::HashSet::new(); + for node in &self.nodes { + // Validate address format (should contain IP:port or [IPv6]:port) + let is_valid_address = if node.address.starts_with('[') { + // IPv6 address format: [::1]:8090 + node.address.contains("]:") && node.address.matches(':').count() >= 2 + } else { + // IPv4 address format: 127.0.0.1:8090 + node.address.matches(':').count() == 1 + }; + + if !is_valid_address { + eprintln!( + "Invalid cluster configuration: malformed address '{}' for node ID {}", + node.address, node.id + ); + return Err(ConfigError::InvalidConfiguration); + } + + // Check for duplicate full addresses + if !addresses.insert(node.address.clone()) { + eprintln!( + "Invalid cluster configuration: duplicate address {} found (node ID: {})", + node.address, node.id + ); + return Err(ConfigError::InvalidConfiguration); + } + + // Validate node name is not empty + if node.name.trim().is_empty() { + eprintln!( + "Invalid cluster configuration: node name cannot be empty for node ID {}", + node.id + ); + return Err(ConfigError::InvalidConfiguration); + } + } + + Ok(()) + } +} diff --git a/core/server/src/main.rs b/core/server/src/main.rs index becbfaf50..896eace55 100644 --- a/core/server/src/main.rs +++ b/core/server/src/main.rs @@ -82,6 +82,7 @@ async fn main() -> Result<(), ServerError> { ); } let args = Args::parse(); + let is_follower = args.follower; // FIRST DISCRETE LOADING STEP. // Load config and create directories. @@ -115,6 +116,10 @@ async fn main() -> Result<(), ServerError> { // From this point on, we can use tracing macros to log messages. logging.late_init(config.system.get_system_path(), &config.system.logging)?; + if is_follower { + info!("Server is running in FOLLOWER mode for testing leader redirection"); + } + if args.with_default_root_credentials { let username_set = std::env::var("IGGY_ROOT_USERNAME").is_ok(); let password_set = std::env::var("IGGY_ROOT_PASSWORD").is_ok(); @@ -357,6 +362,7 @@ async fn main() -> Result<(), ServerError> { .encryptor(encryptor) .version(current_version) .metrics(metrics) + .is_follower(is_follower) .build(); let shard = Rc::new(shard); diff --git a/core/server/src/quic/listener.rs b/core/server/src/quic/listener.rs index 57140e68a..8f842fecf 100644 --- a/core/server/src/quic/listener.rs +++ b/core/server/src/quic/listener.rs @@ -25,8 +25,7 @@ use crate::streaming::session::Session; use anyhow::anyhow; use compio_quic::{Connection, Endpoint, RecvStream, SendStream}; use futures::FutureExt; -use iggy_common::IggyError; -use iggy_common::TransportProtocol; +use iggy_common::{GET_CLUSTER_METADATA_CODE, IggyError, TransportProtocol}; use std::rc::Rc; use tracing::{debug, error, info, trace}; @@ -198,21 +197,32 @@ async fn handle_stream( Ok(()) } Err(e) => { - error!( - "Command was not handled successfully, session: {:?}, error: {e}.", - session - ); - // Only return a connection-terminating error for client not found - if let IggyError::ClientNotFound(_) = e { - sender.send_error_response(e.clone()).await?; - trace!("QUIC error response was sent."); - error!("Session will be deleted."); - Err(anyhow!("Client not found: {e}")) - } else { - // For all other errors, send response and continue the connection + // Special handling for GetClusterMetadata when clustering is disabled + if code == GET_CLUSTER_METADATA_CODE && matches!(e, IggyError::FeatureUnavailable) { + debug!( + "GetClusterMetadata command not available (clustering disabled), session: {:?}.", + session + ); sender.send_error_response(e).await?; trace!("QUIC error response was sent."); Ok(()) + } else { + error!( + "Command was not handled successfully, session: {:?}, error: {e}.", + session + ); + // Only return a connection-terminating error for client not found + if let IggyError::ClientNotFound(_) = e { + sender.send_error_response(e.clone()).await?; + trace!("QUIC error response was sent."); + error!("Session will be deleted."); + Err(anyhow!("Client not found: {e}")) + } else { + // For all other errors, send response and continue the connection + sender.send_error_response(e).await?; + trace!("QUIC error response was sent."); + Ok(()) + } } } } diff --git a/core/server/src/shard/builder.rs b/core/server/src/shard/builder.rs index 37e3099ec..98914543e 100644 --- a/core/server/src/shard/builder.rs +++ b/core/server/src/shard/builder.rs @@ -48,6 +48,7 @@ pub struct IggyShardBuilder { encryptor: Option<EncryptorKind>, version: Option<SemanticVersion>, metrics: Option<Metrics>, + is_follower: bool, } impl IggyShardBuilder { @@ -109,6 +110,11 @@ impl IggyShardBuilder { self } + pub fn is_follower(mut self, is_follower: bool) -> Self { + self.is_follower = is_follower; + self + } + // TODO: Too much happens in there, some of those bootstrapping logic should be moved outside. pub fn build(self) -> IggyShard { let id = self.id.unwrap(); @@ -161,6 +167,7 @@ impl IggyShardBuilder { stop_receiver, messages_receiver: Cell::new(Some(frame_receiver)), metrics, + is_follower: self.is_follower, is_shutting_down: AtomicBool::new(false), tcp_bound_address: Cell::new(None), quic_bound_address: Cell::new(None), diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs index 9d272c417..64243f51e 100644 --- a/core/server/src/shard/mod.rs +++ b/core/server/src/shard/mod.rs @@ -86,6 +86,7 @@ pub struct IggyShard { pub(crate) permissioner: RefCell<Permissioner>, pub(crate) users: Users, pub(crate) metrics: Metrics, + pub(crate) is_follower: bool, pub messages_receiver: Cell<Option<Receiver<ShardFrame>>>, pub(crate) stop_receiver: StopReceiver, pub(crate) is_shutting_down: AtomicBool, diff --git a/core/server/src/shard/system/cluster.rs b/core/server/src/shard/system/cluster.rs index 1619bb8aa..e4af6d724 100644 --- a/core/server/src/shard/system/cluster.rs +++ b/core/server/src/shard/system/cluster.rs @@ -18,10 +18,7 @@ use crate::shard::IggyShard; use crate::streaming::session::Session; -use iggy_common::{ - ClusterMetadata, ClusterNode, ClusterNodeRole, ClusterNodeStatus, IggyError, TransportProtocol, -}; -use std::str::FromStr; +use iggy_common::{ClusterMetadata, ClusterNode, ClusterNodeRole, ClusterNodeStatus, IggyError}; use tracing::trace; impl IggyShard { @@ -35,12 +32,13 @@ impl IggyShard { // TODO(hubcio): Clustering is not yet implemented // The leader/follower as well as node status are currently placeholder implementations. - let name = self.config.cluster.name.clone(); - let id = self.config.cluster.id; + let cluster_name = self.config.cluster.name.clone(); + let cluster_id = self.config.cluster.id; - // Parse transport string to TransportProtocol enum - let transport = TransportProtocol::from_str(&self.config.cluster.transport) - .map_err(|_| IggyError::InvalidConfiguration)?; + // Cannot fail because we validated it in config + let transport = self.config.cluster.transport; + + let own_node_id = self.config.cluster.node.id; let nodes: Vec<ClusterNode> = self .config @@ -48,14 +46,26 @@ impl IggyShard { .nodes .iter() .map(|node_config| { - let role = if node_config.id == 1 { - ClusterNodeRole::Leader + let (role, status) = if node_config.id == own_node_id { + ( + if self.is_follower { + ClusterNodeRole::Follower + } else { + ClusterNodeRole::Leader + }, + ClusterNodeStatus::Healthy, + ) } else { - ClusterNodeRole::Follower + ( + if self.is_follower { + ClusterNodeRole::Leader + } else { + ClusterNodeRole::Follower + }, + ClusterNodeStatus::Healthy, + ) }; - let status = ClusterNodeStatus::Healthy; - ClusterNode { id: node_config.id, name: node_config.name.clone(), @@ -67,8 +77,8 @@ impl IggyShard { .collect(); Ok(ClusterMetadata { - name, - id, + name: cluster_name, + id: cluster_id, transport, nodes, }) diff --git a/core/server/src/tcp/connection_handler.rs b/core/server/src/tcp/connection_handler.rs index 4df84fa0d..30e142e44 100644 --- a/core/server/src/tcp/connection_handler.rs +++ b/core/server/src/tcp/connection_handler.rs @@ -25,7 +25,7 @@ use crate::tcp::connection_handler::command::ServerCommand; use async_channel::Receiver; use bytes::BytesMut; use futures::FutureExt; -use iggy_common::IggyError; +use iggy_common::{GET_CLUSTER_METADATA_CODE, IggyError}; use std::io::ErrorKind; use std::rc::Rc; use tracing::{debug, error, info}; @@ -89,19 +89,30 @@ pub(crate) async fn handle_connection( ); } Err(error) => { - error!( - "Command with code {cmd_code} was not handled successfully, session: {session}, error: {error}." - ); - if let IggyError::ClientNotFound(_) = error { + // Special handling for GetClusterMetadata when clustering is disabled + if cmd_code == GET_CLUSTER_METADATA_CODE + && matches!(error, IggyError::FeatureUnavailable) + { + debug!( + "GetClusterMetadata command not available (clustering disabled), session: {session}." + ); sender.send_error_response(error).await?; debug!("TCP error response was sent to: {session}."); - error!("Session: {session} will be deleted."); - return Err(ConnectionError::from(IggyError::ClientNotFound( - session.client_id, - ))); } else { - sender.send_error_response(error).await?; - debug!("TCP error response was sent to: {session}."); + error!( + "Command with code {cmd_code} was not handled successfully, session: {session}, error: {error}." + ); + if let IggyError::ClientNotFound(_) = error { + sender.send_error_response(error).await?; + debug!("TCP error response was sent to: {session}."); + error!("Session: {session} will be deleted."); + return Err(ConnectionError::from(IggyError::ClientNotFound( + session.client_id, + ))); + } else { + sender.send_error_response(error).await?; + debug!("TCP error response was sent to: {session}."); + } } } }
