This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/io_uring_tpc by this push:
     new 165f5a7d feat(io_uring): add configurable CPU sharding with validation 
(#1982)
165f5a7d is described below

commit 165f5a7df0de77fff42b9176a066ccf61525f1aa
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Wed Jul 9 18:27:39 2025 +0200

    feat(io_uring): add configurable CPU sharding with validation (#1982)
    
    - Add system.sharding.cpu_allocation config to control thread-to-CPU
    mapping
    - Support "all" (default), numeric (e.g. 4), and range (e.g. "5..8")
    values
    - Always pin threads to specific CPU cores for consistent performance
    - Validate configuration against available CPU cores with descriptive
    errors
---
 core/configs/server.toml              |   9 +++
 core/server/src/bootstrap.rs          |  11 ++--
 core/server/src/configs/defaults.rs   |   2 +
 core/server/src/configs/mod.rs        |   1 +
 core/server/src/configs/sharding.rs   | 114 ++++++++++++++++++++++++++++++++++
 core/server/src/configs/system.rs     |   2 +
 core/server/src/configs/validators.rs |  48 ++++++++++++++
 core/server/src/main.rs               |  35 ++++++++---
 core/server/src/shard/mod.rs          |  12 ++--
 9 files changed, 214 insertions(+), 20 deletions(-)

diff --git a/core/configs/server.toml b/core/configs/server.toml
index 5445ff18..e235baa8 100644
--- a/core/configs/server.toml
+++ b/core/configs/server.toml
@@ -541,3 +541,12 @@ size = "4 GiB"
 # and holds different buffer sizes, from 256 B to 512 MiB.
 # Note: This number has to be a power of 2. Minimum value is 128 due to 
internal implementation details.
 bucket_capacity = 8192
+
+# Sharding configuration
+[system.sharding]
+# CPU allocation - controls the number of shards and their CPU affinity.
+# Possible values:
+# - "all": Use all available CPU cores (default)
+# - numeric value (e.g. 4): Use 4 shards (4 threads pinned to cores 0, 1, 2, 3)
+# - range (e.g. "5..8"): Use 3 shards with affinity to cores 5, 6, 7
+cpu_allocation = "all"
diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs
index 68419ba9..8b53a81c 100644
--- a/core/server/src/bootstrap.rs
+++ b/core/server/src/bootstrap.rs
@@ -23,13 +23,16 @@ use crate::{
         utils::file::overwrite,
     },
 };
-use std::{collections::HashSet, env, ops::Range, path::Path, sync::Arc};
+use std::{collections::HashSet, env, path::Path, sync::Arc};
 
 pub fn create_shard_connections(
-    shards_set: Range<usize>,
+    shards_set: &HashSet<usize>,
 ) -> (Vec<ShardConnector<ShardFrame>>, Vec<(u16, StopSender)>) {
     let shards_count = shards_set.len();
-    let connectors: Vec<ShardConnector<ShardFrame>> = shards_set
+    let mut shards_vec: Vec<usize> = shards_set.iter().cloned().collect();
+    shards_vec.sort();
+
+    let connectors: Vec<ShardConnector<ShardFrame>> = shards_vec
         .into_iter()
         .map(|id| ShardConnector::new(id as u16, shards_count))
         .collect();
@@ -122,7 +125,7 @@ pub fn create_root_user() -> User {
 }
 
 pub fn create_shard_executor(cpu_set: HashSet<usize>) -> Runtime {
-    // TODO: The event intererval tick, could be configured based on the fact
+    // TODO: The event interval tick, could be configured based on the fact
     // How many clients we expect to have connected.
     // This roughly estimates the number of tasks we will create.
 
diff --git a/core/server/src/configs/defaults.rs 
b/core/server/src/configs/defaults.rs
index dd74ade7..01f7140b 100644
--- a/core/server/src/configs/defaults.rs
+++ b/core/server/src/configs/defaults.rs
@@ -16,6 +16,7 @@
  * under the License.
  */
 
+use super::sharding::ShardingConfig;
 use super::system::MemoryPoolConfig;
 use super::tcp::TcpSocketConfig;
 use crate::configs::http::{
@@ -324,6 +325,7 @@ impl Default for SystemConfig {
             message_deduplication: MessageDeduplicationConfig::default(),
             recovery: RecoveryConfig::default(),
             memory_pool: MemoryPoolConfig::default(),
+            sharding: ShardingConfig::default(),
         }
     }
 }
diff --git a/core/server/src/configs/mod.rs b/core/server/src/configs/mod.rs
index 1a45ecf9..3055046b 100644
--- a/core/server/src/configs/mod.rs
+++ b/core/server/src/configs/mod.rs
@@ -23,6 +23,7 @@ pub mod displays;
 pub mod http;
 pub mod quic;
 pub mod server;
+pub mod sharding;
 pub mod system;
 pub mod tcp;
 pub mod validators;
diff --git a/core/server/src/configs/sharding.rs 
b/core/server/src/configs/sharding.rs
new file mode 100644
index 00000000..87679676
--- /dev/null
+++ b/core/server/src/configs/sharding.rs
@@ -0,0 +1,114 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use serde::{Deserialize, Deserializer, Serialize};
+use std::collections::HashSet;
+use std::str::FromStr;
+use std::thread::available_parallelism;
+
+#[derive(Debug, Deserialize, Serialize)]
+pub struct ShardingConfig {
+    #[serde(default)]
+    pub cpu_allocation: CpuAllocation,
+}
+
+impl Default for ShardingConfig {
+    fn default() -> Self {
+        ShardingConfig {
+            cpu_allocation: CpuAllocation::default(),
+        }
+    }
+}
+
+#[derive(Debug, Clone, PartialEq, Serialize)]
+pub enum CpuAllocation {
+    All,
+    Count(usize),
+    Range(usize, usize),
+}
+
+impl Default for CpuAllocation {
+    fn default() -> Self {
+        CpuAllocation::All
+    }
+}
+
+impl CpuAllocation {
+    pub fn to_shard_set(&self) -> HashSet<usize> {
+        match self {
+            CpuAllocation::All => {
+                let available_cpus = available_parallelism()
+                    .expect("Failed to get num of cores")
+                    .get();
+                (0..available_cpus).collect()
+            }
+            CpuAllocation::Count(count) => (0..*count).collect(),
+            CpuAllocation::Range(start, end) => (*start..*end).collect(),
+        }
+    }
+}
+
+impl FromStr for CpuAllocation {
+    type Err = String;
+
+    fn from_str(s: &str) -> Result<Self, Self::Err> {
+        match s {
+            "all" => Ok(CpuAllocation::All),
+            s if s.contains("..") => {
+                let parts: Vec<&str> = s.split("..").collect();
+                if parts.len() != 2 {
+                    return Err(format!("Invalid range format: {s}. Expected 
'start..end'"));
+                }
+                let start = parts[0]
+                    .parse::<usize>()
+                    .map_err(|_| format!("Invalid start value: {}", 
parts[0]))?;
+                let end = parts[1]
+                    .parse::<usize>()
+                    .map_err(|_| format!("Invalid end value: {}", parts[1]))?;
+                Ok(CpuAllocation::Range(start, end))
+            }
+            s => {
+                let count = s
+                    .parse::<usize>()
+                    .map_err(|_| format!("Invalid shard count: {s}"))?;
+                Ok(CpuAllocation::Count(count))
+            }
+        }
+    }
+}
+
+impl<'de> Deserialize<'de> for CpuAllocation {
+    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
+    where
+        D: Deserializer<'de>,
+    {
+        #[derive(Deserialize)]
+        #[serde(untagged)]
+        enum CpuAllocationHelper {
+            String(String),
+            Number(usize),
+        }
+
+        match CpuAllocationHelper::deserialize(deserializer)? {
+            CpuAllocationHelper::String(s) => {
+                CpuAllocation::from_str(&s).map_err(serde::de::Error::custom)
+            }
+            CpuAllocationHelper::Number(n) => Ok(CpuAllocation::Count(n)),
+        }
+    }
+}
diff --git a/core/server/src/configs/system.rs 
b/core/server/src/configs/system.rs
index a5193294..54a277cb 100644
--- a/core/server/src/configs/system.rs
+++ b/core/server/src/configs/system.rs
@@ -17,6 +17,7 @@
  */
 
 use super::cache_indexes::CacheIndexesConfig;
+use super::sharding::ShardingConfig;
 use iggy_common::IggyByteSize;
 use iggy_common::IggyExpiry;
 use iggy_common::MaxTopicSize;
@@ -41,6 +42,7 @@ pub struct SystemConfig {
     pub message_deduplication: MessageDeduplicationConfig,
     pub recovery: RecoveryConfig,
     pub memory_pool: MemoryPoolConfig,
+    pub sharding: ShardingConfig,
 }
 
 #[derive(Debug, Deserialize, Serialize)]
diff --git a/core/server/src/configs/validators.rs 
b/core/server/src/configs/validators.rs
index 1304430c..a52853d4 100644
--- a/core/server/src/configs/validators.rs
+++ b/core/server/src/configs/validators.rs
@@ -22,6 +22,7 @@ use super::server::{
     ArchiverConfig, DataMaintenanceConfig, MessageSaverConfig, 
MessagesMaintenanceConfig,
     StateMaintenanceConfig, TelemetryConfig,
 };
+use super::sharding::{CpuAllocation, ShardingConfig};
 use super::system::{CompressionConfig, MemoryPoolConfig, PartitionConfig};
 use crate::archiver::ArchiverKindType;
 use crate::configs::COMPONENT;
@@ -34,6 +35,7 @@ use iggy_common::CompressionAlgorithm;
 use iggy_common::IggyExpiry;
 use iggy_common::MaxTopicSize;
 use iggy_common::Validatable;
+use std::thread::available_parallelism;
 use tracing::error;
 
 impl Validatable<ConfigError> for ServerConfig {
@@ -68,6 +70,12 @@ impl Validatable<ConfigError> for ServerConfig {
         self.telemetry.validate().with_error_context(|error| {
             format!("{COMPONENT} (error: {error}) - failed to validate 
telemetry config")
         })?;
+        self.system
+            .sharding
+            .validate()
+            .with_error_context(|error| {
+                format!("{COMPONENT} (error: {error}) - failed to validate 
sharding config")
+            })?;
 
         let topic_size = match self.system.topic.max_size {
             MaxTopicSize::Custom(size) => Ok(size.as_bytes_u64()),
@@ -352,3 +360,43 @@ impl Validatable<ConfigError> for MemoryPoolConfig {
         Ok(())
     }
 }
+
+impl Validatable<ConfigError> for ShardingConfig {
+    fn validate(&self) -> Result<(), ConfigError> {
+        let available_cpus = available_parallelism()
+            .expect("Failed to get number of CPU cores")
+            .get();
+
+        match &self.cpu_allocation {
+            CpuAllocation::All => Ok(()),
+            CpuAllocation::Count(count) => {
+                if *count == 0 {
+                    eprintln!("Invalid sharding configuration: cpu_allocation 
count cannot be 0");
+                    return Err(ConfigError::InvalidConfiguration);
+                }
+                if *count > available_cpus {
+                    eprintln!(
+                        "Invalid sharding configuration: cpu_allocation count 
{count} exceeds available CPU cores {available_cpus}"
+                    );
+                    return Err(ConfigError::InvalidConfiguration);
+                }
+                Ok(())
+            }
+            CpuAllocation::Range(start, end) => {
+                if start >= end {
+                    eprintln!(
+                        "Invalid sharding configuration: cpu_allocation range 
{start}..{end} is invalid (start must be less than end)"
+                    );
+                    return Err(ConfigError::InvalidConfiguration);
+                }
+                if *end > available_cpus {
+                    eprintln!(
+                        "Invalid sharding configuration: cpu_allocation range 
{start}..{end} exceeds available CPU cores (max: {available_cpus})"
+                    );
+                    return Err(ConfigError::InvalidConfiguration);
+                }
+                Ok(())
+            }
+        }
+    }
+}
diff --git a/core/server/src/main.rs b/core/server/src/main.rs
index 2602e3c8..e01f03ec 100644
--- a/core/server/src/main.rs
+++ b/core/server/src/main.rs
@@ -19,7 +19,6 @@
 use std::collections::HashSet;
 use std::rc::Rc;
 use std::sync::Arc;
-use std::thread::available_parallelism;
 
 use anyhow::Result;
 use clap::Parser;
@@ -36,12 +35,13 @@ use server::bootstrap::{
 };
 use server::configs::config_provider::{self};
 use server::configs::server::ServerConfig;
+use server::configs::sharding::CpuAllocation;
 use server::io::fs_utils;
 #[cfg(not(feature = "tokio-console"))]
 use server::log::logger::Logging;
 #[cfg(feature = "tokio-console")]
 use server::log::tokio_console::Logging;
-use server::server_error::ServerError;
+use server::server_error::{ConfigError, ServerError};
 use server::shard::IggyShard;
 use server::shard::gate::Barrier;
 use server::state::StateKind;
@@ -122,12 +122,28 @@ fn main() -> Result<(), ServerError> {
     // From this point on, we can use tracing macros to log messages.
     logging.late_init(config.system.get_system_path(), 
&config.system.logging)?;
 
-    // TODO: Make this configurable from config as a range
-    // for example this instance of Iggy will use cores from 0..4
-    let available_cpus = available_parallelism().expect("Failed to get num of 
cores");
-    let shards_count = available_cpus.into();
-    let shards_set = 0..shards_count;
-    let (connections, shutdown_handles) = 
create_shard_connections(shards_set.clone());
+    let shards_set = config.system.sharding.cpu_allocation.to_shard_set();
+
+    match &config.system.sharding.cpu_allocation {
+        CpuAllocation::All => {
+            info!(
+                "Using all available CPU cores ({} shards with affinity)",
+                shards_set.len()
+            );
+        }
+        CpuAllocation::Count(count) => {
+            info!("Using {count} shards with affinity to cores 0..{count}");
+        }
+        CpuAllocation::Range(start, end) => {
+            info!(
+                "Using {} shards with affinity to cores {start}..{end}",
+                end - start
+            );
+        }
+    }
+
+    info!("Starting {} shard(s)", shards_set.len());
+    let (connections, shutdown_handles) = 
create_shard_connections(&shards_set);
     let barrier = Arc::new(Barrier::new());
     let mut handles = Vec::with_capacity(shards_set.len());
 
@@ -142,7 +158,8 @@ fn main() -> Result<(), ServerError> {
             .name(format!("shard-{id}"))
             .spawn(move || {
                 MemoryPool::init_pool(config.system.clone());
-                let rt = create_shard_executor(HashSet::from([shard_id]));
+                let affinity_set = HashSet::from([shard_id]);
+                let rt = create_shard_executor(affinity_set);
                 rt.block_on(async move {
                     let version = SemanticVersion::current().expect("Invalid 
version");
                     info!(
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index d4358f66..a8158bc1 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -665,13 +665,10 @@ impl IggyShard {
                 })?;
                 Ok(())
             }
-            ShardEvent::UpdatedStream {
-                stream_id,
-                name,
-            } => {
+            ShardEvent::UpdatedStream { stream_id, name } => {
                 self.update_stream_bypass_auth(stream_id, name)?;
                 Ok(())
-            },
+            }
             ShardEvent::UpdatedTopic {
                 stream_id,
                 topic_id,
@@ -689,9 +686,10 @@ impl IggyShard {
                     *compression_algorithm,
                     *max_topic_size,
                     *replication_factor,
-                ).await?;
+                )
+                .await?;
                 Ok(())
-            },
+            }
             ShardEvent::PurgedStream { stream_id: _ } => todo!(),
             ShardEvent::CreatedConsumerGroup {
                 stream_id: _,

Reply via email to