This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch io_uring_tpc_cli_fixes
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 18faf2885546e3f4d1df8044b4a9dd2062595a74
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Fri Oct 3 17:51:02 2025 +0200

    Make test server run on 4 random cores
---
 core/integration/src/test_server.rs | 43 +++++++++++++++++++++++++++----------
 core/server/src/bootstrap.rs        |  8 +++----
 core/server/src/main.rs             |  2 +-
 core/server/src/shard/mod.rs        | 43 ++++++++++++++++++++++---------------
 core/server/src/slab/streams.rs     |  4 +++-
 5 files changed, 65 insertions(+), 35 deletions(-)

diff --git a/core/integration/src/test_server.rs 
b/core/integration/src/test_server.rs
index 2678eaee..abf392a9 100644
--- a/core/integration/src/test_server.rs
+++ b/core/integration/src/test_server.rs
@@ -16,6 +16,15 @@
  * under the License.
  */
 
+use assert_cmd::prelude::CommandCargoExt;
+use async_trait::async_trait;
+use derive_more::Display;
+use futures::executor::block_on;
+use iggy::prelude::UserStatus::Active;
+use iggy::prelude::*;
+use iggy_common::TransportProtocol;
+use rand::Rng;
+use server::configs::config_provider::{ConfigProvider, FileConfigProvider};
 use std::collections::HashMap;
 use std::fs;
 use std::fs::{File, OpenOptions};
@@ -23,20 +32,10 @@ use std::io::Write;
 use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr};
 use std::path::{Path, PathBuf};
 use std::process::{Child, Command, Stdio};
-use std::thread::{panicking, sleep};
+use std::thread::{available_parallelism, panicking, sleep};
 use std::time::Duration;
-
-use assert_cmd::prelude::CommandCargoExt;
-use async_trait::async_trait;
-use derive_more::Display;
-use futures::executor::block_on;
-use iggy_common::TransportProtocol;
 use uuid::Uuid;
 
-use iggy::prelude::UserStatus::Active;
-use iggy::prelude::*;
-use server::configs::config_provider::{ConfigProvider, FileConfigProvider};
-
 pub const SYSTEM_PATH_ENV_VAR: &str = "IGGY_SYSTEM_PATH";
 pub const TEST_VERBOSITY_ENV_VAR: &str = "IGGY_TEST_VERBOSE";
 pub const IPV6_ENV_VAR: &str = "IGGY_TCP_IPV6";
@@ -99,6 +98,28 @@ impl TestServer {
             }
         }
 
+        // Randomly select 4 CPU cores to reduce interference between parallel 
tests
+        let cpu_allocation = match available_parallelism() {
+            Ok(parallelism) => {
+                let available_cpus = parallelism.get();
+                if available_cpus >= 4 {
+                    let mut rng = rand::thread_rng();
+                    let max_start = available_cpus - 4;
+                    let start = rng.gen_range(0..=max_start);
+                    let end = start + 4;
+                    format!("{}..{}", start, end)
+                } else {
+                    "all".to_string()
+                }
+            }
+            Err(_) => "0..4".to_string(),
+        };
+
+        envs.insert(
+            "IGGY_SYSTEM_SHARDING_CPU_ALLOCATION".to_string(),
+            cpu_allocation,
+        );
+
         if ip_kind == IpAddrKind::V6 {
             envs.insert(IPV6_ENV_VAR.to_string(), "true".to_string());
         }
diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs
index 8e6c06b3..250a52a5 100644
--- a/core/server/src/bootstrap.rs
+++ b/core/server/src/bootstrap.rs
@@ -235,12 +235,10 @@ pub fn create_shard_connections(
     shards_set: &HashSet<usize>,
 ) -> (Vec<ShardConnector<ShardFrame>>, Vec<(u16, StopSender)>) {
     let shards_count = shards_set.len();
-    let mut shards_vec: Vec<usize> = shards_set.iter().cloned().collect();
-    shards_vec.sort();
 
-    let connectors: Vec<ShardConnector<ShardFrame>> = shards_vec
-        .into_iter()
-        .map(|id| ShardConnector::new(id as u16, shards_count))
+    // Create connectors with sequential IDs (0, 1, 2, ...) regardless of CPU 
core numbers
+    let connectors: Vec<ShardConnector<ShardFrame>> = (0..shards_count)
+        .map(|idx| ShardConnector::new(idx as u16, shards_count))
         .collect();
 
     let shutdown_handles = connectors
diff --git a/core/server/src/main.rs b/core/server/src/main.rs
index 37f225e6..36091a5a 100644
--- a/core/server/src/main.rs
+++ b/core/server/src/main.rs
@@ -276,7 +276,7 @@ async fn main() -> Result<(), ServerError> {
     for (id, cpu_id) in shards_set
         .into_iter()
         .enumerate()
-        .map(|(id, shard_id)| (id as u16, shard_id))
+        .map(|(idx, cpu)| (idx as u16, cpu))
     {
         let streams = streams.clone();
         let shards_table = shards_table.clone();
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index 00cdaa88..b1d054c7 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -598,26 +598,35 @@ impl IggyShard {
                         };
 
                         let batches = if consumer_offset.is_none() {
-                            let batches = 
self.streams2.get_messages_by_offset(&stream_id, &topic_id, partition_id, 0, 
count).await?;
+                            let batches = self
+                                .streams2
+                                .get_messages_by_offset(
+                                    &stream_id,
+                                    &topic_id,
+                                    partition_id,
+                                    0,
+                                    count,
+                                )
+                                .await?;
                             Ok(batches)
                         } else {
                             let consumer_offset = consumer_offset.unwrap();
-                        let offset = consumer_offset + 1;
-                        trace!(
-                            "Getting next messages for consumer id: {} for 
partition: {} from offset: {}...",
-                            consumer_id, partition_id, offset
-                        );
-                        let batches = self
-                            .streams2
-                            .get_messages_by_offset(
-                                &stream_id,
-                                &topic_id,
-                                partition_id,
-                                offset,
-                                count,
-                            )
-                            .await?;
-                        Ok(batches)
+                            let offset = consumer_offset + 1;
+                            trace!(
+                                "Getting next messages for consumer id: {} for 
partition: {} from offset: {}...",
+                                consumer_id, partition_id, offset
+                            );
+                            let batches = self
+                                .streams2
+                                .get_messages_by_offset(
+                                    &stream_id,
+                                    &topic_id,
+                                    partition_id,
+                                    offset,
+                                    count,
+                                )
+                                .await?;
+                            Ok(batches)
                         };
                         batches
                     }
diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs
index 729e85a0..5d3acc83 100644
--- a/core/server/src/slab/streams.rs
+++ b/core/server/src/slab/streams.rs
@@ -381,7 +381,9 @@ impl MainOps for Streams {
                 };
 
                 let Some(consumer_offset) = consumer_offset else {
-                    let batches = self.get_messages_by_offset(stream_id, 
topic_id, partition_id, 0, count).await?;
+                    let batches = self
+                        .get_messages_by_offset(stream_id, topic_id, 
partition_id, 0, count)
+                        .await?;
                     return Ok((metadata, batches));
                 };
                 let offset = consumer_offset + 1;

Reply via email to