This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch io_uring_tpc_sharding_config in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 06b906cfa766c13654e2b753daf3ea278088af42 Author: Hubert Gruszecki <[email protected]> AuthorDate: Wed Jul 9 12:15:12 2025 +0200 feat(io_uring): add configurable CPU sharding with validation - Add system.sharding.cpu_allocation config to control thread-to-CPU mapping - Support "all" (default), numeric (e.g. 4), and range (e.g. "5..8") values - Always pin threads to specific CPU cores for consistent performance - Validate configuration against available CPU cores with descriptive errors --- core/configs/server.toml | 9 ++ .../binary/handlers/topics/update_topic_handler.rs | 2 +- core/server/src/bootstrap.rs | 11 +- core/server/src/configs/defaults.rs | 2 + core/server/src/configs/mod.rs | 1 + core/server/src/configs/sharding.rs | 114 +++++++++++++++++++++ core/server/src/configs/system.rs | 2 + core/server/src/configs/validators.rs | 48 +++++++++ core/server/src/main.rs | 35 +++++-- core/server/src/shard/mod.rs | 12 +-- core/server/src/shard/system/topics.rs | 3 +- 11 files changed, 217 insertions(+), 22 deletions(-) diff --git a/core/configs/server.toml b/core/configs/server.toml index 5445ff18..e235baa8 100644 --- a/core/configs/server.toml +++ b/core/configs/server.toml @@ -541,3 +541,12 @@ size = "4 GiB" # and holds different buffer sizes, from 256 B to 512 MiB. # Note: This number has to be a power of 2. Minimum value is 128 due to internal implementation details. bucket_capacity = 8192 + +# Sharding configuration +[system.sharding] +# CPU allocation - controls the number of shards and their CPU affinity. +# Possible values: +# - "all": Use all available CPU cores (default) +# - numeric value (e.g. 4): Use 4 shards (4 threads pinned to cores 0, 1, 2, 3) +# - range (e.g. "5..8"): Use 3 shards with affinity to cores 5, 6, 7 +cpu_allocation = "all" diff --git a/core/server/src/binary/handlers/topics/update_topic_handler.rs b/core/server/src/binary/handlers/topics/update_topic_handler.rs index 3e1ec864..31d31fff 100644 --- a/core/server/src/binary/handlers/topics/update_topic_handler.rs +++ b/core/server/src/binary/handlers/topics/update_topic_handler.rs @@ -19,8 +19,8 @@ use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHandler}; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::{handlers::topics::COMPONENT, sender::SenderKind}; -use crate::shard::transmission::event::ShardEvent; use crate::shard::IggyShard; +use crate::shard::transmission::event::ShardEvent; use crate::state::command::EntryCommand; use crate::streaming::session::Session; use anyhow::Result; diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs index 68419ba9..8b53a81c 100644 --- a/core/server/src/bootstrap.rs +++ b/core/server/src/bootstrap.rs @@ -23,13 +23,16 @@ use crate::{ utils::file::overwrite, }, }; -use std::{collections::HashSet, env, ops::Range, path::Path, sync::Arc}; +use std::{collections::HashSet, env, path::Path, sync::Arc}; pub fn create_shard_connections( - shards_set: Range<usize>, + shards_set: &HashSet<usize>, ) -> (Vec<ShardConnector<ShardFrame>>, Vec<(u16, StopSender)>) { let shards_count = shards_set.len(); - let connectors: Vec<ShardConnector<ShardFrame>> = shards_set + let mut shards_vec: Vec<usize> = shards_set.iter().cloned().collect(); + shards_vec.sort(); + + let connectors: Vec<ShardConnector<ShardFrame>> = shards_vec .into_iter() .map(|id| ShardConnector::new(id as u16, shards_count)) .collect(); @@ -122,7 +125,7 @@ pub fn create_root_user() -> User { } pub fn create_shard_executor(cpu_set: HashSet<usize>) -> Runtime { - // TODO: The event intererval tick, could be configured based on the fact + // TODO: The event interval tick, could be configured based on the fact // How many clients we expect to have connected. // This roughly estimates the number of tasks we will create. diff --git a/core/server/src/configs/defaults.rs b/core/server/src/configs/defaults.rs index dd74ade7..01f7140b 100644 --- a/core/server/src/configs/defaults.rs +++ b/core/server/src/configs/defaults.rs @@ -16,6 +16,7 @@ * under the License. */ +use super::sharding::ShardingConfig; use super::system::MemoryPoolConfig; use super::tcp::TcpSocketConfig; use crate::configs::http::{ @@ -324,6 +325,7 @@ impl Default for SystemConfig { message_deduplication: MessageDeduplicationConfig::default(), recovery: RecoveryConfig::default(), memory_pool: MemoryPoolConfig::default(), + sharding: ShardingConfig::default(), } } } diff --git a/core/server/src/configs/mod.rs b/core/server/src/configs/mod.rs index 1a45ecf9..3055046b 100644 --- a/core/server/src/configs/mod.rs +++ b/core/server/src/configs/mod.rs @@ -23,6 +23,7 @@ pub mod displays; pub mod http; pub mod quic; pub mod server; +pub mod sharding; pub mod system; pub mod tcp; pub mod validators; diff --git a/core/server/src/configs/sharding.rs b/core/server/src/configs/sharding.rs new file mode 100644 index 00000000..87679676 --- /dev/null +++ b/core/server/src/configs/sharding.rs @@ -0,0 +1,114 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use serde::{Deserialize, Deserializer, Serialize}; +use std::collections::HashSet; +use std::str::FromStr; +use std::thread::available_parallelism; + +#[derive(Debug, Deserialize, Serialize)] +pub struct ShardingConfig { + #[serde(default)] + pub cpu_allocation: CpuAllocation, +} + +impl Default for ShardingConfig { + fn default() -> Self { + ShardingConfig { + cpu_allocation: CpuAllocation::default(), + } + } +} + +#[derive(Debug, Clone, PartialEq, Serialize)] +pub enum CpuAllocation { + All, + Count(usize), + Range(usize, usize), +} + +impl Default for CpuAllocation { + fn default() -> Self { + CpuAllocation::All + } +} + +impl CpuAllocation { + pub fn to_shard_set(&self) -> HashSet<usize> { + match self { + CpuAllocation::All => { + let available_cpus = available_parallelism() + .expect("Failed to get num of cores") + .get(); + (0..available_cpus).collect() + } + CpuAllocation::Count(count) => (0..*count).collect(), + CpuAllocation::Range(start, end) => (*start..*end).collect(), + } + } +} + +impl FromStr for CpuAllocation { + type Err = String; + + fn from_str(s: &str) -> Result<Self, Self::Err> { + match s { + "all" => Ok(CpuAllocation::All), + s if s.contains("..") => { + let parts: Vec<&str> = s.split("..").collect(); + if parts.len() != 2 { + return Err(format!("Invalid range format: {s}. Expected 'start..end'")); + } + let start = parts[0] + .parse::<usize>() + .map_err(|_| format!("Invalid start value: {}", parts[0]))?; + let end = parts[1] + .parse::<usize>() + .map_err(|_| format!("Invalid end value: {}", parts[1]))?; + Ok(CpuAllocation::Range(start, end)) + } + s => { + let count = s + .parse::<usize>() + .map_err(|_| format!("Invalid shard count: {s}"))?; + Ok(CpuAllocation::Count(count)) + } + } + } +} + +impl<'de> Deserialize<'de> for CpuAllocation { + fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> + where + D: Deserializer<'de>, + { + #[derive(Deserialize)] + #[serde(untagged)] + enum CpuAllocationHelper { + String(String), + Number(usize), + } + + match CpuAllocationHelper::deserialize(deserializer)? { + CpuAllocationHelper::String(s) => { + CpuAllocation::from_str(&s).map_err(serde::de::Error::custom) + } + CpuAllocationHelper::Number(n) => Ok(CpuAllocation::Count(n)), + } + } +} diff --git a/core/server/src/configs/system.rs b/core/server/src/configs/system.rs index a5193294..54a277cb 100644 --- a/core/server/src/configs/system.rs +++ b/core/server/src/configs/system.rs @@ -17,6 +17,7 @@ */ use super::cache_indexes::CacheIndexesConfig; +use super::sharding::ShardingConfig; use iggy_common::IggyByteSize; use iggy_common::IggyExpiry; use iggy_common::MaxTopicSize; @@ -41,6 +42,7 @@ pub struct SystemConfig { pub message_deduplication: MessageDeduplicationConfig, pub recovery: RecoveryConfig, pub memory_pool: MemoryPoolConfig, + pub sharding: ShardingConfig, } #[derive(Debug, Deserialize, Serialize)] diff --git a/core/server/src/configs/validators.rs b/core/server/src/configs/validators.rs index 1304430c..a52853d4 100644 --- a/core/server/src/configs/validators.rs +++ b/core/server/src/configs/validators.rs @@ -22,6 +22,7 @@ use super::server::{ ArchiverConfig, DataMaintenanceConfig, MessageSaverConfig, MessagesMaintenanceConfig, StateMaintenanceConfig, TelemetryConfig, }; +use super::sharding::{CpuAllocation, ShardingConfig}; use super::system::{CompressionConfig, MemoryPoolConfig, PartitionConfig}; use crate::archiver::ArchiverKindType; use crate::configs::COMPONENT; @@ -34,6 +35,7 @@ use iggy_common::CompressionAlgorithm; use iggy_common::IggyExpiry; use iggy_common::MaxTopicSize; use iggy_common::Validatable; +use std::thread::available_parallelism; use tracing::error; impl Validatable<ConfigError> for ServerConfig { @@ -68,6 +70,12 @@ impl Validatable<ConfigError> for ServerConfig { self.telemetry.validate().with_error_context(|error| { format!("{COMPONENT} (error: {error}) - failed to validate telemetry config") })?; + self.system + .sharding + .validate() + .with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - failed to validate sharding config") + })?; let topic_size = match self.system.topic.max_size { MaxTopicSize::Custom(size) => Ok(size.as_bytes_u64()), @@ -352,3 +360,43 @@ impl Validatable<ConfigError> for MemoryPoolConfig { Ok(()) } } + +impl Validatable<ConfigError> for ShardingConfig { + fn validate(&self) -> Result<(), ConfigError> { + let available_cpus = available_parallelism() + .expect("Failed to get number of CPU cores") + .get(); + + match &self.cpu_allocation { + CpuAllocation::All => Ok(()), + CpuAllocation::Count(count) => { + if *count == 0 { + eprintln!("Invalid sharding configuration: cpu_allocation count cannot be 0"); + return Err(ConfigError::InvalidConfiguration); + } + if *count > available_cpus { + eprintln!( + "Invalid sharding configuration: cpu_allocation count {count} exceeds available CPU cores {available_cpus}" + ); + return Err(ConfigError::InvalidConfiguration); + } + Ok(()) + } + CpuAllocation::Range(start, end) => { + if start >= end { + eprintln!( + "Invalid sharding configuration: cpu_allocation range {start}..{end} is invalid (start must be less than end)" + ); + return Err(ConfigError::InvalidConfiguration); + } + if *end > available_cpus { + eprintln!( + "Invalid sharding configuration: cpu_allocation range {start}..{end} exceeds available CPU cores (max: {available_cpus})" + ); + return Err(ConfigError::InvalidConfiguration); + } + Ok(()) + } + } + } +} diff --git a/core/server/src/main.rs b/core/server/src/main.rs index 2602e3c8..e01f03ec 100644 --- a/core/server/src/main.rs +++ b/core/server/src/main.rs @@ -19,7 +19,6 @@ use std::collections::HashSet; use std::rc::Rc; use std::sync::Arc; -use std::thread::available_parallelism; use anyhow::Result; use clap::Parser; @@ -36,12 +35,13 @@ use server::bootstrap::{ }; use server::configs::config_provider::{self}; use server::configs::server::ServerConfig; +use server::configs::sharding::CpuAllocation; use server::io::fs_utils; #[cfg(not(feature = "tokio-console"))] use server::log::logger::Logging; #[cfg(feature = "tokio-console")] use server::log::tokio_console::Logging; -use server::server_error::ServerError; +use server::server_error::{ConfigError, ServerError}; use server::shard::IggyShard; use server::shard::gate::Barrier; use server::state::StateKind; @@ -122,12 +122,28 @@ fn main() -> Result<(), ServerError> { // From this point on, we can use tracing macros to log messages. logging.late_init(config.system.get_system_path(), &config.system.logging)?; - // TODO: Make this configurable from config as a range - // for example this instance of Iggy will use cores from 0..4 - let available_cpus = available_parallelism().expect("Failed to get num of cores"); - let shards_count = available_cpus.into(); - let shards_set = 0..shards_count; - let (connections, shutdown_handles) = create_shard_connections(shards_set.clone()); + let shards_set = config.system.sharding.cpu_allocation.to_shard_set(); + + match &config.system.sharding.cpu_allocation { + CpuAllocation::All => { + info!( + "Using all available CPU cores ({} shards with affinity)", + shards_set.len() + ); + } + CpuAllocation::Count(count) => { + info!("Using {count} shards with affinity to cores 0..{count}"); + } + CpuAllocation::Range(start, end) => { + info!( + "Using {} shards with affinity to cores {start}..{end}", + end - start + ); + } + } + + info!("Starting {} shard(s)", shards_set.len()); + let (connections, shutdown_handles) = create_shard_connections(&shards_set); let barrier = Arc::new(Barrier::new()); let mut handles = Vec::with_capacity(shards_set.len()); @@ -142,7 +158,8 @@ fn main() -> Result<(), ServerError> { .name(format!("shard-{id}")) .spawn(move || { MemoryPool::init_pool(config.system.clone()); - let rt = create_shard_executor(HashSet::from([shard_id])); + let affinity_set = HashSet::from([shard_id]); + let rt = create_shard_executor(affinity_set); rt.block_on(async move { let version = SemanticVersion::current().expect("Invalid version"); info!( diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs index aff112b1..87b43e8c 100644 --- a/core/server/src/shard/mod.rs +++ b/core/server/src/shard/mod.rs @@ -665,13 +665,10 @@ impl IggyShard { })?; Ok(()) } - ShardEvent::UpdatedStream { - stream_id, - name, - } => { + ShardEvent::UpdatedStream { stream_id, name } => { self.update_stream_bypass_auth(stream_id, name)?; Ok(()) - }, + } ShardEvent::UpdatedTopic { stream_id, topic_id, @@ -689,9 +686,10 @@ impl IggyShard { *compression_algorithm, *max_topic_size, *replication_factor, - ).await?; + ) + .await?; Ok(()) - }, + } ShardEvent::PurgedStream { stream_id: _ } => todo!(), ShardEvent::CreatedConsumerGroup { stream_id: _, diff --git a/core/server/src/shard/system/topics.rs b/core/server/src/shard/system/topics.rs index 7406a11c..4a76c0df 100644 --- a/core/server/src/shard/system/topics.rs +++ b/core/server/src/shard/system/topics.rs @@ -239,7 +239,8 @@ impl IggyShard { compression_algorithm, max_topic_size, replication_factor, - ).await + ) + .await } #[allow(clippy::too_many_arguments)]
