This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch cluster-metadata-port in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 8fd9c403b05be45594b074f47781a41aa1ab83b1 Author: Hubert Gruszecki <[email protected]> AuthorDate: Fri Nov 21 18:27:00 2025 +0100 feat(cluster): add full transport endpoints to cluster configuration BREAKING CHANGE: ClusterNode now uses TransportEndpoints instead of single address - Add TransportEndpoints struct with TCP, QUIC, HTTP, WebSocket addresses - Restructure cluster config: current node (name only) + other nodes (full details) - Current node derives ports from main transport configs to avoid duplication - Update SDK leader detection to use appropriate transport endpoint - Fix integration tests for new configuration structure This enables cluster nodes to have complete information about all transport endpoints of other nodes, supporting multi-protocol cluster communication. --- bdd/docker-compose.yml | 44 +-- bdd/rust/tests/helpers/cluster.rs | 10 +- bdd/rust/tests/steps/leader_redirection.rs | 24 +- core/common/src/types/cluster/metadata.rs | 38 +- core/common/src/types/cluster/mod.rs | 2 + core/common/src/types/cluster/node.rs | 54 ++- .../src/types/cluster/transport_endpoints.rs | 137 +++++++ core/configs/server.toml | 31 +- core/integration/tests/config_provider/mod.rs | 414 +++++++++------------ core/integration/tests/mcp/mod.rs | 7 +- core/sdk/src/leader_aware.rs | 23 +- core/sdk/src/prelude.rs | 6 +- core/sdk/src/quic/quic_client.rs | 7 +- core/sdk/src/tcp/tcp_client.rs | 7 +- core/sdk/src/websocket/websocket_client.rs | 7 +- core/server/src/configs/cluster.rs | 26 +- core/server/src/configs/config_provider.rs | 27 +- core/server/src/configs/defaults.rs | 22 +- core/server/src/configs/validators.rs | 99 +++-- core/server/src/shard/system/cluster.rs | 131 +++++-- core/server/src/streaming/systems/cluster/mod.rs | 60 +-- core/server/src/streaming/utils/address.rs | 76 ++++ core/server/src/streaming/utils/mod.rs | 1 + 23 files changed, 747 insertions(+), 506 deletions(-) diff --git a/bdd/docker-compose.yml b/bdd/docker-compose.yml index b9469dd31..fd52d5867 100644 --- a/bdd/docker-compose.yml +++ b/bdd/docker-compose.yml @@ -95,18 +95,16 @@ services: - IGGY_CLUSTER_ENABLED=true - IGGY_CLUSTER_NAME=test-cluster - IGGY_CLUSTER_ID=1 - - IGGY_CLUSTER_TRANSPORT=tcp - # This node's identity - - IGGY_CLUSTER_NODE_ID=0 - - IGGY_CLUSTER_NODE_NAME=leader-node - - IGGY_CLUSTER_NODE_ADDRESS=iggy-leader:8091 - # Cluster nodes configuration (indexed format for array) - - IGGY_CLUSTER_NODES_0_ID=0 - - IGGY_CLUSTER_NODES_0_NAME=leader-node - - IGGY_CLUSTER_NODES_0_ADDRESS=iggy-leader:8091 - - IGGY_CLUSTER_NODES_1_ID=1 - - IGGY_CLUSTER_NODES_1_NAME=follower-node - - IGGY_CLUSTER_NODES_1_ADDRESS=iggy-follower:8092 + # Current node identity + - IGGY_CLUSTER_NODE_CURRENT_NAME=leader-node + - IGGY_CLUSTER_NODE_CURRENT_IP=iggy-leader + # Other nodes configuration + - IGGY_CLUSTER_NODE_OTHERS_0_NAME=follower-node + - IGGY_CLUSTER_NODE_OTHERS_0_IP=iggy-follower + - IGGY_CLUSTER_NODE_OTHERS_0_PORTS_TCP=8092 + - IGGY_CLUSTER_NODE_OTHERS_0_PORTS_QUIC=8082 + - IGGY_CLUSTER_NODE_OTHERS_0_PORTS_HTTP=3002 + - IGGY_CLUSTER_NODE_OTHERS_0_PORTS_WEBSOCKET=8072 volumes: - iggy_leader_data:/app/local_data_leader networks: @@ -152,18 +150,16 @@ services: - IGGY_CLUSTER_ENABLED=true - IGGY_CLUSTER_NAME=test-cluster - IGGY_CLUSTER_ID=1 - - IGGY_CLUSTER_TRANSPORT=tcp - # This node's identity (different from leader) - - IGGY_CLUSTER_NODE_ID=1 - - IGGY_CLUSTER_NODE_NAME=follower-node - - IGGY_CLUSTER_NODE_ADDRESS=iggy-follower:8092 - # Cluster nodes configuration (indexed format for array) - - IGGY_CLUSTER_NODES_0_ID=0 - - IGGY_CLUSTER_NODES_0_NAME=leader-node - - IGGY_CLUSTER_NODES_0_ADDRESS=iggy-leader:8091 - - IGGY_CLUSTER_NODES_1_ID=1 - - IGGY_CLUSTER_NODES_1_NAME=follower-node - - IGGY_CLUSTER_NODES_1_ADDRESS=iggy-follower:8092 + # Current node identity (different from leader) + - IGGY_CLUSTER_NODE_CURRENT_NAME=follower-node + - IGGY_CLUSTER_NODE_CURRENT_IP=iggy-follower + # Other nodes configuration + - IGGY_CLUSTER_NODE_OTHERS_0_NAME=leader-node + - IGGY_CLUSTER_NODE_OTHERS_0_IP=iggy-leader + - IGGY_CLUSTER_NODE_OTHERS_0_PORTS_TCP=8091 + - IGGY_CLUSTER_NODE_OTHERS_0_PORTS_QUIC=8081 + - IGGY_CLUSTER_NODE_OTHERS_0_PORTS_HTTP=3001 + - IGGY_CLUSTER_NODE_OTHERS_0_PORTS_WEBSOCKET=8071 volumes: - iggy_follower_data:/app/local_data_follower networks: diff --git a/bdd/rust/tests/helpers/cluster.rs b/bdd/rust/tests/helpers/cluster.rs index 71ef991af..b58c05be9 100644 --- a/bdd/rust/tests/helpers/cluster.rs +++ b/bdd/rust/tests/helpers/cluster.rs @@ -112,7 +112,7 @@ pub fn update_node_role( ) -> bool { if let Some(node) = nodes .iter_mut() - .find(|n| n.id == node_id && n.address.ends_with(&format!(":{}", port))) + .find(|n| n.name == format!("node-{}", node_id) && n.endpoints.tcp == port) { node.role = role; node.status = ClusterNodeStatus::Healthy; @@ -122,14 +122,6 @@ pub fn update_node_role( } } -/// Extracts port number from an address string -pub fn extract_port_from_address(address: &str) -> Option<u16> { - address - .rsplit(':') - .next() - .and_then(|port_str| port_str.parse().ok()) -} - /// Determines server type from port number pub fn server_type_from_port(port: u16) -> &'static str { match port { diff --git a/bdd/rust/tests/steps/leader_redirection.rs b/bdd/rust/tests/steps/leader_redirection.rs index 7d484ff3d..20c36af7e 100644 --- a/bdd/rust/tests/steps/leader_redirection.rs +++ b/bdd/rust/tests/steps/leader_redirection.rs @@ -36,10 +36,21 @@ async fn given_cluster_config(world: &mut LeaderContext, node_count: usize) { #[given(regex = r"^node (\d+) is configured on port (\d+)$")] async fn given_node_configured(world: &mut LeaderContext, node_id: u32, port: u16) { + let (quic_port, http_port, ws_port) = match port { + 8091 => (8081, 3001, 8071), // Leader ports + 8092 => (8082, 3002, 8072), // Follower ports + _ => (port - 10, port - 5090, port - 20), // Default mapping + }; + let node = ClusterNode { - id: node_id, name: format!("node-{}", node_id), - address: format!("iggy-server:{}", port), + ip: "iggy-server".to_string(), + endpoints: TransportEndpoints::new( + port, // TCP port + quic_port, // QUIC port + http_port, // HTTP port + ws_port, // WebSocket port + ), role: ClusterNodeRole::Follower, status: ClusterNodeStatus::Healthy, }; @@ -196,7 +207,7 @@ async fn then_verify_client_port(world: &mut LeaderContext, expected_port: u16) // Check cluster metadata if available if let Ok(Some(leader)) = cluster::verify_leader_in_metadata(client).await { // If we found a leader and we're connected to the leader port, mark redirection - if cluster::extract_port_from_address(&leader.address) == Some(expected_port) { + if leader.endpoints.tcp == expected_port { world.test_state.redirection_occurred = true; } } @@ -223,8 +234,8 @@ async fn then_verify_named_client_port( if let Ok(Some(leader)) = cluster::verify_leader_in_metadata(client).await { assert!( - cluster::extract_port_from_address(&leader.address).is_some(), - "Client {} should find valid leader in cluster metadata", + leader.endpoints.tcp > 0, + "Client {} should find valid leader TCP port in cluster metadata", client_name ); } @@ -298,7 +309,8 @@ async fn then_both_use_same_server(world: &mut LeaderContext) { cluster::verify_leader_in_metadata(client_b).await, ) { assert_eq!( - leader_a.address, leader_b.address, + format!("{}:{}", leader_a.ip, leader_a.endpoints.tcp), + format!("{}:{}", leader_b.ip, leader_b.endpoints.tcp), "Both clients should see the same leader" ); } diff --git a/core/common/src/types/cluster/metadata.rs b/core/common/src/types/cluster/metadata.rs index c89e94084..3d6e403e0 100644 --- a/core/common/src/types/cluster/metadata.rs +++ b/core/common/src/types/cluster/metadata.rs @@ -16,7 +16,7 @@ * under the License. */ -use crate::{BytesSerializable, IggyError, TransportProtocol, types::cluster::node::ClusterNode}; +use crate::{BytesSerializable, IggyError, types::cluster::node::ClusterNode}; use bytes::{BufMut, Bytes, BytesMut}; use serde::{Deserialize, Serialize}; use std::fmt::Display; @@ -28,21 +28,17 @@ pub struct ClusterMetadata { pub name: String, /// Unique identifier of the cluster. pub id: u32, - /// Transport used for cluster communication (for binary protocol it's u8, 1=TCP, 2=QUIC, 3=HTTP). - /// For HTTP it's a string "tcp", "quic", "http". - pub transport: TransportProtocol, - /// List of all nodes in the cluster. + /// List of all nodes in the cluster with their transport endpoints. pub nodes: Vec<ClusterNode>, } impl BytesSerializable for ClusterMetadata { fn to_bytes(&self) -> Bytes { let name_bytes = self.name.as_bytes(); - let transport = self.transport as u8; // Calculate size for each node let nodes_size: usize = self.nodes.iter().map(|node| node.get_buffer_size()).sum(); - let size = 4 + name_bytes.len() + 4 + 1 + 4 + nodes_size; // name_len + name + id + transport + nodes_len + nodes + let size = 4 + name_bytes.len() + 4 + 4 + nodes_size; // name_len + name + id + nodes_len + nodes let mut bytes = BytesMut::with_capacity(size); @@ -53,9 +49,6 @@ impl BytesSerializable for ClusterMetadata { // Write cluster id bytes.put_u32_le(self.id); - // Write transport - bytes.put_u8(transport); - // Write nodes count bytes.put_u32_le(self.nodes.len() as u32); @@ -101,14 +94,6 @@ impl BytesSerializable for ClusterMetadata { ); position += 4; - // Read transport - if bytes.len() < position + 1 { - return Err(IggyError::InvalidCommand); - } - let transport_byte = bytes[position]; - let transport = TransportProtocol::try_from(transport_byte)?; - position += 1; - // Read nodes count if bytes.len() < position + 4 { return Err(IggyError::InvalidCommand); @@ -128,17 +113,11 @@ impl BytesSerializable for ClusterMetadata { nodes.push(node); } - Ok(ClusterMetadata { - name, - id, - transport, - nodes, - }) + Ok(ClusterMetadata { name, id, nodes }) } fn write_to_buffer(&self, buf: &mut BytesMut) { let name_bytes = self.name.as_bytes(); - let transport = self.transport as u8; // Write name length and name buf.put_u32_le(name_bytes.len() as u32); @@ -147,9 +126,6 @@ impl BytesSerializable for ClusterMetadata { // Write cluster id buf.put_u32_le(self.id); - // Write transport as u8 - buf.put_u8(transport); - // Write nodes count buf.put_u32_le(self.nodes.len() as u32); @@ -161,7 +137,7 @@ impl BytesSerializable for ClusterMetadata { fn get_buffer_size(&self) -> usize { let nodes_size: usize = self.nodes.iter().map(|node| node.get_buffer_size()).sum(); - 4 + self.name.len() + 4 + 1 + 4 + nodes_size // name_len + name + id + transport + nodes_len + nodes + 4 + self.name.len() + 4 + 4 + nodes_size // name_len + name + id + nodes_len + nodes } } @@ -175,8 +151,8 @@ impl Display for ClusterMetadata { write!( f, - "ClusterMetadata {{ name: {}, id: {}, transport: {}, nodes: {:?} }}", - self.name, self.id, self.transport, nodes + "ClusterMetadata {{ name: {}, id: {}, nodes: {:?} }}", + self.name, self.id, nodes ) } } diff --git a/core/common/src/types/cluster/mod.rs b/core/common/src/types/cluster/mod.rs index 7573d89be..3215bb9c5 100644 --- a/core/common/src/types/cluster/mod.rs +++ b/core/common/src/types/cluster/mod.rs @@ -20,8 +20,10 @@ mod metadata; mod node; mod role; mod status; +mod transport_endpoints; pub use metadata::ClusterMetadata; pub use node::ClusterNode; pub use role::ClusterNodeRole; pub use status::ClusterNodeStatus; +pub use transport_endpoints::TransportEndpoints; diff --git a/core/common/src/types/cluster/node.rs b/core/common/src/types/cluster/node.rs index aa2308040..63c2dad1d 100644 --- a/core/common/src/types/cluster/node.rs +++ b/core/common/src/types/cluster/node.rs @@ -18,7 +18,9 @@ use crate::{ BytesSerializable, IggyError, - types::cluster::{role::ClusterNodeRole, status::ClusterNodeStatus}, + types::cluster::{ + role::ClusterNodeRole, status::ClusterNodeStatus, transport_endpoints::TransportEndpoints, + }, }; use bytes::{BufMut, Bytes, BytesMut}; use serde::{Deserialize, Serialize}; @@ -26,9 +28,9 @@ use std::fmt::Display; #[derive(Debug, Deserialize, Serialize, Clone)] pub struct ClusterNode { - pub id: u32, pub name: String, - pub address: String, + pub ip: String, + pub endpoints: TransportEndpoints, pub role: ClusterNodeRole, pub status: ClusterNodeStatus, } @@ -43,20 +45,12 @@ impl BytesSerializable for ClusterNode { fn from_bytes(bytes: Bytes) -> Result<Self, IggyError> { if bytes.len() < 10 { - // Minimum: 4 (id) + 4 (name_len) + 4 (address_len) + 1 (role) + 1 (status) + // Minimum: 4 (name_len) + 4 (ip_len) + 1 (role) + 1 (status) return Err(IggyError::InvalidCommand); } let mut position = 0; - // Read id - let id = u32::from_le_bytes( - bytes[position..position + 4] - .try_into() - .map_err(|_| IggyError::InvalidNumberEncoding)?, - ); - position += 4; - // Read name length let name_len = u32::from_le_bytes( bytes[position..position + 4] @@ -73,24 +67,26 @@ impl BytesSerializable for ClusterNode { .map_err(|_| IggyError::InvalidCommand)?; position += name_len; - // Read address length - if bytes.len() < position + 4 { - return Err(IggyError::InvalidCommand); - } - let address_len = u32::from_le_bytes( + // Read IP length + let ip_len = u32::from_le_bytes( bytes[position..position + 4] .try_into() .map_err(|_| IggyError::InvalidNumberEncoding)?, ) as usize; position += 4; - // Read address - if bytes.len() < position + address_len { + // Read IP + if bytes.len() < position + ip_len { return Err(IggyError::InvalidCommand); } - let address = String::from_utf8(bytes[position..position + address_len].to_vec()) + let ip = String::from_utf8(bytes[position..position + ip_len].to_vec()) .map_err(|_| IggyError::InvalidCommand)?; - position += address_len; + position += ip_len; + + // Read transport endpoints + let endpoints_bytes = bytes.slice(position..); + let endpoints = TransportEndpoints::from_bytes(endpoints_bytes)?; + position += endpoints.get_buffer_size(); // Read role if bytes.len() < position + 1 { @@ -106,26 +102,26 @@ impl BytesSerializable for ClusterNode { let status = ClusterNodeStatus::try_from(bytes[position])?; Ok(ClusterNode { - id, name, - address, + ip, + endpoints, role, status, }) } fn write_to_buffer(&self, buf: &mut BytesMut) { - buf.put_u32_le(self.id); buf.put_u32_le(self.name.len() as u32); buf.put_slice(self.name.as_bytes()); - buf.put_u32_le(self.address.len() as u32); - buf.put_slice(self.address.as_bytes()); + buf.put_u32_le(self.ip.len() as u32); + buf.put_slice(self.ip.as_bytes()); + self.endpoints.write_to_buffer(buf); self.role.write_to_buffer(buf); self.status.write_to_buffer(buf); } fn get_buffer_size(&self) -> usize { - 4 + 4 + self.name.len() + 4 + self.address.len() + 1 + 1 // id + name_len + name + address_len + address + role + status + 4 + self.name.len() + 4 + self.ip.len() + self.endpoints.get_buffer_size() + 1 + 1 // name_len + name + ip_len + ip + endpoints + role + status } } @@ -133,8 +129,8 @@ impl Display for ClusterNode { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, - "ClusterNode {{ id: {}, name: {}, address: {}, role: {}, status: {} }}", - self.id, self.name, self.address, self.role, self.status + "ClusterNode {{ name: {}, ip: {}, endpoints: {}, role: {}, status: {} }}", + self.name, self.ip, self.endpoints, self.role, self.status ) } } diff --git a/core/common/src/types/cluster/transport_endpoints.rs b/core/common/src/types/cluster/transport_endpoints.rs new file mode 100644 index 000000000..a1e8db8b5 --- /dev/null +++ b/core/common/src/types/cluster/transport_endpoints.rs @@ -0,0 +1,137 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::{BytesSerializable, IggyError}; +use bytes::{BufMut, Bytes, BytesMut}; +use serde::{Deserialize, Serialize}; +use std::fmt::Display; + +#[derive(Debug, Deserialize, Serialize, Clone)] +pub struct TransportEndpoints { + pub tcp: u16, + pub quic: u16, + pub http: u16, + pub websocket: u16, +} + +impl TransportEndpoints { + pub fn new(tcp: u16, quic: u16, http: u16, websocket: u16) -> Self { + Self { + tcp, + quic, + http, + websocket, + } + } + + /// Creates full address strings by combining with an IP address + pub fn with_ip(&self, ip: &str) -> TransportEndpointsAddresses { + TransportEndpointsAddresses { + tcp: format!("{}:{}", ip, self.tcp), + quic: format!("{}:{}", ip, self.quic), + http: format!("{}:{}", ip, self.http), + websocket: format!("{}:{}", ip, self.websocket), + } + } +} + +/// Helper struct for when we need full addresses +#[derive(Debug, Clone)] +pub struct TransportEndpointsAddresses { + pub tcp: String, + pub quic: String, + pub http: String, + pub websocket: String, +} + +impl BytesSerializable for TransportEndpoints { + fn to_bytes(&self) -> Bytes { + let size = self.get_buffer_size(); + let mut bytes = BytesMut::with_capacity(size); + self.write_to_buffer(&mut bytes); + bytes.freeze() + } + + fn from_bytes(bytes: Bytes) -> Result<Self, IggyError> { + if bytes.len() < 8 { + // Minimum: 4 ports * 2 bytes each + return Err(IggyError::InvalidCommand); + } + + let mut position = 0; + + // Read TCP port + let tcp = u16::from_le_bytes( + bytes[position..position + 2] + .try_into() + .map_err(|_| IggyError::InvalidNumberEncoding)?, + ); + position += 2; + + // Read QUIC port + let quic = u16::from_le_bytes( + bytes[position..position + 2] + .try_into() + .map_err(|_| IggyError::InvalidNumberEncoding)?, + ); + position += 2; + + // Read HTTP port + let http = u16::from_le_bytes( + bytes[position..position + 2] + .try_into() + .map_err(|_| IggyError::InvalidNumberEncoding)?, + ); + position += 2; + + // Read WebSocket port + let websocket = u16::from_le_bytes( + bytes[position..position + 2] + .try_into() + .map_err(|_| IggyError::InvalidNumberEncoding)?, + ); + + Ok(TransportEndpoints { + tcp, + quic, + http, + websocket, + }) + } + + fn write_to_buffer(&self, buf: &mut BytesMut) { + buf.put_u16_le(self.tcp); + buf.put_u16_le(self.quic); + buf.put_u16_le(self.http); + buf.put_u16_le(self.websocket); + } + + fn get_buffer_size(&self) -> usize { + 8 // 4 ports * 2 bytes each + } +} + +impl Display for TransportEndpoints { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "tcp: {}, quic: {}, http: {}, websocket: {}", + self.tcp, self.quic, self.http, self.websocket + ) + } +} diff --git a/core/configs/server.toml b/core/configs/server.toml index 892c8cdbf..01500569a 100644 --- a/core/configs/server.toml +++ b/core/configs/server.toml @@ -518,26 +518,25 @@ id = 0 # This prevents accidental cross-cluster communication. name = "iggy-cluster" -# Transport protocol used for communication between nodes in the cluster (string). -# Supported values: "tcp", "quic", "ws" -transport = "tcp" - # Current node configuration -[cluster.node] -# This node unique identifier within the cluster (u32). -# Must be unique across all nodes and match one of the ids in the nodes list. -id = 0 - -# All nodes in the cluster. -[[cluster.nodes]] -id = 0 +[cluster.node.current] +# Name of this node - must be unique across all nodes in the cluster +# The node will use its configured transport addresses (tcp.address, quic.address, etc.) name = "iggy-node-1" -address = "127.0.0.1:8090" +# IP address that other nodes should use to connect to this node +ip = "127.0.0.1" -[[cluster.nodes]] -id = 1 +# Other nodes in the cluster +# Each node must specify all transport ports explicitly +[[cluster.node.others]] name = "iggy-node-2" -address = "127.0.0.1:8091" +ip = "127.0.0.1" +ports = { tcp = 8091, quic = 8081, http = 3001, websocket = 8093 } + +# [[cluster.node.others]] +# name = "iggy-node-3" +# ip = "192.168.1.100" +# ports = { tcp = 8092, quic = 8082, http = 3002, websocket = 8094 } # Sharding configuration [system.sharding] diff --git a/core/integration/tests/config_provider/mod.rs b/core/integration/tests/config_provider/mod.rs index 8b41411bf..aa0976c5c 100644 --- a/core/integration/tests/config_provider/mod.rs +++ b/core/integration/tests/config_provider/mod.rs @@ -16,59 +16,31 @@ * under the License. */ -use integration::file::{file_exists, get_root_path}; use serial_test::serial; use server::configs::config_provider::{ConfigProvider, FileConfigProvider}; use std::env; -async fn scenario_parsing_from_file(extension: &str) { - let mut config_path = get_root_path().join("../configs/server"); - assert!(config_path.set_extension(extension), "Cannot set extension"); - let config_path = config_path.as_path().display().to_string(); - let config_provider = FileConfigProvider::new(config_path.clone()); - assert!( - file_exists(&config_path), - "Config file not found: {config_path}" - ); - assert!( - config_provider.load_config().await.is_ok(), - "ConfigProvider failed to parse config from {config_path}" - ); -} +use integration::file::get_root_path; -#[compio::test] -async fn validate_server_config_toml_from_repository() { - scenario_parsing_from_file("toml").await; -} - -// This test needs to be run in serial because it modifies the environment variables -// which are shared, since all tests run in parallel by default. #[serial] -#[compio::test] -async fn validate_custom_env_provider() { - let expected_datagram_send_buffer_size = "1.00 KB"; - let expected_quic_certificate_self_signed = false; - let expected_http_enabled = false; - let expected_tcp_enabled = "false"; - let expected_message_saver_enabled = false; - let expected_message_expiry = "10s"; +#[tokio::test] +async fn validate_config_env_override() { + let expected_http = true; + let expected_tcp = true; + let expected_message_saver = true; + let expected_message_expiry = "1000ms"; unsafe { + env::set_var("IGGY_HTTP_ENABLED", expected_http.to_string()); + env::set_var("IGGY_TCP_ENABLED", expected_tcp.to_string()); env::set_var( - "IGGY_QUIC_DATAGRAM_SEND_BUFFER_SIZE", - expected_datagram_send_buffer_size, - ); - env::set_var( - "IGGY_QUIC_CERTIFICATE_SELF_SIGNED", - expected_quic_certificate_self_signed.to_string(), + "IGGY_MESSAGE_SAVER_ENABLED", + expected_message_saver.to_string(), ); - env::set_var("IGGY_HTTP_ENABLED", expected_http_enabled.to_string()); - env::set_var("IGGY_TCP_ENABLED", expected_tcp_enabled); env::set_var( - "IGGY_MESSAGE_SAVER_ENABLED", - expected_message_saver_enabled.to_string(), + "IGGY_SYSTEM_SEGMENT_MESSAGE_EXPIRY", + expected_message_expiry, ); - env::set_var("IGGY_SYSTEM_SEGMENT_MESSAGE_EXPIRY", "10s"); } let config_path = get_root_path().join("../configs/server.toml"); @@ -76,220 +48,145 @@ async fn validate_custom_env_provider() { let config = file_config_provider .load_config() .await - .expect("Failed to load default server.toml config"); + .expect("Failed to load server.toml config"); + assert_eq!(config.http.enabled, expected_http); + assert_eq!(config.tcp.enabled, expected_tcp); + assert_eq!(config.message_saver.enabled, expected_message_saver); + + // Check message_expiry was properly set from environment variable + use iggy_common::{IggyDuration, IggyExpiry}; + use std::time::Duration; assert_eq!( - config.quic.datagram_send_buffer_size.to_string(), - expected_datagram_send_buffer_size - ); - assert_eq!( - config.quic.certificate.self_signed, - expected_quic_certificate_self_signed - ); - assert_eq!(config.http.enabled, expected_http_enabled); - assert_eq!(config.tcp.enabled.to_string(), expected_tcp_enabled); - assert_eq!(config.message_saver.enabled, expected_message_saver_enabled); - assert_eq!( - config.system.segment.message_expiry.to_string(), - expected_message_expiry + config.system.segment.message_expiry, + IggyExpiry::ExpireDuration(IggyDuration::from(Duration::from_millis(1000))) ); unsafe { - env::remove_var("IGGY_QUIC_DATAGRAM_SEND_BUFFER_SIZE"); - env::remove_var("IGGY_QUIC_CERTIFICATE_SELF_SIGNED"); env::remove_var("IGGY_HTTP_ENABLED"); env::remove_var("IGGY_TCP_ENABLED"); env::remove_var("IGGY_MESSAGE_SAVER_ENABLED"); - env::remove_var("IGGY_SYSTEM_RETENTION_POLICY_MESSAGE_EXPIRY"); + env::remove_var("IGGY_SYSTEM_SEGMENT_MESSAGE_EXPIRY"); } } -// Test for cluster configuration with environment variable overrides #[serial] #[tokio::test] -async fn validate_cluster_config_env_override() { - // Test data for cluster configuration - let expected_cluster_enabled = true; - let expected_cluster_id = 0; - let expected_cluster_name = "test-cluster"; - let expected_node_id = 0; - - // Test data for cluster nodes array - let expected_node_0_id = 0; - let expected_node_0_name = "test-node-1"; - let expected_node_0_address = "192.168.1.100:9090"; - - let expected_node_1_id = 1; - let expected_node_1_name = "test-node-2"; - let expected_node_1_address = "192.168.1.101:9091"; - - let expected_node_2_id = 2; - let expected_node_2_name = "test-node-3"; - let expected_node_2_address = "192.168.1.102:9092"; - +async fn validate_config_invalid_env() { unsafe { - // Set cluster configuration environment variables - env::set_var("IGGY_CLUSTER_ENABLED", expected_cluster_enabled.to_string()); - env::set_var("IGGY_CLUSTER_ID", expected_cluster_id.to_string()); - env::set_var("IGGY_CLUSTER_NAME", expected_cluster_name); - env::set_var("IGGY_CLUSTER_NODE_ID", expected_node_id.to_string()); - - // Set cluster nodes array environment variables - env::set_var("IGGY_CLUSTER_NODES_0_ID", expected_node_0_id.to_string()); - env::set_var("IGGY_CLUSTER_NODES_0_NAME", expected_node_0_name); - env::set_var("IGGY_CLUSTER_NODES_0_ADDRESS", expected_node_0_address); - - env::set_var("IGGY_CLUSTER_NODES_1_ID", expected_node_1_id.to_string()); - env::set_var("IGGY_CLUSTER_NODES_1_NAME", expected_node_1_name); - env::set_var("IGGY_CLUSTER_NODES_1_ADDRESS", expected_node_1_address); - - env::set_var("IGGY_CLUSTER_NODES_2_ID", expected_node_2_id.to_string()); - env::set_var("IGGY_CLUSTER_NODES_2_NAME", expected_node_2_name); - env::set_var("IGGY_CLUSTER_NODES_2_ADDRESS", expected_node_2_address); + env::set_var("IGGY_HTTP_ENABLED", "wrong bool"); } let config_path = get_root_path().join("../configs/server.toml"); let file_config_provider = FileConfigProvider::new(config_path.as_path().display().to_string()); - let config = file_config_provider - .load_config() - .await - .expect("Failed to load server.toml config with cluster env overrides"); - - // Verify cluster configuration - assert_eq!(config.cluster.enabled, expected_cluster_enabled); - assert_eq!(config.cluster.id, expected_cluster_id); - assert_eq!(config.cluster.name, expected_cluster_name); - assert_eq!(config.cluster.node.id, expected_node_id); + let result = file_config_provider.load_config().await; - // Verify cluster nodes array - should have 3 nodes instead of the default 2 - assert_eq!( - config.cluster.nodes.len(), - 3, - "Should have 3 nodes from environment variables" + assert!( + result.is_err(), + "Config should fail with invalid env values" ); - // Verify first node - assert_eq!(config.cluster.nodes[0].id, expected_node_0_id); - assert_eq!(config.cluster.nodes[0].name, expected_node_0_name); - assert_eq!(config.cluster.nodes[0].address, expected_node_0_address); - - // Verify second node - assert_eq!(config.cluster.nodes[1].id, expected_node_1_id); - assert_eq!(config.cluster.nodes[1].name, expected_node_1_name); - assert_eq!(config.cluster.nodes[1].address, expected_node_1_address); - - // Verify third node (added via env vars) - assert_eq!(config.cluster.nodes[2].id, expected_node_2_id); - assert_eq!(config.cluster.nodes[2].name, expected_node_2_name); - assert_eq!(config.cluster.nodes[2].address, expected_node_2_address); - unsafe { - // Clean up environment variables - env::remove_var("IGGY_CLUSTER_ENABLED"); - env::remove_var("IGGY_CLUSTER_ID"); - env::remove_var("IGGY_CLUSTER_NAME"); - env::remove_var("IGGY_CLUSTER_NODE_ID"); - - env::remove_var("IGGY_CLUSTER_NODES_0_ID"); - env::remove_var("IGGY_CLUSTER_NODES_0_NAME"); - env::remove_var("IGGY_CLUSTER_NODES_0_ADDRESS"); - - env::remove_var("IGGY_CLUSTER_NODES_1_ID"); - env::remove_var("IGGY_CLUSTER_NODES_1_NAME"); - env::remove_var("IGGY_CLUSTER_NODES_1_ADDRESS"); - - env::remove_var("IGGY_CLUSTER_NODES_2_ID"); - env::remove_var("IGGY_CLUSTER_NODES_2_NAME"); - env::remove_var("IGGY_CLUSTER_NODES_2_ADDRESS"); + env::remove_var("IGGY_HTTP_ENABLED"); } } -// Test partial override - only override specific fields +// Test empty list validation #[serial] #[tokio::test] -async fn validate_cluster_partial_env_override() { - // Only override the cluster ID and one node's address - let expected_cluster_id = 99; - let expected_node_1_address = "10.0.0.1:8888"; - - unsafe { - env::set_var("IGGY_CLUSTER_ID", expected_cluster_id.to_string()); - env::set_var("IGGY_CLUSTER_NODES_1_ADDRESS", expected_node_1_address); - } - +async fn validate_empty_array_config() { let config_path = get_root_path().join("../configs/server.toml"); let file_config_provider = FileConfigProvider::new(config_path.as_path().display().to_string()); - let config = file_config_provider + let _config = file_config_provider .load_config() .await - .expect("Failed to load server.toml config with partial cluster env overrides"); - - // Verify overridden values - assert_eq!(config.cluster.id, expected_cluster_id); - assert_eq!(config.cluster.nodes[1].address, expected_node_1_address); - - // Verify non-overridden values remain default - assert_eq!(config.cluster.name, "iggy-cluster"); // default from server.toml - assert_eq!(config.cluster.nodes[0].id, 0); // default from server.toml - assert_eq!(config.cluster.nodes[0].name, "iggy-node-1"); // default from server.toml - assert_eq!(config.cluster.nodes[1].id, 1); // default from server.toml - assert_eq!(config.cluster.nodes[1].name, "iggy-node-2"); // default from server.toml + .expect("Failed to load server.toml config"); + // Ensure environment variable overrides do not affect empty arrays unsafe { - env::remove_var("IGGY_CLUSTER_ID"); - env::remove_var("IGGY_CLUSTER_NODES_1_ADDRESS"); + env::remove_var("IGGY_HTTP_ENABLED"); + env::remove_var("IGGY_TCP_ENABLED"); + env::remove_var("IGGY_MESSAGE_SAVER_ENABLED"); + env::remove_var("IGGY_SYSTEM_SEGMENT_MESSAGE_EXPIRY"); } } -// Test sparse array override - setting index 5 when only 2 nodes exist in TOML -// This test verifies that sparse arrays will fail because intermediate elements -// won't have required fields, which is the expected safety behavior +// Test for cluster configuration with environment variable overrides #[serial] #[tokio::test] -async fn validate_cluster_sparse_array_fails_with_missing_fields() { - // Set node at index 5 (when TOML only has nodes 0 and 1) - // This should fail because nodes 2-4 will be created as empty dicts - // without required fields - let expected_node_5_id = 100; - let expected_node_5_name = "sparse-node"; - let expected_node_5_address = "10.0.0.100:9999"; - - unsafe { - env::set_var("IGGY_CLUSTER_NODES_5_ID", expected_node_5_id.to_string()); - env::set_var("IGGY_CLUSTER_NODES_5_NAME", expected_node_5_name); - env::set_var("IGGY_CLUSTER_NODES_5_ADDRESS", expected_node_5_address); - } - - let config_path = get_root_path().join("../configs/server.toml"); - let file_config_provider = FileConfigProvider::new(config_path.as_path().display().to_string()); - - // This should fail because nodes 2-4 will be missing required fields - let result = file_config_provider.load_config().await; - assert!( - result.is_err(), - "Should fail to load config with sparse array due to missing required fields in intermediate elements" - ); +async fn validate_cluster_config_env_override() { + // Test data for cluster configuration + let expected_cluster_enabled = true; + let expected_cluster_id = 0; + let expected_cluster_name = "test-cluster"; + let expected_current_node_name = "test-node-1"; + + // Test data for other nodes in cluster + let expected_other_node_0_name = "test-node-2"; + let expected_other_node_0_ip = "192.168.1.101"; + let expected_other_node_0_tcp = 9091_u16; + let expected_other_node_0_quic = 9081_u16; + let expected_other_node_0_http = 4001_u16; + let expected_other_node_0_websocket = 9093_u16; + + let expected_other_node_1_name = "test-node-3"; + let expected_other_node_1_ip = "192.168.1.102"; + let expected_other_node_1_tcp = 9092_u16; + let expected_other_node_1_quic = 9082_u16; + let expected_other_node_1_http = 4002_u16; + let expected_other_node_1_websocket = 9094_u16; unsafe { - env::remove_var("IGGY_CLUSTER_NODES_5_ID"); - env::remove_var("IGGY_CLUSTER_NODES_5_NAME"); - env::remove_var("IGGY_CLUSTER_NODES_5_ADDRESS"); - } -} + // Set cluster configuration environment variables + env::set_var("IGGY_CLUSTER_ENABLED", expected_cluster_enabled.to_string()); + env::set_var("IGGY_CLUSTER_ID", expected_cluster_id.to_string()); + env::set_var("IGGY_CLUSTER_NAME", expected_cluster_name); + env::set_var("IGGY_CLUSTER_NODE_CURRENT_NAME", expected_current_node_name); -// Test that we can add node 2 successfully (contiguous array) -#[serial] -#[tokio::test] -async fn validate_cluster_contiguous_array_override() { - // Add node at index 2 (TOML has nodes 0 and 1, so this is contiguous) - let expected_node_2_id = 2; - let expected_node_2_name = "iggy-node-3"; - let expected_node_2_address = "10.0.0.50:8092"; + // Set other nodes array environment variables + env::set_var( + "IGGY_CLUSTER_NODE_OTHERS_0_NAME", + expected_other_node_0_name, + ); + env::set_var("IGGY_CLUSTER_NODE_OTHERS_0_IP", expected_other_node_0_ip); + env::set_var( + "IGGY_CLUSTER_NODE_OTHERS_0_PORTS_TCP", + expected_other_node_0_tcp.to_string(), + ); + env::set_var( + "IGGY_CLUSTER_NODE_OTHERS_0_PORTS_QUIC", + expected_other_node_0_quic.to_string(), + ); + env::set_var( + "IGGY_CLUSTER_NODE_OTHERS_0_PORTS_HTTP", + expected_other_node_0_http.to_string(), + ); + env::set_var( + "IGGY_CLUSTER_NODE_OTHERS_0_PORTS_WEBSOCKET", + expected_other_node_0_websocket.to_string(), + ); - unsafe { - env::set_var("IGGY_CLUSTER_NODES_2_ID", expected_node_2_id.to_string()); - env::set_var("IGGY_CLUSTER_NODES_2_NAME", expected_node_2_name); - env::set_var("IGGY_CLUSTER_NODES_2_ADDRESS", expected_node_2_address); + env::set_var( + "IGGY_CLUSTER_NODE_OTHERS_1_NAME", + expected_other_node_1_name, + ); + env::set_var("IGGY_CLUSTER_NODE_OTHERS_1_IP", expected_other_node_1_ip); + env::set_var( + "IGGY_CLUSTER_NODE_OTHERS_1_PORTS_TCP", + expected_other_node_1_tcp.to_string(), + ); + env::set_var( + "IGGY_CLUSTER_NODE_OTHERS_1_PORTS_QUIC", + expected_other_node_1_quic.to_string(), + ); + env::set_var( + "IGGY_CLUSTER_NODE_OTHERS_1_PORTS_HTTP", + expected_other_node_1_http.to_string(), + ); + env::set_var( + "IGGY_CLUSTER_NODE_OTHERS_1_PORTS_WEBSOCKET", + expected_other_node_1_websocket.to_string(), + ); } let config_path = get_root_path().join("../configs/server.toml"); @@ -297,29 +194,86 @@ async fn validate_cluster_contiguous_array_override() { let config = file_config_provider .load_config() .await - .expect("Failed to load server.toml config with contiguous array override"); + .expect("Failed to load server.toml config with cluster env overrides"); + + // Verify cluster configuration + assert_eq!(config.cluster.enabled, expected_cluster_enabled); + assert_eq!(config.cluster.id, expected_cluster_id); + assert_eq!(config.cluster.name, expected_cluster_name); + assert_eq!(config.cluster.node.current.name, expected_current_node_name); - // Should have 3 nodes total + // Verify other nodes array - should have 2 nodes from environment variables assert_eq!( - config.cluster.nodes.len(), - 3, - "Should have 3 nodes when adding index 2" + config.cluster.node.others.len(), + 2, + "Should have 2 other nodes from environment variables" ); - // Check original nodes are preserved - assert_eq!(config.cluster.nodes[0].id, 0); // from TOML - assert_eq!(config.cluster.nodes[0].name, "iggy-node-1"); // from TOML - assert_eq!(config.cluster.nodes[1].id, 1); // from TOML - assert_eq!(config.cluster.nodes[1].name, "iggy-node-2"); // from TOML + // Verify first other node + assert_eq!( + config.cluster.node.others[0].name, + expected_other_node_0_name + ); + assert_eq!(config.cluster.node.others[0].ip, expected_other_node_0_ip); + assert_eq!( + config.cluster.node.others[0].ports.tcp, + expected_other_node_0_tcp + ); + assert_eq!( + config.cluster.node.others[0].ports.quic, + expected_other_node_0_quic + ); + assert_eq!( + config.cluster.node.others[0].ports.http, + expected_other_node_0_http + ); + assert_eq!( + config.cluster.node.others[0].ports.websocket, + expected_other_node_0_websocket + ); - // Check the node we added at index 2 - assert_eq!(config.cluster.nodes[2].id, expected_node_2_id); - assert_eq!(config.cluster.nodes[2].name, expected_node_2_name); - assert_eq!(config.cluster.nodes[2].address, expected_node_2_address); + // Verify second other node + assert_eq!( + config.cluster.node.others[1].name, + expected_other_node_1_name + ); + assert_eq!(config.cluster.node.others[1].ip, expected_other_node_1_ip); + assert_eq!( + config.cluster.node.others[1].ports.tcp, + expected_other_node_1_tcp + ); + assert_eq!( + config.cluster.node.others[1].ports.quic, + expected_other_node_1_quic + ); + assert_eq!( + config.cluster.node.others[1].ports.http, + expected_other_node_1_http + ); + assert_eq!( + config.cluster.node.others[1].ports.websocket, + expected_other_node_1_websocket + ); unsafe { - env::remove_var("IGGY_CLUSTER_NODES_2_ID"); - env::remove_var("IGGY_CLUSTER_NODES_2_NAME"); - env::remove_var("IGGY_CLUSTER_NODES_2_ADDRESS"); + // Clean up environment variables + env::remove_var("IGGY_CLUSTER_ENABLED"); + env::remove_var("IGGY_CLUSTER_ID"); + env::remove_var("IGGY_CLUSTER_NAME"); + env::remove_var("IGGY_CLUSTER_NODE_CURRENT_NAME"); + + env::remove_var("IGGY_CLUSTER_NODE_OTHERS_0_NAME"); + env::remove_var("IGGY_CLUSTER_NODE_OTHERS_0_IP"); + env::remove_var("IGGY_CLUSTER_NODE_OTHERS_0_PORTS_TCP"); + env::remove_var("IGGY_CLUSTER_NODE_OTHERS_0_PORTS_QUIC"); + env::remove_var("IGGY_CLUSTER_NODE_OTHERS_0_PORTS_HTTP"); + env::remove_var("IGGY_CLUSTER_NODE_OTHERS_0_PORTS_WEBSOCKET"); + + env::remove_var("IGGY_CLUSTER_NODE_OTHERS_1_NAME"); + env::remove_var("IGGY_CLUSTER_NODE_OTHERS_1_IP"); + env::remove_var("IGGY_CLUSTER_NODE_OTHERS_1_PORTS_TCP"); + env::remove_var("IGGY_CLUSTER_NODE_OTHERS_1_PORTS_QUIC"); + env::remove_var("IGGY_CLUSTER_NODE_OTHERS_1_PORTS_HTTP"); + env::remove_var("IGGY_CLUSTER_NODE_OTHERS_1_PORTS_WEBSOCKET"); } } diff --git a/core/integration/tests/mcp/mod.rs b/core/integration/tests/mcp/mod.rs index 124522a3a..74017d45a 100644 --- a/core/integration/tests/mcp/mod.rs +++ b/core/integration/tests/mcp/mod.rs @@ -78,8 +78,6 @@ async fn mcp_server_should_handle_ping() { assert_empty_response("ping", None).await; } -// TODO(hubcio): not sure how to fix the cluster ports in CI -#[ignore] #[tokio::test] #[parallel] async fn mcp_server_should_return_cluster_metadata() { @@ -88,7 +86,7 @@ async fn mcp_server_should_return_cluster_metadata() { assert!(!cluster.name.is_empty()); assert_eq!(cluster.nodes.len(), 1); let node = &cluster.nodes[0]; - assert_eq!(node.id, 0); + assert!(!node.name.is_empty()); }) .await; } @@ -579,8 +577,7 @@ async fn invoke_request<T: DeserializeOwned>( async fn setup() -> McpInfra { let mut iggy_envs = HashMap::new(); - // TODO(hubcio): not sure how to fix the cluster ports in CI - // iggy_envs.insert("IGGY_CLUSTER_ENABLED".to_owned(), "true".to_owned()); + iggy_envs.insert("IGGY_CLUSTER_ENABLED".to_owned(), "true".to_owned()); iggy_envs.insert("IGGY_QUIC_ENABLED".to_owned(), "false".to_owned()); iggy_envs.insert("IGGY_WEBSOCKET_ENABLED".to_owned(), "false".to_owned()); let mut test_server = TestServer::new(Some(iggy_envs), true, None, IpAddrKind::V4); diff --git a/core/sdk/src/leader_aware.rs b/core/sdk/src/leader_aware.rs index 9367a7b13..9e910c583 100644 --- a/core/sdk/src/leader_aware.rs +++ b/core/sdk/src/leader_aware.rs @@ -19,6 +19,7 @@ use iggy_binary_protocol::ClusterClient; use iggy_common::{ ClusterMetadata, ClusterNodeRole, ClusterNodeStatus, IggyError, IggyErrorDiscriminants, + TransportProtocol, }; use std::net::SocketAddr; use std::str::FromStr; @@ -31,6 +32,7 @@ const MAX_LEADER_REDIRECTS: u8 = 3; pub async fn check_and_redirect_to_leader<C: ClusterClient>( client: &C, current_address: &str, + transport: TransportProtocol, ) -> Result<Option<String>, IggyError> { debug!("Checking cluster metadata for leader detection"); @@ -41,7 +43,7 @@ pub async fn check_and_redirect_to_leader<C: ClusterClient>( metadata.nodes.len(), metadata.name ); - process_cluster_metadata(&metadata, current_address) + process_cluster_metadata(&metadata, current_address, transport) } Err(e) if is_feature_unavailable_error(&e) => { debug!("Cluster metadata feature unavailable - server doesn't support clustering"); @@ -61,6 +63,7 @@ pub async fn check_and_redirect_to_leader<C: ClusterClient>( fn process_cluster_metadata( metadata: &ClusterMetadata, current_address: &str, + transport: TransportProtocol, ) -> Result<Option<String>, IggyError> { let leader = metadata .nodes @@ -69,17 +72,25 @@ fn process_cluster_metadata( match leader { Some(leader_node) => { + let leader_port = match transport { + TransportProtocol::Tcp => leader_node.endpoints.tcp, + TransportProtocol::Quic => leader_node.endpoints.quic, + TransportProtocol::Http => leader_node.endpoints.http, + TransportProtocol::WebSocket => leader_node.endpoints.websocket, + }; + let leader_address = format!("{}:{}", leader_node.ip, leader_port); + info!( - "Found leader node: {} at {}", - leader_node.name, leader_node.address + "Found leader node: {} at {} (using {} transport)", + leader_node.name, leader_address, transport ); - if !is_same_address(current_address, &leader_node.address) { + if !is_same_address(current_address, &leader_address) { info!( "Current connection to {} is not the leader, will redirect to {}", - current_address, leader_node.address + current_address, leader_address ); - Ok(Some(leader_node.address.clone())) + Ok(Some(leader_address)) } else { debug!("Already connected to leader at {}", current_address); Ok(None) diff --git a/core/sdk/src/prelude.rs b/core/sdk/src/prelude.rs index 8780dd947..5459b3125 100644 --- a/core/sdk/src/prelude.rs +++ b/core/sdk/src/prelude.rs @@ -64,9 +64,9 @@ pub use iggy_common::{ PollingKind, PollingStrategy, QuicClientConfig, QuicClientConfigBuilder, QuicClientReconnectionConfig, SendMessages, Sizeable, SnapshotCompression, Stats, Stream, StreamDetails, StreamPermissions, SystemSnapshotType, TcpClientConfig, TcpClientConfigBuilder, - TcpClientReconnectionConfig, Topic, TopicDetails, TopicPermissions, TransportProtocol, UserId, - UserStatus, Validatable, WebSocketClientConfig, WebSocketClientConfigBuilder, - WebSocketClientReconnectionConfig, defaults, locking, + TcpClientReconnectionConfig, Topic, TopicDetails, TopicPermissions, TransportEndpoints, + TransportProtocol, UserId, UserStatus, Validatable, WebSocketClientConfig, + WebSocketClientConfigBuilder, WebSocketClientReconnectionConfig, defaults, locking, }; pub use iggy_common::{ IGGY_MESSAGE_CHECKSUM_OFFSET_RANGE, IGGY_MESSAGE_HEADER_SIZE, diff --git a/core/sdk/src/quic/quic_client.rs b/core/sdk/src/quic/quic_client.rs index ecf5bbf86..3f7f6854f 100644 --- a/core/sdk/src/quic/quic_client.rs +++ b/core/sdk/src/quic/quic_client.rs @@ -424,7 +424,12 @@ impl QuicClient { /// Returns true if redirection occurred and reconnection is needed. pub(crate) async fn handle_leader_redirection(&self) -> Result<bool, IggyError> { let current_address = self.current_server_address.lock().await.clone(); - let leader_address = check_and_redirect_to_leader(self, ¤t_address).await?; + let leader_address = check_and_redirect_to_leader( + self, + ¤t_address, + iggy_common::TransportProtocol::Quic, + ) + .await?; if let Some(new_leader_address) = leader_address { let mut redirection_state = self.leader_redirection_state.lock().await; diff --git a/core/sdk/src/tcp/tcp_client.rs b/core/sdk/src/tcp/tcp_client.rs index 66017fad3..0c25a5173 100644 --- a/core/sdk/src/tcp/tcp_client.rs +++ b/core/sdk/src/tcp/tcp_client.rs @@ -476,7 +476,12 @@ impl TcpClient { /// Returns true if redirection occurred and reconnection is needed. pub(crate) async fn handle_leader_redirection(&self) -> Result<bool, IggyError> { let current_address = self.current_server_address.lock().await.clone(); - let leader_address = check_and_redirect_to_leader(self, ¤t_address).await?; + let leader_address = check_and_redirect_to_leader( + self, + ¤t_address, + iggy_common::TransportProtocol::Tcp, + ) + .await?; if let Some(new_leader_address) = leader_address { let mut redirection_state = self.leader_redirection_state.lock().await; diff --git a/core/sdk/src/websocket/websocket_client.rs b/core/sdk/src/websocket/websocket_client.rs index 399df9ba7..ff89d2a30 100644 --- a/core/sdk/src/websocket/websocket_client.rs +++ b/core/sdk/src/websocket/websocket_client.rs @@ -437,7 +437,12 @@ impl WebSocketClient { /// Returns true if redirection occurred and reconnection is needed. pub(crate) async fn handle_leader_redirection(&self) -> Result<bool, IggyError> { let current_address = self.current_server_address.lock().await.clone(); - let leader_address = check_and_redirect_to_leader(self, ¤t_address).await?; + let leader_address = check_and_redirect_to_leader( + self, + ¤t_address, + iggy_common::TransportProtocol::WebSocket, + ) + .await?; if let Some(new_leader_address) = leader_address { let mut redirection_state = self.leader_redirection_state.lock().await; diff --git a/core/server/src/configs/cluster.rs b/core/server/src/configs/cluster.rs index 3a0d4506c..59ea8c731 100644 --- a/core/server/src/configs/cluster.rs +++ b/core/server/src/configs/cluster.rs @@ -16,7 +16,6 @@ * under the License. */ -use iggy_common::TransportProtocol; use serde::{Deserialize, Serialize}; #[derive(Debug, Deserialize, Serialize, Clone)] @@ -24,19 +23,32 @@ pub struct ClusterConfig { pub enabled: bool, pub name: String, pub id: u32, - pub transport: TransportProtocol, pub node: NodeConfig, - pub nodes: Vec<ClusterNodeConfig>, } #[derive(Debug, Deserialize, Serialize, Clone)] pub struct NodeConfig { - pub id: u32, + pub current: CurrentNodeConfig, + pub others: Vec<OtherNodeConfig>, } #[derive(Debug, Deserialize, Serialize, Clone)] -pub struct ClusterNodeConfig { - pub id: u32, +pub struct CurrentNodeConfig { pub name: String, - pub address: String, + pub ip: String, +} + +#[derive(Debug, Deserialize, Serialize, Clone)] +pub struct OtherNodeConfig { + pub name: String, + pub ip: String, + pub ports: TransportPorts, +} + +#[derive(Debug, Deserialize, Serialize, Clone)] +pub struct TransportPorts { + pub tcp: u16, + pub quic: u16, + pub http: u16, + pub websocket: u16, } diff --git a/core/server/src/configs/config_provider.rs b/core/server/src/configs/config_provider.rs index b77ec4bdf..539aa9cdb 100644 --- a/core/server/src/configs/config_provider.rs +++ b/core/server/src/configs/config_provider.rs @@ -168,12 +168,27 @@ impl CustomEnvProvider { if remaining_path.is_empty() { arr[array_index] = value; } else if let FigmentValue::Dict(_, elem_dict) = &mut arr[array_index] { - Self::insert_overridden_values_from_env( - &Dict::new(), - elem_dict, - remaining_path.to_vec(), - value, - ); + // For nested structures in arrays, check if we need to create intermediate dicts + // Handle the "ports" case where it should be a nested structure + if remaining_path.len() >= 2 && remaining_path[0] == "ports" { + // Create the ports dict if it doesn't exist + elem_dict + .entry("ports".to_string()) + .or_insert_with(|| FigmentValue::Dict(Tag::Default, Dict::new())); + + if let Some(FigmentValue::Dict(_, ports_dict)) = elem_dict.get_mut("ports") { + // Insert the specific port value (tcp, quic, http, websocket) + ports_dict.insert(remaining_path[1].clone(), value); + } + } else { + // Default behavior for other fields + Self::insert_overridden_values_from_env( + &Dict::new(), + elem_dict, + remaining_path.to_vec(), + value, + ); + } } } } diff --git a/core/server/src/configs/defaults.rs b/core/server/src/configs/defaults.rs index ce990815c..a2ec9f229 100644 --- a/core/server/src/configs/defaults.rs +++ b/core/server/src/configs/defaults.rs @@ -19,7 +19,8 @@ use super::sharding::ShardingConfig; use super::system::MemoryPoolConfig; use super::tcp::TcpSocketConfig; -use crate::configs::cluster::{ClusterConfig, ClusterNodeConfig, NodeConfig}; +use crate::configs::cluster::CurrentNodeConfig; +use crate::configs::cluster::{ClusterConfig, NodeConfig}; use crate::configs::http::{ HttpConfig, HttpCorsConfig, HttpJwtConfig, HttpMetricsConfig, HttpTlsConfig, }; @@ -546,20 +547,7 @@ impl Default for ClusterConfig { enabled: SERVER_CONFIG.cluster.enabled, id: SERVER_CONFIG.cluster.id as u32, name: SERVER_CONFIG.cluster.name.parse().unwrap(), - transport: SERVER_CONFIG.cluster.transport.parse().unwrap(), node: NodeConfig::default(), - nodes: vec![ - ClusterNodeConfig { - id: SERVER_CONFIG.cluster.nodes[0].id as u32, - name: SERVER_CONFIG.cluster.nodes[0].name.parse().unwrap(), - address: SERVER_CONFIG.cluster.nodes[0].address.parse().unwrap(), - }, - ClusterNodeConfig { - id: SERVER_CONFIG.cluster.nodes[1].id as u32, - name: SERVER_CONFIG.cluster.nodes[1].name.parse().unwrap(), - address: SERVER_CONFIG.cluster.nodes[1].address.parse().unwrap(), - }, - ], } } } @@ -567,7 +555,11 @@ impl Default for ClusterConfig { impl Default for NodeConfig { fn default() -> NodeConfig { NodeConfig { - id: SERVER_CONFIG.cluster.node.id as u32, + current: CurrentNodeConfig { + name: SERVER_CONFIG.cluster.node.current.name.parse().unwrap(), + ip: SERVER_CONFIG.cluster.node.current.ip.parse().unwrap(), + }, + others: vec![], // Empty by default, will be populated from config if present } } } diff --git a/core/server/src/configs/validators.rs b/core/server/src/configs/validators.rs index 792b8f434..3a4a66a16 100644 --- a/core/server/src/configs/validators.rs +++ b/core/server/src/configs/validators.rs @@ -337,77 +337,70 @@ impl Validatable<ConfigError> for ClusterConfig { return Err(ConfigError::InvalidConfiguration); } - // Validate nodes list is not empty - if self.nodes.is_empty() { - eprintln!("Invalid cluster configuration: nodes list cannot be empty"); + // Validate current node name is not empty + if self.node.current.name.trim().is_empty() { + eprintln!("Invalid cluster configuration: current node name cannot be empty"); return Err(ConfigError::InvalidConfiguration); } - // Check if nodes start from ID 0 - let has_node_zero = self.nodes.iter().any(|node| node.id == 0); - if !has_node_zero { - eprintln!("Invalid cluster configuration: nodes must start from ID 0"); - return Err(ConfigError::InvalidConfiguration); - } - - // Check if current node ID exists in nodes vector - let current_node_exists = self.nodes.iter().any(|node| node.id == self.node.id); - if !current_node_exists { - eprintln!( - "Invalid cluster configuration: current node ID {} not found in nodes list", - self.node.id - ); - return Err(ConfigError::InvalidConfiguration); - } + // Check for duplicate node names among other nodes + let mut node_names = std::collections::HashSet::new(); + node_names.insert(self.node.current.name.clone()); - // Check for duplicate node IDs - let mut node_ids = std::collections::HashSet::new(); - for node in &self.nodes { - if !node_ids.insert(node.id) { + for node in &self.node.others { + if !node_names.insert(node.name.clone()) { eprintln!( - "Invalid cluster configuration: duplicate node ID {} found", - node.id + "Invalid cluster configuration: duplicate node name '{}' found", + node.name ); return Err(ConfigError::InvalidConfiguration); } } - // Validate unique addresses (IP:port combinations) - let mut addresses = std::collections::HashSet::new(); - for node in &self.nodes { - // Validate address format (should contain IP:port or [IPv6]:port) - let is_valid_address = if node.address.starts_with('[') { - // IPv6 address format: [::1]:8090 - node.address.contains("]:") && node.address.matches(':').count() >= 2 - } else { - // IPv4 address format: 127.0.0.1:8090 - node.address.matches(':').count() == 1 - }; - - if !is_valid_address { - eprintln!( - "Invalid cluster configuration: malformed address '{}' for node ID {}", - node.address, node.id - ); + // Validate each other node configuration + let mut used_endpoints = std::collections::HashSet::new(); + for node in &self.node.others { + // Validate node name is not empty + if node.name.trim().is_empty() { + eprintln!("Invalid cluster configuration: node name cannot be empty"); return Err(ConfigError::InvalidConfiguration); } - // Check for duplicate full addresses - if !addresses.insert(node.address.clone()) { + // Validate IP is not empty + if node.ip.trim().is_empty() { eprintln!( - "Invalid cluster configuration: duplicate address {} found (node ID: {})", - node.address, node.id + "Invalid cluster configuration: IP cannot be empty for node '{}'", + node.name ); return Err(ConfigError::InvalidConfiguration); } - // Validate node name is not empty - if node.name.trim().is_empty() { - eprintln!( - "Invalid cluster configuration: node name cannot be empty for node ID {}", - node.id - ); - return Err(ConfigError::InvalidConfiguration); + // Validate transport ports + let ports = [ + ("TCP", node.ports.tcp), + ("QUIC", node.ports.quic), + ("HTTP", node.ports.http), + ("WebSocket", node.ports.websocket), + ]; + + for (name, port) in &ports { + if *port == 0 { + eprintln!( + "Invalid cluster configuration: {} port cannot be 0 for node '{}'", + name, node.name + ); + return Err(ConfigError::InvalidConfiguration); + } + + // Check for port conflicts across nodes on the same IP + let endpoint = format!("{}:{}:{}", node.ip, name, port); + if !used_endpoints.insert(endpoint.clone()) { + eprintln!( + "Invalid cluster configuration: port conflict - {}:{} is already used", + node.ip, port + ); + return Err(ConfigError::InvalidConfiguration); + } } } diff --git a/core/server/src/shard/system/cluster.rs b/core/server/src/shard/system/cluster.rs index e4af6d724..91cda8c1b 100644 --- a/core/server/src/shard/system/cluster.rs +++ b/core/server/src/shard/system/cluster.rs @@ -18,7 +18,10 @@ use crate::shard::IggyShard; use crate::streaming::session::Session; -use iggy_common::{ClusterMetadata, ClusterNode, ClusterNodeRole, ClusterNodeStatus, IggyError}; +use crate::streaming::utils::address::extract_port; +use iggy_common::{ + ClusterMetadata, ClusterNode, ClusterNodeRole, ClusterNodeStatus, IggyError, TransportEndpoints, +}; use tracing::trace; impl IggyShard { @@ -34,53 +37,99 @@ impl IggyShard { let cluster_name = self.config.cluster.name.clone(); let cluster_id = self.config.cluster.id; + let current_node_name = self.config.cluster.node.current.name.clone(); - // Cannot fail because we validated it in config - let transport = self.config.cluster.transport; - - let own_node_id = self.config.cluster.node.id; - - let nodes: Vec<ClusterNode> = self - .config - .cluster - .nodes - .iter() - .map(|node_config| { - let (role, status) = if node_config.id == own_node_id { - ( - if self.is_follower { - ClusterNodeRole::Follower - } else { - ClusterNodeRole::Leader - }, - ClusterNodeStatus::Healthy, - ) + let mut nodes = Vec::new(); + + let current_ip = self.config.cluster.node.current.ip.clone(); + + // Get the actual bound ports from the shard's bound addresses + let current_endpoints = self.get_actual_bound_ports().unwrap_or_else(|| { + TransportEndpoints::new( + extract_port(&self.config.tcp.address), + extract_port(&self.config.quic.address), + extract_port(&self.config.http.address), + extract_port(&self.config.websocket.address), + ) + }); + + nodes.push(ClusterNode { + name: current_node_name.clone(), + ip: current_ip, + endpoints: current_endpoints, + role: if self.is_follower { + ClusterNodeRole::Follower + } else { + ClusterNodeRole::Leader + }, + status: ClusterNodeStatus::Healthy, + }); + + for other_node in &self.config.cluster.node.others { + let endpoints = TransportEndpoints::new( + other_node.ports.tcp, + other_node.ports.quic, + other_node.ports.http, + other_node.ports.websocket, + ); + + nodes.push(ClusterNode { + name: other_node.name.clone(), + ip: other_node.ip.clone(), + endpoints, + role: if self.is_follower { + ClusterNodeRole::Leader } else { - ( - if self.is_follower { - ClusterNodeRole::Leader - } else { - ClusterNodeRole::Follower - }, - ClusterNodeStatus::Healthy, - ) - }; - - ClusterNode { - id: node_config.id, - name: node_config.name.clone(), - address: node_config.address.clone(), - role, - status, - } - }) - .collect(); + ClusterNodeRole::Follower + }, + status: ClusterNodeStatus::Healthy, + }); + } Ok(ClusterMetadata { name: cluster_name, id: cluster_id, - transport, nodes, }) } + + /// Get actual bound ports from the shard's bound addresses + /// This is needed when server binds to port 0 (OS-assigned port) + fn get_actual_bound_ports(&self) -> Option<TransportEndpoints> { + let tcp_port = self + .tcp_bound_address + .get() + .map(|addr| addr.port()) + .unwrap_or_else(|| extract_port(&self.config.tcp.address)); + + let quic_port = self + .quic_bound_address + .get() + .map(|addr| addr.port()) + .unwrap_or_else(|| extract_port(&self.config.quic.address)); + + let http_port = self + .http_bound_address + .get() + .map(|addr| addr.port()) + .unwrap_or_else(|| extract_port(&self.config.http.address)); + + let websocket_port = self + .websocket_bound_address + .get() + .map(|addr| addr.port()) + .unwrap_or_else(|| extract_port(&self.config.websocket.address)); + + trace!( + "Using actual bound ports - TCP: {}, QUIC: {}, HTTP: {}, WebSocket: {}", + tcp_port, quic_port, http_port, websocket_port + ); + + Some(TransportEndpoints::new( + tcp_port, + quic_port, + http_port, + websocket_port, + )) + } } diff --git a/core/server/src/streaming/systems/cluster/mod.rs b/core/server/src/streaming/systems/cluster/mod.rs index 51161280f..09e947ea5 100644 --- a/core/server/src/streaming/systems/cluster/mod.rs +++ b/core/server/src/streaming/systems/cluster/mod.rs @@ -18,11 +18,13 @@ use crate::streaming::session::Session; use crate::streaming::systems::system::System; +use crate::streaming::utils::address::{extract_ip, extract_port}; use iggy_common::ClusterMetadata; use iggy_common::ClusterNode; use iggy_common::ClusterNodeRole; use iggy_common::ClusterNodeStatus; use iggy_common::IggyError; +use iggy_common::TransportEndpoints; use tracing::trace; impl System { @@ -38,35 +40,49 @@ impl System { let name = self.cluster_config.name.clone(); let id = self.cluster_config.id; - let transport = self.cluster_config.transport; + let current_node_name = self.cluster_config.node.current.name.clone(); - let nodes: Vec<ClusterNode> = self - .cluster_config - .nodes - .iter() - .map(|node_config| { - let role = if node_config.id == 1 { - ClusterNodeRole::Leader - } else { - ClusterNodeRole::Follower - }; + // Build nodes list starting with current node + let mut nodes = Vec::new(); - let status = ClusterNodeStatus::Healthy; + // Add current node with ports derived from transport configs + let current_ip = extract_ip(&self.tcp_config.address); + let current_endpoints = TransportEndpoints::new( + ¤t_ip, + extract_port(&self.tcp_config.address), + extract_port(&self.quic_config.address), + extract_port(&self.http_config.address), + extract_port(&self.websocket_config.address), + ); - ClusterNode { - id: node_config.id, - name: node_config.name.clone(), - address: node_config.address.clone(), - role, - status, - } - }) - .collect(); + nodes.push(ClusterNode { + name: current_node_name.clone(), + endpoints: current_endpoints, + role: ClusterNodeRole::Leader, // Placeholder + status: ClusterNodeStatus::Healthy, + }); + + // Add other nodes from configuration + for other_node in &self.cluster_config.node.others { + let endpoints = TransportEndpoints::new( + &other_node.ip, + other_node.ports.tcp, + other_node.ports.quic, + other_node.ports.http, + other_node.ports.websocket, + ); + + nodes.push(ClusterNode { + name: other_node.name.clone(), + endpoints, + role: ClusterNodeRole::Follower, // Placeholder + status: ClusterNodeStatus::Healthy, + }); + } Ok(ClusterMetadata { name, id, - transport, nodes, }) } diff --git a/core/server/src/streaming/utils/address.rs b/core/server/src/streaming/utils/address.rs new file mode 100644 index 000000000..3c51b8e7b --- /dev/null +++ b/core/server/src/streaming/utils/address.rs @@ -0,0 +1,76 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/// Extracts IP from an address string like "127.0.0.1:8090" or "[::1]:8090" +pub fn extract_ip(address: &str) -> String { + if let Some(colon_pos) = address.rfind(':') { + // Handle IPv6 addresses like [::1]:8090 + if address.starts_with('[') + && let Some(bracket_pos) = address.rfind(']') + { + return address[1..bracket_pos].to_string(); + } + // Handle IPv4 addresses like 127.0.0.1:8090 + return address[..colon_pos].to_string(); + } + address.to_string() +} + +/// Extracts port from an address string like "127.0.0.1:8090" +pub fn extract_port(address: &str) -> u16 { + if let Some(colon_pos) = address.rfind(':') + && let Ok(port) = address[colon_pos + 1..].parse::<u16>() + { + return port; + } + 0 +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_extract_ip_ipv4() { + assert_eq!(extract_ip("127.0.0.1:8090"), "127.0.0.1"); + assert_eq!(extract_ip("192.168.1.100:3000"), "192.168.1.100"); + } + + #[test] + fn test_extract_ip_ipv6() { + assert_eq!(extract_ip("[::1]:8090"), "::1"); + assert_eq!(extract_ip("[2001:db8::1]:443"), "2001:db8::1"); + } + + #[test] + fn test_extract_ip_no_port() { + assert_eq!(extract_ip("127.0.0.1"), "127.0.0.1"); + } + + #[test] + fn test_extract_port() { + assert_eq!(extract_port("127.0.0.1:8090"), 8090); + assert_eq!(extract_port("192.168.1.100:3000"), 3000); + assert_eq!(extract_port("[::1]:8090"), 8090); + } + + #[test] + fn test_extract_port_no_port() { + assert_eq!(extract_port("127.0.0.1"), 0); + } +} diff --git a/core/server/src/streaming/utils/mod.rs b/core/server/src/streaming/utils/mod.rs index 74f5b7cf8..07896474c 100644 --- a/core/server/src/streaming/utils/mod.rs +++ b/core/server/src/streaming/utils/mod.rs @@ -16,6 +16,7 @@ * under the License. */ +pub mod address; pub mod crypto; pub mod file; pub mod hash;
