This is an automated email from the ASF dual-hosted git repository.
piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 4dc1be39d fix(server): set CPU affinity before NUMA memory bind (#2511)
4dc1be39d is described below
commit 4dc1be39d29ee77f5c89047c3478c045d2efb8d8
Author: tungtose <[email protected]>
AuthorDate: Tue Dec 23 17:00:02 2025 +0700
fix(server): set CPU affinity before NUMA memory bind (#2511)
---
Cargo.toml | 2 +-
core/server/src/bootstrap.rs | 5 ++---
core/server/src/configs/sharding.rs | 30 ++++++++++++++++++++++++++++++
core/server/src/main.rs | 6 +++++-
4 files changed, 38 insertions(+), 5 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index 2ccb4b1ea..9d55f094a 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -145,7 +145,7 @@ lazy_static = "1.5.0"
log = "0.4.29"
mimalloc = "0.1"
mockall = "0.14.0"
-nix = { version = "0.30.1", features = ["fs", "resource"] }
+nix = { version = "0.30.1", features = ["fs", "resource", "sched"] }
nonzero_lit = "0.1.2"
once_cell = "1.21.3"
parquet = "=55.2.0"
diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs
index af066da2d..60f19a9eb 100644
--- a/core/server/src/bootstrap.rs
+++ b/core/server/src/bootstrap.rs
@@ -75,7 +75,7 @@ use iggy_common::{
MIN_USERNAME_LENGTH,
},
};
-use std::{collections::HashSet, env, path::Path, sync::Arc};
+use std::{env, path::Path, sync::Arc};
use tracing::{info, warn};
pub async fn load_streams(
@@ -351,7 +351,7 @@ pub fn create_root_user() -> User {
User::root(&username, &password)
}
-pub fn create_shard_executor(_cpu_set: HashSet<usize>) -> Runtime {
+pub fn create_shard_executor() -> Runtime {
// TODO: The event interval tick, could be configured based on the fact
// How many clients we expect to have connected.
// This roughly estimates the number of tasks we will create.
@@ -371,7 +371,6 @@ pub fn create_shard_executor(_cpu_set: HashSet<usize>) ->
Runtime {
compio::runtime::RuntimeBuilder::new()
.with_proactor(proactor.to_owned())
.event_interval(128)
- .thread_affinity(_cpu_set)
.build()
.unwrap()
}
diff --git a/core/server/src/configs/sharding.rs
b/core/server/src/configs/sharding.rs
index 4ce31889f..c8f289e4f 100644
--- a/core/server/src/configs/sharding.rs
+++ b/core/server/src/configs/sharding.rs
@@ -21,6 +21,8 @@ use hwlocality::bitmap::SpecializedBitmapRef;
use hwlocality::cpu::cpuset::CpuSet;
use hwlocality::memory::binding::{MemoryBindingFlags, MemoryBindingPolicy};
use hwlocality::object::types::ObjectType::{self, NUMANode};
+#[cfg(target_os = "linux")]
+use nix::{sched::sched_setaffinity, unistd::Pid};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::collections::HashSet;
use std::str::FromStr;
@@ -291,6 +293,34 @@ pub struct ShardInfo {
}
impl ShardInfo {
+ pub fn bind_cpu(&self) -> Result<(), ServerError> {
+ #[cfg(target_os = "linux")]
+ {
+ if self.cpu_set.is_empty() {
+ return Ok(());
+ }
+
+ let mut cpuset = nix::sched::CpuSet::new();
+ for &cpu in &self.cpu_set {
+ cpuset.set(cpu).map_err(|_| ServerError::BindingFailed)?;
+ }
+
+ sched_setaffinity(Pid::from_raw(0), &cpuset).map_err(|e| {
+ tracing::error!("Failed to set CPU affinity: {:?}", e);
+ ServerError::BindingFailed
+ })?;
+
+ info!("Thread bound to CPUs: {:?}", self.cpu_set);
+ }
+
+ #[cfg(not(target_os = "linux"))]
+ {
+ tracing::debug!("CPU affinity binding skipped on non-Linux
platform");
+ }
+
+ Ok(())
+ }
+
pub fn bind_memory(&self) -> Result<(), ServerError> {
if let Some(node_id) = self.numa_node {
let topology = Topology::new().map_err(|err|
ServerError::TopologyDetection {
diff --git a/core/server/src/main.rs b/core/server/src/main.rs
index 23e7c515d..58c50e33c 100644
--- a/core/server/src/main.rs
+++ b/core/server/src/main.rs
@@ -400,11 +400,15 @@ fn main() -> Result<(), ServerError> {
.name(format!("shard-{id}"))
.spawn(move || {
let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
+ if let Err(e) = assignment.bind_cpu() {
+ error!("Failed to bind cpu: {e:?}");
+ }
+
if let Err(e) = assignment.bind_memory() {
error!("Failed to bind memory: {e:?}");
}
- let rt = create_shard_executor(assignment.cpu_set);
+ let rt = create_shard_executor();
rt.block_on(async move {
let builder = IggyShard::builder();
let shard = builder