This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch io_uring_tpc_cli_fixes in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 18faf2885546e3f4d1df8044b4a9dd2062595a74 Author: Hubert Gruszecki <[email protected]> AuthorDate: Fri Oct 3 17:51:02 2025 +0200 Make test server run on 4 random cores --- core/integration/src/test_server.rs | 43 +++++++++++++++++++++++++++---------- core/server/src/bootstrap.rs | 8 +++---- core/server/src/main.rs | 2 +- core/server/src/shard/mod.rs | 43 ++++++++++++++++++++++--------------- core/server/src/slab/streams.rs | 4 +++- 5 files changed, 65 insertions(+), 35 deletions(-) diff --git a/core/integration/src/test_server.rs b/core/integration/src/test_server.rs index 2678eaee..abf392a9 100644 --- a/core/integration/src/test_server.rs +++ b/core/integration/src/test_server.rs @@ -16,6 +16,15 @@ * under the License. */ +use assert_cmd::prelude::CommandCargoExt; +use async_trait::async_trait; +use derive_more::Display; +use futures::executor::block_on; +use iggy::prelude::UserStatus::Active; +use iggy::prelude::*; +use iggy_common::TransportProtocol; +use rand::Rng; +use server::configs::config_provider::{ConfigProvider, FileConfigProvider}; use std::collections::HashMap; use std::fs; use std::fs::{File, OpenOptions}; @@ -23,20 +32,10 @@ use std::io::Write; use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr}; use std::path::{Path, PathBuf}; use std::process::{Child, Command, Stdio}; -use std::thread::{panicking, sleep}; +use std::thread::{available_parallelism, panicking, sleep}; use std::time::Duration; - -use assert_cmd::prelude::CommandCargoExt; -use async_trait::async_trait; -use derive_more::Display; -use futures::executor::block_on; -use iggy_common::TransportProtocol; use uuid::Uuid; -use iggy::prelude::UserStatus::Active; -use iggy::prelude::*; -use server::configs::config_provider::{ConfigProvider, FileConfigProvider}; - pub const SYSTEM_PATH_ENV_VAR: &str = "IGGY_SYSTEM_PATH"; pub const TEST_VERBOSITY_ENV_VAR: &str = "IGGY_TEST_VERBOSE"; pub const IPV6_ENV_VAR: &str = "IGGY_TCP_IPV6"; @@ -99,6 +98,28 @@ impl TestServer { } } + // Randomly select 4 CPU cores to reduce interference between parallel tests + let cpu_allocation = match available_parallelism() { + Ok(parallelism) => { + let available_cpus = parallelism.get(); + if available_cpus >= 4 { + let mut rng = rand::thread_rng(); + let max_start = available_cpus - 4; + let start = rng.gen_range(0..=max_start); + let end = start + 4; + format!("{}..{}", start, end) + } else { + "all".to_string() + } + } + Err(_) => "0..4".to_string(), + }; + + envs.insert( + "IGGY_SYSTEM_SHARDING_CPU_ALLOCATION".to_string(), + cpu_allocation, + ); + if ip_kind == IpAddrKind::V6 { envs.insert(IPV6_ENV_VAR.to_string(), "true".to_string()); } diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs index 8e6c06b3..250a52a5 100644 --- a/core/server/src/bootstrap.rs +++ b/core/server/src/bootstrap.rs @@ -235,12 +235,10 @@ pub fn create_shard_connections( shards_set: &HashSet<usize>, ) -> (Vec<ShardConnector<ShardFrame>>, Vec<(u16, StopSender)>) { let shards_count = shards_set.len(); - let mut shards_vec: Vec<usize> = shards_set.iter().cloned().collect(); - shards_vec.sort(); - let connectors: Vec<ShardConnector<ShardFrame>> = shards_vec - .into_iter() - .map(|id| ShardConnector::new(id as u16, shards_count)) + // Create connectors with sequential IDs (0, 1, 2, ...) regardless of CPU core numbers + let connectors: Vec<ShardConnector<ShardFrame>> = (0..shards_count) + .map(|idx| ShardConnector::new(idx as u16, shards_count)) .collect(); let shutdown_handles = connectors diff --git a/core/server/src/main.rs b/core/server/src/main.rs index 37f225e6..36091a5a 100644 --- a/core/server/src/main.rs +++ b/core/server/src/main.rs @@ -276,7 +276,7 @@ async fn main() -> Result<(), ServerError> { for (id, cpu_id) in shards_set .into_iter() .enumerate() - .map(|(id, shard_id)| (id as u16, shard_id)) + .map(|(idx, cpu)| (idx as u16, cpu)) { let streams = streams.clone(); let shards_table = shards_table.clone(); diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs index 00cdaa88..b1d054c7 100644 --- a/core/server/src/shard/mod.rs +++ b/core/server/src/shard/mod.rs @@ -598,26 +598,35 @@ impl IggyShard { }; let batches = if consumer_offset.is_none() { - let batches = self.streams2.get_messages_by_offset(&stream_id, &topic_id, partition_id, 0, count).await?; + let batches = self + .streams2 + .get_messages_by_offset( + &stream_id, + &topic_id, + partition_id, + 0, + count, + ) + .await?; Ok(batches) } else { let consumer_offset = consumer_offset.unwrap(); - let offset = consumer_offset + 1; - trace!( - "Getting next messages for consumer id: {} for partition: {} from offset: {}...", - consumer_id, partition_id, offset - ); - let batches = self - .streams2 - .get_messages_by_offset( - &stream_id, - &topic_id, - partition_id, - offset, - count, - ) - .await?; - Ok(batches) + let offset = consumer_offset + 1; + trace!( + "Getting next messages for consumer id: {} for partition: {} from offset: {}...", + consumer_id, partition_id, offset + ); + let batches = self + .streams2 + .get_messages_by_offset( + &stream_id, + &topic_id, + partition_id, + offset, + count, + ) + .await?; + Ok(batches) }; batches } diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs index 729e85a0..5d3acc83 100644 --- a/core/server/src/slab/streams.rs +++ b/core/server/src/slab/streams.rs @@ -381,7 +381,9 @@ impl MainOps for Streams { }; let Some(consumer_offset) = consumer_offset else { - let batches = self.get_messages_by_offset(stream_id, topic_id, partition_id, 0, count).await?; + let batches = self + .get_messages_by_offset(stream_id, topic_id, partition_id, 0, count) + .await?; return Ok((metadata, batches)); }; let offset = consumer_offset + 1;
