This is an automated email from the ASF dual-hosted git repository.
piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new a5d569450 feat(server): NUMA awareness (#2412)
a5d569450 is described below
commit a5d569450ee34441be997786046c7a30785e11f2
Author: tungtose <[email protected]>
AuthorDate: Mon Dec 15 16:02:20 2025 +0700
feat(server): NUMA awareness (#2412)
This PR addressing #2387
---
.github/actions/rust/pre-merge/action.yml | 4 +-
.../actions/utils/setup-rust-with-cache/action.yml | 13 +
Cargo.lock | 75 +++
Cargo.toml | 1 +
DEPENDENCIES.md | 6 +
Dockerfile | 2 +
bdd/rust/Dockerfile | 6 +
core/configs/server.toml | 5 +-
core/server/Cargo.toml | 7 +
core/server/Dockerfile | 10 +
core/server/src/bootstrap.rs | 14 +-
core/server/src/configs/sharding.rs | 512 ++++++++++++++++++++-
core/server/src/configs/validators.rs | 13 +
core/server/src/main.rs | 43 +-
core/server/src/server_error.rs | 33 +-
15 files changed, 702 insertions(+), 42 deletions(-)
diff --git a/.github/actions/rust/pre-merge/action.yml
b/.github/actions/rust/pre-merge/action.yml
index 346a8c7fa..bf3bcb920 100644
--- a/.github/actions/rust/pre-merge/action.yml
+++ b/.github/actions/rust/pre-merge/action.yml
@@ -130,7 +130,7 @@ runs:
shell: bash
- name: Install musl tools for aarch64-musl
- if: inputs.task == 'build-aarch64-musl'
+ if: inputs.task == 'build-aarch64-musl' && runner.os == 'Linux'
run: |
sudo apt-get update && sudo apt-get install -y musl-tools
rustup target add aarch64-unknown-linux-musl
@@ -144,6 +144,8 @@ runs:
# Disable GCC outline atomics to avoid undefined __aarch64_ldadd4_sync
# references when linking dbus-sys (required by keyring crate)
CFLAGS: "-mno-outline-atomics"
+ PKG_CONFIG_ALLOW_CROSS: "1"
+ PKG_CONFIG_ALL_STATIC: "1"
# macOS builds
- name: Build macOS aarch64
diff --git a/.github/actions/utils/setup-rust-with-cache/action.yml
b/.github/actions/utils/setup-rust-with-cache/action.yml
index a1bc0904e..c2f362b5b 100644
--- a/.github/actions/utils/setup-rust-with-cache/action.yml
+++ b/.github/actions/utils/setup-rust-with-cache/action.yml
@@ -37,6 +37,19 @@ inputs:
runs:
using: "composite"
steps:
+ - name: Install system dependencies
+ if: runner.os == 'Linux'
+ run: |
+ sudo apt-get update
+ sudo apt-get install -y libhwloc-dev pkg-config libudev-dev
+ shell: bash
+
+ - name: Install system dependencies (macOS)
+ if: runner.os == 'macOS'
+ run: |
+ brew install hwloc pkg-config
+ shell: bash
+
- name: Setup Rust toolchain
run: |
echo "Using Rust toolchain from rust-toolchain.toml: $(rustup show)"
diff --git a/Cargo.lock b/Cargo.lock
index 839063828..351efd920 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -892,6 +892,15 @@ version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8"
+[[package]]
+name = "autotools"
+version = "0.2.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ef941527c41b0fc0dd48511a8154cd5fc7e29200a0ff8b7203c5d777dbc795cf"
+dependencies = [
+ "cc",
+]
+
[[package]]
name = "aws-lc-rs"
version = "1.15.1"
@@ -4077,6 +4086,40 @@ version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "135b12329e5e3ce057a9f972339ea52bc954fe1e9358ef27f95e89716fbc5424"
+[[package]]
+name = "hwlocality"
+version = "1.0.0-alpha.11"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f50d4312588681f6d07e6009728bf5c777e1f674d43a3ad91d15f6795a0db965"
+dependencies = [
+ "arrayvec",
+ "bitflags 2.10.0",
+ "derive_more 2.1.0",
+ "errno",
+ "hwlocality-sys",
+ "libc",
+ "strum 0.27.2",
+ "thiserror 2.0.17",
+ "windows-sys 0.61.2",
+]
+
+[[package]]
+name = "hwlocality-sys"
+version = "0.6.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "98f0a5f1ab804ba087ef715ce5cd4feaab6222a2ef6b3e9d5ae1536e90393728"
+dependencies = [
+ "autotools",
+ "cmake",
+ "flate2",
+ "libc",
+ "pkg-config",
+ "sha3",
+ "tar",
+ "ureq",
+ "windows-sys 0.61.2",
+]
+
[[package]]
name = "hyper"
version = "1.8.1"
@@ -5143,6 +5186,15 @@ dependencies = [
"rayon",
]
+[[package]]
+name = "keccak"
+version = "0.1.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ecc2af9a1119c51f12a14607e783cb977bde58bc069ff0c3da1095e635d70654"
+dependencies = [
+ "cpufeatures",
+]
+
[[package]]
name = "keyring"
version = "3.6.3"
@@ -8172,6 +8224,7 @@ dependencies = [
"futures",
"hash32 1.0.0",
"human-repr",
+ "hwlocality",
"iggy_common",
"jsonwebtoken",
"lending-iterator",
@@ -8239,6 +8292,16 @@ dependencies = [
"digest",
]
+[[package]]
+name = "sha3"
+version = "0.10.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "75872d278a8f37ef87fa0ddbda7802605cb18344497949862c0d4dcb291eba60"
+dependencies = [
+ "digest",
+ "keccak",
+]
+
[[package]]
name = "sharded-slab"
version = "0.1.7"
@@ -8855,6 +8918,17 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369"
+[[package]]
+name = "tar"
+version = "0.4.44"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1d863878d212c87a19c1a610eb53bb01fe12951c0501cf5a0d65f724914a667a"
+dependencies = [
+ "filetime",
+ "libc",
+ "xattr",
+]
+
[[package]]
name = "tempfile"
version = "3.23.0"
@@ -9737,6 +9811,7 @@ source =
"registry+https://github.com/rust-lang/crates.io-index"
checksum = "d39cb1dbab692d82a977c0392ffac19e188bd9186a9f32806f0aaa859d75585a"
dependencies = [
"base64 0.22.1",
+ "flate2",
"log",
"percent-encoding",
"rustls",
diff --git a/Cargo.toml b/Cargo.toml
index 87ecac08c..f2491ac6d 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -132,6 +132,7 @@ futures = "0.3.31"
futures-util = "0.3.31"
human-repr = "1.1.0"
humantime = "2.3.0"
+hwlocality = "1.0.0-alpha.11"
iceberg = "=0.6.0"
iceberg-catalog-rest = "=0.6.0"
iggy = { path = "core/sdk", version = "0.8.1-edge.1" }
diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md
index 51b3e5d83..1ba074063 100644
--- a/DEPENDENCIES.md
+++ b/DEPENDENCIES.md
@@ -73,6 +73,7 @@ atomic: 0.6.1, "Apache-2.0 OR MIT",
atomic-polyfill: 1.0.3, "Apache-2.0 OR MIT",
atomic-waker: 1.1.2, "Apache-2.0 OR MIT",
autocfg: 1.5.0, "Apache-2.0 OR MIT",
+autotools: 0.2.7, "MIT",
aws-lc-rs: 1.15.1, "(Apache-2.0 OR ISC) AND ISC",
aws-lc-sys: 0.34.0, "(Apache-2.0 OR ISC) AND ISC AND OpenSSL",
axum: 0.8.7, "MIT",
@@ -367,6 +368,8 @@ httparse: 1.10.1, "Apache-2.0 OR MIT",
httpdate: 1.0.3, "Apache-2.0 OR MIT",
human-repr: 1.1.0, "MIT",
humantime: 2.3.0, "Apache-2.0 OR MIT",
+hwlocality: 1.0.0-alpha.11, "MIT",
+hwlocality-sys: 0.6.1, "MIT",
hyper: 1.8.1, "MIT",
hyper-named-pipe: 0.1.0, "Apache-2.0",
hyper-rustls: 0.27.7, "Apache-2.0 OR ISC OR MIT",
@@ -436,6 +439,7 @@ journal: 0.1.0, "Apache-2.0",
js-sys: 0.3.83, "Apache-2.0 OR MIT",
jsonwebtoken: 10.2.0, "MIT",
jwalk: 0.8.1, "MIT",
+keccak: 0.1.5, "Apache-2.0 OR MIT",
keyring: 3.6.3, "Apache-2.0 OR MIT",
kqueue: 1.1.1, "MIT",
kqueue-sys: 1.0.4, "MIT",
@@ -720,6 +724,7 @@ serial_test_derive: 3.2.0, "MIT",
server: 0.6.1-edge.1, "Apache-2.0",
sha1: 0.10.6, "Apache-2.0 OR MIT",
sha2: 0.10.9, "Apache-2.0 OR MIT",
+sha3: 0.10.8, "Apache-2.0 OR MIT",
sharded-slab: 0.1.7, "MIT",
shlex: 1.3.0, "Apache-2.0 OR MIT",
signal-hook-registry: 1.4.7, "Apache-2.0 OR MIT",
@@ -771,6 +776,7 @@ sysinfo: 0.34.2, "MIT",
sysinfo: 0.37.2, "MIT",
tagptr: 0.2.0, "Apache-2.0 OR MIT",
tap: 1.0.1, "MIT",
+tar: 0.4.44, "Apache-2.0 OR MIT",
tempfile: 3.23.0, "Apache-2.0 OR MIT",
terminal_size: 0.4.3, "Apache-2.0 OR MIT",
termtree: 0.5.1, "MIT",
diff --git a/Dockerfile b/Dockerfile
index 588cebdb2..f4ad3304b 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -24,6 +24,8 @@ RUN apt-get update && apt-get install -y \
build-essential \
pkg-config \
libssl-dev \
+ libhwloc-dev \
+ libudev-dev
&& rm -rf /var/lib/apt/lists/*
COPY . .
RUN cargo build --bin iggy --release
diff --git a/bdd/rust/Dockerfile b/bdd/rust/Dockerfile
index 0f1032f2f..288fefd10 100644
--- a/bdd/rust/Dockerfile
+++ b/bdd/rust/Dockerfile
@@ -20,6 +20,12 @@ FROM rust:${RUST_VERSION}
WORKDIR /app
+RUN apt-get update && apt-get install -y \
+ libhwloc-dev \
+ libudev-dev \
+ pkg-config \
+ && rm -rf /var/lib/apt/lists/*
+
# Copy everything
COPY . .
diff --git a/core/configs/server.toml b/core/configs/server.toml
index b1a25b61d..9e075eff4 100644
--- a/core/configs/server.toml
+++ b/core/configs/server.toml
@@ -556,7 +556,10 @@ ports = { tcp = 8091, quic = 8081, http = 3001, websocket
= 8093 }
# - "all": Use all available CPU cores (default)
# - numeric value (e.g. 4): Use 4 shards (4 threads pinned to cores 0, 1, 2, 3)
# - range (e.g. "5..8"): Use 3 shards with affinity to cores 5, 6, 7
-cpu_allocation = "all"
+# - numa settings:
+# + "numa:auto": Use all available numa node, cores
+# + "numa:nodes=0,1;cores=4;no_ht=true": Use NUMA node 0 and 1, each nodes
use 4 cores, and no hyperthreads
+cpu_allocation = "numa:auto"
[websocket]
enabled = true
diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml
index bb375bdf5..bbd5e699e 100644
--- a/core/server/Cargo.toml
+++ b/core/server/Cargo.toml
@@ -71,6 +71,7 @@ flume = { workspace = true }
futures = { workspace = true }
hash32 = "1.0.0"
human-repr = { workspace = true }
+
iggy_common = { workspace = true }
jsonwebtoken = { version = "10.2.0", features = ["rust_crypto"] }
lending-iterator = "0.1.7"
@@ -127,6 +128,12 @@ twox-hash = { workspace = true }
ulid = "1.2.1"
uuid = { workspace = true }
+[target.'cfg(not(target_env = "musl"))'.dependencies]
+hwlocality = { workspace = true }
+
+[target.'cfg(target_env = "musl")'.dependencies]
+hwlocality = { workspace = true, features = ["vendored"] }
+
[build-dependencies]
figment = { version = "0.10.19", features = ["json", "toml", "env"] }
vergen-git2 = { version = "1.0.7", features = [
diff --git a/core/server/Dockerfile b/core/server/Dockerfile
index 52f9100d4..3a0ad7f94 100644
--- a/core/server/Dockerfile
+++ b/core/server/Dockerfile
@@ -97,6 +97,11 @@ ARG TARGETPLATFORM
ARG PREBUILT_IGGY_SERVER
ARG PREBUILT_IGGY_CLI
WORKDIR /app
+RUN apt-get update && apt-get install -y \
+ libhwloc-dev \
+ libudev-dev \
+ pkg-config \
+ && rm -rf /var/lib/apt/lists/*
COPY --from=prebuilt /out/iggy-server /usr/local/bin/iggy-server
COPY --from=prebuilt /out/iggy /usr/local/bin/iggy
RUN echo "═══════════════════════════════════════════════════════════════" && \
@@ -120,6 +125,11 @@ ARG TARGETPLATFORM
ARG PROFILE=release
ARG LIBC=musl
WORKDIR /app
+RUN apt-get update && apt-get install -y \
+ libhwloc15 \
+ libudev1 \
+ pkg-config \
+ && rm -rf /var/lib/apt/lists/*
COPY --from=builder /app/iggy-server /usr/local/bin/iggy-server
COPY --from=builder /app/iggy /usr/local/bin/iggy
RUN echo "═══════════════════════════════════════════════════════════════" && \
diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs
index 3f95da049..af066da2d 100644
--- a/core/server/src/bootstrap.rs
+++ b/core/server/src/bootstrap.rs
@@ -23,6 +23,7 @@ use crate::{
configs::{
cache_indexes::CacheIndexesConfig,
server::ServerConfig,
+ sharding::ShardInfo,
system::{INDEX_EXTENSION, LOG_EXTENSION, SystemConfig},
},
io::fs_utils::{self, DirEntry},
@@ -250,13 +251,16 @@ pub fn load_users(state: impl IntoIterator<Item =
UserState>) -> Users {
}
pub fn create_shard_connections(
- shards_set: &HashSet<usize>,
+ shard_assignment: &[ShardInfo],
) -> (Vec<ShardConnector<ShardFrame>>, Vec<(u16, StopSender)>) {
- let shards_count = shards_set.len();
-
// Create connectors with sequential IDs (0, 1, 2, ...) regardless of CPU
core numbers
- let connectors: Vec<ShardConnector<ShardFrame>> = (0..shards_count)
- .map(|idx| ShardConnector::new(idx as u16))
+ let connectors: Vec<ShardConnector<ShardFrame>> = shard_assignment
+ .iter()
+ .enumerate()
+ .map(|(idx, _assignment)| {
+ // let cpu_id = assignment.cpu_set.iter().next().unwrap_or(&idx);
+ ShardConnector::new(idx as u16)
+ })
.collect();
let shutdown_handles = connectors
diff --git a/core/server/src/configs/sharding.rs
b/core/server/src/configs/sharding.rs
index aa8214f6a..4ce31889f 100644
--- a/core/server/src/configs/sharding.rs
+++ b/core/server/src/configs/sharding.rs
@@ -16,10 +16,19 @@
* under the License.
*/
+use hwlocality::Topology;
+use hwlocality::bitmap::SpecializedBitmapRef;
+use hwlocality::cpu::cpuset::CpuSet;
+use hwlocality::memory::binding::{MemoryBindingFlags, MemoryBindingPolicy};
+use hwlocality::object::types::ObjectType::{self, NUMANode};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::collections::HashSet;
use std::str::FromStr;
+use std::sync::Arc;
use std::thread::available_parallelism;
+use tracing::info;
+
+use crate::server_error::ServerError;
#[derive(Debug, Deserialize, Serialize, Default)]
pub struct ShardingConfig {
@@ -33,21 +42,440 @@ pub enum CpuAllocation {
All,
Count(usize),
Range(usize, usize),
+ NumaAware(NumaConfig),
+}
+
+/// NUMA specific configuration
+#[derive(Debug, Clone, PartialEq, Default)]
+pub struct NumaConfig {
+ /// Which NUMA nodes to use (empty = auto-detect all)
+ pub nodes: Vec<usize>,
+ /// Cores per node to use (0 = use all available)
+ pub cores_per_node: usize,
+ /// skip hyperthread sibling
+ pub avoid_hyperthread: bool,
+}
+
+impl NumaConfig {
+ pub fn validate(&self, topology: &NumaTopology) -> Result<(), ServerError>
{
+ let available_nodes = topology.node_count;
+
+ if available_nodes == 0 {
+ return Err(ServerError::NoNumaNodes);
+ }
+
+ for &node in &self.nodes {
+ if node >= available_nodes {
+ return Err(ServerError::InvalidNode {
+ requested: node,
+ available: available_nodes,
+ });
+ }
+ }
+
+ // Validate core per node
+ if self.cores_per_node > 0 {
+ for &node in &self.nodes {
+ let available_cores = if self.avoid_hyperthread {
+ topology.physical_cores_for_node(node)
+ } else {
+ topology.logical_cores_for_node(node)
+ };
+
+ info!(
+ "core_per_node: {}, available_cores: {}",
+ self.cores_per_node, available_cores
+ );
+
+ if self.cores_per_node > available_cores {
+ return Err(ServerError::InsufficientCores {
+ requested: self.cores_per_node,
+ available: available_cores,
+ node,
+ });
+ }
+ }
+ }
+
+ Ok(())
+ }
+
+ // pub fn or_fallback(&self, topology: &NumaTopology) -> CpuAllocation {
+ // if topology.node_count < 2 {
+ // tracing::warn!(
+ // "NUMA requested but only {} node detected, falling back to
count",
+ // topology.node_count
+ // );
+ // }
+ //
+ // todo!()
+ // }
}
impl CpuAllocation {
- pub fn to_shard_set(&self) -> HashSet<usize> {
- match self {
+ fn parse_numa(s: &str) -> Result<CpuAllocation, String> {
+ let params = s
+ .strip_prefix("numa:")
+ .ok_or_else(|| "Numa config must start with 'numa:'".to_string())?;
+
+ if params == "auto" {
+ return Ok(CpuAllocation::NumaAware(NumaConfig {
+ nodes: vec![],
+ cores_per_node: 0,
+ avoid_hyperthread: true,
+ }));
+ }
+
+ let mut nodes = Vec::new();
+ let mut cores_per_node = 0;
+ let mut avoid_hyperthread = true;
+
+ for param in params.split(';') {
+ let kv: Vec<&str> = param.split('=').collect();
+ if kv.len() != 2 {
+ return Err(format!(
+ "Invalid NUMA parameter: '{param}', only available: 'auto'"
+ ));
+ }
+
+ match kv[0] {
+ "nodes" => {
+ nodes = kv[1]
+ .split(',')
+ .map(|n| {
+ n.parse::<usize>()
+ .map_err(|_| format!("Invalid node number:
{n}"))
+ })
+ .collect::<Result<Vec<_>, _>>()?;
+ }
+ "cores" => {
+ cores_per_node = kv[1]
+ .parse::<usize>()
+ .map_err(|_| format!("Invalid cores value: {}",
kv[1]))?;
+ }
+ "no_ht" => {
+ avoid_hyperthread = kv[1]
+ .parse::<bool>()
+ .map_err(|_| format!("Invalid no ht value: {}",
kv[1]))?;
+ }
+ _ => {
+ return Err(format!(
+ "Unknown NUMA parameter: {}, example:
numa:nodes=0;cores=4;no_ht=true",
+ kv[0]
+ ));
+ }
+ }
+ }
+
+ Ok(CpuAllocation::NumaAware(NumaConfig {
+ nodes,
+ cores_per_node,
+ avoid_hyperthread,
+ }))
+ }
+}
+
+#[derive(Debug)]
+pub struct NumaTopology {
+ topology: Topology,
+ node_count: usize,
+ physical_cores_per_node: Vec<usize>,
+ logical_cores_per_node: Vec<usize>,
+}
+
+impl NumaTopology {
+ pub fn detect() -> Result<NumaTopology, ServerError> {
+ let topology =
+ Topology::new().map_err(|e| ServerError::TopologyDetection { msg:
e.to_string() })?;
+
+ let numa_nodes: Vec<_> =
topology.objects_with_type(NUMANode).collect();
+
+ let node_count = numa_nodes.len();
+
+ if node_count == 0 {
+ return Err(ServerError::NoNumaNodes);
+ }
+
+ let mut physical_cores_per_node = Vec::new();
+ let mut logical_cores_per_node = Vec::new();
+
+ for node in numa_nodes {
+ let cpuset = node.cpuset().ok_or(ServerError::TopologyDetection {
+ msg: "NUMA node has no CPU set".to_string(),
+ })?;
+
+ let logical_cores = cpuset.weight().unwrap_or(0);
+
+ let physical_cores = topology
+ .objects_with_type(ObjectType::Core)
+ .filter(|core| {
+ if let Some(core_cpuset) = core.cpuset() {
+ !(cpuset & core_cpuset).is_empty()
+ } else {
+ false
+ }
+ })
+ .count();
+
+ physical_cores_per_node.push(physical_cores);
+ logical_cores_per_node.push(logical_cores);
+ }
+
+ Ok(Self {
+ topology,
+ node_count,
+ physical_cores_per_node,
+ logical_cores_per_node,
+ })
+ }
+
+ pub fn physical_cores_for_node(&self, node: usize) -> usize {
+ self.physical_cores_per_node.get(node).copied().unwrap_or(0)
+ }
+
+ pub fn logical_cores_for_node(&self, node: usize) -> usize {
+ self.logical_cores_per_node.get(node).copied().unwrap_or(0)
+ }
+
+ fn filter_physical_cores(&self, node_cpuset: CpuSet) -> CpuSet {
+ let mut physical_cpuset = CpuSet::new();
+ for core in self.topology.objects_with_type(ObjectType::Core) {
+ if let Some(core_cpuset) = core.cpuset() {
+ let intersection = node_cpuset.clone() & core_cpuset;
+ if !intersection.is_empty() {
+ // Take the minimum (first) CPU ID for consistency
+ if let Some(first_cpu) = intersection.iter_set().min() {
+ physical_cpuset.set(first_cpu)
+ }
+ }
+ }
+ }
+ physical_cpuset
+ }
+
+ /// Get CPU set for a NUMA node
+ fn get_cpuset_for_node(
+ &self,
+ node_id: usize,
+ avoid_hyperthread: bool,
+ ) -> Result<CpuSet, ServerError> {
+ let node = self
+ .topology
+ .objects_with_type(ObjectType::NUMANode)
+ .nth(node_id)
+ .ok_or(ServerError::InvalidNode {
+ requested: node_id,
+ available: self.node_count,
+ })?;
+
+ let cpuset_ref = node.cpuset().ok_or(ServerError::TopologyDetection {
+ msg: format!("Node {} has no CPU set", node_id),
+ })?;
+
+ let cpuset = SpecializedBitmapRef::to_owned(&cpuset_ref);
+
+ if avoid_hyperthread {
+ Ok(self.filter_physical_cores(cpuset))
+ } else {
+ Ok(cpuset)
+ }
+ }
+}
+
+#[derive(Debug, Clone)]
+pub struct ShardInfo {
+ /// CPUs this shard should use
+ pub cpu_set: HashSet<usize>,
+ /// NUMA node
+ pub numa_node: Option<usize>,
+}
+
+impl ShardInfo {
+ pub fn bind_memory(&self) -> Result<(), ServerError> {
+ if let Some(node_id) = self.numa_node {
+ let topology = Topology::new().map_err(|err|
ServerError::TopologyDetection {
+ msg: err.to_string(),
+ })?;
+
+ let node = topology
+ .objects_with_type(ObjectType::NUMANode)
+ .nth(node_id)
+ .ok_or(ServerError::InvalidNode {
+ requested: node_id,
+ available:
topology.objects_with_type(ObjectType::NUMANode).count(),
+ })?;
+
+ if let Some(nodeset) = node.nodeset() {
+ topology
+ .bind_memory(
+ nodeset,
+ MemoryBindingPolicy::Bind,
+ MemoryBindingFlags::THREAD |
MemoryBindingFlags::STRICT,
+ )
+ .map_err(|err| {
+ tracing::error!("Failed to bind memory {:?}", err);
+ ServerError::BindingFailed
+ })?;
+
+ info!("Memory bound to NUMA node {node_id}");
+ }
+ }
+
+ Ok(())
+ }
+}
+
+pub struct ShardAllocator {
+ allocation: CpuAllocation,
+ topology: Option<Arc<NumaTopology>>,
+}
+
+impl ShardAllocator {
+ pub fn new(allocation: &CpuAllocation) -> Result<ShardAllocator,
ServerError> {
+ let topology = if matches!(allocation, CpuAllocation::NumaAware(_)) {
+ let numa_topology = NumaTopology::detect()?;
+
+ Some(Arc::new(numa_topology))
+ } else {
+ None
+ };
+
+ Ok(Self {
+ allocation: allocation.clone(),
+ topology,
+ })
+ }
+
+ pub fn to_shard_assignments(&self) -> Result<Vec<ShardInfo>, ServerError> {
+ match &self.allocation {
CpuAllocation::All => {
let available_cpus = available_parallelism()
- .expect("Failed to get num of cores")
+ .map_err(|err| ServerError::Other {
+ msg: format!("Failed to get available_parallelism:
{:?}", err),
+ })?
.get();
- (0..available_cpus).collect()
+
+ let shard_assignments: Vec<_> = (0..available_cpus)
+ .map(|cpu_id| ShardInfo {
+ cpu_set: HashSet::from([cpu_id]),
+ numa_node: None,
+ })
+ .collect();
+
+ info!(
+ "Using all available CPU cores ({} shards with affinity)",
+ shard_assignments.len()
+ );
+
+ Ok(shard_assignments)
+ }
+ CpuAllocation::Count(count) => {
+ let shard_assignments = (0..*count)
+ .map(|cpu_id| ShardInfo {
+ cpu_set: HashSet::from([cpu_id]),
+ numa_node: None,
+ })
+ .collect();
+
+ info!("Using {count} shards with affinity to cores
0..{count}");
+
+ Ok(shard_assignments)
+ }
+ CpuAllocation::Range(start, end) => {
+ let shard_assignments = (*start..*end)
+ .map(|cpu_id| ShardInfo {
+ cpu_set: HashSet::from([cpu_id]),
+ numa_node: None,
+ })
+ .collect();
+
+ info!(
+ "Using {} shards with affinity to cores {start}..{end}",
+ end - start
+ );
+
+ Ok(shard_assignments)
+ }
+ CpuAllocation::NumaAware(numa_config) => {
+ let topology =
self.topology.as_ref().ok_or(ServerError::NoTopology)?;
+ self.compute_numa_assignments(topology, numa_config)
}
- CpuAllocation::Count(count) => (0..*count).collect(),
- CpuAllocation::Range(start, end) => (*start..*end).collect(),
}
}
+
+ fn compute_numa_assignments(
+ &self,
+ topology: &NumaTopology,
+ numa: &NumaConfig,
+ ) -> Result<Vec<ShardInfo>, ServerError> {
+ // Determine which noes to use
+ let nodes = if numa.nodes.is_empty() {
+ // Auto: use all nodes
+ (0..topology.node_count).collect()
+ } else {
+ numa.nodes.clone()
+ };
+
+ // Determine cores per node
+ let cores_per_node = if numa.cores_per_node == 0 {
+ // Auto: use all available
+ if numa.avoid_hyperthread {
+ topology.physical_cores_for_node(nodes[0])
+ } else {
+ topology.logical_cores_for_node(nodes[0])
+ }
+ } else {
+ numa.cores_per_node
+ };
+
+ let mut shard_infos = Vec::new();
+
+ let node_cpus: Vec<Vec<usize>> = nodes
+ .iter()
+ .map(|&node_id| {
+ let cpuset = topology.get_cpuset_for_node(node_id,
numa.avoid_hyperthread)?;
+ Ok(cpuset.iter_set().map(usize::from).collect())
+ })
+ .collect::<Result<_, ServerError>>()?;
+
+ // For each node, create one shard per core (thread-per-core)
+ for (idx, &node_id) in nodes.iter().enumerate() {
+ let available_cpus = &node_cpus[idx];
+
+ // Take first core_per_node CPUs from this node
+ let cores_to_use: Vec<usize> = available_cpus
+ .iter()
+ .take(cores_per_node)
+ .copied()
+ .collect();
+
+ if cores_to_use.len() < cores_per_node {
+ return Err(ServerError::InsufficientCores {
+ requested: cores_per_node,
+ available: available_cpus.len(),
+ node: node_id,
+ });
+ }
+
+ // Create one shard per core
+ for cpu_id in cores_to_use {
+ shard_infos.push(ShardInfo {
+ cpu_set: HashSet::from([cpu_id]),
+ numa_node: Some(node_id),
+ });
+ }
+ }
+
+ info!(
+ "Using {} shards with {} NUMA node, {} cores per node, and avoid
hyperthread {}",
+ shard_infos.len(),
+ nodes.len(),
+ cores_per_node,
+ numa.avoid_hyperthread
+ );
+
+ Ok(shard_infos)
+ }
}
impl FromStr for CpuAllocation {
@@ -56,6 +484,7 @@ impl FromStr for CpuAllocation {
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"all" => Ok(CpuAllocation::All),
+ s if s.starts_with("numa:") => Self::parse_numa(s),
s if s.contains("..") => {
let parts: Vec<&str> = s.split("..").collect();
if parts.len() != 2 {
@@ -90,6 +519,25 @@ impl Serialize for CpuAllocation {
CpuAllocation::Range(start, end) => {
serializer.serialize_str(&format!("{start}..{end}"))
}
+ CpuAllocation::NumaAware(numa) => {
+ if numa.nodes.is_empty() && numa.cores_per_node == 0 {
+ serializer.serialize_str("numa:auto")
+ } else {
+ let nodes_str = numa
+ .nodes
+ .iter()
+ .map(|n| n.to_string())
+ .collect::<Vec<_>>()
+ .join(",");
+
+ let full_str = format!(
+ "numa:nodes={};cores:{};no_ht={}",
+ nodes_str, numa.cores_per_node, numa.avoid_hyperthread
+ );
+
+ serializer.serialize_str(&full_str)
+ }
+ }
}
}
}
@@ -114,3 +562,55 @@ impl<'de> Deserialize<'de> for CpuAllocation {
}
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_parse_all() {
+ assert_eq!(CpuAllocation::from_str("all").unwrap(),
CpuAllocation::All);
+ }
+
+ #[test]
+ fn test_parse_count() {
+ assert_eq!(
+ CpuAllocation::from_str("4").unwrap(),
+ CpuAllocation::Count(4)
+ );
+ }
+
+ #[test]
+ fn test_parse_range() {
+ assert_eq!(
+ CpuAllocation::from_str("2..8").unwrap(),
+ CpuAllocation::Range(2, 8)
+ );
+ }
+
+ #[test]
+ fn test_parse_numa_auto() {
+ let result = CpuAllocation::from_str("numa:auto").unwrap();
+ match result {
+ CpuAllocation::NumaAware(numa) => {
+ assert!(numa.nodes.is_empty());
+ assert_eq!(numa.cores_per_node, 0);
+ assert!(numa.avoid_hyperthread);
+ }
+ _ => panic!("Expected NumaAware"),
+ }
+ }
+
+ #[test]
+ fn test_parse_numa_explicit() {
+ let result =
CpuAllocation::from_str("numa:nodes=0,1;cores=4;no_ht=true").unwrap();
+ match result {
+ CpuAllocation::NumaAware(numa) => {
+ assert_eq!(numa.nodes, vec![0, 1]);
+ assert_eq!(numa.cores_per_node, 4);
+ assert!(numa.avoid_hyperthread);
+ }
+ _ => panic!("Expected NumaAware"),
+ }
+ }
+}
diff --git a/core/server/src/configs/validators.rs
b/core/server/src/configs/validators.rs
index ab0547dea..77e792023 100644
--- a/core/server/src/configs/validators.rs
+++ b/core/server/src/configs/validators.rs
@@ -25,6 +25,7 @@ use super::sharding::{CpuAllocation, ShardingConfig};
use super::system::{CompressionConfig, PartitionConfig};
use crate::configs::COMPONENT;
use crate::configs::server::{MemoryPoolConfig, PersonalAccessTokenConfig,
ServerConfig};
+use crate::configs::sharding::NumaTopology;
use crate::configs::system::SegmentConfig;
use crate::streaming::segments::*;
use err_trail::ErrContext;
@@ -321,6 +322,18 @@ impl Validatable<ConfigurationError> for ShardingConfig {
}
Ok(())
}
+ CpuAllocation::NumaAware(numa_config) => match
NumaTopology::detect() {
+ // TODO: dry the validation, already validate it from the
shard allocation
+ Ok(topology) => numa_config.validate(&topology).map_err(|e| {
+ eprintln!("Invalid NUMA configuration: {}", e);
+ ConfigurationError::InvalidConfigurationValue
+ }),
+ Err(e) => {
+ eprintln!("Failed to detect NUMA topology: {}", e);
+ eprintln!("NUMA allocation requested but system doesn't
support it");
+ Err(ConfigurationError::InvalidConfigurationValue)
+ }
+ },
}
}
}
diff --git a/core/server/src/main.rs b/core/server/src/main.rs
index 4d9d0f08e..e2d47f79c 100644
--- a/core/server/src/main.rs
+++ b/core/server/src/main.rs
@@ -30,7 +30,7 @@ use server::bootstrap::{
create_directories, create_shard_connections, create_shard_executor,
load_config, load_streams,
load_users, resolve_persister, update_system_info,
};
-use server::configs::sharding::CpuAllocation;
+use server::configs::sharding::ShardAllocator;
use server::diagnostics::{print_io_uring_permission_info,
print_locked_memory_limit_info};
use server::io::fs_utils;
use server::log::logger::Logging;
@@ -49,7 +49,6 @@ use server::streaming::diagnostics::metrics::Metrics;
use server::streaming::storage::SystemStorage;
use server::streaming::utils::ptr::EternalPtr;
use server::versioning::SemanticVersion;
-use std::collections::HashSet;
use std::panic::AssertUnwindSafe;
use std::rc::Rc;
use std::str::FromStr;
@@ -284,24 +283,8 @@ fn main() -> Result<(), ServerError> {
let users = load_users(users_state.into_values());
// ELEVENTH DISCRETE LOADING STEP.
- let shards_set = config.system.sharding.cpu_allocation.to_shard_set();
- match &config.system.sharding.cpu_allocation {
- CpuAllocation::All => {
- info!(
- "Using all available CPU cores ({} shards with affinity)",
- shards_set.len()
- );
- }
- CpuAllocation::Count(count) => {
- info!("Using {count} shards with affinity to cores
0..{count}");
- }
- CpuAllocation::Range(start, end) => {
- info!(
- "Using {} shards with affinity to cores {start}..{end}",
- end - start
- );
- }
- }
+ let shard_allocator =
ShardAllocator::new(&config.system.sharding.cpu_allocation)?;
+ let shard_assignment = shard_allocator.to_shard_assignments()?;
#[cfg(feature = "disable-mimalloc")]
warn!("Using default system allocator because code was build with
`disable-mimalloc` feature");
@@ -313,9 +296,9 @@ fn main() -> Result<(), ServerError> {
let metrics = Metrics::init();
// TWELFTH DISCRETE LOADING STEP.
- let shards_count = shards_set.len();
- info!("Starting {} shard(s)", shards_count);
- let (connections, shutdown_handles) =
create_shard_connections(&shards_set);
+ info!("Starting {} shard(s)", shard_assignment.len());
+ let (connections, shutdown_handles) =
create_shard_connections(&shard_assignment);
+ let shards_count = shard_assignment.len();
let mut handles: Vec<JoinHandle<()>> =
Vec::with_capacity(shards_count);
// Channel for shard completion notifications
@@ -348,7 +331,7 @@ fn main() -> Result<(), ServerError> {
let ns = IggyNamespace::new(stream_id,
topic_id, partition_id);
let shard_id =
ShardId::new(calculate_shard_assignment(
&ns,
- shards_set.len() as u32,
+ shard_assignment.len() as u32,
));
shards_table.insert(ns, shard_id);
}
@@ -358,10 +341,10 @@ fn main() -> Result<(), ServerError> {
}
});
- for (id, cpu_id) in shards_set
+ for (id, assignment) in shard_assignment
.into_iter()
.enumerate()
- .map(|(idx, cpu)| (idx as u16, cpu))
+ .map(|(idx, assignment)| (idx as u16, assignment))
{
let streams = streams.clone();
let shards_table = shards_table.clone();
@@ -412,8 +395,11 @@ fn main() -> Result<(), ServerError> {
.name(format!("shard-{id}"))
.spawn(move || {
let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
- let affinity_set = HashSet::from([cpu_id]);
- let rt = create_shard_executor(affinity_set);
+ if let Err(e) = assignment.bind_memory() {
+ error!("Failed to bind memory: {e:?}");
+ }
+
+ let rt = create_shard_executor(assignment.cpu_set);
rt.block_on(async move {
let builder = IggyShard::builder();
let shard = builder
@@ -438,6 +424,7 @@ fn main() -> Result<(), ServerError> {
return Err(e.to_string());
}
info!("Shard {id} run completed");
+
Ok(())
})
}));
diff --git a/core/server/src/server_error.rs b/core/server/src/server_error.rs
index c7e211465..f49e10468 100644
--- a/core/server/src/server_error.rs
+++ b/core/server/src/server_error.rs
@@ -23,7 +23,7 @@ use std::array::TryFromSliceError;
use std::io;
error_set!(
- ServerError := ConfigurationError || ArchiverError || ConnectionError ||
LogError || CompatError || QuicError || ShardError
+ ServerError := NumaError || ConfigurationError || ArchiverError ||
ConnectionError || LogError || CompatError || QuicError || ShardError
IoError := {
#[display("IO error")]
@@ -36,6 +36,37 @@ error_set!(
ReadToEndError(ReadError)
}
+ NumaError := {
+ #[display("Failed to detect topology: {}", msg)]
+ TopologyDetection {
+ msg: String
+ },
+
+ #[display("There is no NUMA node on this server")]
+ NoNumaNodes,
+
+ #[display("No Topology")]
+ NoTopology,
+
+ #[display("Binding Failed")]
+ BindingFailed,
+
+ #[display("Insufficient cores on node {}: requested {}, only {}
available", node, requested, available)]
+ InsufficientCores {
+ requested: usize,
+ available: usize,
+ node: usize,
+ },
+
+ #[display("Invalid NUMA node: requested {}, only available {} node",
requested, available)]
+ InvalidNode { requested: usize, available: usize },
+
+ #[display("Other error: {}", msg)]
+ Other {
+ msg: String
+ },
+ }
+
ConfigurationError := {
ConfigurationError(iggy_common::ConfigurationError),
}