This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new bcfa50cf8 refactor(configs): move ServerConfig types from server to 
configs crate (#2796)
bcfa50cf8 is described below

commit bcfa50cf8ff975411fb7d9bbba4ddf73f346c75d
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Thu Feb 26 18:56:43 2026 +0100

    refactor(configs): move ServerConfig types from server to configs crate 
(#2796)
---
 Cargo.lock                                         |  17 +-
 DEPENDENCIES.md                                    |   2 +-
 core/common/src/alloc/memory_pool.rs               |   3 +
 core/configs/Cargo.toml                            |   8 +
 core/configs/src/lib.rs                            |   6 +-
 .../src/server_config}/cache_indexes.rs            |   0
 .../src/server_config}/cluster.rs                  |   0
 .../src/server_config}/defaults.rs                 |  26 +-
 .../src/server_config}/displays.rs                 |   8 +-
 .../configs => configs/src/server_config}/http.rs  |   0
 .../configs => configs/src/server_config}/mod.rs   |   0
 .../configs => configs/src/server_config}/quic.rs  |   0
 .../src/server_config}/server.rs                   |  19 +-
 core/configs/src/server_config/sharding.rs         | 261 +++++++++
 .../src/server_config}/system.rs                   |   2 +-
 .../configs => configs/src/server_config}/tcp.rs   |   0
 .../src/server_config}/validators.rs               |  41 +-
 .../src/server_config}/websocket.rs                |   0
 core/server/Cargo.toml                             |   5 -
 core/server/src/bootstrap.rs                       |   2 +-
 core/server/src/{configs/mod.rs => configs.rs}     |  18 +-
 core/server/src/configs/sharding.rs                | 637 ---------------------
 core/server/src/lib.rs                             |   1 +
 core/server/src/main.rs                            |   2 +-
 core/server/src/server_error.rs                    |  30 +-
 core/server/src/shard_allocator.rs                 | 379 ++++++++++++
 core/server/src/streaming/segments/mod.rs          |   2 +-
 27 files changed, 717 insertions(+), 752 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 76d8605cf..124dfe960 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1936,10 +1936,18 @@ name = "configs"
 version = "0.1.0"
 dependencies = [
  "configs_derive",
+ "derive_more",
+ "err_trail",
  "figment",
+ "iggy_common",
+ "jsonwebtoken",
  "serde",
  "serde_json",
+ "serde_with",
+ "static-toml",
+ "strum",
  "tracing",
+ "tungstenite",
 ]
 
 [[package]]
