This is an automated email from the ASF dual-hosted git repository.
gkoszyk pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/io_uring_tpc by this push:
new 7607c9d1 feat(io_uring): dump config to file after shard starts (#2024)
7607c9d1 is described below
commit 7607c9d14bd18664e83b67c13b578781c495ad37
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Sat Jul 19 16:58:37 2025 +0200
feat(io_uring): dump config to file after shard starts (#2024)
Co-authored-by: Hubert Gruszecki <[email protected]>
---
core/server/src/configs/sharding.rs | 19 ++++++++--
core/server/src/shard/builder.rs | 7 ++--
core/server/src/shard/mod.rs | 8 +++++
core/server/src/shard/transmission/event.rs | 3 ++
core/server/src/tcp/mod.rs | 2 +-
core/server/src/tcp/tcp_listener.rs | 56 +++++++++++++++++++++++++++--
core/server/src/tcp/tcp_server.rs | 3 +-
7 files changed, 86 insertions(+), 12 deletions(-)
diff --git a/core/server/src/configs/sharding.rs
b/core/server/src/configs/sharding.rs
index 87679676..1accd214 100644
--- a/core/server/src/configs/sharding.rs
+++ b/core/server/src/configs/sharding.rs
@@ -16,7 +16,7 @@
* under the License.
*/
-use serde::{Deserialize, Deserializer, Serialize};
+use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::collections::HashSet;
use std::str::FromStr;
use std::thread::available_parallelism;
@@ -35,7 +35,7 @@ impl Default for ShardingConfig {
}
}
-#[derive(Debug, Clone, PartialEq, Serialize)]
+#[derive(Debug, Clone, PartialEq)]
pub enum CpuAllocation {
All,
Count(usize),
@@ -92,6 +92,21 @@ impl FromStr for CpuAllocation {
}
}
+impl Serialize for CpuAllocation {
+ fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
+ where
+ S: Serializer,
+ {
+ match self {
+ CpuAllocation::All => serializer.serialize_str("all"),
+ CpuAllocation::Count(n) => serializer.serialize_u64(*n as u64),
+ CpuAllocation::Range(start, end) => {
+ serializer.serialize_str(&format!("{start}..{end}"))
+ }
+ }
+ }
+}
+
impl<'de> Deserialize<'de> for CpuAllocation {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
diff --git a/core/server/src/shard/builder.rs b/core/server/src/shard/builder.rs
index c49cdeb6..65c3592a 100644
--- a/core/server/src/shard/builder.rs
+++ b/core/server/src/shard/builder.rs
@@ -16,11 +16,7 @@
* under the License.
*/
-use std::{
- cell::Cell,
- rc::Rc,
- sync::{Arc, atomic::AtomicBool},
-};
+use std::{cell::Cell, rc::Rc, sync::atomic::AtomicBool};
use iggy_common::{Aes256GcmEncryptor, EncryptorKind};
use tracing::info;
@@ -130,6 +126,7 @@ impl IggyShardBuilder {
metrics: Metrics::init(),
task_registry: TaskRegistry::new(),
is_shutting_down: AtomicBool::new(false),
+ tcp_bound_address: Cell::new(None),
users: Default::default(),
permissioner: Default::default(),
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index 43280c77..9de75aa5 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -37,6 +37,7 @@ use namespace::IggyNamespace;
use std::{
cell::{Cell, RefCell},
future::Future,
+ net::SocketAddr,
pin::Pin,
rc::Rc,
str::FromStr,
@@ -159,6 +160,7 @@ pub struct IggyShard {
pub(crate) stop_sender: StopSender,
pub(crate) task_registry: TaskRegistry,
pub(crate) is_shutting_down: AtomicBool,
+ pub(crate) tcp_bound_address: Cell<Option<SocketAddr>>,
}
impl IggyShard {
@@ -208,6 +210,7 @@ impl IggyShard {
stop_sender,
task_registry: TaskRegistry::new(),
is_shutting_down: AtomicBool::new(false),
+ tcp_bound_address: Cell::new(None),
};
let user = User::root(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD);
shard
@@ -850,6 +853,11 @@ impl IggyShard {
self.update_permissions_bypass_auth(user_id,
permissions.to_owned())?;
Ok(())
}
+ ShardEvent::TcpBound { address } => {
+ info!("Received TcpBound event with address: {}", address);
+ self.tcp_bound_address.set(Some(*address));
+ Ok(())
+ }
}
}
diff --git a/core/server/src/shard/transmission/event.rs
b/core/server/src/shard/transmission/event.rs
index d2462e2f..67997242 100644
--- a/core/server/src/shard/transmission/event.rs
+++ b/core/server/src/shard/transmission/event.rs
@@ -130,4 +130,7 @@ pub enum ShardEvent {
address: SocketAddr,
transport: Transport,
},
+ TcpBound {
+ address: SocketAddr,
+ },
}
diff --git a/core/server/src/tcp/mod.rs b/core/server/src/tcp/mod.rs
index 3e2ab77d..3eb84c1c 100644
--- a/core/server/src/tcp/mod.rs
+++ b/core/server/src/tcp/mod.rs
@@ -21,7 +21,7 @@ pub mod sender;
pub mod tcp_listener;
pub mod tcp_sender;
pub mod tcp_server;
-mod tcp_socket;
+pub mod tcp_socket;
pub mod tcp_tls_listener;
pub mod tcp_tls_sender;
diff --git a/core/server/src/tcp/tcp_listener.rs
b/core/server/src/tcp/tcp_listener.rs
index 6ad2cf08..db0bcfaf 100644
--- a/core/server/src/tcp/tcp_listener.rs
+++ b/core/server/src/tcp/tcp_listener.rs
@@ -28,6 +28,7 @@ use futures::FutureExt;
use iggy_common::IggyError;
use std::net::SocketAddr;
use std::rc::Rc;
+use std::sync::Arc;
use std::time::Duration;
use tracing::{error, info};
@@ -64,17 +65,68 @@ async fn create_listener(
pub async fn start(
server_name: &'static str,
- addr: SocketAddr,
+ mut addr: SocketAddr,
config: &TcpSocketConfig,
shard: Rc<IggyShard>,
) -> Result<(), IggyError> {
+ if shard.id != 0 && addr.port() == 0 {
+ info!("Shard {} waiting for TCP address from shard 0...", shard.id);
+ loop {
+ if let Some(bound_addr) = shard.tcp_bound_address.get() {
+ addr = bound_addr;
+ info!("Shard {} received TCP address: {}", shard.id, addr);
+ break;
+ }
+ compio::time::sleep(Duration::from_millis(10)).await;
+ }
+ }
+
let listener = create_listener(addr, config)
.await
.map_err(|_| IggyError::CannotBindToSocket(addr.to_string()))
.with_error_context(|err| {
format!("Failed to bind {server_name} server to address: {addr},
{err}")
})?;
- info!("{server_name} server has started on: {:?}", addr);
+ let actual_addr = listener.local_addr().map_err(|e| {
+ error!("Failed to get local address: {e}");
+ IggyError::CannotBindToSocket(addr.to_string())
+ })?;
+ info!("{server_name} server has started on: {:?}", actual_addr);
+
+ if shard.id == 0 {
+ if addr.port() == 0 {
+ let event = ShardEvent::TcpBound {
+ address: actual_addr,
+ };
+ shard.broadcast_event_to_all_shards(Arc::new(event)).await;
+ }
+
+ let mut current_config = shard.config.clone();
+ current_config.tcp.address = actual_addr.to_string();
+
+ let runtime_path = current_config.system.get_runtime_path();
+ let current_config_path =
format!("{runtime_path}/current_config.toml");
+ let current_config_content =
+ toml::to_string(¤t_config).expect("Cannot serialize
current_config");
+
+ let buf_result = compio::fs::write(¤t_config_path,
current_config_content).await;
+ match buf_result.0 {
+ Ok(_) => info!("Current config written to: {}",
current_config_path),
+ Err(e) => error!(
+ "Failed to write current config to {}: {}",
+ current_config_path, e
+ ),
+ }
+ }
+
+ accept_loop(server_name, listener, shard).await
+}
+
+async fn accept_loop(
+ server_name: &'static str,
+ listener: TcpListener,
+ shard: Rc<IggyShard>,
+) -> Result<(), IggyError> {
loop {
let shutdown_check = async {
loop {
diff --git a/core/server/src/tcp/tcp_server.rs
b/core/server/src/tcp/tcp_server.rs
index 6b299bff..22a48144 100644
--- a/core/server/src/tcp/tcp_server.rs
+++ b/core/server/src/tcp/tcp_server.rs
@@ -24,7 +24,6 @@ use std::rc::Rc;
use tracing::info;
/// Starts the TCP server.
-/// Returns the address the server is listening on.
pub async fn spawn_tcp_server(shard: Rc<IggyShard>) -> Result<(), IggyError> {
let server_name = if shard.config.tcp.tls.enabled {
"Iggy TCP TLS"
@@ -41,7 +40,7 @@ pub async fn spawn_tcp_server(shard: Rc<IggyShard>) ->
Result<(), IggyError> {
.expect("Failed to parse TCP address");
let socket = tcp_socket::build(ip_v6, socket_config);
info!("Initializing {server_name} server...");
- // TODO: Fixme -- storing addr of the server inside of the config for
integration tests...
+
match shard.config.tcp.tls.enabled {
true => tcp_tls_listener::start(server_name, addr, socket,
shard.clone()).await?,
false => tcp_listener::start(server_name, addr, socket_config,
shard.clone()).await?,