This is an automated email from the ASF dual-hosted git repository.

piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new a5d569450 feat(server): NUMA awareness (#2412)
a5d569450 is described below

commit a5d569450ee34441be997786046c7a30785e11f2
Author: tungtose <[email protected]>
AuthorDate: Mon Dec 15 16:02:20 2025 +0700

    feat(server): NUMA awareness (#2412)
    
    This PR addressing #2387
---
 .github/actions/rust/pre-merge/action.yml          |   4 +-
 .../actions/utils/setup-rust-with-cache/action.yml |  13 +
 Cargo.lock                                         |  75 +++
 Cargo.toml                                         |   1 +
 DEPENDENCIES.md                                    |   6 +
 Dockerfile                                         |   2 +
 bdd/rust/Dockerfile                                |   6 +
 core/configs/server.toml                           |   5 +-
 core/server/Cargo.toml                             |   7 +
 core/server/Dockerfile                             |  10 +
 core/server/src/bootstrap.rs                       |  14 +-
 core/server/src/configs/sharding.rs                | 512 ++++++++++++++++++++-
 core/server/src/configs/validators.rs              |  13 +
 core/server/src/main.rs                            |  43 +-
 core/server/src/server_error.rs                    |  33 +-
 15 files changed, 702 insertions(+), 42 deletions(-)

diff --git a/.github/actions/rust/pre-merge/action.yml 
b/.github/actions/rust/pre-merge/action.yml
index 346a8c7fa..bf3bcb920 100644
--- a/.github/actions/rust/pre-merge/action.yml
+++ b/.github/actions/rust/pre-merge/action.yml
@@ -130,7 +130,7 @@ runs:
       shell: bash
 
     - name: Install musl tools for aarch64-musl
-      if: inputs.task == 'build-aarch64-musl'
+      if: inputs.task == 'build-aarch64-musl' && runner.os == 'Linux'
       run: |
         sudo apt-get update && sudo apt-get install -y musl-tools
         rustup target add aarch64-unknown-linux-musl
@@ -144,6 +144,8 @@ runs:
         # Disable GCC outline atomics to avoid undefined __aarch64_ldadd4_sync
         # references when linking dbus-sys (required by keyring crate)
         CFLAGS: "-mno-outline-atomics"
+        PKG_CONFIG_ALLOW_CROSS: "1"
+        PKG_CONFIG_ALL_STATIC: "1"
 
     # macOS builds
     - name: Build macOS aarch64
diff --git a/.github/actions/utils/setup-rust-with-cache/action.yml 
b/.github/actions/utils/setup-rust-with-cache/action.yml
index a1bc0904e..c2f362b5b 100644
--- a/.github/actions/utils/setup-rust-with-cache/action.yml
+++ b/.github/actions/utils/setup-rust-with-cache/action.yml
@@ -37,6 +37,19 @@ inputs:
 runs:
   using: "composite"
   steps:
+    - name: Install system dependencies
+      if: runner.os == 'Linux'
+      run: |
+        sudo apt-get update
+        sudo apt-get install -y libhwloc-dev pkg-config libudev-dev
+      shell: bash
+
+    - name: Install system dependencies (macOS)
+      if: runner.os == 'macOS'
+      run: |
+        brew install hwloc pkg-config
+      shell: bash
+
     - name: Setup Rust toolchain
       run: |
         echo "Using Rust toolchain from rust-toolchain.toml: $(rustup show)"
diff --git a/Cargo.lock b/Cargo.lock
index 839063828..351efd920 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -892,6 +892,15 @@ version = "1.5.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8"
 
+[[package]]
+name = "autotools"
+version = "0.2.7"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "ef941527c41b0fc0dd48511a8154cd5fc7e29200a0ff8b7203c5d777dbc795cf"
+dependencies = [
+ "cc",
+]
+
 [[package]]
 name = "aws-lc-rs"
 version = "1.15.1"
@@ -4077,6 +4086,40 @@ version = "2.3.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "135b12329e5e3ce057a9f972339ea52bc954fe1e9358ef27f95e89716fbc5424"
 
+[[package]]
+name = "hwlocality"
+version = "1.0.0-alpha.11"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "f50d4312588681f6d07e6009728bf5c777e1f674d43a3ad91d15f6795a0db965"
+dependencies = [
+ "arrayvec",
+ "bitflags 2.10.0",
+ "derive_more 2.1.0",
+ "errno",
+ "hwlocality-sys",
+ "libc",
+ "strum 0.27.2",
+ "thiserror 2.0.17",
+ "windows-sys 0.61.2",
+]
+
+[[package]]
+name = "hwlocality-sys"
+version = "0.6.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "98f0a5f1ab804ba087ef715ce5cd4feaab6222a2ef6b3e9d5ae1536e90393728"
+dependencies = [
+ "autotools",
+ "cmake",
+ "flate2",
+ "libc",
+ "pkg-config",
+ "sha3",
+ "tar",
+ "ureq",
+ "windows-sys 0.61.2",
+]
+
 [[package]]
 name = "hyper"
 version = "1.8.1"
@@ -5143,6 +5186,15 @@ dependencies = [
  "rayon",
 ]
 
+[[package]]
+name = "keccak"
+version = "0.1.5"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "ecc2af9a1119c51f12a14607e783cb977bde58bc069ff0c3da1095e635d70654"
+dependencies = [
+ "cpufeatures",
+]
+
 [[package]]
 name = "keyring"
 version = "3.6.3"
@@ -8172,6 +8224,7 @@ dependencies = [
  "futures",
  "hash32 1.0.0",
  "human-repr",
+ "hwlocality",
  "iggy_common",
  "jsonwebtoken",
  "lending-iterator",
@@ -8239,6 +8292,16 @@ dependencies = [
  "digest",
 ]
 
+[[package]]
+name = "sha3"
+version = "0.10.8"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "75872d278a8f37ef87fa0ddbda7802605cb18344497949862c0d4dcb291eba60"
+dependencies = [
+ "digest",
+ "keccak",
+]
+
 [[package]]
 name = "sharded-slab"
 version = "0.1.7"
@@ -8855,6 +8918,17 @@ version = "1.0.1"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369"
 
+[[package]]
+name = "tar"
+version = "0.4.44"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "1d863878d212c87a19c1a610eb53bb01fe12951c0501cf5a0d65f724914a667a"
+dependencies = [
+ "filetime",
+ "libc",
+ "xattr",
+]
+
 [[package]]
 name = "tempfile"
 version = "3.23.0"
@@ -9737,6 +9811,7 @@ source = 
"registry+https://github.com/rust-lang/crates.io-index";
 checksum = "d39cb1dbab692d82a977c0392ffac19e188bd9186a9f32806f0aaa859d75585a"
 dependencies = [
  "base64 0.22.1",
+ "flate2",
  "log",
  "percent-encoding",
  "rustls",
diff --git a/Cargo.toml b/Cargo.toml
index 87ecac08c..f2491ac6d 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -132,6 +132,7 @@ futures = "0.3.31"
 futures-util = "0.3.31"
 human-repr = "1.1.0"
 humantime = "2.3.0"
+hwlocality = "1.0.0-alpha.11"
 iceberg = "=0.6.0"
 iceberg-catalog-rest = "=0.6.0"
 iggy = { path = "core/sdk", version = "0.8.1-edge.1" }
diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md
index 51b3e5d83..1ba074063 100644
--- a/DEPENDENCIES.md
+++ b/DEPENDENCIES.md
@@ -73,6 +73,7 @@ atomic: 0.6.1, "Apache-2.0 OR MIT",
 atomic-polyfill: 1.0.3, "Apache-2.0 OR MIT",
 atomic-waker: 1.1.2, "Apache-2.0 OR MIT",
 autocfg: 1.5.0, "Apache-2.0 OR MIT",
+autotools: 0.2.7, "MIT",
 aws-lc-rs: 1.15.1, "(Apache-2.0 OR ISC) AND ISC",
 aws-lc-sys: 0.34.0, "(Apache-2.0 OR ISC) AND ISC AND OpenSSL",
 axum: 0.8.7, "MIT",
@@ -367,6 +368,8 @@ httparse: 1.10.1, "Apache-2.0 OR MIT",
 httpdate: 1.0.3, "Apache-2.0 OR MIT",
 human-repr: 1.1.0, "MIT",
 humantime: 2.3.0, "Apache-2.0 OR MIT",
+hwlocality: 1.0.0-alpha.11, "MIT",
+hwlocality-sys: 0.6.1, "MIT",
 hyper: 1.8.1, "MIT",
 hyper-named-pipe: 0.1.0, "Apache-2.0",
 hyper-rustls: 0.27.7, "Apache-2.0 OR ISC OR MIT",
@@ -436,6 +439,7 @@ journal: 0.1.0, "Apache-2.0",
 js-sys: 0.3.83, "Apache-2.0 OR MIT",
 jsonwebtoken: 10.2.0, "MIT",
 jwalk: 0.8.1, "MIT",
+keccak: 0.1.5, "Apache-2.0 OR MIT",
 keyring: 3.6.3, "Apache-2.0 OR MIT",
 kqueue: 1.1.1, "MIT",
 kqueue-sys: 1.0.4, "MIT",
@@ -720,6 +724,7 @@ serial_test_derive: 3.2.0, "MIT",
 server: 0.6.1-edge.1, "Apache-2.0",
 sha1: 0.10.6, "Apache-2.0 OR MIT",
 sha2: 0.10.9, "Apache-2.0 OR MIT",
+sha3: 0.10.8, "Apache-2.0 OR MIT",
 sharded-slab: 0.1.7, "MIT",
 shlex: 1.3.0, "Apache-2.0 OR MIT",
 signal-hook-registry: 1.4.7, "Apache-2.0 OR MIT",
@@ -771,6 +776,7 @@ sysinfo: 0.34.2, "MIT",
 sysinfo: 0.37.2, "MIT",
 tagptr: 0.2.0, "Apache-2.0 OR MIT",
 tap: 1.0.1, "MIT",
+tar: 0.4.44, "Apache-2.0 OR MIT",
 tempfile: 3.23.0, "Apache-2.0 OR MIT",
 terminal_size: 0.4.3, "Apache-2.0 OR MIT",
 termtree: 0.5.1, "MIT",
diff --git a/Dockerfile b/Dockerfile
index 588cebdb2..f4ad3304b 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -24,6 +24,8 @@ RUN apt-get update && apt-get install -y \
     build-essential \
     pkg-config \
     libssl-dev \
+    libhwloc-dev \
+    libudev-dev
     && rm -rf /var/lib/apt/lists/*
 COPY . .
 RUN cargo build --bin iggy --release
diff --git a/bdd/rust/Dockerfile b/bdd/rust/Dockerfile
index 0f1032f2f..288fefd10 100644
--- a/bdd/rust/Dockerfile
+++ b/bdd/rust/Dockerfile
@@ -20,6 +20,12 @@ FROM rust:${RUST_VERSION}
 
 WORKDIR /app
 
+RUN apt-get update && apt-get install -y \
+    libhwloc-dev \
+    libudev-dev \
+    pkg-config \
+    && rm -rf /var/lib/apt/lists/*
+
 # Copy everything
 COPY . .
 
diff --git a/core/configs/server.toml b/core/configs/server.toml
index b1a25b61d..9e075eff4 100644
--- a/core/configs/server.toml
+++ b/core/configs/server.toml
@@ -556,7 +556,10 @@ ports = { tcp = 8091, quic = 8081, http = 3001, websocket 
= 8093 }
 # - "all": Use all available CPU cores (default)
 # - numeric value (e.g. 4): Use 4 shards (4 threads pinned to cores 0, 1, 2, 3)
 # - range (e.g. "5..8"): Use 3 shards with affinity to cores 5, 6, 7
-cpu_allocation = "all"
+# - numa settings:
+#     + "numa:auto": Use all available numa node, cores
+#     + "numa:nodes=0,1;cores=4;no_ht=true": Use NUMA node 0 and 1, each nodes 
use 4 cores, and no hyperthreads
+cpu_allocation = "numa:auto"
 
 [websocket]
 enabled = true
diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml
index bb375bdf5..bbd5e699e 100644
--- a/core/server/Cargo.toml
+++ b/core/server/Cargo.toml
@@ -71,6 +71,7 @@ flume = { workspace = true }
 futures = { workspace = true }
 hash32 = "1.0.0"
 human-repr = { workspace = true }
+
 iggy_common = { workspace = true }
 jsonwebtoken = { version = "10.2.0", features = ["rust_crypto"] }
 lending-iterator = "0.1.7"
@@ -127,6 +128,12 @@ twox-hash = { workspace = true }
 ulid = "1.2.1"
 uuid = { workspace = true }
 
+[target.'cfg(not(target_env = "musl"))'.dependencies]
+hwlocality = { workspace = true }
+
+[target.'cfg(target_env = "musl")'.dependencies]
+hwlocality = { workspace = true, features = ["vendored"] }
+
 [build-dependencies]
 figment = { version = "0.10.19", features = ["json", "toml", "env"] }
 vergen-git2 = { version = "1.0.7", features = [
diff --git a/core/server/Dockerfile b/core/server/Dockerfile
index 52f9100d4..3a0ad7f94 100644
--- a/core/server/Dockerfile
+++ b/core/server/Dockerfile
@@ -97,6 +97,11 @@ ARG TARGETPLATFORM
 ARG PREBUILT_IGGY_SERVER
 ARG PREBUILT_IGGY_CLI
 WORKDIR /app
+RUN apt-get update && apt-get install -y \
+    libhwloc-dev \
+    libudev-dev \
+    pkg-config \
+    && rm -rf /var/lib/apt/lists/*
 COPY --from=prebuilt /out/iggy-server /usr/local/bin/iggy-server
 COPY --from=prebuilt /out/iggy        /usr/local/bin/iggy
 RUN echo "═══════════════════════════════════════════════════════════════" && \
@@ -120,6 +125,11 @@ ARG TARGETPLATFORM
 ARG PROFILE=release
 ARG LIBC=musl
 WORKDIR /app
+RUN apt-get update && apt-get install -y \
+    libhwloc15 \
+    libudev1 \
+    pkg-config \
+    && rm -rf /var/lib/apt/lists/*
 COPY --from=builder /app/iggy-server /usr/local/bin/iggy-server
 COPY --from=builder /app/iggy        /usr/local/bin/iggy
 RUN echo "═══════════════════════════════════════════════════════════════" && \
diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs
index 3f95da049..af066da2d 100644
--- a/core/server/src/bootstrap.rs
+++ b/core/server/src/bootstrap.rs
@@ -23,6 +23,7 @@ use crate::{
     configs::{
         cache_indexes::CacheIndexesConfig,
         server::ServerConfig,
+        sharding::ShardInfo,
         system::{INDEX_EXTENSION, LOG_EXTENSION, SystemConfig},
     },
     io::fs_utils::{self, DirEntry},
@@ -250,13 +251,16 @@ pub fn load_users(state: impl IntoIterator<Item = 
UserState>) -> Users {
 }
 
 pub fn create_shard_connections(
-    shards_set: &HashSet<usize>,
+    shard_assignment: &[ShardInfo],
 ) -> (Vec<ShardConnector<ShardFrame>>, Vec<(u16, StopSender)>) {
-    let shards_count = shards_set.len();
-
     // Create connectors with sequential IDs (0, 1, 2, ...) regardless of CPU 
core numbers
-    let connectors: Vec<ShardConnector<ShardFrame>> = (0..shards_count)
-        .map(|idx| ShardConnector::new(idx as u16))
+    let connectors: Vec<ShardConnector<ShardFrame>> = shard_assignment
+        .iter()
+        .enumerate()
+        .map(|(idx, _assignment)| {
+            // let cpu_id = assignment.cpu_set.iter().next().unwrap_or(&idx);
+            ShardConnector::new(idx as u16)
+        })
         .collect();
 
     let shutdown_handles = connectors
diff --git a/core/server/src/configs/sharding.rs 
b/core/server/src/configs/sharding.rs
index aa8214f6a..4ce31889f 100644
--- a/core/server/src/configs/sharding.rs
+++ b/core/server/src/configs/sharding.rs
@@ -16,10 +16,19 @@
  * under the License.
  */
 
+use hwlocality::Topology;
+use hwlocality::bitmap::SpecializedBitmapRef;
+use hwlocality::cpu::cpuset::CpuSet;
+use hwlocality::memory::binding::{MemoryBindingFlags, MemoryBindingPolicy};
+use hwlocality::object::types::ObjectType::{self, NUMANode};
 use serde::{Deserialize, Deserializer, Serialize, Serializer};
 use std::collections::HashSet;
 use std::str::FromStr;
+use std::sync::Arc;
 use std::thread::available_parallelism;
+use tracing::info;
+
+use crate::server_error::ServerError;
 
 #[derive(Debug, Deserialize, Serialize, Default)]
 pub struct ShardingConfig {
@@ -33,21 +42,440 @@ pub enum CpuAllocation {
     All,
     Count(usize),
     Range(usize, usize),
+    NumaAware(NumaConfig),
+}
+
+/// NUMA specific configuration
+#[derive(Debug, Clone, PartialEq, Default)]
+pub struct NumaConfig {
+    /// Which NUMA nodes to use (empty = auto-detect all)
+    pub nodes: Vec<usize>,
+    /// Cores per node to use (0 = use all available)
+    pub cores_per_node: usize,
+    /// skip hyperthread sibling
+    pub avoid_hyperthread: bool,
+}
+
+impl NumaConfig {
+    pub fn validate(&self, topology: &NumaTopology) -> Result<(), ServerError> 
{
+        let available_nodes = topology.node_count;
+
+        if available_nodes == 0 {
+            return Err(ServerError::NoNumaNodes);
+        }
+
+        for &node in &self.nodes {
+            if node >= available_nodes {
+                return Err(ServerError::InvalidNode {
+                    requested: node,
+                    available: available_nodes,
+                });
+            }
+        }
+
+        // Validate core per node
+        if self.cores_per_node > 0 {
+            for &node in &self.nodes {
+                let available_cores = if self.avoid_hyperthread {
+                    topology.physical_cores_for_node(node)
+                } else {
+                    topology.logical_cores_for_node(node)
+                };
+
+                info!(
+                    "core_per_node: {}, available_cores: {}",
+                    self.cores_per_node, available_cores
+                );
+
+                if self.cores_per_node > available_cores {
+                    return Err(ServerError::InsufficientCores {
+                        requested: self.cores_per_node,
+                        available: available_cores,
+                        node,
+                    });
+                }
+            }
+        }
+
+        Ok(())
+    }
+
+    // pub fn or_fallback(&self, topology: &NumaTopology) -> CpuAllocation {
+    //     if topology.node_count < 2 {
+    //         tracing::warn!(
+    //             "NUMA requested but only {} node detected, falling back to 
count",
+    //             topology.node_count
+    //         );
+    //     }
+    //
+    //     todo!()
+    // }
 }
 
 impl CpuAllocation {
-    pub fn to_shard_set(&self) -> HashSet<usize> {
-        match self {
+    fn parse_numa(s: &str) -> Result<CpuAllocation, String> {
+        let params = s
+            .strip_prefix("numa:")
+            .ok_or_else(|| "Numa config must start with 'numa:'".to_string())?;
+
+        if params == "auto" {
+            return Ok(CpuAllocation::NumaAware(NumaConfig {
+                nodes: vec![],
+                cores_per_node: 0,
+                avoid_hyperthread: true,
+            }));
+        }
+
+        let mut nodes = Vec::new();
+        let mut cores_per_node = 0;
+        let mut avoid_hyperthread = true;
+
+        for param in params.split(';') {
+            let kv: Vec<&str> = param.split('=').collect();
+            if kv.len() != 2 {
+                return Err(format!(
+                    "Invalid NUMA parameter: '{param}', only available: 'auto'"
+                ));
+            }
+
+            match kv[0] {
+                "nodes" => {
+                    nodes = kv[1]
+                        .split(',')
+                        .map(|n| {
+                            n.parse::<usize>()
+                                .map_err(|_| format!("Invalid node number: 
{n}"))
+                        })
+                        .collect::<Result<Vec<_>, _>>()?;
+                }
+                "cores" => {
+                    cores_per_node = kv[1]
+                        .parse::<usize>()
+                        .map_err(|_| format!("Invalid cores value: {}", 
kv[1]))?;
+                }
+                "no_ht" => {
+                    avoid_hyperthread = kv[1]
+                        .parse::<bool>()
+                        .map_err(|_| format!("Invalid no ht value: {}", 
kv[1]))?;
+                }
+                _ => {
+                    return Err(format!(
+                        "Unknown NUMA parameter: {}, example: 
numa:nodes=0;cores=4;no_ht=true",
+                        kv[0]
+                    ));
+                }
+            }
+        }
+
+        Ok(CpuAllocation::NumaAware(NumaConfig {
+            nodes,
+            cores_per_node,
+            avoid_hyperthread,
+        }))
+    }
+}
+
+#[derive(Debug)]
+pub struct NumaTopology {
+    topology: Topology,
+    node_count: usize,
+    physical_cores_per_node: Vec<usize>,
+    logical_cores_per_node: Vec<usize>,
+}
+
+impl NumaTopology {
+    pub fn detect() -> Result<NumaTopology, ServerError> {
+        let topology =
+            Topology::new().map_err(|e| ServerError::TopologyDetection { msg: 
e.to_string() })?;
+
+        let numa_nodes: Vec<_> = 
topology.objects_with_type(NUMANode).collect();
+
+        let node_count = numa_nodes.len();
+
+        if node_count == 0 {
+            return Err(ServerError::NoNumaNodes);
+        }
+
+        let mut physical_cores_per_node = Vec::new();
+        let mut logical_cores_per_node = Vec::new();
+
+        for node in numa_nodes {
+            let cpuset = node.cpuset().ok_or(ServerError::TopologyDetection {
+                msg: "NUMA node has no CPU set".to_string(),
+            })?;
+
+            let logical_cores = cpuset.weight().unwrap_or(0);
+
+            let physical_cores = topology
+                .objects_with_type(ObjectType::Core)
+                .filter(|core| {
+                    if let Some(core_cpuset) = core.cpuset() {
+                        !(cpuset & core_cpuset).is_empty()
+                    } else {
+                        false
+                    }
+                })
+                .count();
+
+            physical_cores_per_node.push(physical_cores);
+            logical_cores_per_node.push(logical_cores);
+        }
+
+        Ok(Self {
+            topology,
+            node_count,
+            physical_cores_per_node,
+            logical_cores_per_node,
+        })
+    }
+
+    pub fn physical_cores_for_node(&self, node: usize) -> usize {
+        self.physical_cores_per_node.get(node).copied().unwrap_or(0)
+    }
+
+    pub fn logical_cores_for_node(&self, node: usize) -> usize {
+        self.logical_cores_per_node.get(node).copied().unwrap_or(0)
+    }
+
+    fn filter_physical_cores(&self, node_cpuset: CpuSet) -> CpuSet {
+        let mut physical_cpuset = CpuSet::new();
+        for core in self.topology.objects_with_type(ObjectType::Core) {
+            if let Some(core_cpuset) = core.cpuset() {
+                let intersection = node_cpuset.clone() & core_cpuset;
+                if !intersection.is_empty() {
+                    // Take the minimum (first) CPU ID for consistency
+                    if let Some(first_cpu) = intersection.iter_set().min() {
+                        physical_cpuset.set(first_cpu)
+                    }
+                }
+            }
+        }
+        physical_cpuset
+    }
+
+    /// Get CPU set for a NUMA node
+    fn get_cpuset_for_node(
+        &self,
+        node_id: usize,
+        avoid_hyperthread: bool,
+    ) -> Result<CpuSet, ServerError> {
+        let node = self
+            .topology
+            .objects_with_type(ObjectType::NUMANode)
+            .nth(node_id)
+            .ok_or(ServerError::InvalidNode {
+                requested: node_id,
+                available: self.node_count,
+            })?;
+
+        let cpuset_ref = node.cpuset().ok_or(ServerError::TopologyDetection {
+            msg: format!("Node {} has no CPU set", node_id),
+        })?;
+
+        let cpuset = SpecializedBitmapRef::to_owned(&cpuset_ref);
+
+        if avoid_hyperthread {
+            Ok(self.filter_physical_cores(cpuset))
+        } else {
+            Ok(cpuset)
+        }
+    }
+}
+
+#[derive(Debug, Clone)]
+pub struct ShardInfo {
+    /// CPUs this shard should use
+    pub cpu_set: HashSet<usize>,
+    /// NUMA node
+    pub numa_node: Option<usize>,
+}
+
+impl ShardInfo {
+    pub fn bind_memory(&self) -> Result<(), ServerError> {
+        if let Some(node_id) = self.numa_node {
+            let topology = Topology::new().map_err(|err| 
ServerError::TopologyDetection {
+                msg: err.to_string(),
+            })?;
+
+            let node = topology
+                .objects_with_type(ObjectType::NUMANode)
+                .nth(node_id)
+                .ok_or(ServerError::InvalidNode {
+                    requested: node_id,
+                    available: 
topology.objects_with_type(ObjectType::NUMANode).count(),
+                })?;
+
+            if let Some(nodeset) = node.nodeset() {
+                topology
+                    .bind_memory(
+                        nodeset,
+                        MemoryBindingPolicy::Bind,
+                        MemoryBindingFlags::THREAD | 
MemoryBindingFlags::STRICT,
+                    )
+                    .map_err(|err| {
+                        tracing::error!("Failed to bind memory {:?}", err);
+                        ServerError::BindingFailed
+                    })?;
+
+                info!("Memory bound to NUMA node {node_id}");
+            }
+        }
+
+        Ok(())
+    }
+}
+
+pub struct ShardAllocator {
+    allocation: CpuAllocation,
+    topology: Option<Arc<NumaTopology>>,
+}
+
+impl ShardAllocator {
+    pub fn new(allocation: &CpuAllocation) -> Result<ShardAllocator, 
ServerError> {
+        let topology = if matches!(allocation, CpuAllocation::NumaAware(_)) {
+            let numa_topology = NumaTopology::detect()?;
+
+            Some(Arc::new(numa_topology))
+        } else {
+            None
+        };
+
+        Ok(Self {
+            allocation: allocation.clone(),
+            topology,
+        })
+    }
+
+    pub fn to_shard_assignments(&self) -> Result<Vec<ShardInfo>, ServerError> {
+        match &self.allocation {
             CpuAllocation::All => {
                 let available_cpus = available_parallelism()
-                    .expect("Failed to get num of cores")
+                    .map_err(|err| ServerError::Other {
+                        msg: format!("Failed to get available_parallelism: 
{:?}", err),
+                    })?
                     .get();
-                (0..available_cpus).collect()
+
+                let shard_assignments: Vec<_> = (0..available_cpus)
+                    .map(|cpu_id| ShardInfo {
+                        cpu_set: HashSet::from([cpu_id]),
+                        numa_node: None,
+                    })
+                    .collect();
+
+                info!(
+                    "Using all available CPU cores ({} shards with affinity)",
+                    shard_assignments.len()
+                );
+
+                Ok(shard_assignments)
+            }
+            CpuAllocation::Count(count) => {
+                let shard_assignments = (0..*count)
+                    .map(|cpu_id| ShardInfo {
+                        cpu_set: HashSet::from([cpu_id]),
+                        numa_node: None,
+                    })
+                    .collect();
+
+                info!("Using {count} shards with affinity to cores 
0..{count}");
+
+                Ok(shard_assignments)
+            }
+            CpuAllocation::Range(start, end) => {
+                let shard_assignments = (*start..*end)
+                    .map(|cpu_id| ShardInfo {
+                        cpu_set: HashSet::from([cpu_id]),
+                        numa_node: None,
+                    })
+                    .collect();
+
+                info!(
+                    "Using {} shards with affinity to cores {start}..{end}",
+                    end - start
+                );
+
+                Ok(shard_assignments)
+            }
+            CpuAllocation::NumaAware(numa_config) => {
+                let topology = 
self.topology.as_ref().ok_or(ServerError::NoTopology)?;
+                self.compute_numa_assignments(topology, numa_config)
             }
-            CpuAllocation::Count(count) => (0..*count).collect(),
-            CpuAllocation::Range(start, end) => (*start..*end).collect(),
         }
     }
+
+    fn compute_numa_assignments(
+        &self,
+        topology: &NumaTopology,
+        numa: &NumaConfig,
+    ) -> Result<Vec<ShardInfo>, ServerError> {
+        // Determine  which noes to use
+        let nodes = if numa.nodes.is_empty() {
+            // Auto: use all nodes
+            (0..topology.node_count).collect()
+        } else {
+            numa.nodes.clone()
+        };
+
+        // Determine cores per node
+        let cores_per_node = if numa.cores_per_node == 0 {
+            // Auto: use all available
+            if numa.avoid_hyperthread {
+                topology.physical_cores_for_node(nodes[0])
+            } else {
+                topology.logical_cores_for_node(nodes[0])
+            }
+        } else {
+            numa.cores_per_node
+        };
+
+        let mut shard_infos = Vec::new();
+
+        let node_cpus: Vec<Vec<usize>> = nodes
+            .iter()
+            .map(|&node_id| {
+                let cpuset = topology.get_cpuset_for_node(node_id, 
numa.avoid_hyperthread)?;
+                Ok(cpuset.iter_set().map(usize::from).collect())
+            })
+            .collect::<Result<_, ServerError>>()?;
+
+        // For each node, create one shard per core (thread-per-core)
+        for (idx, &node_id) in nodes.iter().enumerate() {
+            let available_cpus = &node_cpus[idx];
+
+            // Take first core_per_node CPUs from this node
+            let cores_to_use: Vec<usize> = available_cpus
+                .iter()
+                .take(cores_per_node)
+                .copied()
+                .collect();
+
+            if cores_to_use.len() < cores_per_node {
+                return Err(ServerError::InsufficientCores {
+                    requested: cores_per_node,
+                    available: available_cpus.len(),
+                    node: node_id,
+                });
+            }
+
+            // Create one shard per core
+            for cpu_id in cores_to_use {
+                shard_infos.push(ShardInfo {
+                    cpu_set: HashSet::from([cpu_id]),
+                    numa_node: Some(node_id),
+                });
+            }
+        }
+
+        info!(
+            "Using {} shards with {} NUMA node, {} cores per node, and avoid 
hyperthread {}",
+            shard_infos.len(),
+            nodes.len(),
+            cores_per_node,
+            numa.avoid_hyperthread
+        );
+
+        Ok(shard_infos)
+    }
 }
 
 impl FromStr for CpuAllocation {
@@ -56,6 +484,7 @@ impl FromStr for CpuAllocation {
     fn from_str(s: &str) -> Result<Self, Self::Err> {
         match s {
             "all" => Ok(CpuAllocation::All),
+            s if s.starts_with("numa:") => Self::parse_numa(s),
             s if s.contains("..") => {
                 let parts: Vec<&str> = s.split("..").collect();
                 if parts.len() != 2 {
@@ -90,6 +519,25 @@ impl Serialize for CpuAllocation {
             CpuAllocation::Range(start, end) => {
                 serializer.serialize_str(&format!("{start}..{end}"))
             }
+            CpuAllocation::NumaAware(numa) => {
+                if numa.nodes.is_empty() && numa.cores_per_node == 0 {
+                    serializer.serialize_str("numa:auto")
+                } else {
+                    let nodes_str = numa
+                        .nodes
+                        .iter()
+                        .map(|n| n.to_string())
+                        .collect::<Vec<_>>()
+                        .join(",");
+
+                    let full_str = format!(
+                        "numa:nodes={};cores:{};no_ht={}",
+                        nodes_str, numa.cores_per_node, numa.avoid_hyperthread
+                    );
+
+                    serializer.serialize_str(&full_str)
+                }
+            }
         }
     }
 }
@@ -114,3 +562,55 @@ impl<'de> Deserialize<'de> for CpuAllocation {
         }
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_parse_all() {
+        assert_eq!(CpuAllocation::from_str("all").unwrap(), 
CpuAllocation::All);
+    }
+
+    #[test]
+    fn test_parse_count() {
+        assert_eq!(
+            CpuAllocation::from_str("4").unwrap(),
+            CpuAllocation::Count(4)
+        );
+    }
+
+    #[test]
+    fn test_parse_range() {
+        assert_eq!(
+            CpuAllocation::from_str("2..8").unwrap(),
+            CpuAllocation::Range(2, 8)
+        );
+    }
+
+    #[test]
+    fn test_parse_numa_auto() {
+        let result = CpuAllocation::from_str("numa:auto").unwrap();
+        match result {
+            CpuAllocation::NumaAware(numa) => {
+                assert!(numa.nodes.is_empty());
+                assert_eq!(numa.cores_per_node, 0);
+                assert!(numa.avoid_hyperthread);
+            }
+            _ => panic!("Expected NumaAware"),
+        }
+    }
+
+    #[test]
+    fn test_parse_numa_explicit() {
+        let result = 
CpuAllocation::from_str("numa:nodes=0,1;cores=4;no_ht=true").unwrap();
+        match result {
+            CpuAllocation::NumaAware(numa) => {
+                assert_eq!(numa.nodes, vec![0, 1]);
+                assert_eq!(numa.cores_per_node, 4);
+                assert!(numa.avoid_hyperthread);
+            }
+            _ => panic!("Expected NumaAware"),
+        }
+    }
+}
diff --git a/core/server/src/configs/validators.rs 
b/core/server/src/configs/validators.rs
index ab0547dea..77e792023 100644
--- a/core/server/src/configs/validators.rs
+++ b/core/server/src/configs/validators.rs
@@ -25,6 +25,7 @@ use super::sharding::{CpuAllocation, ShardingConfig};
 use super::system::{CompressionConfig, PartitionConfig};
 use crate::configs::COMPONENT;
 use crate::configs::server::{MemoryPoolConfig, PersonalAccessTokenConfig, 
ServerConfig};
+use crate::configs::sharding::NumaTopology;
 use crate::configs::system::SegmentConfig;
 use crate::streaming::segments::*;
 use err_trail::ErrContext;
@@ -321,6 +322,18 @@ impl Validatable<ConfigurationError> for ShardingConfig {
                 }
                 Ok(())
             }
+            CpuAllocation::NumaAware(numa_config) => match 
NumaTopology::detect() {
+                // TODO: dry the validation, already validate it from the 
shard allocation
+                Ok(topology) => numa_config.validate(&topology).map_err(|e| {
+                    eprintln!("Invalid NUMA configuration: {}", e);
+                    ConfigurationError::InvalidConfigurationValue
+                }),
+                Err(e) => {
+                    eprintln!("Failed to detect NUMA topology: {}", e);
+                    eprintln!("NUMA allocation requested but system doesn't 
support it");
+                    Err(ConfigurationError::InvalidConfigurationValue)
+                }
+            },
         }
     }
 }
diff --git a/core/server/src/main.rs b/core/server/src/main.rs
index 4d9d0f08e..e2d47f79c 100644
--- a/core/server/src/main.rs
+++ b/core/server/src/main.rs
@@ -30,7 +30,7 @@ use server::bootstrap::{
     create_directories, create_shard_connections, create_shard_executor, 
load_config, load_streams,
     load_users, resolve_persister, update_system_info,
 };
-use server::configs::sharding::CpuAllocation;
+use server::configs::sharding::ShardAllocator;
 use server::diagnostics::{print_io_uring_permission_info, 
print_locked_memory_limit_info};
 use server::io::fs_utils;
 use server::log::logger::Logging;
@@ -49,7 +49,6 @@ use server::streaming::diagnostics::metrics::Metrics;
 use server::streaming::storage::SystemStorage;
 use server::streaming::utils::ptr::EternalPtr;
 use server::versioning::SemanticVersion;
-use std::collections::HashSet;
 use std::panic::AssertUnwindSafe;
 use std::rc::Rc;
 use std::str::FromStr;
@@ -284,24 +283,8 @@ fn main() -> Result<(), ServerError> {
         let users = load_users(users_state.into_values());
 
         // ELEVENTH DISCRETE LOADING STEP.
-        let shards_set = config.system.sharding.cpu_allocation.to_shard_set();
-        match &config.system.sharding.cpu_allocation {
-            CpuAllocation::All => {
-                info!(
-                    "Using all available CPU cores ({} shards with affinity)",
-                    shards_set.len()
-                );
-            }
-            CpuAllocation::Count(count) => {
-                info!("Using {count} shards with affinity to cores 
0..{count}");
-            }
-            CpuAllocation::Range(start, end) => {
-                info!(
-                    "Using {} shards with affinity to cores {start}..{end}",
-                    end - start
-                );
-            }
-        }
+        let shard_allocator = 
ShardAllocator::new(&config.system.sharding.cpu_allocation)?;
+        let shard_assignment = shard_allocator.to_shard_assignments()?;
 
         #[cfg(feature = "disable-mimalloc")]
         warn!("Using default system allocator because code was build with 
`disable-mimalloc` feature");
@@ -313,9 +296,9 @@ fn main() -> Result<(), ServerError> {
         let metrics = Metrics::init();
 
         // TWELFTH DISCRETE LOADING STEP.
-        let shards_count = shards_set.len();
-        info!("Starting {} shard(s)", shards_count);
-        let (connections, shutdown_handles) = 
create_shard_connections(&shards_set);
+        info!("Starting {} shard(s)", shard_assignment.len());
+        let (connections, shutdown_handles) = 
create_shard_connections(&shard_assignment);
+        let shards_count = shard_assignment.len();
         let mut handles: Vec<JoinHandle<()>> = 
Vec::with_capacity(shards_count);
 
         // Channel for shard completion notifications
@@ -348,7 +331,7 @@ fn main() -> Result<(), ServerError> {
                                 let ns = IggyNamespace::new(stream_id, 
topic_id, partition_id);
                                 let shard_id = 
ShardId::new(calculate_shard_assignment(
                                     &ns,
-                                    shards_set.len() as u32,
+                                    shard_assignment.len() as u32,
                                 ));
                                 shards_table.insert(ns, shard_id);
                             }
@@ -358,10 +341,10 @@ fn main() -> Result<(), ServerError> {
             }
         });
 
-        for (id, cpu_id) in shards_set
+        for (id, assignment) in shard_assignment
             .into_iter()
             .enumerate()
-            .map(|(idx, cpu)| (idx as u16, cpu))
+            .map(|(idx, assignment)| (idx as u16, assignment))
         {
             let streams = streams.clone();
             let shards_table = shards_table.clone();
@@ -412,8 +395,11 @@ fn main() -> Result<(), ServerError> {
                 .name(format!("shard-{id}"))
                 .spawn(move || {
                     let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
-                        let affinity_set = HashSet::from([cpu_id]);
-                        let rt = create_shard_executor(affinity_set);
+                        if let Err(e) = assignment.bind_memory() {
+                            error!("Failed to bind memory: {e:?}");
+                        }
+
+                        let rt = create_shard_executor(assignment.cpu_set);
                         rt.block_on(async move {
                             let builder = IggyShard::builder();
                             let shard = builder
@@ -438,6 +424,7 @@ fn main() -> Result<(), ServerError> {
                                 return Err(e.to_string());
                             }
                             info!("Shard {id} run completed");
+
                             Ok(())
                         })
                     }));
diff --git a/core/server/src/server_error.rs b/core/server/src/server_error.rs
index c7e211465..f49e10468 100644
--- a/core/server/src/server_error.rs
+++ b/core/server/src/server_error.rs
@@ -23,7 +23,7 @@ use std::array::TryFromSliceError;
 use std::io;
 
 error_set!(
-    ServerError := ConfigurationError || ArchiverError || ConnectionError || 
LogError || CompatError || QuicError || ShardError
+    ServerError := NumaError || ConfigurationError || ArchiverError || 
ConnectionError || LogError || CompatError || QuicError || ShardError
 
     IoError := {
         #[display("IO error")]
@@ -36,6 +36,37 @@ error_set!(
         ReadToEndError(ReadError)
     }
 
+    NumaError := {
+        #[display("Failed to detect topology: {}", msg)]
+        TopologyDetection {
+            msg: String
+        },
+
+        #[display("There is no NUMA node on this server")]
+        NoNumaNodes,
+
+        #[display("No Topology")]
+        NoTopology,
+
+        #[display("Binding Failed")]
+        BindingFailed,
+
+        #[display("Insufficient cores on node {}: requested {}, only {} 
available", node, requested, available)]
+        InsufficientCores {
+            requested: usize,
+            available: usize,
+            node: usize,
+        },
+
+        #[display("Invalid NUMA node: requested {}, only available {} node", 
requested, available)]
+        InvalidNode { requested: usize, available: usize },
+
+        #[display("Other error: {}", msg)]
+        Other {
+            msg: String
+        },
+    }
+
     ConfigurationError := {
         ConfigurationError(iggy_common::ConfigurationError),
     }


Reply via email to