This is an automated email from the ASF dual-hosted git repository.

piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new 4dc1be39d fix(server): set CPU affinity before NUMA memory bind (#2511)
4dc1be39d is described below

commit 4dc1be39d29ee77f5c89047c3478c045d2efb8d8
Author: tungtose <[email protected]>
AuthorDate: Tue Dec 23 17:00:02 2025 +0700

    fix(server): set CPU affinity before NUMA memory bind (#2511)
---
 Cargo.toml                          |  2 +-
 core/server/src/bootstrap.rs        |  5 ++---
 core/server/src/configs/sharding.rs | 30 ++++++++++++++++++++++++++++++
 core/server/src/main.rs             |  6 +++++-
 4 files changed, 38 insertions(+), 5 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 2ccb4b1ea..9d55f094a 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -145,7 +145,7 @@ lazy_static = "1.5.0"
 log = "0.4.29"
 mimalloc = "0.1"
 mockall = "0.14.0"
-nix = { version = "0.30.1", features = ["fs", "resource"] }
+nix = { version = "0.30.1", features = ["fs", "resource", "sched"] }
 nonzero_lit = "0.1.2"
 once_cell = "1.21.3"
 parquet = "=55.2.0"
diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs
index af066da2d..60f19a9eb 100644
--- a/core/server/src/bootstrap.rs
+++ b/core/server/src/bootstrap.rs
@@ -75,7 +75,7 @@ use iggy_common::{
         MIN_USERNAME_LENGTH,
     },
 };
-use std::{collections::HashSet, env, path::Path, sync::Arc};
+use std::{env, path::Path, sync::Arc};
 use tracing::{info, warn};
 
 pub async fn load_streams(
@@ -351,7 +351,7 @@ pub fn create_root_user() -> User {
     User::root(&username, &password)
 }
 
-pub fn create_shard_executor(_cpu_set: HashSet<usize>) -> Runtime {
+pub fn create_shard_executor() -> Runtime {
     // TODO: The event interval tick, could be configured based on the fact
     // How many clients we expect to have connected.
     // This roughly estimates the number of tasks we will create.
@@ -371,7 +371,6 @@ pub fn create_shard_executor(_cpu_set: HashSet<usize>) -> 
Runtime {
     compio::runtime::RuntimeBuilder::new()
         .with_proactor(proactor.to_owned())
         .event_interval(128)
-        .thread_affinity(_cpu_set)
         .build()
         .unwrap()
 }
diff --git a/core/server/src/configs/sharding.rs 
b/core/server/src/configs/sharding.rs
index 4ce31889f..c8f289e4f 100644
--- a/core/server/src/configs/sharding.rs
+++ b/core/server/src/configs/sharding.rs
@@ -21,6 +21,8 @@ use hwlocality::bitmap::SpecializedBitmapRef;
 use hwlocality::cpu::cpuset::CpuSet;
 use hwlocality::memory::binding::{MemoryBindingFlags, MemoryBindingPolicy};
 use hwlocality::object::types::ObjectType::{self, NUMANode};
+#[cfg(target_os = "linux")]
+use nix::{sched::sched_setaffinity, unistd::Pid};
 use serde::{Deserialize, Deserializer, Serialize, Serializer};
 use std::collections::HashSet;
 use std::str::FromStr;
@@ -291,6 +293,34 @@ pub struct ShardInfo {
 }
 
 impl ShardInfo {
+    pub fn bind_cpu(&self) -> Result<(), ServerError> {
+        #[cfg(target_os = "linux")]
+        {
+            if self.cpu_set.is_empty() {
+                return Ok(());
+            }
+
+            let mut cpuset = nix::sched::CpuSet::new();
+            for &cpu in &self.cpu_set {
+                cpuset.set(cpu).map_err(|_| ServerError::BindingFailed)?;
+            }
+
+            sched_setaffinity(Pid::from_raw(0), &cpuset).map_err(|e| {
+                tracing::error!("Failed to set CPU affinity: {:?}", e);
+                ServerError::BindingFailed
+            })?;
+
+            info!("Thread bound to CPUs: {:?}", self.cpu_set);
+        }
+
+        #[cfg(not(target_os = "linux"))]
+        {
+            tracing::debug!("CPU affinity binding skipped on non-Linux 
platform");
+        }
+
+        Ok(())
+    }
+
     pub fn bind_memory(&self) -> Result<(), ServerError> {
         if let Some(node_id) = self.numa_node {
             let topology = Topology::new().map_err(|err| 
ServerError::TopologyDetection {
diff --git a/core/server/src/main.rs b/core/server/src/main.rs
index 23e7c515d..58c50e33c 100644
--- a/core/server/src/main.rs
+++ b/core/server/src/main.rs
@@ -400,11 +400,15 @@ fn main() -> Result<(), ServerError> {
                 .name(format!("shard-{id}"))
                 .spawn(move || {
                     let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
+                        if let Err(e) = assignment.bind_cpu() {
+                            error!("Failed to bind cpu: {e:?}");
+                        }
+
                         if let Err(e) = assignment.bind_memory() {
                             error!("Failed to bind memory: {e:?}");
                         }
 
-                        let rt = create_shard_executor(assignment.cpu_set);
+                        let rt = create_shard_executor();
                         rt.block_on(async move {
                             let builder = IggyShard::builder();
                             let shard = builder

Reply via email to