This is an automated email from the ASF dual-hosted git repository.
gkoszyk pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/io_uring_tpc by this push:
new 165f5a7d feat(io_uring): add configurable CPU sharding with validation
(#1982)
165f5a7d is described below
commit 165f5a7df0de77fff42b9176a066ccf61525f1aa
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Wed Jul 9 18:27:39 2025 +0200
feat(io_uring): add configurable CPU sharding with validation (#1982)
- Add system.sharding.cpu_allocation config to control thread-to-CPU
mapping
- Support "all" (default), numeric (e.g. 4), and range (e.g. "5..8")
values
- Always pin threads to specific CPU cores for consistent performance
- Validate configuration against available CPU cores with descriptive
errors
---
core/configs/server.toml | 9 +++
core/server/src/bootstrap.rs | 11 ++--
core/server/src/configs/defaults.rs | 2 +
core/server/src/configs/mod.rs | 1 +
core/server/src/configs/sharding.rs | 114 ++++++++++++++++++++++++++++++++++
core/server/src/configs/system.rs | 2 +
core/server/src/configs/validators.rs | 48 ++++++++++++++
core/server/src/main.rs | 35 ++++++++---
core/server/src/shard/mod.rs | 12 ++--
9 files changed, 214 insertions(+), 20 deletions(-)
diff --git a/core/configs/server.toml b/core/configs/server.toml
index 5445ff18..e235baa8 100644
--- a/core/configs/server.toml
+++ b/core/configs/server.toml
@@ -541,3 +541,12 @@ size = "4 GiB"
# and holds different buffer sizes, from 256 B to 512 MiB.
# Note: This number has to be a power of 2. Minimum value is 128 due to
internal implementation details.
bucket_capacity = 8192
+
+# Sharding configuration
+[system.sharding]
+# CPU allocation - controls the number of shards and their CPU affinity.
+# Possible values:
+# - "all": Use all available CPU cores (default)
+# - numeric value (e.g. 4): Use 4 shards (4 threads pinned to cores 0, 1, 2, 3)
+# - range (e.g. "5..8"): Use 3 shards with affinity to cores 5, 6, 7
+cpu_allocation = "all"
diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs
index 68419ba9..8b53a81c 100644
--- a/core/server/src/bootstrap.rs
+++ b/core/server/src/bootstrap.rs
@@ -23,13 +23,16 @@ use crate::{
utils::file::overwrite,
},
};
-use std::{collections::HashSet, env, ops::Range, path::Path, sync::Arc};
+use std::{collections::HashSet, env, path::Path, sync::Arc};
pub fn create_shard_connections(
- shards_set: Range<usize>,
+ shards_set: &HashSet<usize>,
) -> (Vec<ShardConnector<ShardFrame>>, Vec<(u16, StopSender)>) {
let shards_count = shards_set.len();
- let connectors: Vec<ShardConnector<ShardFrame>> = shards_set
+ let mut shards_vec: Vec<usize> = shards_set.iter().cloned().collect();
+ shards_vec.sort();
+
+ let connectors: Vec<ShardConnector<ShardFrame>> = shards_vec
.into_iter()
.map(|id| ShardConnector::new(id as u16, shards_count))
.collect();
@@ -122,7 +125,7 @@ pub fn create_root_user() -> User {
}
pub fn create_shard_executor(cpu_set: HashSet<usize>) -> Runtime {
- // TODO: The event intererval tick, could be configured based on the fact
+ // TODO: The event interval tick, could be configured based on the fact
// How many clients we expect to have connected.
// This roughly estimates the number of tasks we will create.
diff --git a/core/server/src/configs/defaults.rs
b/core/server/src/configs/defaults.rs
index dd74ade7..01f7140b 100644
--- a/core/server/src/configs/defaults.rs
+++ b/core/server/src/configs/defaults.rs
@@ -16,6 +16,7 @@
* under the License.
*/
+use super::sharding::ShardingConfig;
use super::system::MemoryPoolConfig;
use super::tcp::TcpSocketConfig;
use crate::configs::http::{
@@ -324,6 +325,7 @@ impl Default for SystemConfig {
message_deduplication: MessageDeduplicationConfig::default(),
recovery: RecoveryConfig::default(),
memory_pool: MemoryPoolConfig::default(),
+ sharding: ShardingConfig::default(),
}
}
}
diff --git a/core/server/src/configs/mod.rs b/core/server/src/configs/mod.rs
index 1a45ecf9..3055046b 100644
--- a/core/server/src/configs/mod.rs
+++ b/core/server/src/configs/mod.rs
@@ -23,6 +23,7 @@ pub mod displays;
pub mod http;
pub mod quic;
pub mod server;
+pub mod sharding;
pub mod system;
pub mod tcp;
pub mod validators;
diff --git a/core/server/src/configs/sharding.rs
b/core/server/src/configs/sharding.rs
new file mode 100644
index 00000000..87679676
--- /dev/null
+++ b/core/server/src/configs/sharding.rs
@@ -0,0 +1,114 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use serde::{Deserialize, Deserializer, Serialize};
+use std::collections::HashSet;
+use std::str::FromStr;
+use std::thread::available_parallelism;
+
+#[derive(Debug, Deserialize, Serialize)]
+pub struct ShardingConfig {
+ #[serde(default)]
+ pub cpu_allocation: CpuAllocation,
+}
+
+impl Default for ShardingConfig {
+ fn default() -> Self {
+ ShardingConfig {
+ cpu_allocation: CpuAllocation::default(),
+ }
+ }
+}
+
+#[derive(Debug, Clone, PartialEq, Serialize)]
+pub enum CpuAllocation {
+ All,
+ Count(usize),
+ Range(usize, usize),
+}
+
+impl Default for CpuAllocation {
+ fn default() -> Self {
+ CpuAllocation::All
+ }
+}
+
+impl CpuAllocation {
+ pub fn to_shard_set(&self) -> HashSet<usize> {
+ match self {
+ CpuAllocation::All => {
+ let available_cpus = available_parallelism()
+ .expect("Failed to get num of cores")
+ .get();
+ (0..available_cpus).collect()
+ }
+ CpuAllocation::Count(count) => (0..*count).collect(),
+ CpuAllocation::Range(start, end) => (*start..*end).collect(),
+ }
+ }
+}
+
+impl FromStr for CpuAllocation {
+ type Err = String;
+
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ match s {
+ "all" => Ok(CpuAllocation::All),
+ s if s.contains("..") => {
+ let parts: Vec<&str> = s.split("..").collect();
+ if parts.len() != 2 {
+ return Err(format!("Invalid range format: {s}. Expected
'start..end'"));
+ }
+ let start = parts[0]
+ .parse::<usize>()
+ .map_err(|_| format!("Invalid start value: {}",
parts[0]))?;
+ let end = parts[1]
+ .parse::<usize>()
+ .map_err(|_| format!("Invalid end value: {}", parts[1]))?;
+ Ok(CpuAllocation::Range(start, end))
+ }
+ s => {
+ let count = s
+ .parse::<usize>()
+ .map_err(|_| format!("Invalid shard count: {s}"))?;
+ Ok(CpuAllocation::Count(count))
+ }
+ }
+ }
+}
+
+impl<'de> Deserialize<'de> for CpuAllocation {
+ fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
+ where
+ D: Deserializer<'de>,
+ {
+ #[derive(Deserialize)]
+ #[serde(untagged)]
+ enum CpuAllocationHelper {
+ String(String),
+ Number(usize),
+ }
+
+ match CpuAllocationHelper::deserialize(deserializer)? {
+ CpuAllocationHelper::String(s) => {
+ CpuAllocation::from_str(&s).map_err(serde::de::Error::custom)
+ }
+ CpuAllocationHelper::Number(n) => Ok(CpuAllocation::Count(n)),
+ }
+ }
+}
diff --git a/core/server/src/configs/system.rs
b/core/server/src/configs/system.rs
index a5193294..54a277cb 100644
--- a/core/server/src/configs/system.rs
+++ b/core/server/src/configs/system.rs
@@ -17,6 +17,7 @@
*/
use super::cache_indexes::CacheIndexesConfig;
+use super::sharding::ShardingConfig;
use iggy_common::IggyByteSize;
use iggy_common::IggyExpiry;
use iggy_common::MaxTopicSize;
@@ -41,6 +42,7 @@ pub struct SystemConfig {
pub message_deduplication: MessageDeduplicationConfig,
pub recovery: RecoveryConfig,
pub memory_pool: MemoryPoolConfig,
+ pub sharding: ShardingConfig,
}
#[derive(Debug, Deserialize, Serialize)]
diff --git a/core/server/src/configs/validators.rs
b/core/server/src/configs/validators.rs
index 1304430c..a52853d4 100644
--- a/core/server/src/configs/validators.rs
+++ b/core/server/src/configs/validators.rs
@@ -22,6 +22,7 @@ use super::server::{
ArchiverConfig, DataMaintenanceConfig, MessageSaverConfig,
MessagesMaintenanceConfig,
StateMaintenanceConfig, TelemetryConfig,
};
+use super::sharding::{CpuAllocation, ShardingConfig};
use super::system::{CompressionConfig, MemoryPoolConfig, PartitionConfig};
use crate::archiver::ArchiverKindType;
use crate::configs::COMPONENT;
@@ -34,6 +35,7 @@ use iggy_common::CompressionAlgorithm;
use iggy_common::IggyExpiry;
use iggy_common::MaxTopicSize;
use iggy_common::Validatable;
+use std::thread::available_parallelism;
use tracing::error;
impl Validatable<ConfigError> for ServerConfig {
@@ -68,6 +70,12 @@ impl Validatable<ConfigError> for ServerConfig {
self.telemetry.validate().with_error_context(|error| {
format!("{COMPONENT} (error: {error}) - failed to validate
telemetry config")
})?;
+ self.system
+ .sharding
+ .validate()
+ .with_error_context(|error| {
+ format!("{COMPONENT} (error: {error}) - failed to validate
sharding config")
+ })?;
let topic_size = match self.system.topic.max_size {
MaxTopicSize::Custom(size) => Ok(size.as_bytes_u64()),
@@ -352,3 +360,43 @@ impl Validatable<ConfigError> for MemoryPoolConfig {
Ok(())
}
}
+
+impl Validatable<ConfigError> for ShardingConfig {
+ fn validate(&self) -> Result<(), ConfigError> {
+ let available_cpus = available_parallelism()
+ .expect("Failed to get number of CPU cores")
+ .get();
+
+ match &self.cpu_allocation {
+ CpuAllocation::All => Ok(()),
+ CpuAllocation::Count(count) => {
+ if *count == 0 {
+ eprintln!("Invalid sharding configuration: cpu_allocation
count cannot be 0");
+ return Err(ConfigError::InvalidConfiguration);
+ }
+ if *count > available_cpus {
+ eprintln!(
+ "Invalid sharding configuration: cpu_allocation count
{count} exceeds available CPU cores {available_cpus}"
+ );
+ return Err(ConfigError::InvalidConfiguration);
+ }
+ Ok(())
+ }
+ CpuAllocation::Range(start, end) => {
+ if start >= end {
+ eprintln!(
+ "Invalid sharding configuration: cpu_allocation range
{start}..{end} is invalid (start must be less than end)"
+ );
+ return Err(ConfigError::InvalidConfiguration);
+ }
+ if *end > available_cpus {
+ eprintln!(
+ "Invalid sharding configuration: cpu_allocation range
{start}..{end} exceeds available CPU cores (max: {available_cpus})"
+ );
+ return Err(ConfigError::InvalidConfiguration);
+ }
+ Ok(())
+ }
+ }
+ }
+}
diff --git a/core/server/src/main.rs b/core/server/src/main.rs
index 2602e3c8..e01f03ec 100644
--- a/core/server/src/main.rs
+++ b/core/server/src/main.rs
@@ -19,7 +19,6 @@
use std::collections::HashSet;
use std::rc::Rc;
use std::sync::Arc;
-use std::thread::available_parallelism;
use anyhow::Result;
use clap::Parser;
@@ -36,12 +35,13 @@ use server::bootstrap::{
};
use server::configs::config_provider::{self};
use server::configs::server::ServerConfig;
+use server::configs::sharding::CpuAllocation;
use server::io::fs_utils;
#[cfg(not(feature = "tokio-console"))]
use server::log::logger::Logging;
#[cfg(feature = "tokio-console")]
use server::log::tokio_console::Logging;
-use server::server_error::ServerError;
+use server::server_error::{ConfigError, ServerError};
use server::shard::IggyShard;
use server::shard::gate::Barrier;
use server::state::StateKind;
@@ -122,12 +122,28 @@ fn main() -> Result<(), ServerError> {
// From this point on, we can use tracing macros to log messages.
logging.late_init(config.system.get_system_path(),
&config.system.logging)?;
- // TODO: Make this configurable from config as a range
- // for example this instance of Iggy will use cores from 0..4
- let available_cpus = available_parallelism().expect("Failed to get num of
cores");
- let shards_count = available_cpus.into();
- let shards_set = 0..shards_count;
- let (connections, shutdown_handles) =
create_shard_connections(shards_set.clone());
+ let shards_set = config.system.sharding.cpu_allocation.to_shard_set();
+
+ match &config.system.sharding.cpu_allocation {
+ CpuAllocation::All => {
+ info!(
+ "Using all available CPU cores ({} shards with affinity)",
+ shards_set.len()
+ );
+ }
+ CpuAllocation::Count(count) => {
+ info!("Using {count} shards with affinity to cores 0..{count}");
+ }
+ CpuAllocation::Range(start, end) => {
+ info!(
+ "Using {} shards with affinity to cores {start}..{end}",
+ end - start
+ );
+ }
+ }
+
+ info!("Starting {} shard(s)", shards_set.len());
+ let (connections, shutdown_handles) =
create_shard_connections(&shards_set);
let barrier = Arc::new(Barrier::new());
let mut handles = Vec::with_capacity(shards_set.len());
@@ -142,7 +158,8 @@ fn main() -> Result<(), ServerError> {
.name(format!("shard-{id}"))
.spawn(move || {
MemoryPool::init_pool(config.system.clone());
- let rt = create_shard_executor(HashSet::from([shard_id]));
+ let affinity_set = HashSet::from([shard_id]);
+ let rt = create_shard_executor(affinity_set);
rt.block_on(async move {
let version = SemanticVersion::current().expect("Invalid
version");
info!(
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index d4358f66..a8158bc1 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -665,13 +665,10 @@ impl IggyShard {
})?;
Ok(())
}
- ShardEvent::UpdatedStream {
- stream_id,
- name,
- } => {
+ ShardEvent::UpdatedStream { stream_id, name } => {
self.update_stream_bypass_auth(stream_id, name)?;
Ok(())
- },
+ }
ShardEvent::UpdatedTopic {
stream_id,
topic_id,
@@ -689,9 +686,10 @@ impl IggyShard {
*compression_algorithm,
*max_topic_size,
*replication_factor,
- ).await?;
+ )
+ .await?;
Ok(())
- },
+ }
ShardEvent::PurgedStream { stream_id: _ } => todo!(),
ShardEvent::CreatedConsumerGroup {
stream_id: _,