@@ -3161,7 +3169,6 @@ dependencies = [
  "atomic",
  "pear",
  "serde",
- "serde_json",
  "toml 0.8.23",
  "uncased",
  "version_check",
@@ -5273,9 +5280,9 @@ dependencies = [
 
 [[package]]
 name = "keccak"
-version = "0.1.5"
+version = "0.1.6"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "ecc2af9a1119c51f12a14607e783cb977bde58bc069ff0c3da1095e635d70654"
+checksum = "cb26cec98cce3a3d96cbb7bced3c4b16e3d13f27ec56dbd62cbc8f39cfb9d653"
 dependencies = [
  "cpufeatures 0.2.17",
 ]
@@ -8338,13 +8345,11 @@ dependencies = [
  "cyper",
  "cyper-axum",
  "dashmap",
- "derive_more",
  "dotenvy",
  "enum_dispatch",
  "err_trail",
  "error_set",
  "figlet-rs",
- "figment",
  "flume 0.12.0",
  "fs2",
  "futures",
@@ -8375,10 +8380,8 @@ dependencies = [
  "rustls-pemfile",
  "send_wrapper",
  "serde",
- "serde_with",
  "slab",
  "socket2 0.6.2",
- "static-toml",
  "strum",
  "sysinfo 0.38.1",
  "tempfile",
diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md
index ffe0d501e..52bb36d63 100644
--- a/DEPENDENCIES.md
+++ b/DEPENDENCIES.md
@@ -439,7 +439,7 @@ journal: 0.1.0, "Apache-2.0",
 js-sys: 0.3.85, "Apache-2.0 OR MIT",
 jsonwebtoken: 10.3.0, "MIT",
 jwalk: 0.8.1, "MIT",
-keccak: 0.1.5, "Apache-2.0 OR MIT",
+keccak: 0.1.6, "Apache-2.0 OR MIT",
 keyring: 3.6.3, "Apache-2.0 OR MIT",
 kqueue: 1.1.1, "MIT",
 kqueue-sys: 1.0.4, "MIT",
diff --git a/core/common/src/alloc/memory_pool.rs 
b/core/common/src/alloc/memory_pool.rs
index 8ebc892c9..9216ef57f 100644
--- a/core/common/src/alloc/memory_pool.rs
+++ b/core/common/src/alloc/memory_pool.rs
@@ -73,6 +73,9 @@ pub fn memory_pool() -> &'static MemoryPool {
         .expect("Memory pool not initialized - MemoryPool::init_pool should be 
called first")
 }
 
+// TODO: Extract shared domain types (IggyByteSize, IggyDuration, etc.) into 
an `iggy_types`
+// leaf crate so `iggy_common` can depend on `configs` directly. That lets us 
delete this
+// duplicate and use `configs::server::MemoryPoolConfig` here instead.
 /// Configuration for the memory pool.
 #[derive(Debug)]
 pub struct MemoryPoolConfigOther {
diff --git a/core/configs/Cargo.toml b/core/configs/Cargo.toml
index d03f44c04..ed7d6eba3 100644
--- a/core/configs/Cargo.toml
+++ b/core/configs/Cargo.toml
@@ -24,7 +24,15 @@ publish = false
 
 [dependencies]
 configs_derive = { workspace = true }
+derive_more = { workspace = true }
+err_trail = { workspace = true }
 figment = { workspace = true }
+iggy_common = { workspace = true }
+jsonwebtoken = { workspace = true }
 serde = { workspace = true }
 serde_json = { workspace = true }
+serde_with = { workspace = true }
+static-toml = { workspace = true }
+strum = { workspace = true }
 tracing = { workspace = true }
+tungstenite = { workspace = true }
diff --git a/core/configs/src/lib.rs b/core/configs/src/lib.rs
index d1410ab8a..419c77b8a 100644
--- a/core/configs/src/lib.rs
+++ b/core/configs/src/lib.rs
@@ -20,9 +20,13 @@
 extern crate self as configs;
 
 mod configs_impl;
-
+mod server_config;
 pub use configs_derive::ConfigEnv;
 pub use configs_impl::{
     ConfigEnvMappings, ConfigProvider, ConfigurationError, ConfigurationType, 
EnvVarMapping,
     FileConfigProvider, TypedEnvProvider, parse_env_value_to_json,
 };
+pub use server_config::{
+    COMPONENT, cache_indexes, cluster, defaults, displays, http, quic, server, 
sharding, system,
+    tcp, validators, websocket,
+};
diff --git a/core/server/src/configs/cache_indexes.rs 
b/core/configs/src/server_config/cache_indexes.rs
similarity index 100%
rename from core/server/src/configs/cache_indexes.rs
rename to core/configs/src/server_config/cache_indexes.rs
diff --git a/core/server/src/configs/cluster.rs 
b/core/configs/src/server_config/cluster.rs
similarity index 100%
rename from core/server/src/configs/cluster.rs
rename to core/configs/src/server_config/cluster.rs
diff --git a/core/server/src/configs/defaults.rs 
b/core/configs/src/server_config/defaults.rs
similarity index 96%
rename from core/server/src/configs/defaults.rs
rename to core/configs/src/server_config/defaults.rs
index d2f1e61af..27eae580d 100644
--- a/core/server/src/configs/defaults.rs
+++ b/core/configs/src/server_config/defaults.rs
@@ -16,35 +16,33 @@
  * under the License.
  */
 
-use super::sharding::ShardingConfig;
-use super::tcp::TcpSocketConfig;
-use crate::configs::cluster::CurrentNodeConfig;
-use crate::configs::cluster::{ClusterConfig, NodeConfig, OtherNodeConfig, 
TransportPorts};
-use crate::configs::http::{
-    HttpConfig, HttpCorsConfig, HttpJwtConfig, HttpMetricsConfig, 
HttpTlsConfig,
-};
-use crate::configs::quic::{QuicCertificateConfig, QuicConfig, 
QuicSocketConfig};
-use crate::configs::server::{
+use super::cluster::CurrentNodeConfig;
+use super::cluster::{ClusterConfig, NodeConfig, OtherNodeConfig, 
TransportPorts};
+use super::http::{HttpConfig, HttpCorsConfig, HttpJwtConfig, 
HttpMetricsConfig, HttpTlsConfig};
+use super::quic::{QuicCertificateConfig, QuicConfig, QuicSocketConfig};
+use super::server::{
     ConsumerGroupConfig, DataMaintenanceConfig, HeartbeatConfig, 
MemoryPoolConfig,
     MessageSaverConfig, MessagesMaintenanceConfig, 
PersonalAccessTokenCleanerConfig,
     PersonalAccessTokenConfig, ServerConfig, TelemetryConfig, 
TelemetryLogsConfig,
     TelemetryTracesConfig,
 };
-use crate::configs::system::{
+use super::sharding::ShardingConfig;
+use super::system::{
     BackupConfig, CompatibilityConfig, CompressionConfig, EncryptionConfig, 
LoggingConfig,
     MessageDeduplicationConfig, PartitionConfig, RecoveryConfig, 
RuntimeConfig, SegmentConfig,
     StateConfig, StreamConfig, SystemConfig, TopicConfig,
 };
-use crate::configs::tcp::{TcpConfig, TcpTlsConfig};
-use crate::configs::websocket::{WebSocketConfig, WebSocketTlsConfig};
+use super::tcp::TcpSocketConfig;
+use super::tcp::{TcpConfig, TcpTlsConfig};
+use super::websocket::{WebSocketConfig, WebSocketTlsConfig};
 use iggy_common::IggyByteSize;
 use iggy_common::IggyDuration;
 use std::sync::Arc;
 use std::time::Duration;
 
 static_toml::static_toml! {
-    // static_toml crate always starts from CARGO_MANIFEST_DIR (core/server)
-    pub static SERVER_CONFIG = include_toml!("config.toml");
+    // static_toml resolves relative to CARGO_MANIFEST_DIR (core/configs/).
+    pub static SERVER_CONFIG = include_toml!("../server/config.toml");
 }
 
 impl Default for ServerConfig {
diff --git a/core/server/src/configs/displays.rs 
b/core/configs/src/server_config/displays.rs
similarity index 98%
rename from core/server/src/configs/displays.rs
rename to core/configs/src/server_config/displays.rs
index df7741564..959d35476 100644
--- a/core/server/src/configs/displays.rs
+++ b/core/configs/src/server_config/displays.rs
@@ -17,13 +17,13 @@
  * under the License.
  */
 
-use crate::configs::quic::{QuicCertificateConfig, QuicConfig};
-use crate::configs::server::{
+use super::quic::{QuicCertificateConfig, QuicConfig};
+use super::server::{
     ConsumerGroupConfig, DataMaintenanceConfig, HeartbeatConfig, 
MessagesMaintenanceConfig,
     TelemetryConfig, TelemetryLogsConfig, TelemetryTracesConfig,
 };
-use crate::configs::system::MessageDeduplicationConfig;
-use crate::configs::{
+use super::system::MessageDeduplicationConfig;
+use super::{
     http::{HttpConfig, HttpCorsConfig, HttpJwtConfig, HttpMetricsConfig, 
HttpTlsConfig},
     server::{MessageSaverConfig, ServerConfig},
     system::{
diff --git a/core/server/src/configs/http.rs 
b/core/configs/src/server_config/http.rs
similarity index 100%
rename from core/server/src/configs/http.rs
rename to core/configs/src/server_config/http.rs
diff --git a/core/server/src/configs/mod.rs 
b/core/configs/src/server_config/mod.rs
similarity index 100%
copy from core/server/src/configs/mod.rs
copy to core/configs/src/server_config/mod.rs
diff --git a/core/server/src/configs/quic.rs 
b/core/configs/src/server_config/quic.rs
similarity index 100%
rename from core/server/src/configs/quic.rs
rename to core/configs/src/server_config/quic.rs
diff --git a/core/server/src/configs/server.rs 
b/core/configs/src/server_config/server.rs
similarity index 93%
rename from core/server/src/configs/server.rs
rename to core/configs/src/server_config/server.rs
index 850384f9e..8c005f599 100644
--- a/core/server/src/configs/server.rs
+++ b/core/configs/src/server_config/server.rs
@@ -16,14 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-use crate::configs::COMPONENT;
-use crate::configs::cluster::ClusterConfig;
-use crate::configs::http::HttpConfig;
-use crate::configs::quic::QuicConfig;
-use crate::configs::system::SystemConfig;
-use crate::configs::tcp::TcpConfig;
-use crate::configs::websocket::WebSocketConfig;
-use crate::server_error::ConfigurationError;
+use super::COMPONENT;
+use super::cluster::ClusterConfig;
+use super::http::HttpConfig;
+use super::quic::QuicConfig;
+use super::system::SystemConfig;
+use super::tcp::TcpConfig;
+use super::websocket::WebSocketConfig;
+use crate::ConfigurationError;
 use configs::{ConfigEnv, ConfigEnvMappings, ConfigProvider, 
FileConfigProvider, TypedEnvProvider};
 use derive_more::Display;
 use err_trail::ErrContext;
@@ -66,7 +66,6 @@ pub struct MemoryPoolConfig {
     pub bucket_capacity: u32,
 }
 
-// Hack around the fact that we define our config inside of the `server`  
crate, but `memory_pool` is in `common`.
 impl MemoryPoolConfig {
     pub fn into_other(&self) -> MemoryPoolConfigOther {
         MemoryPoolConfigOther {
@@ -204,7 +203,7 @@ impl ServerConfig {
 
     /// Create a config provider using compile-time generated env var mappings.
     pub fn config_provider(config_path: &str) -> 
FileConfigProvider<ServerConfigEnvProvider> {
-        let default_config = Toml::string(include_str!("../../config.toml"));
+        let default_config = 
Toml::string(include_str!("../../../server/config.toml"));
         FileConfigProvider::new(
             config_path.to_string(),
             ServerConfigEnvProvider::default(),
diff --git a/core/configs/src/server_config/sharding.rs 
b/core/configs/src/server_config/sharding.rs
new file mode 100644
index 000000000..084c984e1
--- /dev/null
+++ b/core/configs/src/server_config/sharding.rs
@@ -0,0 +1,261 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use serde::{Deserialize, Deserializer, Serialize, Serializer};
+use std::str::FromStr;
+
+use configs::ConfigEnv;
+
+#[derive(Debug, Deserialize, Serialize, Default, ConfigEnv)]
+pub struct ShardingConfig {
+    #[serde(default)]
+    #[config_env(leaf)]
+    pub cpu_allocation: CpuAllocation,
+}
+
+#[derive(Debug, Clone, PartialEq, Default)]
+pub enum CpuAllocation {
+    #[default]
+    All,
+    Count(usize),
+    Range(usize, usize),
+    NumaAware(NumaConfig),
+}
+
+/// NUMA specific configuration
+#[derive(Debug, Clone, PartialEq, Default)]
+pub struct NumaConfig {
+    /// Which NUMA nodes to use (empty = auto-detect all)
+    pub nodes: Vec<usize>,
+    /// Cores per node to use (0 = use all available)
+    pub cores_per_node: usize,
+    /// skip hyperthread sibling
+    pub avoid_hyperthread: bool,
+}
+
+impl CpuAllocation {
+    fn parse_numa(s: &str) -> Result<CpuAllocation, String> {
+        let params = s
+            .strip_prefix("numa:")
+            .ok_or_else(|| "Numa config must start with 'numa:'".to_string())?;
+
+        if params == "auto" {
+            return Ok(CpuAllocation::NumaAware(NumaConfig {
+                nodes: vec![],
+                cores_per_node: 0,
+                avoid_hyperthread: true,
+            }));
+        }
+
+        let mut nodes = Vec::new();
+        let mut cores_per_node = 0;
+        let mut avoid_hyperthread = true;
+
+        for param in params.split(';') {
+            let kv: Vec<&str> = param.split('=').collect();
+            if kv.len() != 2 {
+                return Err(format!(
+                    "Invalid NUMA parameter: '{param}', only available: 'auto'"
+                ));
+            }
+
+            match kv[0] {
+                "nodes" => {
+                    nodes = kv[1]
+                        .split(',')
+                        .map(|n| {
+                            n.parse::<usize>()
+                                .map_err(|_| format!("Invalid node number: 
{n}"))
+                        })
+                        .collect::<Result<Vec<_>, _>>()?;
+                }
+                "cores" => {
+                    cores_per_node = kv[1]
+                        .parse::<usize>()
+                        .map_err(|_| format!("Invalid cores value: {}", 
kv[1]))?;
+                }
+                "no_ht" => {
+                    avoid_hyperthread = kv[1]
+                        .parse::<bool>()
+                        .map_err(|_| format!("Invalid no ht value: {}", 
kv[1]))?;
+                }
+                _ => {
+                    return Err(format!(
+                        "Unknown NUMA parameter: {}, example: 
numa:nodes=0;cores=4;no_ht=true",
+                        kv[0]
+                    ));
+                }
+            }
+        }
+
+        Ok(CpuAllocation::NumaAware(NumaConfig {
+            nodes,
+            cores_per_node,
+            avoid_hyperthread,
+        }))
+    }
+}
+
+impl FromStr for CpuAllocation {
+    type Err = String;
+
+    fn from_str(s: &str) -> Result<Self, Self::Err> {
+        match s {
+            "all" => Ok(CpuAllocation::All),
+            s if s.starts_with("numa:") => Self::parse_numa(s),
+            s if s.contains("..") => {
+                let parts: Vec<&str> = s.split("..").collect();
+                if parts.len() != 2 {
+                    return Err(format!("Invalid range format: {s}. Expected 
'start..end'"));
+                }
+                let start = parts[0]
+                    .parse::<usize>()
+                    .map_err(|_| format!("Invalid start value: {}", 
parts[0]))?;
+                let end = parts[1]
+                    .parse::<usize>()
+                    .map_err(|_| format!("Invalid end value: {}", parts[1]))?;
+                Ok(CpuAllocation::Range(start, end))
+            }
+            s => {
+                let count = s
+                    .parse::<usize>()
+                    .map_err(|_| format!("Invalid shard count: {s}"))?;
+                Ok(CpuAllocation::Count(count))
+            }
+        }
+    }
+}
+
+impl Serialize for CpuAllocation {
+    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
+    where
+        S: Serializer,
+    {
+        match self {
+            CpuAllocation::All => serializer.serialize_str("all"),
+            CpuAllocation::Count(n) => serializer.serialize_u64(*n as u64),
+            CpuAllocation::Range(start, end) => {
+                serializer.serialize_str(&format!("{start}..{end}"))
+            }
+            CpuAllocation::NumaAware(numa) => {
+                if numa.nodes.is_empty() && numa.cores_per_node == 0 {
+                    serializer.serialize_str("numa:auto")
+                } else {
+                    let nodes_str = numa
+                        .nodes
+                        .iter()
+                        .map(|n| n.to_string())
+                        .collect::<Vec<_>>()
+                        .join(",");
+
+                    let full_str = format!(
+                        "numa:nodes={};cores={};no_ht={}",
+                        nodes_str, numa.cores_per_node, numa.avoid_hyperthread
+                    );
+
+                    serializer.serialize_str(&full_str)
+                }
+            }
+        }
+    }
+}
+
+impl<'de> Deserialize<'de> for CpuAllocation {
+    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
+    where
+        D: Deserializer<'de>,
+    {
+        #[derive(Deserialize)]
+        #[serde(untagged)]
+        enum CpuAllocationHelper {
+            String(String),
+            Number(usize),
+        }
+
+        match CpuAllocationHelper::deserialize(deserializer)? {
+            CpuAllocationHelper::String(s) => {
+                CpuAllocation::from_str(&s).map_err(serde::de::Error::custom)
+            }
+            CpuAllocationHelper::Number(n) => Ok(CpuAllocation::Count(n)),
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_parse_all() {
+        assert_eq!(CpuAllocation::from_str("all").unwrap(), 
CpuAllocation::All);
+    }
+
+    #[test]
+    fn test_parse_count() {
+        assert_eq!(
+            CpuAllocation::from_str("4").unwrap(),
+            CpuAllocation::Count(4)
+        );
+    }
+
+    #[test]
+    fn test_parse_range() {
+        assert_eq!(
+            CpuAllocation::from_str("2..8").unwrap(),
+            CpuAllocation::Range(2, 8)
+        );
+    }
+
+    #[test]
+    fn test_parse_numa_auto() {
+        let result = CpuAllocation::from_str("numa:auto").unwrap();
+        match result {
+            CpuAllocation::NumaAware(numa) => {
+                assert!(numa.nodes.is_empty());
+                assert_eq!(numa.cores_per_node, 0);
+                assert!(numa.avoid_hyperthread);
+            }
+            _ => panic!("Expected NumaAware"),
+        }
+    }
+
+    #[test]
+    fn test_parse_numa_explicit() {
+        let result = 
CpuAllocation::from_str("numa:nodes=0,1;cores=4;no_ht=true").unwrap();
+        match result {
+            CpuAllocation::NumaAware(numa) => {
+                assert_eq!(numa.nodes, vec![0, 1]);
+                assert_eq!(numa.cores_per_node, 4);
+                assert!(numa.avoid_hyperthread);
+            }
+            _ => panic!("Expected NumaAware"),
+        }
+    }
+
+    #[test]
+    fn test_numa_explicit_serde_roundtrip() {
+        let original = CpuAllocation::NumaAware(NumaConfig {
+            nodes: vec![0, 1],
+            cores_per_node: 4,
+            avoid_hyperthread: true,
+        });
+        let serialized = serde_json::to_string(&original).unwrap();
+        let deserialized: CpuAllocation = 
serde_json::from_str(&serialized).unwrap();
+        assert_eq!(original, deserialized);
+    }
+}
diff --git a/core/server/src/configs/system.rs 
b/core/configs/src/server_config/system.rs
similarity index 99%
rename from core/server/src/configs/system.rs
rename to core/configs/src/server_config/system.rs
index 58de89ed5..91d9a99ca 100644
--- a/core/server/src/configs/system.rs
+++ b/core/configs/src/server_config/system.rs
@@ -17,8 +17,8 @@
  */
 
 use super::cache_indexes::CacheIndexesConfig;
+use super::server::MemoryPoolConfig;
 use super::sharding::ShardingConfig;
-use crate::configs::server::MemoryPoolConfig;
 use configs::ConfigEnv;
 use iggy_common::IggyByteSize;
 use iggy_common::IggyError;
diff --git a/core/server/src/configs/tcp.rs 
b/core/configs/src/server_config/tcp.rs
similarity index 100%
rename from core/server/src/configs/tcp.rs
rename to core/configs/src/server_config/tcp.rs
diff --git a/core/server/src/configs/validators.rs 
b/core/configs/src/server_config/validators.rs
similarity index 92%
rename from core/server/src/configs/validators.rs
rename to core/configs/src/server_config/validators.rs
index b9d70aaf3..6a49157a8 100644
--- a/core/server/src/configs/validators.rs
+++ b/core/configs/src/server_config/validators.rs
@@ -17,18 +17,16 @@
  * under the License.
  */
 
+use super::COMPONENT;
 use super::cluster::ClusterConfig;
 use super::server::{
     DataMaintenanceConfig, MessageSaverConfig, MessagesMaintenanceConfig, 
TelemetryConfig,
 };
+use super::server::{MemoryPoolConfig, PersonalAccessTokenConfig, ServerConfig};
 use super::sharding::{CpuAllocation, ShardingConfig};
+use super::system::SegmentConfig;
 use super::system::{CompressionConfig, LoggingConfig, PartitionConfig};
-use crate::configs::COMPONENT;
-use crate::configs::server::{MemoryPoolConfig, PersonalAccessTokenConfig, 
ServerConfig};
-use crate::configs::sharding::NumaTopology;
-use crate::configs::system::SegmentConfig;
-use crate::streaming::segments::*;
-use configs::ConfigurationError;
+use crate::ConfigurationError;
 use err_trail::ErrContext;
 use iggy_common::CompressionAlgorithm;
 use iggy_common::IggyExpiry;
@@ -37,6 +35,9 @@ use iggy_common::Validatable;
 use std::thread::available_parallelism;
 use tracing::warn;
 
+/// 1 GiB max segment size. Canonical definition; re-exported by core/server 
streaming.
+pub const SEGMENT_MAX_SIZE_BYTES: u64 = 1024 * 1024 * 1024;
+
 impl Validatable<ConfigurationError> for ServerConfig {
     fn validate(&self) -> Result<(), ConfigurationError> {
         self.system
@@ -339,7 +340,10 @@ impl Validatable<ConfigurationError> for MemoryPoolConfig {
 impl Validatable<ConfigurationError> for ShardingConfig {
     fn validate(&self) -> Result<(), ConfigurationError> {
         let available_cpus = available_parallelism()
-            .expect("Failed to get number of CPU cores")
+            .map_err(|_| {
+                eprintln!("Failed to detect available CPU cores");
+                ConfigurationError::InvalidConfigurationValue
+            })?
             .get();
 
         match &self.cpu_allocation {
@@ -372,18 +376,9 @@ impl Validatable<ConfigurationError> for ShardingConfig {
                 }
                 Ok(())
             }
-            CpuAllocation::NumaAware(numa_config) => match 
NumaTopology::detect() {
-                // TODO: dry the validation, already validate it from the 
shard allocation
-                Ok(topology) => numa_config.validate(&topology).map_err(|e| {
-                    eprintln!("Invalid NUMA configuration: {}", e);
-                    ConfigurationError::InvalidConfigurationValue
-                }),
-                Err(e) => {
-                    eprintln!("Failed to detect NUMA topology: {}", e);
-                    eprintln!("NUMA allocation requested but system doesn't 
support it");
-                    Err(ConfigurationError::InvalidConfigurationValue)
-                }
-            },
+            // NUMA topology validation requires hwlocality (runtime dep).
+            // Full NUMA validation happens in server::shard_allocator at 
startup.
+            CpuAllocation::NumaAware(_) => Ok(()),
         }
     }
 }
@@ -394,19 +389,16 @@ impl Validatable<ConfigurationError> for ClusterConfig {
             return Ok(());
         }
 
-        // Validate cluster name is not empty
         if self.name.trim().is_empty() {
             eprintln!("Invalid cluster configuration: cluster name cannot be 
empty");
             return Err(ConfigurationError::InvalidConfigurationValue);
         }
 
-        // Validate current node name is not empty
         if self.node.current.name.trim().is_empty() {
             eprintln!("Invalid cluster configuration: current node name cannot 
be empty");
             return Err(ConfigurationError::InvalidConfigurationValue);
         }
 
-        // Check for duplicate node names among other nodes
         let mut node_names = std::collections::HashSet::new();
         node_names.insert(self.node.current.name.clone());
 
@@ -420,16 +412,13 @@ impl Validatable<ConfigurationError> for ClusterConfig {
             }
         }
 
-        // Validate each other node configuration
         let mut used_endpoints = std::collections::HashSet::new();
         for node in &self.node.others {
-            // Validate node name is not empty
             if node.name.trim().is_empty() {
                 eprintln!("Invalid cluster configuration: node name cannot be 
empty");
                 return Err(ConfigurationError::InvalidConfigurationValue);
             }
 
-            // Validate IP is not empty
             if node.ip.trim().is_empty() {
                 eprintln!(
                     "Invalid cluster configuration: IP cannot be empty for 
node '{}'",
@@ -438,7 +427,6 @@ impl Validatable<ConfigurationError> for ClusterConfig {
                 return Err(ConfigurationError::InvalidConfigurationValue);
             }
 
-            // Validate transport ports if provided
             let port_list = [
                 ("TCP", node.ports.tcp),
                 ("QUIC", node.ports.quic),
@@ -456,7 +444,6 @@ impl Validatable<ConfigurationError> for ClusterConfig {
                         return 
Err(ConfigurationError::InvalidConfigurationValue);
                     }
 
-                    // Check for port conflicts across nodes on the same IP
                     let endpoint = format!("{}:{}:{}", node.ip, name, port);
                     if !used_endpoints.insert(endpoint.clone()) {
                         eprintln!(
diff --git a/core/server/src/configs/websocket.rs 
b/core/configs/src/server_config/websocket.rs
similarity index 100%
rename from core/server/src/configs/websocket.rs
rename to core/configs/src/server_config/websocket.rs
diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml
index 057ae6281..39c3c5de1 100644
--- a/core/server/Cargo.toml
+++ b/core/server/Cargo.toml
@@ -54,13 +54,11 @@ ctrlc = { workspace = true }
 cyper = { workspace = true }
 cyper-axum = { workspace = true }
 dashmap = { workspace = true }
-derive_more = { workspace = true }
 dotenvy = { workspace = true }
 enum_dispatch = { workspace = true }
 err_trail = { workspace = true }
 error_set = { workspace = true }
 figlet-rs = { workspace = true }
-figment = { workspace = true }
 flume = { workspace = true }
 fs2 = { workspace = true }
 futures = { workspace = true }
@@ -90,10 +88,8 @@ rustls = { workspace = true }
 rustls-pemfile = { workspace = true }
 send_wrapper = { workspace = true }
 serde = { workspace = true }
-serde_with = { workspace = true }
 slab = { workspace = true }
 socket2 = { workspace = true }
-static-toml = { workspace = true }
 strum = { workspace = true }
 sysinfo = { workspace = true }
 tempfile = { workspace = true }
@@ -115,5 +111,4 @@ hwlocality = { workspace = true }
 hwlocality = { workspace = true, features = ["vendored"] }
 
 [build-dependencies]
-figment = { workspace = true, features = ["json", "toml", "env"] }
 vergen-git2 = { workspace = true }
diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs
index 3d95fb9c3..02bd742f2 100644
--- a/core/server/src/bootstrap.rs
+++ b/core/server/src/bootstrap.rs
@@ -23,7 +23,6 @@ use crate::{
     configs::{
         cache_indexes::CacheIndexesConfig,
         server::ServerConfig,
-        sharding::ShardInfo,
         system::{INDEX_EXTENSION, LOG_EXTENSION, SystemConfig},
     },
     io::fs_utils::{self, DirEntry},
@@ -36,6 +35,7 @@ use crate::{
             frame::ShardFrame,
         },
     },
+    shard_allocator::ShardInfo,
     state::system::{StreamState, TopicState, UserState},
     streaming::{
         partitions::{
diff --git a/core/server/src/configs/mod.rs b/core/server/src/configs.rs
similarity index 76%
rename from core/server/src/configs/mod.rs
rename to core/server/src/configs.rs
index 8bee32c44..dac4d0953 100644
--- a/core/server/src/configs/mod.rs
+++ b/core/server/src/configs.rs
@@ -17,17 +17,7 @@
  * under the License.
  */
 
-pub mod cache_indexes;
-pub mod cluster;
-pub mod defaults;
-pub mod displays;
-pub mod http;
-pub mod quic;
-pub mod server;
-pub mod sharding;
-pub mod system;
-pub mod tcp;
-pub mod validators;
-pub mod websocket;
-
-pub const COMPONENT: &str = "CONFIG";
+pub use configs::{
+    COMPONENT, cache_indexes, cluster, defaults, displays, http, quic, server, 
sharding, system,
+    tcp, validators, websocket,
+};
diff --git a/core/server/src/configs/sharding.rs 
b/core/server/src/configs/sharding.rs
deleted file mode 100644
index 5224c0b47..000000000
--- a/core/server/src/configs/sharding.rs
+++ /dev/null
@@ -1,637 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-use hwlocality::Topology;
-use hwlocality::bitmap::SpecializedBitmapRef;
-use hwlocality::cpu::cpuset::CpuSet;
-use hwlocality::memory::binding::{MemoryBindingFlags, MemoryBindingPolicy};
-use hwlocality::object::types::ObjectType::{self, NUMANode};
-#[cfg(target_os = "linux")]
-use nix::{sched::sched_setaffinity, unistd::Pid};
-use serde::{Deserialize, Deserializer, Serialize, Serializer};
-use std::collections::HashSet;
-use std::str::FromStr;
-use std::sync::Arc;
-use std::thread::available_parallelism;
-use tracing::info;
-
-use crate::server_error::ServerError;
-use configs::ConfigEnv;
-
-#[derive(Debug, Deserialize, Serialize, Default, ConfigEnv)]
-pub struct ShardingConfig {
-    #[serde(default)]
-    #[config_env(leaf)]
-    pub cpu_allocation: CpuAllocation,
-}
-
-#[derive(Debug, Clone, PartialEq, Default)]
-pub enum CpuAllocation {
-    #[default]
-    All,
-    Count(usize),
-    Range(usize, usize),
-    NumaAware(NumaConfig),
-}
-
-/// NUMA specific configuration
-#[derive(Debug, Clone, PartialEq, Default)]
-pub struct NumaConfig {
-    /// Which NUMA nodes to use (empty = auto-detect all)
-    pub nodes: Vec<usize>,
-    /// Cores per node to use (0 = use all available)
-    pub cores_per_node: usize,
-    /// skip hyperthread sibling
-    pub avoid_hyperthread: bool,
-}
-
-impl NumaConfig {
-    pub fn validate(&self, topology: &NumaTopology) -> Result<(), ServerError> 
{
-        let available_nodes = topology.node_count;
-
-        if available_nodes == 0 {
-            return Err(ServerError::NoNumaNodes);
-        }
-
-        for &node in &self.nodes {
-            if node >= available_nodes {
-                return Err(ServerError::InvalidNode {
-                    requested: node,
-                    available: available_nodes,
-                });
-            }
-        }
-
-        // Validate core per node
-        if self.cores_per_node > 0 {
-            for &node in &self.nodes {
-                let available_cores = if self.avoid_hyperthread {
-                    topology.physical_cores_for_node(node)
-                } else {
-                    topology.logical_cores_for_node(node)
-                };
-
-                info!(
-                    "core_per_node: {}, available_cores: {}",
-                    self.cores_per_node, available_cores
-                );
-
-                if self.cores_per_node > available_cores {
-                    return Err(ServerError::InsufficientCores {
-                        requested: self.cores_per_node,
-                        available: available_cores,
-                        node,
-                    });
-                }
-            }
-        }
-
-        Ok(())
-    }
-}
-
-impl CpuAllocation {
-    fn parse_numa(s: &str) -> Result<CpuAllocation, String> {
-        let params = s
-            .strip_prefix("numa:")
-            .ok_or_else(|| "Numa config must start with 'numa:'".to_string())?;
-
-        if params == "auto" {
-            return Ok(CpuAllocation::NumaAware(NumaConfig {
-                nodes: vec![],
-                cores_per_node: 0,
-                avoid_hyperthread: true,
-            }));
-        }
-
-        let mut nodes = Vec::new();
-        let mut cores_per_node = 0;
-        let mut avoid_hyperthread = true;
-
-        for param in params.split(';') {
-            let kv: Vec<&str> = param.split('=').collect();
-            if kv.len() != 2 {
-                return Err(format!(
-                    "Invalid NUMA parameter: '{param}', only available: 'auto'"
-                ));
-            }
-
-            match kv[0] {
-                "nodes" => {
-                    nodes = kv[1]
-                        .split(',')
-                        .map(|n| {
-                            n.parse::<usize>()
-                                .map_err(|_| format!("Invalid node number: 
{n}"))
-                        })
-                        .collect::<Result<Vec<_>, _>>()?;
-                }
-                "cores" => {
-                    cores_per_node = kv[1]
-                        .parse::<usize>()
-                        .map_err(|_| format!("Invalid cores value: {}", 
kv[1]))?;
-                }
-                "no_ht" => {
-                    avoid_hyperthread = kv[1]
-                        .parse::<bool>()
-                        .map_err(|_| format!("Invalid no ht value: {}", 
kv[1]))?;
-                }
-                _ => {
-                    return Err(format!(
-                        "Unknown NUMA parameter: {}, example: 
numa:nodes=0;cores=4;no_ht=true",
-                        kv[0]
-                    ));
-                }
-            }
-        }
-
-        Ok(CpuAllocation::NumaAware(NumaConfig {
-            nodes,
-            cores_per_node,
-            avoid_hyperthread,
-        }))
-    }
-}
-
-#[derive(Debug)]
-pub struct NumaTopology {
-    topology: Topology,
-    node_count: usize,
-    physical_cores_per_node: Vec<usize>,
-    logical_cores_per_node: Vec<usize>,
-}
-
-impl NumaTopology {
-    pub fn detect() -> Result<NumaTopology, ServerError> {
-        let topology =
-            Topology::new().map_err(|e| ServerError::TopologyDetection { msg: 
e.to_string() })?;
-
-        let numa_nodes: Vec<_> = 
topology.objects_with_type(NUMANode).collect();
-
-        let node_count = numa_nodes.len();
-
-        if node_count == 0 {
-            return Err(ServerError::NoNumaNodes);
-        }
-
-        let mut physical_cores_per_node = Vec::new();
-        let mut logical_cores_per_node = Vec::new();
-
-        for node in numa_nodes {
-            let cpuset = node.cpuset().ok_or(ServerError::TopologyDetection {
-                msg: "NUMA node has no CPU set".to_string(),
-            })?;
-
-            let logical_cores = cpuset.weight().unwrap_or(0);
-
-            let physical_cores = topology
-                .objects_with_type(ObjectType::Core)
-                .filter(|core| {
-                    if let Some(core_cpuset) = core.cpuset() {
-                        !(cpuset & core_cpuset).is_empty()
-                    } else {
-                        false
-                    }
-                })
-                .count();
-
-            physical_cores_per_node.push(physical_cores);
-            logical_cores_per_node.push(logical_cores);
-        }
-
-        Ok(Self {
-            topology,
-            node_count,
-            physical_cores_per_node,
-            logical_cores_per_node,
-        })
-    }
-
-    pub fn physical_cores_for_node(&self, node: usize) -> usize {
-        self.physical_cores_per_node.get(node).copied().unwrap_or(0)
-    }
-
-    pub fn logical_cores_for_node(&self, node: usize) -> usize {
-        self.logical_cores_per_node.get(node).copied().unwrap_or(0)
-    }
-
-    fn filter_physical_cores(&self, node_cpuset: CpuSet) -> CpuSet {
-        let mut physical_cpuset = CpuSet::new();
-        for core in self.topology.objects_with_type(ObjectType::Core) {
-            if let Some(core_cpuset) = core.cpuset() {
-                let intersection = node_cpuset.clone() & core_cpuset;
-                if !intersection.is_empty() {
-                    // Take the minimum (first) CPU ID for consistency
-                    if let Some(first_cpu) = intersection.iter_set().min() {
-                        physical_cpuset.set(first_cpu)
-                    }
-                }
-            }
-        }
-        physical_cpuset
-    }
-
-    /// Get CPU set for a NUMA node
-    fn get_cpuset_for_node(
-        &self,
-        node_id: usize,
-        avoid_hyperthread: bool,
-    ) -> Result<CpuSet, ServerError> {
-        let node = self
-            .topology
-            .objects_with_type(ObjectType::NUMANode)
-            .nth(node_id)
-            .ok_or(ServerError::InvalidNode {
-                requested: node_id,
-                available: self.node_count,
-            })?;
-
-        let cpuset_ref = node.cpuset().ok_or(ServerError::TopologyDetection {
-            msg: format!("Node {} has no CPU set", node_id),
-        })?;
-
-        let cpuset = SpecializedBitmapRef::to_owned(&cpuset_ref);
-
-        if avoid_hyperthread {
-            Ok(self.filter_physical_cores(cpuset))
-        } else {
-            Ok(cpuset)
-        }
-    }
-}
-
-#[derive(Debug, Clone)]
-pub struct ShardInfo {
-    /// CPUs this shard should use
-    pub cpu_set: HashSet<usize>,
-    /// NUMA node
-    pub numa_node: Option<usize>,
-}
-
-impl ShardInfo {
-    pub fn bind_cpu(&self) -> Result<(), ServerError> {
-        #[cfg(target_os = "linux")]
-        {
-            if self.cpu_set.is_empty() {
-                return Ok(());
-            }
-
-            let mut cpuset = nix::sched::CpuSet::new();
-            for &cpu in &self.cpu_set {
-                cpuset.set(cpu).map_err(|_| ServerError::BindingFailed)?;
-            }
-
-            sched_setaffinity(Pid::from_raw(0), &cpuset).map_err(|e| {
-                tracing::error!("Failed to set CPU affinity: {:?}", e);
-                ServerError::BindingFailed
-            })?;
-
-            info!("Thread bound to CPUs: {:?}", self.cpu_set);
-        }
-
-        #[cfg(not(target_os = "linux"))]
-        {
-            tracing::debug!("CPU affinity binding skipped on non-Linux 
platform");
-        }
-
-        Ok(())
-    }
-
-    pub fn bind_memory(&self) -> Result<(), ServerError> {
-        if let Some(node_id) = self.numa_node {
-            let topology = Topology::new().map_err(|err| 
ServerError::TopologyDetection {
-                msg: err.to_string(),
-            })?;
-
-            let node = topology
-                .objects_with_type(ObjectType::NUMANode)
-                .nth(node_id)
-                .ok_or(ServerError::InvalidNode {
-                    requested: node_id,
-                    available: 
topology.objects_with_type(ObjectType::NUMANode).count(),
-                })?;
-
-            if let Some(nodeset) = node.nodeset() {
-                topology
-                    .bind_memory(
-                        nodeset,
-                        MemoryBindingPolicy::Bind,
-                        MemoryBindingFlags::THREAD | 
MemoryBindingFlags::STRICT,
-                    )
-                    .map_err(|err| {
-                        tracing::error!("Failed to bind memory {:?}", err);
-                        ServerError::BindingFailed
-                    })?;
-
-                info!("Memory bound to NUMA node {node_id}");
-            }
-        }
-
-        Ok(())
-    }
-}
-
-pub struct ShardAllocator {
-    allocation: CpuAllocation,
-    topology: Option<Arc<NumaTopology>>,
-}
-
-impl ShardAllocator {
-    pub fn new(allocation: &CpuAllocation) -> Result<ShardAllocator, 
ServerError> {
-        let topology = if matches!(allocation, CpuAllocation::NumaAware(_)) {
-            let numa_topology = NumaTopology::detect()?;
-
-            Some(Arc::new(numa_topology))
-        } else {
-            None
-        };
-
-        Ok(Self {
-            allocation: allocation.clone(),
-            topology,
-        })
-    }
-
-    pub fn to_shard_assignments(&self) -> Result<Vec<ShardInfo>, ServerError> {
-        match &self.allocation {
-            CpuAllocation::All => {
-                let available_cpus = available_parallelism()
-                    .map_err(|err| ServerError::Other {
-                        msg: format!("Failed to get available_parallelism: 
{:?}", err),
-                    })?
-                    .get();
-
-                let shard_assignments: Vec<_> = (0..available_cpus)
-                    .map(|cpu_id| ShardInfo {
-                        cpu_set: HashSet::from([cpu_id]),
-                        numa_node: None,
-                    })
-                    .collect();
-
-                info!(
-                    "Using all available CPU cores ({} shards with affinity)",
-                    shard_assignments.len()
-                );
-
-                Ok(shard_assignments)
-            }
-            CpuAllocation::Count(count) => {
-                let shard_assignments = (0..*count)
-                    .map(|cpu_id| ShardInfo {
-                        cpu_set: HashSet::from([cpu_id]),
-                        numa_node: None,
-                    })
-                    .collect();
-
-                info!("Using {count} shards with affinity to cores 
0..{count}");
-
-                Ok(shard_assignments)
-            }
-            CpuAllocation::Range(start, end) => {
-                let shard_assignments = (*start..*end)
-                    .map(|cpu_id| ShardInfo {
-                        cpu_set: HashSet::from([cpu_id]),
-                        numa_node: None,
-                    })
-                    .collect();
-
-                info!(
-                    "Using {} shards with affinity to cores {start}..{end}",
-                    end - start
-                );
-
-                Ok(shard_assignments)
-            }
-            CpuAllocation::NumaAware(numa_config) => {
-                let topology = 
self.topology.as_ref().ok_or(ServerError::NoTopology)?;
-                self.compute_numa_assignments(topology, numa_config)
-            }
-        }
-    }
-
-    fn compute_numa_assignments(
-        &self,
-        topology: &NumaTopology,
-        numa: &NumaConfig,
-    ) -> Result<Vec<ShardInfo>, ServerError> {
-        // Determine  which noes to use
-        let nodes = if numa.nodes.is_empty() {
-            // Auto: use all nodes
-            (0..topology.node_count).collect()
-        } else {
-            numa.nodes.clone()
-        };
-
-        // Determine cores per node
-        let cores_per_node = if numa.cores_per_node == 0 {
-            // Auto: use all available
-            if numa.avoid_hyperthread {
-                topology.physical_cores_for_node(nodes[0])
-            } else {
-                topology.logical_cores_for_node(nodes[0])
-            }
-        } else {
-            numa.cores_per_node
-        };
-
-        let mut shard_infos = Vec::new();
-
-        let node_cpus: Vec<Vec<usize>> = nodes
-            .iter()
-            .map(|&node_id| {
-                let cpuset = topology.get_cpuset_for_node(node_id, 
numa.avoid_hyperthread)?;
-                Ok(cpuset.iter_set().map(usize::from).collect())
-            })
-            .collect::<Result<_, ServerError>>()?;
-
-        // For each node, create one shard per core (thread-per-core)
-        for (idx, &node_id) in nodes.iter().enumerate() {
-            let available_cpus = &node_cpus[idx];
-
-            // Take first core_per_node CPUs from this node
-            let cores_to_use: Vec<usize> = available_cpus
-                .iter()
-                .take(cores_per_node)
-                .copied()
-                .collect();
-
-            if cores_to_use.len() < cores_per_node {
-                return Err(ServerError::InsufficientCores {
-                    requested: cores_per_node,
-                    available: available_cpus.len(),
-                    node: node_id,
-                });
-            }
-
-            // Create one shard per core
-            for cpu_id in cores_to_use {
-                shard_infos.push(ShardInfo {
-                    cpu_set: HashSet::from([cpu_id]),
-                    numa_node: Some(node_id),
-                });
-            }
-        }
-
-        info!(
-            "Using {} shards with {} NUMA node, {} cores per node, and avoid 
hyperthread {}",
-            shard_infos.len(),
-            nodes.len(),
-            cores_per_node,
-            numa.avoid_hyperthread
-        );
-
-        Ok(shard_infos)
-    }
-}
-
-impl FromStr for CpuAllocation {
-    type Err = String;
-
-    fn from_str(s: &str) -> Result<Self, Self::Err> {
-        match s {
-            "all" => Ok(CpuAllocation::All),
-            s if s.starts_with("numa:") => Self::parse_numa(s),
-            s if s.contains("..") => {
-                let parts: Vec<&str> = s.split("..").collect();
-                if parts.len() != 2 {
-                    return Err(format!("Invalid range format: {s}. Expected 
'start..end'"));
-                }
-                let start = parts[0]
-                    .parse::<usize>()
-                    .map_err(|_| format!("Invalid start value: {}", 
parts[0]))?;
-                let end = parts[1]
-                    .parse::<usize>()
-                    .map_err(|_| format!("Invalid end value: {}", parts[1]))?;
-                Ok(CpuAllocation::Range(start, end))
-            }
-            s => {
-                let count = s
-                    .parse::<usize>()
-                    .map_err(|_| format!("Invalid shard count: {s}"))?;
-                Ok(CpuAllocation::Count(count))
-            }
-        }
-    }
-}
-
-impl Serialize for CpuAllocation {
-    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
-    where
-        S: Serializer,
-    {
-        match self {
-            CpuAllocation::All => serializer.serialize_str("all"),
-            CpuAllocation::Count(n) => serializer.serialize_u64(*n as u64),
-            CpuAllocation::Range(start, end) => {
-                serializer.serialize_str(&format!("{start}..{end}"))
-            }
-            CpuAllocation::NumaAware(numa) => {
-                if numa.nodes.is_empty() && numa.cores_per_node == 0 {
-                    serializer.serialize_str("numa:auto")
-                } else {
-                    let nodes_str = numa
-                        .nodes
-                        .iter()
-                        .map(|n| n.to_string())
-                        .collect::<Vec<_>>()
-                        .join(",");
-
-                    let full_str = format!(
-                        "numa:nodes={};cores:{};no_ht={}",
-                        nodes_str, numa.cores_per_node, numa.avoid_hyperthread
-                    );
-
-                    serializer.serialize_str(&full_str)
-                }
-            }
-        }
-    }
-}
-
-impl<'de> Deserialize<'de> for CpuAllocation {
-    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
-    where
-        D: Deserializer<'de>,
-    {
-        #[derive(Deserialize)]
-        #[serde(untagged)]
-        enum CpuAllocationHelper {
-            String(String),
-            Number(usize),
-        }
-
-        match CpuAllocationHelper::deserialize(deserializer)? {
-            CpuAllocationHelper::String(s) => {
-                CpuAllocation::from_str(&s).map_err(serde::de::Error::custom)
-            }
-            CpuAllocationHelper::Number(n) => Ok(CpuAllocation::Count(n)),
-        }
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-
-    #[test]
-    fn test_parse_all() {
-        assert_eq!(CpuAllocation::from_str("all").unwrap(), 
CpuAllocation::All);
-    }
-
-    #[test]
-    fn test_parse_count() {
-        assert_eq!(
-            CpuAllocation::from_str("4").unwrap(),
-            CpuAllocation::Count(4)
-        );
-    }
-
-    #[test]
-    fn test_parse_range() {
-        assert_eq!(
-            CpuAllocation::from_str("2..8").unwrap(),
-            CpuAllocation::Range(2, 8)
-        );
-    }
-
-    #[test]
-    fn test_parse_numa_auto() {
-        let result = CpuAllocation::from_str("numa:auto").unwrap();
-        match result {
-            CpuAllocation::NumaAware(numa) => {
-                assert!(numa.nodes.is_empty());
-                assert_eq!(numa.cores_per_node, 0);
-                assert!(numa.avoid_hyperthread);
-            }
-            _ => panic!("Expected NumaAware"),
-        }
-    }
-
-    #[test]
-    fn test_parse_numa_explicit() {
-        let result = 
CpuAllocation::from_str("numa:nodes=0,1;cores=4;no_ht=true").unwrap();
-        match result {
-            CpuAllocation::NumaAware(numa) => {
-                assert_eq!(numa.nodes, vec![0, 1]);
-                assert_eq!(numa.cores_per_node, 4);
-                assert!(numa.avoid_hyperthread);
-            }
-            _ => panic!("Expected NumaAware"),
-        }
-    }
-}
diff --git a/core/server/src/lib.rs b/core/server/src/lib.rs
index 1fc0d0d3f..b4bac970d 100644
--- a/core/server/src/lib.rs
+++ b/core/server/src/lib.rs
@@ -42,6 +42,7 @@ pub mod metadata;
 pub mod quic;
 pub mod server_error;
 pub mod shard;
+pub mod shard_allocator;
 pub mod state;
 pub mod streaming;
 pub mod tcp;
diff --git a/core/server/src/main.rs b/core/server/src/main.rs
index d3ce11f49..6a7fc1e06 100644
--- a/core/server/src/main.rs
+++ b/core/server/src/main.rs
@@ -32,7 +32,6 @@ use server::bootstrap::{
     create_directories, create_shard_connections, create_shard_executor, 
load_config,
     load_metadata, resolve_persister, update_system_info,
 };
-use server::configs::sharding::ShardAllocator;
 use server::diagnostics::{print_io_uring_permission_info, 
print_locked_memory_limit_info};
 use server::io::fs_utils;
 use server::log::logger::Logging;
@@ -40,6 +39,7 @@ use server::metadata::{Metadata, create_metadata_handles};
 use server::server_error::ServerError;
 use server::shard::system::info::SystemInfo;
 use server::shard::{IggyShard, calculate_shard_assignment};
+use server::shard_allocator::ShardAllocator;
 use server::state::file::FileState;
 use server::state::system::SystemState;
 use server::streaming::clients::client_manager::{Client, ClientManager};
diff --git a/core/server/src/server_error.rs b/core/server/src/server_error.rs
index 74dd71118..6f0124eb2 100644
--- a/core/server/src/server_error.rs
+++ b/core/server/src/server_error.rs
@@ -37,34 +37,8 @@ error_set!(
     }
 
     NumaError := {
-        #[display("Failed to detect topology: {}", msg)]
-        TopologyDetection {
-            msg: String
-        },
-
-        #[display("There is no NUMA node on this server")]
-        NoNumaNodes,
-
-        #[display("No Topology")]
-        NoTopology,
-
-        #[display("Binding Failed")]
-        BindingFailed,
-
-        #[display("Insufficient cores on node {}: requested {}, only {} 
available", node, requested, available)]
-        InsufficientCores {
-            requested: usize,
-            available: usize,
-            node: usize,
-        },
-
-        #[display("Invalid NUMA node: requested {}, only available {} node", 
requested, available)]
-        InvalidNode { requested: usize, available: usize },
-
-        #[display("Other error: {}", msg)]
-        Other {
-            msg: String
-        },
+        #[display("{0}")]
+        Sharding(crate::shard_allocator::ShardingError),
     }
 
     ConfigurationError := {
diff --git a/core/server/src/shard_allocator.rs 
b/core/server/src/shard_allocator.rs
new file mode 100644
index 000000000..334c09f28
--- /dev/null
+++ b/core/server/src/shard_allocator.rs
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use configs::sharding::{CpuAllocation, NumaConfig};
+use hwlocality::Topology;
+use hwlocality::bitmap::SpecializedBitmapRef;
+use hwlocality::cpu::cpuset::CpuSet;
+use hwlocality::memory::binding::{MemoryBindingFlags, MemoryBindingPolicy};
+use hwlocality::object::types::ObjectType::{self, NUMANode};
+#[cfg(target_os = "linux")]
+use nix::{sched::sched_setaffinity, unistd::Pid};
+use std::collections::HashSet;
+use std::sync::Arc;
+use std::thread::available_parallelism;
+use tracing::info;
+
+#[derive(Debug, thiserror::Error)]
+pub enum ShardingError {
+    #[error("Failed to detect topology: {msg}")]
+    TopologyDetection { msg: String },
+
+    #[error("There is no NUMA node on this server")]
+    NoNumaNodes,
+
+    #[error("No Topology")]
+    NoTopology,
+
+    #[error("Binding Failed")]
+    BindingFailed,
+
+    #[error("Insufficient cores on node {node}: requested {requested}, only 
{available} available")]
+    InsufficientCores {
+        requested: usize,
+        available: usize,
+        node: usize,
+    },
+
+    #[error("Invalid NUMA node: requested {requested}, only available 
{available} node")]
+    InvalidNode { requested: usize, available: usize },
+
+    #[error("Other error: {msg}")]
+    Other { msg: String },
+}
+
+#[derive(Debug)]
+pub struct NumaTopology {
+    topology: Topology,
+    node_count: usize,
+    physical_cores_per_node: Vec<usize>,
+    logical_cores_per_node: Vec<usize>,
+}
+
+impl NumaTopology {
+    pub fn detect() -> Result<NumaTopology, ShardingError> {
+        let topology =
+            Topology::new().map_err(|e| ShardingError::TopologyDetection { 
msg: e.to_string() })?;
+
+        let numa_nodes: Vec<_> = 
topology.objects_with_type(NUMANode).collect();
+
+        let node_count = numa_nodes.len();
+
+        if node_count == 0 {
+            return Err(ShardingError::NoNumaNodes);
+        }
+
+        let mut physical_cores_per_node = Vec::new();
+        let mut logical_cores_per_node = Vec::new();
+
+        for node in numa_nodes {
+            let cpuset = node.cpuset().ok_or(ShardingError::TopologyDetection {
+                msg: "NUMA node has no CPU set".to_string(),
+            })?;
+
+            let logical_cores = cpuset.weight().unwrap_or(0);
+
+            let physical_cores = topology
+                .objects_with_type(ObjectType::Core)
+                .filter(|core| {
+                    if let Some(core_cpuset) = core.cpuset() {
+                        !(cpuset & core_cpuset).is_empty()
+                    } else {
+                        false
+                    }
+                })
+                .count();
+
+            physical_cores_per_node.push(physical_cores);
+            logical_cores_per_node.push(logical_cores);
+        }
+
+        Ok(Self {
+            topology,
+            node_count,
+            physical_cores_per_node,
+            logical_cores_per_node,
+        })
+    }
+
+    pub fn physical_cores_for_node(&self, node: usize) -> usize {
+        self.physical_cores_per_node.get(node).copied().unwrap_or(0)
+    }
+
+    pub fn logical_cores_for_node(&self, node: usize) -> usize {
+        self.logical_cores_per_node.get(node).copied().unwrap_or(0)
+    }
+
+    fn filter_physical_cores(&self, node_cpuset: CpuSet) -> CpuSet {
+        let mut physical_cpuset = CpuSet::new();
+        for core in self.topology.objects_with_type(ObjectType::Core) {
+            if let Some(core_cpuset) = core.cpuset() {
+                let intersection = node_cpuset.clone() & core_cpuset;
+                if !intersection.is_empty()
+                    && let Some(first_cpu) = intersection.iter_set().min()
+                {
+                    physical_cpuset.set(first_cpu)
+                }
+            }
+        }
+        physical_cpuset
+    }
+
+    fn get_cpuset_for_node(
+        &self,
+        node_id: usize,
+        avoid_hyperthread: bool,
+    ) -> Result<CpuSet, ShardingError> {
+        let node = self
+            .topology
+            .objects_with_type(ObjectType::NUMANode)
+            .nth(node_id)
+            .ok_or(ShardingError::InvalidNode {
+                requested: node_id,
+                available: self.node_count,
+            })?;
+
+        let cpuset_ref = node.cpuset().ok_or(ShardingError::TopologyDetection {
+            msg: format!("Node {} has no CPU set", node_id),
+        })?;
+
+        let cpuset = SpecializedBitmapRef::to_owned(&cpuset_ref);
+
+        if avoid_hyperthread {
+            Ok(self.filter_physical_cores(cpuset))
+        } else {
+            Ok(cpuset)
+        }
+    }
+}
+
+#[derive(Debug, Clone)]
+pub struct ShardInfo {
+    pub cpu_set: HashSet<usize>,
+    pub numa_node: Option<usize>,
+}
+
+impl ShardInfo {
+    pub fn bind_cpu(&self) -> Result<(), ShardingError> {
+        #[cfg(target_os = "linux")]
+        {
+            if self.cpu_set.is_empty() {
+                return Ok(());
+            }
+
+            let mut cpuset = nix::sched::CpuSet::new();
+            for &cpu in &self.cpu_set {
+                cpuset.set(cpu).map_err(|_| ShardingError::BindingFailed)?;
+            }
+
+            sched_setaffinity(Pid::from_raw(0), &cpuset).map_err(|e| {
+                tracing::error!("Failed to set CPU affinity: {:?}", e);
+                ShardingError::BindingFailed
+            })?;
+
+            info!("Thread bound to CPUs: {:?}", self.cpu_set);
+        }
+
+        #[cfg(not(target_os = "linux"))]
+        {
+            tracing::debug!("CPU affinity binding skipped on non-Linux 
platform");
+        }
+
+        Ok(())
+    }
+
+    pub fn bind_memory(&self) -> Result<(), ShardingError> {
+        if let Some(node_id) = self.numa_node {
+            let topology = Topology::new().map_err(|err| 
ShardingError::TopologyDetection {
+                msg: err.to_string(),
+            })?;
+
+            let node = topology
+                .objects_with_type(ObjectType::NUMANode)
+                .nth(node_id)
+                .ok_or(ShardingError::InvalidNode {
+                    requested: node_id,
+                    available: 
topology.objects_with_type(ObjectType::NUMANode).count(),
+                })?;
+
+            if let Some(nodeset) = node.nodeset() {
+                topology
+                    .bind_memory(
+                        nodeset,
+                        MemoryBindingPolicy::Bind,
+                        MemoryBindingFlags::THREAD | 
MemoryBindingFlags::STRICT,
+                    )
+                    .map_err(|err| {
+                        tracing::error!("Failed to bind memory {:?}", err);
+                        ShardingError::BindingFailed
+                    })?;
+
+                info!("Memory bound to NUMA node {node_id}");
+            }
+        }
+
+        Ok(())
+    }
+}
+
+pub struct ShardAllocator {
+    allocation: CpuAllocation,
+    topology: Option<Arc<NumaTopology>>,
+}
+
+impl ShardAllocator {
+    pub fn new(allocation: &CpuAllocation) -> Result<ShardAllocator, 
ShardingError> {
+        let topology = if matches!(allocation, CpuAllocation::NumaAware(_)) {
+            let numa_topology = NumaTopology::detect()?;
+
+            Some(Arc::new(numa_topology))
+        } else {
+            None
+        };
+
+        Ok(Self {
+            allocation: allocation.clone(),
+            topology,
+        })
+    }
+
+    pub fn to_shard_assignments(&self) -> Result<Vec<ShardInfo>, 
ShardingError> {
+        match &self.allocation {
+            CpuAllocation::All => {
+                let available_cpus = available_parallelism()
+                    .map_err(|err| ShardingError::Other {
+                        msg: format!("Failed to get available_parallelism: 
{:?}", err),
+                    })?
+                    .get();
+
+                let shard_assignments: Vec<_> = (0..available_cpus)
+                    .map(|cpu_id| ShardInfo {
+                        cpu_set: HashSet::from([cpu_id]),
+                        numa_node: None,
+                    })
+                    .collect();
+
+                info!(
+                    "Using all available CPU cores ({} shards with affinity)",
+                    shard_assignments.len()
+                );
+
+                Ok(shard_assignments)
+            }
+            CpuAllocation::Count(count) => {
+                let shard_assignments = (0..*count)
+                    .map(|cpu_id| ShardInfo {
+                        cpu_set: HashSet::from([cpu_id]),
+                        numa_node: None,
+                    })
+                    .collect();
+
+                info!("Using {count} shards with affinity to cores 
0..{count}");
+
+                Ok(shard_assignments)
+            }
+            CpuAllocation::Range(start, end) => {
+                let shard_assignments = (*start..*end)
+                    .map(|cpu_id| ShardInfo {
+                        cpu_set: HashSet::from([cpu_id]),
+                        numa_node: None,
+                    })
+                    .collect();
+
+                info!(
+                    "Using {} shards with affinity to cores {start}..{end}",
+                    end - start
+                );
+
+                Ok(shard_assignments)
+            }
+            CpuAllocation::NumaAware(numa_config) => {
+                let topology = 
self.topology.as_ref().ok_or(ShardingError::NoTopology)?;
+                self.compute_numa_assignments(topology, numa_config)
+            }
+        }
+    }
+
+    fn compute_numa_assignments(
+        &self,
+        topology: &NumaTopology,
+        numa: &NumaConfig,
+    ) -> Result<Vec<ShardInfo>, ShardingError> {
+        let nodes = if numa.nodes.is_empty() {
+            (0..topology.node_count).collect()
+        } else {
+            numa.nodes.clone()
+        };
+
+        let cores_per_node = if numa.cores_per_node == 0 {
+            if numa.avoid_hyperthread {
+                topology.physical_cores_for_node(nodes[0])
+            } else {
+                topology.logical_cores_for_node(nodes[0])
+            }
+        } else {
+            numa.cores_per_node
+        };
+
+        let mut shard_infos = Vec::new();
+
+        let node_cpus: Vec<Vec<usize>> = nodes
+            .iter()
+            .map(|&node_id| {
+                let cpuset = topology.get_cpuset_for_node(node_id, 
numa.avoid_hyperthread)?;
+                Ok(cpuset.iter_set().map(usize::from).collect())
+            })
+            .collect::<Result<_, ShardingError>>()?;
+
+        for (idx, &node_id) in nodes.iter().enumerate() {
+            let available_cpus = &node_cpus[idx];
+
+            let cores_to_use: Vec<usize> = available_cpus
+                .iter()
+                .take(cores_per_node)
+                .copied()
+                .collect();
+
+            if cores_to_use.len() < cores_per_node {
+                return Err(ShardingError::InsufficientCores {
+                    requested: cores_per_node,
+                    available: available_cpus.len(),
+                    node: node_id,
+                });
+            }
+
+            for cpu_id in cores_to_use {
+                shard_infos.push(ShardInfo {
+                    cpu_set: HashSet::from([cpu_id]),
+                    numa_node: Some(node_id),
+                });
+            }
+        }
+
+        info!(
+            "Using {} shards with {} NUMA node, {} cores per node, and avoid 
hyperthread {}",
+            shard_infos.len(),
+            nodes.len(),
+            cores_per_node,
+            numa.avoid_hyperthread
+        );
+
+        Ok(shard_infos)
+    }
+}
diff --git a/core/server/src/streaming/segments/mod.rs 
b/core/server/src/streaming/segments/mod.rs
index 878af0eb1..4b3b883d9 100644
--- a/core/server/src/streaming/segments/mod.rs
+++ b/core/server/src/streaming/segments/mod.rs
@@ -32,4 +32,4 @@ pub use types::IggyMessageViewMut;
 pub use types::IggyMessagesBatchMut;
 pub use types::IggyMessagesBatchSet;
 
-pub const SEGMENT_MAX_SIZE_BYTES: u64 = 1024 * 1024 * 1024;
+pub use crate::configs::validators::SEGMENT_MAX_SIZE_BYTES;

Reply via email to