This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new bcfa50cf8 refactor(configs): move ServerConfig types from server to
configs crate (#2796)
bcfa50cf8 is described below
commit bcfa50cf8ff975411fb7d9bbba4ddf73f346c75d
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Thu Feb 26 18:56:43 2026 +0100
refactor(configs): move ServerConfig types from server to configs crate
(#2796)
---
Cargo.lock | 17 +-
DEPENDENCIES.md | 2 +-
core/common/src/alloc/memory_pool.rs | 3 +
core/configs/Cargo.toml | 8 +
core/configs/src/lib.rs | 6 +-
.../src/server_config}/cache_indexes.rs | 0
.../src/server_config}/cluster.rs | 0
.../src/server_config}/defaults.rs | 26 +-
.../src/server_config}/displays.rs | 8 +-
.../configs => configs/src/server_config}/http.rs | 0
.../configs => configs/src/server_config}/mod.rs | 0
.../configs => configs/src/server_config}/quic.rs | 0
.../src/server_config}/server.rs | 19 +-
core/configs/src/server_config/sharding.rs | 261 +++++++++
.../src/server_config}/system.rs | 2 +-
.../configs => configs/src/server_config}/tcp.rs | 0
.../src/server_config}/validators.rs | 41 +-
.../src/server_config}/websocket.rs | 0
core/server/Cargo.toml | 5 -
core/server/src/bootstrap.rs | 2 +-
core/server/src/{configs/mod.rs => configs.rs} | 18 +-
core/server/src/configs/sharding.rs | 637 ---------------------
core/server/src/lib.rs | 1 +
core/server/src/main.rs | 2 +-
core/server/src/server_error.rs | 30 +-
core/server/src/shard_allocator.rs | 379 ++++++++++++
core/server/src/streaming/segments/mod.rs | 2 +-
27 files changed, 717 insertions(+), 752 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 76d8605cf..124dfe960 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1936,10 +1936,18 @@ name = "configs"
version = "0.1.0"
dependencies = [
"configs_derive",
+ "derive_more",
+ "err_trail",
"figment",
+ "iggy_common",
+ "jsonwebtoken",
"serde",
"serde_json",
+ "serde_with",
+ "static-toml",
+ "strum",
"tracing",
+ "tungstenite",
]
[[package]]
@@ -3161,7 +3169,6 @@ dependencies = [
"atomic",
"pear",
"serde",
- "serde_json",
"toml 0.8.23",
"uncased",
"version_check",
@@ -5273,9 +5280,9 @@ dependencies = [
[[package]]
name = "keccak"
-version = "0.1.5"
+version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ecc2af9a1119c51f12a14607e783cb977bde58bc069ff0c3da1095e635d70654"
+checksum = "cb26cec98cce3a3d96cbb7bced3c4b16e3d13f27ec56dbd62cbc8f39cfb9d653"
dependencies = [
"cpufeatures 0.2.17",
]
@@ -8338,13 +8345,11 @@ dependencies = [
"cyper",
"cyper-axum",
"dashmap",
- "derive_more",
"dotenvy",
"enum_dispatch",
"err_trail",
"error_set",
"figlet-rs",
- "figment",
"flume 0.12.0",
"fs2",
"futures",
@@ -8375,10 +8380,8 @@ dependencies = [
"rustls-pemfile",
"send_wrapper",
"serde",
- "serde_with",
"slab",
"socket2 0.6.2",
- "static-toml",
"strum",
"sysinfo 0.38.1",
"tempfile",
diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md
index ffe0d501e..52bb36d63 100644
--- a/DEPENDENCIES.md
+++ b/DEPENDENCIES.md
@@ -439,7 +439,7 @@ journal: 0.1.0, "Apache-2.0",
js-sys: 0.3.85, "Apache-2.0 OR MIT",
jsonwebtoken: 10.3.0, "MIT",
jwalk: 0.8.1, "MIT",
-keccak: 0.1.5, "Apache-2.0 OR MIT",
+keccak: 0.1.6, "Apache-2.0 OR MIT",
keyring: 3.6.3, "Apache-2.0 OR MIT",
kqueue: 1.1.1, "MIT",
kqueue-sys: 1.0.4, "MIT",
diff --git a/core/common/src/alloc/memory_pool.rs
b/core/common/src/alloc/memory_pool.rs
index 8ebc892c9..9216ef57f 100644
--- a/core/common/src/alloc/memory_pool.rs
+++ b/core/common/src/alloc/memory_pool.rs
@@ -73,6 +73,9 @@ pub fn memory_pool() -> &'static MemoryPool {
.expect("Memory pool not initialized - MemoryPool::init_pool should be
called first")
}
+// TODO: Extract shared domain types (IggyByteSize, IggyDuration, etc.) into
an `iggy_types`
+// leaf crate so `iggy_common` can depend on `configs` directly. That lets us
delete this
+// duplicate and use `configs::server::MemoryPoolConfig` here instead.
/// Configuration for the memory pool.
#[derive(Debug)]
pub struct MemoryPoolConfigOther {
diff --git a/core/configs/Cargo.toml b/core/configs/Cargo.toml
index d03f44c04..ed7d6eba3 100644
--- a/core/configs/Cargo.toml
+++ b/core/configs/Cargo.toml
@@ -24,7 +24,15 @@ publish = false
[dependencies]
configs_derive = { workspace = true }
+derive_more = { workspace = true }
+err_trail = { workspace = true }
figment = { workspace = true }
+iggy_common = { workspace = true }
+jsonwebtoken = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
+serde_with = { workspace = true }
+static-toml = { workspace = true }
+strum = { workspace = true }
tracing = { workspace = true }
+tungstenite = { workspace = true }
diff --git a/core/configs/src/lib.rs b/core/configs/src/lib.rs
index d1410ab8a..419c77b8a 100644
--- a/core/configs/src/lib.rs
+++ b/core/configs/src/lib.rs
@@ -20,9 +20,13 @@
extern crate self as configs;
mod configs_impl;
-
+mod server_config;
pub use configs_derive::ConfigEnv;
pub use configs_impl::{
ConfigEnvMappings, ConfigProvider, ConfigurationError, ConfigurationType,
EnvVarMapping,
FileConfigProvider, TypedEnvProvider, parse_env_value_to_json,
};
+pub use server_config::{
+ COMPONENT, cache_indexes, cluster, defaults, displays, http, quic, server,
sharding, system,
+ tcp, validators, websocket,
+};
diff --git a/core/server/src/configs/cache_indexes.rs
b/core/configs/src/server_config/cache_indexes.rs
similarity index 100%
rename from core/server/src/configs/cache_indexes.rs
rename to core/configs/src/server_config/cache_indexes.rs
diff --git a/core/server/src/configs/cluster.rs
b/core/configs/src/server_config/cluster.rs
similarity index 100%
rename from core/server/src/configs/cluster.rs
rename to core/configs/src/server_config/cluster.rs
diff --git a/core/server/src/configs/defaults.rs
b/core/configs/src/server_config/defaults.rs
similarity index 96%
rename from core/server/src/configs/defaults.rs
rename to core/configs/src/server_config/defaults.rs
index d2f1e61af..27eae580d 100644
--- a/core/server/src/configs/defaults.rs
+++ b/core/configs/src/server_config/defaults.rs
@@ -16,35 +16,33 @@
* under the License.
*/
-use super::sharding::ShardingConfig;
-use super::tcp::TcpSocketConfig;
-use crate::configs::cluster::CurrentNodeConfig;
-use crate::configs::cluster::{ClusterConfig, NodeConfig, OtherNodeConfig,
TransportPorts};
-use crate::configs::http::{
- HttpConfig, HttpCorsConfig, HttpJwtConfig, HttpMetricsConfig,
HttpTlsConfig,
-};
-use crate::configs::quic::{QuicCertificateConfig, QuicConfig,
QuicSocketConfig};
-use crate::configs::server::{
+use super::cluster::CurrentNodeConfig;
+use super::cluster::{ClusterConfig, NodeConfig, OtherNodeConfig,
TransportPorts};
+use super::http::{HttpConfig, HttpCorsConfig, HttpJwtConfig,
HttpMetricsConfig, HttpTlsConfig};
+use super::quic::{QuicCertificateConfig, QuicConfig, QuicSocketConfig};
+use super::server::{
ConsumerGroupConfig, DataMaintenanceConfig, HeartbeatConfig,
MemoryPoolConfig,
MessageSaverConfig, MessagesMaintenanceConfig,
PersonalAccessTokenCleanerConfig,
PersonalAccessTokenConfig, ServerConfig, TelemetryConfig,
TelemetryLogsConfig,
TelemetryTracesConfig,
};
-use crate::configs::system::{
+use super::sharding::ShardingConfig;
+use super::system::{
BackupConfig, CompatibilityConfig, CompressionConfig, EncryptionConfig,
LoggingConfig,
MessageDeduplicationConfig, PartitionConfig, RecoveryConfig,
RuntimeConfig, SegmentConfig,
StateConfig, StreamConfig, SystemConfig, TopicConfig,
};
-use crate::configs::tcp::{TcpConfig, TcpTlsConfig};
-use crate::configs::websocket::{WebSocketConfig, WebSocketTlsConfig};
+use super::tcp::TcpSocketConfig;
+use super::tcp::{TcpConfig, TcpTlsConfig};
+use super::websocket::{WebSocketConfig, WebSocketTlsConfig};
use iggy_common::IggyByteSize;
use iggy_common::IggyDuration;
use std::sync::Arc;
use std::time::Duration;
static_toml::static_toml! {
- // static_toml crate always starts from CARGO_MANIFEST_DIR (core/server)
- pub static SERVER_CONFIG = include_toml!("config.toml");
+ // static_toml resolves relative to CARGO_MANIFEST_DIR (core/configs/).
+ pub static SERVER_CONFIG = include_toml!("../server/config.toml");
}
impl Default for ServerConfig {
diff --git a/core/server/src/configs/displays.rs
b/core/configs/src/server_config/displays.rs
similarity index 98%
rename from core/server/src/configs/displays.rs
rename to core/configs/src/server_config/displays.rs
index df7741564..959d35476 100644
--- a/core/server/src/configs/displays.rs
+++ b/core/configs/src/server_config/displays.rs
@@ -17,13 +17,13 @@
* under the License.
*/
-use crate::configs::quic::{QuicCertificateConfig, QuicConfig};
-use crate::configs::server::{
+use super::quic::{QuicCertificateConfig, QuicConfig};
+use super::server::{
ConsumerGroupConfig, DataMaintenanceConfig, HeartbeatConfig,
MessagesMaintenanceConfig,
TelemetryConfig, TelemetryLogsConfig, TelemetryTracesConfig,
};
-use crate::configs::system::MessageDeduplicationConfig;
-use crate::configs::{
+use super::system::MessageDeduplicationConfig;
+use super::{
http::{HttpConfig, HttpCorsConfig, HttpJwtConfig, HttpMetricsConfig,
HttpTlsConfig},
server::{MessageSaverConfig, ServerConfig},
system::{
diff --git a/core/server/src/configs/http.rs
b/core/configs/src/server_config/http.rs
similarity index 100%
rename from core/server/src/configs/http.rs
rename to core/configs/src/server_config/http.rs
diff --git a/core/server/src/configs/mod.rs
b/core/configs/src/server_config/mod.rs
similarity index 100%
copy from core/server/src/configs/mod.rs
copy to core/configs/src/server_config/mod.rs
diff --git a/core/server/src/configs/quic.rs
b/core/configs/src/server_config/quic.rs
similarity index 100%
rename from core/server/src/configs/quic.rs
rename to core/configs/src/server_config/quic.rs
diff --git a/core/server/src/configs/server.rs
b/core/configs/src/server_config/server.rs
similarity index 93%
rename from core/server/src/configs/server.rs
rename to core/configs/src/server_config/server.rs
index 850384f9e..8c005f599 100644
--- a/core/server/src/configs/server.rs
+++ b/core/configs/src/server_config/server.rs
@@ -16,14 +16,14 @@
* specific language governing permissions and limitations
* under the License.
*/
-use crate::configs::COMPONENT;
-use crate::configs::cluster::ClusterConfig;
-use crate::configs::http::HttpConfig;
-use crate::configs::quic::QuicConfig;
-use crate::configs::system::SystemConfig;
-use crate::configs::tcp::TcpConfig;
-use crate::configs::websocket::WebSocketConfig;
-use crate::server_error::ConfigurationError;
+use super::COMPONENT;
+use super::cluster::ClusterConfig;
+use super::http::HttpConfig;
+use super::quic::QuicConfig;
+use super::system::SystemConfig;
+use super::tcp::TcpConfig;
+use super::websocket::WebSocketConfig;
+use crate::ConfigurationError;
use configs::{ConfigEnv, ConfigEnvMappings, ConfigProvider,
FileConfigProvider, TypedEnvProvider};
use derive_more::Display;
use err_trail::ErrContext;
@@ -66,7 +66,6 @@ pub struct MemoryPoolConfig {
pub bucket_capacity: u32,
}
-// Hack around the fact that we define our config inside of the `server`
crate, but `memory_pool` is in `common`.
impl MemoryPoolConfig {
pub fn into_other(&self) -> MemoryPoolConfigOther {
MemoryPoolConfigOther {
@@ -204,7 +203,7 @@ impl ServerConfig {
/// Create a config provider using compile-time generated env var mappings.
pub fn config_provider(config_path: &str) ->
FileConfigProvider<ServerConfigEnvProvider> {
- let default_config = Toml::string(include_str!("../../config.toml"));
+ let default_config =
Toml::string(include_str!("../../../server/config.toml"));
FileConfigProvider::new(
config_path.to_string(),
ServerConfigEnvProvider::default(),
diff --git a/core/configs/src/server_config/sharding.rs
b/core/configs/src/server_config/sharding.rs
new file mode 100644
index 000000000..084c984e1
--- /dev/null
+++ b/core/configs/src/server_config/sharding.rs
@@ -0,0 +1,261 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use serde::{Deserialize, Deserializer, Serialize, Serializer};
+use std::str::FromStr;
+
+use configs::ConfigEnv;
+
+#[derive(Debug, Deserialize, Serialize, Default, ConfigEnv)]
+pub struct ShardingConfig {
+ #[serde(default)]
+ #[config_env(leaf)]
+ pub cpu_allocation: CpuAllocation,
+}
+
+#[derive(Debug, Clone, PartialEq, Default)]
+pub enum CpuAllocation {
+ #[default]
+ All,
+ Count(usize),
+ Range(usize, usize),
+ NumaAware(NumaConfig),
+}
+
+/// NUMA specific configuration
+#[derive(Debug, Clone, PartialEq, Default)]
+pub struct NumaConfig {
+ /// Which NUMA nodes to use (empty = auto-detect all)
+ pub nodes: Vec<usize>,
+ /// Cores per node to use (0 = use all available)
+ pub cores_per_node: usize,
+ /// skip hyperthread sibling
+ pub avoid_hyperthread: bool,
+}
+
+impl CpuAllocation {
+ fn parse_numa(s: &str) -> Result<CpuAllocation, String> {
+ let params = s
+ .strip_prefix("numa:")
+ .ok_or_else(|| "Numa config must start with 'numa:'".to_string())?;
+
+ if params == "auto" {
+ return Ok(CpuAllocation::NumaAware(NumaConfig {
+ nodes: vec![],
+ cores_per_node: 0,
+ avoid_hyperthread: true,
+ }));
+ }
+
+ let mut nodes = Vec::new();
+ let mut cores_per_node = 0;
+ let mut avoid_hyperthread = true;
+
+ for param in params.split(';') {
+ let kv: Vec<&str> = param.split('=').collect();
+ if kv.len() != 2 {
+ return Err(format!(
+ "Invalid NUMA parameter: '{param}', only available: 'auto'"
+ ));
+ }
+
+ match kv[0] {
+ "nodes" => {
+ nodes = kv[1]
+ .split(',')
+ .map(|n| {
+ n.parse::<usize>()
+ .map_err(|_| format!("Invalid node number:
{n}"))
+ })
+ .collect::<Result<Vec<_>, _>>()?;
+ }
+ "cores" => {
+ cores_per_node = kv[1]
+ .parse::<usize>()
+ .map_err(|_| format!("Invalid cores value: {}",
kv[1]))?;
+ }
+ "no_ht" => {
+ avoid_hyperthread = kv[1]
+ .parse::<bool>()
+ .map_err(|_| format!("Invalid no ht value: {}",
kv[1]))?;
+ }
+ _ => {
+ return Err(format!(
+ "Unknown NUMA parameter: {}, example:
numa:nodes=0;cores=4;no_ht=true",
+ kv[0]
+ ));
+ }
+ }
+ }
+
+ Ok(CpuAllocation::NumaAware(NumaConfig {
+ nodes,
+ cores_per_node,
+ avoid_hyperthread,
+ }))
+ }
+}
+
+impl FromStr for CpuAllocation {
+ type Err = String;
+
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ match s {
+ "all" => Ok(CpuAllocation::All),
+ s if s.starts_with("numa:") => Self::parse_numa(s),
+ s if s.contains("..") => {
+ let parts: Vec<&str> = s.split("..").collect();
+ if parts.len() != 2 {
+ return Err(format!("Invalid range format: {s}. Expected
'start..end'"));
+ }
+ let start = parts[0]
+ .parse::<usize>()
+ .map_err(|_| format!("Invalid start value: {}",
parts[0]))?;
+ let end = parts[1]
+ .parse::<usize>()
+ .map_err(|_| format!("Invalid end value: {}", parts[1]))?;
+ Ok(CpuAllocation::Range(start, end))
+ }
+ s => {
+ let count = s
+ .parse::<usize>()
+ .map_err(|_| format!("Invalid shard count: {s}"))?;
+ Ok(CpuAllocation::Count(count))
+ }
+ }
+ }
+}
+
+impl Serialize for CpuAllocation {
+ fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
+ where
+ S: Serializer,
+ {
+ match self {
+ CpuAllocation::All => serializer.serialize_str("all"),
+ CpuAllocation::Count(n) => serializer.serialize_u64(*n as u64),
+ CpuAllocation::Range(start, end) => {
+ serializer.serialize_str(&format!("{start}..{end}"))
+ }
+ CpuAllocation::NumaAware(numa) => {
+ if numa.nodes.is_empty() && numa.cores_per_node == 0 {
+ serializer.serialize_str("numa:auto")
+ } else {
+ let nodes_str = numa
+ .nodes
+ .iter()
+ .map(|n| n.to_string())
+ .collect::<Vec<_>>()
+ .join(",");
+
+ let full_str = format!(
+ "numa:nodes={};cores={};no_ht={}",
+ nodes_str, numa.cores_per_node, numa.avoid_hyperthread
+ );
+
+ serializer.serialize_str(&full_str)
+ }
+ }
+ }
+ }
+}
+
+impl<'de> Deserialize<'de> for CpuAllocation {
+ fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
+ where
+ D: Deserializer<'de>,
+ {
+ #[derive(Deserialize)]
+ #[serde(untagged)]
+ enum CpuAllocationHelper {
+ String(String),
+ Number(usize),
+ }
+
+ match CpuAllocationHelper::deserialize(deserializer)? {
+ CpuAllocationHelper::String(s) => {
+ CpuAllocation::from_str(&s).map_err(serde::de::Error::custom)
+ }
+ CpuAllocationHelper::Number(n) => Ok(CpuAllocation::Count(n)),
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_parse_all() {
+ assert_eq!(CpuAllocation::from_str("all").unwrap(),
CpuAllocation::All);
+ }
+
+ #[test]
+ fn test_parse_count() {
+ assert_eq!(
+ CpuAllocation::from_str("4").unwrap(),
+ CpuAllocation::Count(4)
+ );
+ }
+
+ #[test]
+ fn test_parse_range() {
+ assert_eq!(
+ CpuAllocation::from_str("2..8").unwrap(),
+ CpuAllocation::Range(2, 8)
+ );
+ }
+
+ #[test]
+ fn test_parse_numa_auto() {
+ let result = CpuAllocation::from_str("numa:auto").unwrap();
+ match result {
+ CpuAllocation::NumaAware(numa) => {
+ assert!(numa.nodes.is_empty());
+ assert_eq!(numa.cores_per_node, 0);
+ assert!(numa.avoid_hyperthread);
+ }
+ _ => panic!("Expected NumaAware"),
+ }
+ }
+
+ #[test]
+ fn test_parse_numa_explicit() {
+ let result =
CpuAllocation::from_str("numa:nodes=0,1;cores=4;no_ht=true").unwrap();
+ match result {
+ CpuAllocation::NumaAware(numa) => {
+ assert_eq!(numa.nodes, vec![0, 1]);
+ assert_eq!(numa.cores_per_node, 4);
+ assert!(numa.avoid_hyperthread);
+ }
+ _ => panic!("Expected NumaAware"),
+ }
+ }
+
+ #[test]
+ fn test_numa_explicit_serde_roundtrip() {
+ let original = CpuAllocation::NumaAware(NumaConfig {
+ nodes: vec![0, 1],
+ cores_per_node: 4,
+ avoid_hyperthread: true,
+ });
+ let serialized = serde_json::to_string(&original).unwrap();
+ let deserialized: CpuAllocation =
serde_json::from_str(&serialized).unwrap();
+ assert_eq!(original, deserialized);
+ }
+}
diff --git a/core/server/src/configs/system.rs
b/core/configs/src/server_config/system.rs
similarity index 99%
rename from core/server/src/configs/system.rs
rename to core/configs/src/server_config/system.rs
index 58de89ed5..91d9a99ca 100644
--- a/core/server/src/configs/system.rs
+++ b/core/configs/src/server_config/system.rs
@@ -17,8 +17,8 @@
*/
use super::cache_indexes::CacheIndexesConfig;
+use super::server::MemoryPoolConfig;
use super::sharding::ShardingConfig;
-use crate::configs::server::MemoryPoolConfig;
use configs::ConfigEnv;
use iggy_common::IggyByteSize;
use iggy_common::IggyError;
diff --git a/core/server/src/configs/tcp.rs
b/core/configs/src/server_config/tcp.rs
similarity index 100%
rename from core/server/src/configs/tcp.rs
rename to core/configs/src/server_config/tcp.rs
diff --git a/core/server/src/configs/validators.rs
b/core/configs/src/server_config/validators.rs
similarity index 92%
rename from core/server/src/configs/validators.rs
rename to core/configs/src/server_config/validators.rs
index b9d70aaf3..6a49157a8 100644
--- a/core/server/src/configs/validators.rs
+++ b/core/configs/src/server_config/validators.rs
@@ -17,18 +17,16 @@
* under the License.
*/
+use super::COMPONENT;
use super::cluster::ClusterConfig;
use super::server::{
DataMaintenanceConfig, MessageSaverConfig, MessagesMaintenanceConfig,
TelemetryConfig,
};
+use super::server::{MemoryPoolConfig, PersonalAccessTokenConfig, ServerConfig};
use super::sharding::{CpuAllocation, ShardingConfig};
+use super::system::SegmentConfig;
use super::system::{CompressionConfig, LoggingConfig, PartitionConfig};
-use crate::configs::COMPONENT;
-use crate::configs::server::{MemoryPoolConfig, PersonalAccessTokenConfig,
ServerConfig};
-use crate::configs::sharding::NumaTopology;
-use crate::configs::system::SegmentConfig;
-use crate::streaming::segments::*;
-use configs::ConfigurationError;
+use crate::ConfigurationError;
use err_trail::ErrContext;
use iggy_common::CompressionAlgorithm;
use iggy_common::IggyExpiry;
@@ -37,6 +35,9 @@ use iggy_common::Validatable;
use std::thread::available_parallelism;
use tracing::warn;
+/// 1 GiB max segment size. Canonical definition; re-exported by core/server
streaming.
+pub const SEGMENT_MAX_SIZE_BYTES: u64 = 1024 * 1024 * 1024;
+
impl Validatable<ConfigurationError> for ServerConfig {
fn validate(&self) -> Result<(), ConfigurationError> {
self.system
@@ -339,7 +340,10 @@ impl Validatable<ConfigurationError> for MemoryPoolConfig {
impl Validatable<ConfigurationError> for ShardingConfig {
fn validate(&self) -> Result<(), ConfigurationError> {
let available_cpus = available_parallelism()
- .expect("Failed to get number of CPU cores")
+ .map_err(|_| {
+ eprintln!("Failed to detect available CPU cores");
+ ConfigurationError::InvalidConfigurationValue
+ })?
.get();
match &self.cpu_allocation {
@@ -372,18 +376,9 @@ impl Validatable<ConfigurationError> for ShardingConfig {
}
Ok(())
}
- CpuAllocation::NumaAware(numa_config) => match
NumaTopology::detect() {
- // TODO: dry the validation, already validate it from the
shard allocation
- Ok(topology) => numa_config.validate(&topology).map_err(|e| {
- eprintln!("Invalid NUMA configuration: {}", e);
- ConfigurationError::InvalidConfigurationValue
- }),
- Err(e) => {
- eprintln!("Failed to detect NUMA topology: {}", e);
- eprintln!("NUMA allocation requested but system doesn't
support it");
- Err(ConfigurationError::InvalidConfigurationValue)
- }
- },
+ // NUMA topology validation requires hwlocality (runtime dep).
+ // Full NUMA validation happens in server::shard_allocator at
startup.
+ CpuAllocation::NumaAware(_) => Ok(()),
}
}
}
@@ -394,19 +389,16 @@ impl Validatable<ConfigurationError> for ClusterConfig {
return Ok(());
}
- // Validate cluster name is not empty
if self.name.trim().is_empty() {
eprintln!("Invalid cluster configuration: cluster name cannot be
empty");
return Err(ConfigurationError::InvalidConfigurationValue);
}
- // Validate current node name is not empty
if self.node.current.name.trim().is_empty() {
eprintln!("Invalid cluster configuration: current node name cannot
be empty");
return Err(ConfigurationError::InvalidConfigurationValue);
}
- // Check for duplicate node names among other nodes
let mut node_names = std::collections::HashSet::new();
node_names.insert(self.node.current.name.clone());
@@ -420,16 +412,13 @@ impl Validatable<ConfigurationError> for ClusterConfig {
}
}
- // Validate each other node configuration
let mut used_endpoints = std::collections::HashSet::new();
for node in &self.node.others {
- // Validate node name is not empty
if node.name.trim().is_empty() {
eprintln!("Invalid cluster configuration: node name cannot be
empty");
return Err(ConfigurationError::InvalidConfigurationValue);
}
- // Validate IP is not empty
if node.ip.trim().is_empty() {
eprintln!(
"Invalid cluster configuration: IP cannot be empty for
node '{}'",
@@ -438,7 +427,6 @@ impl Validatable<ConfigurationError> for ClusterConfig {
return Err(ConfigurationError::InvalidConfigurationValue);
}
- // Validate transport ports if provided
let port_list = [
("TCP", node.ports.tcp),
("QUIC", node.ports.quic),
@@ -456,7 +444,6 @@ impl Validatable<ConfigurationError> for ClusterConfig {
return
Err(ConfigurationError::InvalidConfigurationValue);
}
- // Check for port conflicts across nodes on the same IP
let endpoint = format!("{}:{}:{}", node.ip, name, port);
if !used_endpoints.insert(endpoint.clone()) {
eprintln!(
diff --git a/core/server/src/configs/websocket.rs
b/core/configs/src/server_config/websocket.rs
similarity index 100%
rename from core/server/src/configs/websocket.rs
rename to core/configs/src/server_config/websocket.rs
diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml
index 057ae6281..39c3c5de1 100644
--- a/core/server/Cargo.toml
+++ b/core/server/Cargo.toml
@@ -54,13 +54,11 @@ ctrlc = { workspace = true }
cyper = { workspace = true }
cyper-axum = { workspace = true }
dashmap = { workspace = true }
-derive_more = { workspace = true }
dotenvy = { workspace = true }
enum_dispatch = { workspace = true }
err_trail = { workspace = true }
error_set = { workspace = true }
figlet-rs = { workspace = true }
-figment = { workspace = true }
flume = { workspace = true }
fs2 = { workspace = true }
futures = { workspace = true }
@@ -90,10 +88,8 @@ rustls = { workspace = true }
rustls-pemfile = { workspace = true }
send_wrapper = { workspace = true }
serde = { workspace = true }
-serde_with = { workspace = true }
slab = { workspace = true }
socket2 = { workspace = true }
-static-toml = { workspace = true }
strum = { workspace = true }
sysinfo = { workspace = true }
tempfile = { workspace = true }
@@ -115,5 +111,4 @@ hwlocality = { workspace = true }
hwlocality = { workspace = true, features = ["vendored"] }
[build-dependencies]
-figment = { workspace = true, features = ["json", "toml", "env"] }
vergen-git2 = { workspace = true }
diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs
index 3d95fb9c3..02bd742f2 100644
--- a/core/server/src/bootstrap.rs
+++ b/core/server/src/bootstrap.rs
@@ -23,7 +23,6 @@ use crate::{
configs::{
cache_indexes::CacheIndexesConfig,
server::ServerConfig,
- sharding::ShardInfo,
system::{INDEX_EXTENSION, LOG_EXTENSION, SystemConfig},
},
io::fs_utils::{self, DirEntry},
@@ -36,6 +35,7 @@ use crate::{
frame::ShardFrame,
},
},
+ shard_allocator::ShardInfo,
state::system::{StreamState, TopicState, UserState},
streaming::{
partitions::{
diff --git a/core/server/src/configs/mod.rs b/core/server/src/configs.rs
similarity index 76%
rename from core/server/src/configs/mod.rs
rename to core/server/src/configs.rs
index 8bee32c44..dac4d0953 100644
--- a/core/server/src/configs/mod.rs
+++ b/core/server/src/configs.rs
@@ -17,17 +17,7 @@
* under the License.
*/
-pub mod cache_indexes;
-pub mod cluster;
-pub mod defaults;
-pub mod displays;
-pub mod http;
-pub mod quic;
-pub mod server;
-pub mod sharding;
-pub mod system;
-pub mod tcp;
-pub mod validators;
-pub mod websocket;
-
-pub const COMPONENT: &str = "CONFIG";
+pub use configs::{
+ COMPONENT, cache_indexes, cluster, defaults, displays, http, quic, server,
sharding, system,
+ tcp, validators, websocket,
+};
diff --git a/core/server/src/configs/sharding.rs
b/core/server/src/configs/sharding.rs
deleted file mode 100644
index 5224c0b47..000000000
--- a/core/server/src/configs/sharding.rs
+++ /dev/null
@@ -1,637 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-use hwlocality::Topology;
-use hwlocality::bitmap::SpecializedBitmapRef;
-use hwlocality::cpu::cpuset::CpuSet;
-use hwlocality::memory::binding::{MemoryBindingFlags, MemoryBindingPolicy};
-use hwlocality::object::types::ObjectType::{self, NUMANode};
-#[cfg(target_os = "linux")]
-use nix::{sched::sched_setaffinity, unistd::Pid};
-use serde::{Deserialize, Deserializer, Serialize, Serializer};
-use std::collections::HashSet;
-use std::str::FromStr;
-use std::sync::Arc;
-use std::thread::available_parallelism;
-use tracing::info;
-
-use crate::server_error::ServerError;
-use configs::ConfigEnv;
-
-#[derive(Debug, Deserialize, Serialize, Default, ConfigEnv)]
-pub struct ShardingConfig {
- #[serde(default)]
- #[config_env(leaf)]
- pub cpu_allocation: CpuAllocation,
-}
-
-#[derive(Debug, Clone, PartialEq, Default)]
-pub enum CpuAllocation {
- #[default]
- All,
- Count(usize),
- Range(usize, usize),
- NumaAware(NumaConfig),
-}
-
-/// NUMA specific configuration
-#[derive(Debug, Clone, PartialEq, Default)]
-pub struct NumaConfig {
- /// Which NUMA nodes to use (empty = auto-detect all)
- pub nodes: Vec<usize>,
- /// Cores per node to use (0 = use all available)
- pub cores_per_node: usize,
- /// skip hyperthread sibling
- pub avoid_hyperthread: bool,
-}
-
-impl NumaConfig {
- pub fn validate(&self, topology: &NumaTopology) -> Result<(), ServerError>
{
- let available_nodes = topology.node_count;
-
- if available_nodes == 0 {
- return Err(ServerError::NoNumaNodes);
- }
-
- for &node in &self.nodes {
- if node >= available_nodes {
- return Err(ServerError::InvalidNode {
- requested: node,
- available: available_nodes,
- });
- }
- }
-
- // Validate core per node
- if self.cores_per_node > 0 {
- for &node in &self.nodes {
- let available_cores = if self.avoid_hyperthread {
- topology.physical_cores_for_node(node)
- } else {
- topology.logical_cores_for_node(node)
- };
-
- info!(
- "core_per_node: {}, available_cores: {}",
- self.cores_per_node, available_cores
- );
-
- if self.cores_per_node > available_cores {
- return Err(ServerError::InsufficientCores {
- requested: self.cores_per_node,
- available: available_cores,
- node,
- });
- }
- }
- }
-
- Ok(())
- }
-}
-
-impl CpuAllocation {
- fn parse_numa(s: &str) -> Result<CpuAllocation, String> {
- let params = s
- .strip_prefix("numa:")
- .ok_or_else(|| "Numa config must start with 'numa:'".to_string())?;
-
- if params == "auto" {
- return Ok(CpuAllocation::NumaAware(NumaConfig {
- nodes: vec![],
- cores_per_node: 0,
- avoid_hyperthread: true,
- }));
- }
-
- let mut nodes = Vec::new();
- let mut cores_per_node = 0;
- let mut avoid_hyperthread = true;
-
- for param in params.split(';') {
- let kv: Vec<&str> = param.split('=').collect();
- if kv.len() != 2 {
- return Err(format!(
- "Invalid NUMA parameter: '{param}', only available: 'auto'"
- ));
- }
-
- match kv[0] {
- "nodes" => {
- nodes = kv[1]
- .split(',')
- .map(|n| {
- n.parse::<usize>()
- .map_err(|_| format!("Invalid node number:
{n}"))
- })
- .collect::<Result<Vec<_>, _>>()?;
- }
- "cores" => {
- cores_per_node = kv[1]
- .parse::<usize>()
- .map_err(|_| format!("Invalid cores value: {}",
kv[1]))?;
- }
- "no_ht" => {
- avoid_hyperthread = kv[1]
- .parse::<bool>()
- .map_err(|_| format!("Invalid no ht value: {}",
kv[1]))?;
- }
- _ => {
- return Err(format!(
- "Unknown NUMA parameter: {}, example:
numa:nodes=0;cores=4;no_ht=true",
- kv[0]
- ));
- }
- }
- }
-
- Ok(CpuAllocation::NumaAware(NumaConfig {
- nodes,
- cores_per_node,
- avoid_hyperthread,
- }))
- }
-}
-
-#[derive(Debug)]
-pub struct NumaTopology {
- topology: Topology,
- node_count: usize,
- physical_cores_per_node: Vec<usize>,
- logical_cores_per_node: Vec<usize>,
-}
-
-impl NumaTopology {
- pub fn detect() -> Result<NumaTopology, ServerError> {
- let topology =
- Topology::new().map_err(|e| ServerError::TopologyDetection { msg:
e.to_string() })?;
-
- let numa_nodes: Vec<_> =
topology.objects_with_type(NUMANode).collect();
-
- let node_count = numa_nodes.len();
-
- if node_count == 0 {
- return Err(ServerError::NoNumaNodes);
- }
-
- let mut physical_cores_per_node = Vec::new();
- let mut logical_cores_per_node = Vec::new();
-
- for node in numa_nodes {
- let cpuset = node.cpuset().ok_or(ServerError::TopologyDetection {
- msg: "NUMA node has no CPU set".to_string(),
- })?;
-
- let logical_cores = cpuset.weight().unwrap_or(0);
-
- let physical_cores = topology
- .objects_with_type(ObjectType::Core)
- .filter(|core| {
- if let Some(core_cpuset) = core.cpuset() {
- !(cpuset & core_cpuset).is_empty()
- } else {
- false
- }
- })
- .count();
-
- physical_cores_per_node.push(physical_cores);
- logical_cores_per_node.push(logical_cores);
- }
-
- Ok(Self {
- topology,
- node_count,
- physical_cores_per_node,
- logical_cores_per_node,
- })
- }
-
- pub fn physical_cores_for_node(&self, node: usize) -> usize {
- self.physical_cores_per_node.get(node).copied().unwrap_or(0)
- }
-
- pub fn logical_cores_for_node(&self, node: usize) -> usize {
- self.logical_cores_per_node.get(node).copied().unwrap_or(0)
- }
-
- fn filter_physical_cores(&self, node_cpuset: CpuSet) -> CpuSet {
- let mut physical_cpuset = CpuSet::new();
- for core in self.topology.objects_with_type(ObjectType::Core) {
- if let Some(core_cpuset) = core.cpuset() {
- let intersection = node_cpuset.clone() & core_cpuset;
- if !intersection.is_empty() {
- // Take the minimum (first) CPU ID for consistency
- if let Some(first_cpu) = intersection.iter_set().min() {
- physical_cpuset.set(first_cpu)
- }
- }
- }
- }
- physical_cpuset
- }
-
- /// Get CPU set for a NUMA node
- fn get_cpuset_for_node(
- &self,
- node_id: usize,
- avoid_hyperthread: bool,
- ) -> Result<CpuSet, ServerError> {
- let node = self
- .topology
- .objects_with_type(ObjectType::NUMANode)
- .nth(node_id)
- .ok_or(ServerError::InvalidNode {
- requested: node_id,
- available: self.node_count,
- })?;
-
- let cpuset_ref = node.cpuset().ok_or(ServerError::TopologyDetection {
- msg: format!("Node {} has no CPU set", node_id),
- })?;
-
- let cpuset = SpecializedBitmapRef::to_owned(&cpuset_ref);
-
- if avoid_hyperthread {
- Ok(self.filter_physical_cores(cpuset))
- } else {
- Ok(cpuset)
- }
- }
-}
-
-#[derive(Debug, Clone)]
-pub struct ShardInfo {
- /// CPUs this shard should use
- pub cpu_set: HashSet<usize>,
- /// NUMA node
- pub numa_node: Option<usize>,
-}
-
-impl ShardInfo {
- pub fn bind_cpu(&self) -> Result<(), ServerError> {
- #[cfg(target_os = "linux")]
- {
- if self.cpu_set.is_empty() {
- return Ok(());
- }
-
- let mut cpuset = nix::sched::CpuSet::new();
- for &cpu in &self.cpu_set {
- cpuset.set(cpu).map_err(|_| ServerError::BindingFailed)?;
- }
-
- sched_setaffinity(Pid::from_raw(0), &cpuset).map_err(|e| {
- tracing::error!("Failed to set CPU affinity: {:?}", e);
- ServerError::BindingFailed
- })?;
-
- info!("Thread bound to CPUs: {:?}", self.cpu_set);
- }
-
- #[cfg(not(target_os = "linux"))]
- {
- tracing::debug!("CPU affinity binding skipped on non-Linux
platform");
- }
-
- Ok(())
- }
-
- pub fn bind_memory(&self) -> Result<(), ServerError> {
- if let Some(node_id) = self.numa_node {
- let topology = Topology::new().map_err(|err|
ServerError::TopologyDetection {
- msg: err.to_string(),
- })?;
-
- let node = topology
- .objects_with_type(ObjectType::NUMANode)
- .nth(node_id)
- .ok_or(ServerError::InvalidNode {
- requested: node_id,
- available:
topology.objects_with_type(ObjectType::NUMANode).count(),
- })?;
-
- if let Some(nodeset) = node.nodeset() {
- topology
- .bind_memory(
- nodeset,
- MemoryBindingPolicy::Bind,
- MemoryBindingFlags::THREAD |
MemoryBindingFlags::STRICT,
- )
- .map_err(|err| {
- tracing::error!("Failed to bind memory {:?}", err);
- ServerError::BindingFailed
- })?;
-
- info!("Memory bound to NUMA node {node_id}");
- }
- }
-
- Ok(())
- }
-}
-
-pub struct ShardAllocator {
- allocation: CpuAllocation,
- topology: Option<Arc<NumaTopology>>,
-}
-
-impl ShardAllocator {
- pub fn new(allocation: &CpuAllocation) -> Result<ShardAllocator,
ServerError> {
- let topology = if matches!(allocation, CpuAllocation::NumaAware(_)) {
- let numa_topology = NumaTopology::detect()?;
-
- Some(Arc::new(numa_topology))
- } else {
- None
- };
-
- Ok(Self {
- allocation: allocation.clone(),
- topology,
- })
- }
-
- pub fn to_shard_assignments(&self) -> Result<Vec<ShardInfo>, ServerError> {
- match &self.allocation {
- CpuAllocation::All => {
- let available_cpus = available_parallelism()
- .map_err(|err| ServerError::Other {
- msg: format!("Failed to get available_parallelism:
{:?}", err),
- })?
- .get();
-
- let shard_assignments: Vec<_> = (0..available_cpus)
- .map(|cpu_id| ShardInfo {
- cpu_set: HashSet::from([cpu_id]),
- numa_node: None,
- })
- .collect();
-
- info!(
- "Using all available CPU cores ({} shards with affinity)",
- shard_assignments.len()
- );
-
- Ok(shard_assignments)
- }
- CpuAllocation::Count(count) => {
- let shard_assignments = (0..*count)
- .map(|cpu_id| ShardInfo {
- cpu_set: HashSet::from([cpu_id]),
- numa_node: None,
- })
- .collect();
-
- info!("Using {count} shards with affinity to cores
0..{count}");
-
- Ok(shard_assignments)
- }
- CpuAllocation::Range(start, end) => {
- let shard_assignments = (*start..*end)
- .map(|cpu_id| ShardInfo {
- cpu_set: HashSet::from([cpu_id]),
- numa_node: None,
- })
- .collect();
-
- info!(
- "Using {} shards with affinity to cores {start}..{end}",
- end - start
- );
-
- Ok(shard_assignments)
- }
- CpuAllocation::NumaAware(numa_config) => {
- let topology =
self.topology.as_ref().ok_or(ServerError::NoTopology)?;
- self.compute_numa_assignments(topology, numa_config)
- }
- }
- }
-
- fn compute_numa_assignments(
- &self,
- topology: &NumaTopology,
- numa: &NumaConfig,
- ) -> Result<Vec<ShardInfo>, ServerError> {
- // Determine which noes to use
- let nodes = if numa.nodes.is_empty() {
- // Auto: use all nodes
- (0..topology.node_count).collect()
- } else {
- numa.nodes.clone()
- };
-
- // Determine cores per node
- let cores_per_node = if numa.cores_per_node == 0 {
- // Auto: use all available
- if numa.avoid_hyperthread {
- topology.physical_cores_for_node(nodes[0])
- } else {
- topology.logical_cores_for_node(nodes[0])
- }
- } else {
- numa.cores_per_node
- };
-
- let mut shard_infos = Vec::new();
-
- let node_cpus: Vec<Vec<usize>> = nodes
- .iter()
- .map(|&node_id| {
- let cpuset = topology.get_cpuset_for_node(node_id,
numa.avoid_hyperthread)?;
- Ok(cpuset.iter_set().map(usize::from).collect())
- })
- .collect::<Result<_, ServerError>>()?;
-
- // For each node, create one shard per core (thread-per-core)
- for (idx, &node_id) in nodes.iter().enumerate() {
- let available_cpus = &node_cpus[idx];
-
- // Take first core_per_node CPUs from this node
- let cores_to_use: Vec<usize> = available_cpus
- .iter()
- .take(cores_per_node)
- .copied()
- .collect();
-
- if cores_to_use.len() < cores_per_node {
- return Err(ServerError::InsufficientCores {
- requested: cores_per_node,
- available: available_cpus.len(),
- node: node_id,
- });
- }
-
- // Create one shard per core
- for cpu_id in cores_to_use {
- shard_infos.push(ShardInfo {
- cpu_set: HashSet::from([cpu_id]),
- numa_node: Some(node_id),
- });
- }
- }
-
- info!(
- "Using {} shards with {} NUMA node, {} cores per node, and avoid
hyperthread {}",
- shard_infos.len(),
- nodes.len(),
- cores_per_node,
- numa.avoid_hyperthread
- );
-
- Ok(shard_infos)
- }
-}
-
-impl FromStr for CpuAllocation {
- type Err = String;
-
- fn from_str(s: &str) -> Result<Self, Self::Err> {
- match s {
- "all" => Ok(CpuAllocation::All),
- s if s.starts_with("numa:") => Self::parse_numa(s),
- s if s.contains("..") => {
- let parts: Vec<&str> = s.split("..").collect();
- if parts.len() != 2 {
- return Err(format!("Invalid range format: {s}. Expected
'start..end'"));
- }
- let start = parts[0]
- .parse::<usize>()
- .map_err(|_| format!("Invalid start value: {}",
parts[0]))?;
- let end = parts[1]
- .parse::<usize>()
- .map_err(|_| format!("Invalid end value: {}", parts[1]))?;
- Ok(CpuAllocation::Range(start, end))
- }
- s => {
- let count = s
- .parse::<usize>()
- .map_err(|_| format!("Invalid shard count: {s}"))?;
- Ok(CpuAllocation::Count(count))
- }
- }
- }
-}
-
-impl Serialize for CpuAllocation {
- fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
- where
- S: Serializer,
- {
- match self {
- CpuAllocation::All => serializer.serialize_str("all"),
- CpuAllocation::Count(n) => serializer.serialize_u64(*n as u64),
- CpuAllocation::Range(start, end) => {
- serializer.serialize_str(&format!("{start}..{end}"))
- }
- CpuAllocation::NumaAware(numa) => {
- if numa.nodes.is_empty() && numa.cores_per_node == 0 {
- serializer.serialize_str("numa:auto")
- } else {
- let nodes_str = numa
- .nodes
- .iter()
- .map(|n| n.to_string())
- .collect::<Vec<_>>()
- .join(",");
-
- let full_str = format!(
- "numa:nodes={};cores:{};no_ht={}",
- nodes_str, numa.cores_per_node, numa.avoid_hyperthread
- );
-
- serializer.serialize_str(&full_str)
- }
- }
- }
- }
-}
-
-impl<'de> Deserialize<'de> for CpuAllocation {
- fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
- where
- D: Deserializer<'de>,
- {
- #[derive(Deserialize)]
- #[serde(untagged)]
- enum CpuAllocationHelper {
- String(String),
- Number(usize),
- }
-
- match CpuAllocationHelper::deserialize(deserializer)? {
- CpuAllocationHelper::String(s) => {
- CpuAllocation::from_str(&s).map_err(serde::de::Error::custom)
- }
- CpuAllocationHelper::Number(n) => Ok(CpuAllocation::Count(n)),
- }
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- #[test]
- fn test_parse_all() {
- assert_eq!(CpuAllocation::from_str("all").unwrap(),
CpuAllocation::All);
- }
-
- #[test]
- fn test_parse_count() {
- assert_eq!(
- CpuAllocation::from_str("4").unwrap(),
- CpuAllocation::Count(4)
- );
- }
-
- #[test]
- fn test_parse_range() {
- assert_eq!(
- CpuAllocation::from_str("2..8").unwrap(),
- CpuAllocation::Range(2, 8)
- );
- }
-
- #[test]
- fn test_parse_numa_auto() {
- let result = CpuAllocation::from_str("numa:auto").unwrap();
- match result {
- CpuAllocation::NumaAware(numa) => {
- assert!(numa.nodes.is_empty());
- assert_eq!(numa.cores_per_node, 0);
- assert!(numa.avoid_hyperthread);
- }
- _ => panic!("Expected NumaAware"),
- }
- }
-
- #[test]
- fn test_parse_numa_explicit() {
- let result =
CpuAllocation::from_str("numa:nodes=0,1;cores=4;no_ht=true").unwrap();
- match result {
- CpuAllocation::NumaAware(numa) => {
- assert_eq!(numa.nodes, vec![0, 1]);
- assert_eq!(numa.cores_per_node, 4);
- assert!(numa.avoid_hyperthread);
- }
- _ => panic!("Expected NumaAware"),
- }
- }
-}
diff --git a/core/server/src/lib.rs b/core/server/src/lib.rs
index 1fc0d0d3f..b4bac970d 100644
--- a/core/server/src/lib.rs
+++ b/core/server/src/lib.rs
@@ -42,6 +42,7 @@ pub mod metadata;
pub mod quic;
pub mod server_error;
pub mod shard;
+pub mod shard_allocator;
pub mod state;
pub mod streaming;
pub mod tcp;
diff --git a/core/server/src/main.rs b/core/server/src/main.rs
index d3ce11f49..6a7fc1e06 100644
--- a/core/server/src/main.rs
+++ b/core/server/src/main.rs
@@ -32,7 +32,6 @@ use server::bootstrap::{
create_directories, create_shard_connections, create_shard_executor,
load_config,
load_metadata, resolve_persister, update_system_info,
};
-use server::configs::sharding::ShardAllocator;
use server::diagnostics::{print_io_uring_permission_info,
print_locked_memory_limit_info};
use server::io::fs_utils;
use server::log::logger::Logging;
@@ -40,6 +39,7 @@ use server::metadata::{Metadata, create_metadata_handles};
use server::server_error::ServerError;
use server::shard::system::info::SystemInfo;
use server::shard::{IggyShard, calculate_shard_assignment};
+use server::shard_allocator::ShardAllocator;
use server::state::file::FileState;
use server::state::system::SystemState;
use server::streaming::clients::client_manager::{Client, ClientManager};
diff --git a/core/server/src/server_error.rs b/core/server/src/server_error.rs
index 74dd71118..6f0124eb2 100644
--- a/core/server/src/server_error.rs
+++ b/core/server/src/server_error.rs
@@ -37,34 +37,8 @@ error_set!(
}
NumaError := {
- #[display("Failed to detect topology: {}", msg)]
- TopologyDetection {
- msg: String
- },
-
- #[display("There is no NUMA node on this server")]
- NoNumaNodes,
-
- #[display("No Topology")]
- NoTopology,
-
- #[display("Binding Failed")]
- BindingFailed,
-
- #[display("Insufficient cores on node {}: requested {}, only {}
available", node, requested, available)]
- InsufficientCores {
- requested: usize,
- available: usize,
- node: usize,
- },
-
- #[display("Invalid NUMA node: requested {}, only available {} node",
requested, available)]
- InvalidNode { requested: usize, available: usize },
-
- #[display("Other error: {}", msg)]
- Other {
- msg: String
- },
+ #[display("{0}")]
+ Sharding(crate::shard_allocator::ShardingError),
}
ConfigurationError := {
diff --git a/core/server/src/shard_allocator.rs
b/core/server/src/shard_allocator.rs
new file mode 100644
index 000000000..334c09f28
--- /dev/null
+++ b/core/server/src/shard_allocator.rs
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use configs::sharding::{CpuAllocation, NumaConfig};
+use hwlocality::Topology;
+use hwlocality::bitmap::SpecializedBitmapRef;
+use hwlocality::cpu::cpuset::CpuSet;
+use hwlocality::memory::binding::{MemoryBindingFlags, MemoryBindingPolicy};
+use hwlocality::object::types::ObjectType::{self, NUMANode};
+#[cfg(target_os = "linux")]
+use nix::{sched::sched_setaffinity, unistd::Pid};
+use std::collections::HashSet;
+use std::sync::Arc;
+use std::thread::available_parallelism;
+use tracing::info;
+
+#[derive(Debug, thiserror::Error)]
+pub enum ShardingError {
+ #[error("Failed to detect topology: {msg}")]
+ TopologyDetection { msg: String },
+
+ #[error("There is no NUMA node on this server")]
+ NoNumaNodes,
+
+ #[error("No Topology")]
+ NoTopology,
+
+ #[error("Binding Failed")]
+ BindingFailed,
+
+ #[error("Insufficient cores on node {node}: requested {requested}, only
{available} available")]
+ InsufficientCores {
+ requested: usize,
+ available: usize,
+ node: usize,
+ },
+
+ #[error("Invalid NUMA node: requested {requested}, only available
{available} node")]
+ InvalidNode { requested: usize, available: usize },
+
+ #[error("Other error: {msg}")]
+ Other { msg: String },
+}
+
+#[derive(Debug)]
+pub struct NumaTopology {
+ topology: Topology,
+ node_count: usize,
+ physical_cores_per_node: Vec<usize>,
+ logical_cores_per_node: Vec<usize>,
+}
+
+impl NumaTopology {
+ pub fn detect() -> Result<NumaTopology, ShardingError> {
+ let topology =
+ Topology::new().map_err(|e| ShardingError::TopologyDetection {
msg: e.to_string() })?;
+
+ let numa_nodes: Vec<_> =
topology.objects_with_type(NUMANode).collect();
+
+ let node_count = numa_nodes.len();
+
+ if node_count == 0 {
+ return Err(ShardingError::NoNumaNodes);
+ }
+
+ let mut physical_cores_per_node = Vec::new();
+ let mut logical_cores_per_node = Vec::new();
+
+ for node in numa_nodes {
+ let cpuset = node.cpuset().ok_or(ShardingError::TopologyDetection {
+ msg: "NUMA node has no CPU set".to_string(),
+ })?;
+
+ let logical_cores = cpuset.weight().unwrap_or(0);
+
+ let physical_cores = topology
+ .objects_with_type(ObjectType::Core)
+ .filter(|core| {
+ if let Some(core_cpuset) = core.cpuset() {
+ !(cpuset & core_cpuset).is_empty()
+ } else {
+ false
+ }
+ })
+ .count();
+
+ physical_cores_per_node.push(physical_cores);
+ logical_cores_per_node.push(logical_cores);
+ }
+
+ Ok(Self {
+ topology,
+ node_count,
+ physical_cores_per_node,
+ logical_cores_per_node,
+ })
+ }
+
+ pub fn physical_cores_for_node(&self, node: usize) -> usize {
+ self.physical_cores_per_node.get(node).copied().unwrap_or(0)
+ }
+
+ pub fn logical_cores_for_node(&self, node: usize) -> usize {
+ self.logical_cores_per_node.get(node).copied().unwrap_or(0)
+ }
+
+ fn filter_physical_cores(&self, node_cpuset: CpuSet) -> CpuSet {
+ let mut physical_cpuset = CpuSet::new();
+ for core in self.topology.objects_with_type(ObjectType::Core) {
+ if let Some(core_cpuset) = core.cpuset() {
+ let intersection = node_cpuset.clone() & core_cpuset;
+ if !intersection.is_empty()
+ && let Some(first_cpu) = intersection.iter_set().min()
+ {
+ physical_cpuset.set(first_cpu)
+ }
+ }
+ }
+ physical_cpuset
+ }
+
+ fn get_cpuset_for_node(
+ &self,
+ node_id: usize,
+ avoid_hyperthread: bool,
+ ) -> Result<CpuSet, ShardingError> {
+ let node = self
+ .topology
+ .objects_with_type(ObjectType::NUMANode)
+ .nth(node_id)
+ .ok_or(ShardingError::InvalidNode {
+ requested: node_id,
+ available: self.node_count,
+ })?;
+
+ let cpuset_ref = node.cpuset().ok_or(ShardingError::TopologyDetection {
+ msg: format!("Node {} has no CPU set", node_id),
+ })?;
+
+ let cpuset = SpecializedBitmapRef::to_owned(&cpuset_ref);
+
+ if avoid_hyperthread {
+ Ok(self.filter_physical_cores(cpuset))
+ } else {
+ Ok(cpuset)
+ }
+ }
+}
+
+#[derive(Debug, Clone)]
+pub struct ShardInfo {
+ pub cpu_set: HashSet<usize>,
+ pub numa_node: Option<usize>,
+}
+
+impl ShardInfo {
+ pub fn bind_cpu(&self) -> Result<(), ShardingError> {
+ #[cfg(target_os = "linux")]
+ {
+ if self.cpu_set.is_empty() {
+ return Ok(());
+ }
+
+ let mut cpuset = nix::sched::CpuSet::new();
+ for &cpu in &self.cpu_set {
+ cpuset.set(cpu).map_err(|_| ShardingError::BindingFailed)?;
+ }
+
+ sched_setaffinity(Pid::from_raw(0), &cpuset).map_err(|e| {
+ tracing::error!("Failed to set CPU affinity: {:?}", e);
+ ShardingError::BindingFailed
+ })?;
+
+ info!("Thread bound to CPUs: {:?}", self.cpu_set);
+ }
+
+ #[cfg(not(target_os = "linux"))]
+ {
+ tracing::debug!("CPU affinity binding skipped on non-Linux
platform");
+ }
+
+ Ok(())
+ }
+
+ pub fn bind_memory(&self) -> Result<(), ShardingError> {
+ if let Some(node_id) = self.numa_node {
+ let topology = Topology::new().map_err(|err|
ShardingError::TopologyDetection {
+ msg: err.to_string(),
+ })?;
+
+ let node = topology
+ .objects_with_type(ObjectType::NUMANode)
+ .nth(node_id)
+ .ok_or(ShardingError::InvalidNode {
+ requested: node_id,
+ available:
topology.objects_with_type(ObjectType::NUMANode).count(),
+ })?;
+
+ if let Some(nodeset) = node.nodeset() {
+ topology
+ .bind_memory(
+ nodeset,
+ MemoryBindingPolicy::Bind,
+ MemoryBindingFlags::THREAD |
MemoryBindingFlags::STRICT,
+ )
+ .map_err(|err| {
+ tracing::error!("Failed to bind memory {:?}", err);
+ ShardingError::BindingFailed
+ })?;
+
+ info!("Memory bound to NUMA node {node_id}");
+ }
+ }
+
+ Ok(())
+ }
+}
+
+pub struct ShardAllocator {
+ allocation: CpuAllocation,
+ topology: Option<Arc<NumaTopology>>,
+}
+
+impl ShardAllocator {
+ pub fn new(allocation: &CpuAllocation) -> Result<ShardAllocator,
ShardingError> {
+ let topology = if matches!(allocation, CpuAllocation::NumaAware(_)) {
+ let numa_topology = NumaTopology::detect()?;
+
+ Some(Arc::new(numa_topology))
+ } else {
+ None
+ };
+
+ Ok(Self {
+ allocation: allocation.clone(),
+ topology,
+ })
+ }
+
+ pub fn to_shard_assignments(&self) -> Result<Vec<ShardInfo>,
ShardingError> {
+ match &self.allocation {
+ CpuAllocation::All => {
+ let available_cpus = available_parallelism()
+ .map_err(|err| ShardingError::Other {
+ msg: format!("Failed to get available_parallelism:
{:?}", err),
+ })?
+ .get();
+
+ let shard_assignments: Vec<_> = (0..available_cpus)
+ .map(|cpu_id| ShardInfo {
+ cpu_set: HashSet::from([cpu_id]),
+ numa_node: None,
+ })
+ .collect();
+
+ info!(
+ "Using all available CPU cores ({} shards with affinity)",
+ shard_assignments.len()
+ );
+
+ Ok(shard_assignments)
+ }
+ CpuAllocation::Count(count) => {
+ let shard_assignments = (0..*count)
+ .map(|cpu_id| ShardInfo {
+ cpu_set: HashSet::from([cpu_id]),
+ numa_node: None,
+ })
+ .collect();
+
+ info!("Using {count} shards with affinity to cores
0..{count}");
+
+ Ok(shard_assignments)
+ }
+ CpuAllocation::Range(start, end) => {
+ let shard_assignments = (*start..*end)
+ .map(|cpu_id| ShardInfo {
+ cpu_set: HashSet::from([cpu_id]),
+ numa_node: None,
+ })
+ .collect();
+
+ info!(
+ "Using {} shards with affinity to cores {start}..{end}",
+ end - start
+ );
+
+ Ok(shard_assignments)
+ }
+ CpuAllocation::NumaAware(numa_config) => {
+ let topology =
self.topology.as_ref().ok_or(ShardingError::NoTopology)?;
+ self.compute_numa_assignments(topology, numa_config)
+ }
+ }
+ }
+
+ fn compute_numa_assignments(
+ &self,
+ topology: &NumaTopology,
+ numa: &NumaConfig,
+ ) -> Result<Vec<ShardInfo>, ShardingError> {
+ let nodes = if numa.nodes.is_empty() {
+ (0..topology.node_count).collect()
+ } else {
+ numa.nodes.clone()
+ };
+
+ let cores_per_node = if numa.cores_per_node == 0 {
+ if numa.avoid_hyperthread {
+ topology.physical_cores_for_node(nodes[0])
+ } else {
+ topology.logical_cores_for_node(nodes[0])
+ }
+ } else {
+ numa.cores_per_node
+ };
+
+ let mut shard_infos = Vec::new();
+
+ let node_cpus: Vec<Vec<usize>> = nodes
+ .iter()
+ .map(|&node_id| {
+ let cpuset = topology.get_cpuset_for_node(node_id,
numa.avoid_hyperthread)?;
+ Ok(cpuset.iter_set().map(usize::from).collect())
+ })
+ .collect::<Result<_, ShardingError>>()?;
+
+ for (idx, &node_id) in nodes.iter().enumerate() {
+ let available_cpus = &node_cpus[idx];
+
+ let cores_to_use: Vec<usize> = available_cpus
+ .iter()
+ .take(cores_per_node)
+ .copied()
+ .collect();
+
+ if cores_to_use.len() < cores_per_node {
+ return Err(ShardingError::InsufficientCores {
+ requested: cores_per_node,
+ available: available_cpus.len(),
+ node: node_id,
+ });
+ }
+
+ for cpu_id in cores_to_use {
+ shard_infos.push(ShardInfo {
+ cpu_set: HashSet::from([cpu_id]),
+ numa_node: Some(node_id),
+ });
+ }
+ }
+
+ info!(
+ "Using {} shards with {} NUMA node, {} cores per node, and avoid
hyperthread {}",
+ shard_infos.len(),
+ nodes.len(),
+ cores_per_node,
+ numa.avoid_hyperthread
+ );
+
+ Ok(shard_infos)
+ }
+}
diff --git a/core/server/src/streaming/segments/mod.rs
b/core/server/src/streaming/segments/mod.rs
index 878af0eb1..4b3b883d9 100644
--- a/core/server/src/streaming/segments/mod.rs
+++ b/core/server/src/streaming/segments/mod.rs
@@ -32,4 +32,4 @@ pub use types::IggyMessageViewMut;
pub use types::IggyMessagesBatchMut;
pub use types::IggyMessagesBatchSet;
-pub const SEGMENT_MAX_SIZE_BYTES: u64 = 1024 * 1024 * 1024;
+pub use crate::configs::validators::SEGMENT_MAX_SIZE_BYTES;