This is an automated email from the ASF dual-hosted git repository.

piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new deb9ff71 refactor(integration): consolidate server tests using test 
matrices (#1901)
deb9ff71 is described below

commit deb9ff717ebd9fef16a6390af8dae70c984ec4c5
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Sun Jun 22 20:41:03 2025 +0200

    refactor(integration): consolidate server tests using test matrices (#1901)
    
    This refactoring eliminates code duplication across transport-specific
    server tests (TCP, QUIC, HTTP) by introducing a unified test matrix
    approach. The same test scenarios now run across all supported
    transports automatically, ensuring consistent testing coverage while
    reducing maintenance overhead.
    
    Key changes:
    - Moved benchmark utilities to a shared module (bench_utils)
    - Added transport() and server_addr() methods to ClientFactory trait
    - Created test matrices for general scenarios (all transports) and
      consumer group scenarios (TCP/QUIC only)
    - Consolidated data integrity tests to use parameterized cache settings
    - Removed transport-specific test files in favor of unified approach
    - Enhanced restart tests to verify data persistence with different
      segment cache configurations
    
    This change improves test maintainability by having a single source
    of truth for each test scenario while preserving the ability to run
    transport-specific tests where needed.
---
 .gitignore                                         |   2 +-
 .../{tests/bench/mod.rs => src/bench_utils.rs}     |  53 ++--
 core/integration/src/http_client.rs                |  10 +-
 core/integration/src/lib.rs                        |   1 +
 core/integration/src/quic_client.rs                |  10 +-
 core/integration/src/tcp_client.rs                 |  10 +-
 core/integration/src/test_server.rs                |   2 +
 core/integration/tests/bench/http.rs               |  64 -----
 core/integration/tests/bench/quic.rs               |  64 -----
 core/integration/tests/bench/tcp.rs                |  64 -----
 .../data_integrity/verify_after_server_restart.rs  |  95 +++++--
 core/integration/tests/mod.rs                      |   1 -
 core/integration/tests/server/cg.rs                |  38 +++
 core/integration/tests/server/general.rs           |  41 +++
 core/integration/tests/server/http_server.rs       |  73 ------
 core/integration/tests/server/mod.rs               |  84 ++++++-
 core/integration/tests/server/quic_server.rs       | 105 --------
 .../server/{mod.rs => scenarios/bench_scenario.rs} |  15 +-
 .../server/scenarios/delete_segments_scenario.rs   |   5 +-
 core/integration/tests/server/scenarios/mod.rs     |  12 +-
 .../server/scenarios/server_restart_scenario.rs    | 280 ---------------------
 .../tests/server/scenarios/tcp_tls_scenario.rs     |   2 +
 core/integration/tests/server/specific.rs          | 110 ++++++++
 core/integration/tests/server/tcp_server.rs        | 277 --------------------
 .../tests/streaming/get_by_timestamp.rs            |  14 +-
 25 files changed, 420 insertions(+), 1012 deletions(-)

diff --git a/.gitignore b/.gitignore
index 806fdb39..d4da1bae 100644
--- a/.gitignore
+++ b/.gitignore
@@ -5,7 +5,7 @@ target*
 .DS_Store
 .gradle
 local_data*
-bench_*
+/bench_*
 /sdk/errors_table/
 /rust-clippy-results.sarif
 /performance_results*
diff --git a/core/integration/tests/bench/mod.rs 
b/core/integration/src/bench_utils.rs
similarity index 77%
rename from core/integration/tests/bench/mod.rs
rename to core/integration/src/bench_utils.rs
index 97b0dcb1..579d2e11 100644
--- a/core/integration/tests/bench/mod.rs
+++ b/core/integration/src/bench_utils.rs
@@ -1,28 +1,23 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-mod http;
-mod quic;
-mod tcp;
-
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::test_server::Transport;
 use assert_cmd::prelude::CommandCargoExt;
-use iggy::prelude::IggyByteSize;
-use integration::test_server::{TEST_VERBOSITY_ENV_VAR, Transport};
+use iggy::prelude::*;
 use std::{
     fs::{self, File, OpenOptions},
     io::Write,
@@ -38,7 +33,7 @@ const DEFAULT_NUMBER_OF_STREAMS: u64 = 8;
 
 pub fn run_bench_and_wait_for_finish(
     server_addr: &str,
-    transport: Transport,
+    transport: &Transport,
     bench: &str,
     amount_of_data_to_process: IggyByteSize,
 ) {
@@ -47,7 +42,8 @@ pub fn run_bench_and_wait_for_finish(
     let mut stderr_file_path = None;
     let mut stdout_file_path = None;
 
-    if std::env::var(TEST_VERBOSITY_ENV_VAR).is_err() {
+    let test_verbosity_env_var = "IGGY_TEST_VERBOSE";
+    if std::env::var(test_verbosity_env_var).is_err() {
         let stderr_file = get_random_path();
         let stdout_file = get_random_path();
         stderr_file_path = Some(stderr_file);
@@ -68,7 +64,7 @@ pub fn run_bench_and_wait_for_finish(
         "--message-size",
         &message_size.to_string(),
         bench,
-        &format!("{}", transport),
+        &transport.to_string(),
         "--server-address",
         server_addr,
     ]);
@@ -76,7 +72,7 @@ pub fn run_bench_and_wait_for_finish(
     // By default, all iggy-bench logs are redirected to files,
     // and dumped to stderr when test fails. With IGGY_TEST_VERBOSE=1
     // logs are dumped to stdout during test execution.
-    if std::env::var(TEST_VERBOSITY_ENV_VAR).is_ok() {
+    if std::env::var(test_verbosity_env_var).is_ok() {
         command.stdout(Stdio::inherit());
         command.stderr(Stdio::inherit());
     } else {
@@ -127,7 +123,6 @@ pub fn run_bench_and_wait_for_finish(
         panic!("Failed to get output from iggy-bench");
     }
 
-    // TODO: fix this, it needs to be called in Drop
     if panicking() {
         if let Some(stdout_file_path) = &stdout_file_path {
             eprintln!(
diff --git a/core/integration/src/http_client.rs 
b/core/integration/src/http_client.rs
index afd566ed..8d201d31 100644
--- a/core/integration/src/http_client.rs
+++ b/core/integration/src/http_client.rs
@@ -16,7 +16,7 @@
  * under the License.
  */
 
-use crate::test_server::ClientFactory;
+use crate::test_server::{ClientFactory, Transport};
 use async_trait::async_trait;
 use iggy::http::http_client::HttpClient;
 use iggy::prelude::{Client, HttpClientConfig};
@@ -37,6 +37,14 @@ impl ClientFactory for HttpClientFactory {
         let client = HttpClient::create(Arc::new(config)).unwrap();
         Box::new(client)
     }
+
+    fn transport(&self) -> Transport {
+        Transport::Http
+    }
+
+    fn server_addr(&self) -> String {
+        self.server_addr.clone()
+    }
 }
 
 unsafe impl Send for HttpClientFactory {}
diff --git a/core/integration/src/lib.rs b/core/integration/src/lib.rs
index be388600..1f65508b 100644
--- a/core/integration/src/lib.rs
+++ b/core/integration/src/lib.rs
@@ -16,6 +16,7 @@
  * under the License.
  */
 
+pub mod bench_utils;
 pub mod file;
 #[allow(deprecated)]
 pub mod http_client;
diff --git a/core/integration/src/quic_client.rs 
b/core/integration/src/quic_client.rs
index a8ef2446..0205fc40 100644
--- a/core/integration/src/quic_client.rs
+++ b/core/integration/src/quic_client.rs
@@ -16,7 +16,7 @@
  * under the License.
  */
 
-use crate::test_server::ClientFactory;
+use crate::test_server::{ClientFactory, Transport};
 use async_trait::async_trait;
 use iggy::prelude::{Client, QuicClientConfig};
 use iggy::quic::quick_client::QuicClient;
@@ -38,6 +38,14 @@ impl ClientFactory for QuicClientFactory {
         Client::connect(&client).await.unwrap();
         Box::new(client)
     }
+
+    fn transport(&self) -> Transport {
+        Transport::Quic
+    }
+
+    fn server_addr(&self) -> String {
+        self.server_addr.clone()
+    }
 }
 
 unsafe impl Send for QuicClientFactory {}
diff --git a/core/integration/src/tcp_client.rs 
b/core/integration/src/tcp_client.rs
index 35f4a894..a5a4ae04 100644
--- a/core/integration/src/tcp_client.rs
+++ b/core/integration/src/tcp_client.rs
@@ -16,7 +16,7 @@
  * under the License.
  */
 
-use crate::test_server::ClientFactory;
+use crate::test_server::{ClientFactory, Transport};
 use async_trait::async_trait;
 use iggy::prelude::{Client, TcpClient, TcpClientConfig};
 use std::sync::Arc;
@@ -49,6 +49,14 @@ impl ClientFactory for TcpClientFactory {
         });
         Box::new(client)
     }
+
+    fn transport(&self) -> Transport {
+        Transport::Tcp
+    }
+
+    fn server_addr(&self) -> String {
+        self.server_addr.clone()
+    }
 }
 
 unsafe impl Send for TcpClientFactory {}
diff --git a/core/integration/src/test_server.rs 
b/core/integration/src/test_server.rs
index 322aa054..312ece59 100644
--- a/core/integration/src/test_server.rs
+++ b/core/integration/src/test_server.rs
@@ -54,6 +54,8 @@ pub enum IpAddrKind {
 #[async_trait]
 pub trait ClientFactory: Sync + Send {
     async fn create_client(&self) -> Box<dyn Client>;
+    fn transport(&self) -> Transport;
+    fn server_addr(&self) -> String;
 }
 
 #[derive(Debug, Clone, Copy, PartialEq, Display)]
diff --git a/core/integration/tests/bench/http.rs 
b/core/integration/tests/bench/http.rs
deleted file mode 100644
index f5bee4fb..00000000
--- a/core/integration/tests/bench/http.rs
+++ /dev/null
@@ -1,64 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-use super::run_bench_and_wait_for_finish;
-use iggy::prelude::IggyByteSize;
-use integration::test_server::{IpAddrKind, TestServer, Transport};
-use serial_test::parallel;
-use std::str::FromStr;
-
-#[test]
-#[parallel]
-fn http_ipv4_bench() {
-    let mut test_server = TestServer::default();
-    test_server.start();
-    let server_addr = test_server.get_http_api_addr().unwrap();
-    run_bench_and_wait_for_finish(
-        &server_addr,
-        Transport::Http,
-        "pinned-producer",
-        IggyByteSize::from_str("8MB").unwrap(),
-    );
-    run_bench_and_wait_for_finish(
-        &server_addr,
-        Transport::Http,
-        "pinned-consumer",
-        IggyByteSize::from_str("8MB").unwrap(),
-    );
-}
-
-#[cfg_attr(feature = "ci-qemu", ignore)]
-#[test]
-#[parallel]
-fn http_ipv6_bench() {
-    let mut test_server = TestServer::new(None, true, None, IpAddrKind::V6);
-    test_server.start();
-    let server_addr = test_server.get_http_api_addr().unwrap();
-    run_bench_and_wait_for_finish(
-        &server_addr,
-        Transport::Http,
-        "pinned-producer",
-        IggyByteSize::from_str("8MB").unwrap(),
-    );
-    run_bench_and_wait_for_finish(
-        &server_addr,
-        Transport::Http,
-        "pinned-consumer",
-        IggyByteSize::from_str("8MB").unwrap(),
-    );
-}
diff --git a/core/integration/tests/bench/quic.rs 
b/core/integration/tests/bench/quic.rs
deleted file mode 100644
index ce3cd193..00000000
--- a/core/integration/tests/bench/quic.rs
+++ /dev/null
@@ -1,64 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-use super::run_bench_and_wait_for_finish;
-use iggy::prelude::IggyByteSize;
-use integration::test_server::{IpAddrKind, TestServer, Transport};
-use serial_test::parallel;
-use std::str::FromStr;
-
-#[test]
-#[parallel]
-fn quic_ipv4_bench() {
-    let mut test_server = TestServer::default();
-    test_server.start();
-    let server_addr = test_server.get_quic_udp_addr().unwrap();
-    run_bench_and_wait_for_finish(
-        &server_addr,
-        Transport::Quic,
-        "pinned-producer",
-        IggyByteSize::from_str("8MB").unwrap(),
-    );
-    run_bench_and_wait_for_finish(
-        &server_addr,
-        Transport::Quic,
-        "pinned-consumer",
-        IggyByteSize::from_str("8MB").unwrap(),
-    );
-}
-
-#[cfg_attr(feature = "ci-qemu", ignore)]
-#[test]
-#[parallel]
-fn quic_ipv6_bench() {
-    let mut test_server = TestServer::new(None, true, None, IpAddrKind::V6);
-    test_server.start();
-    let server_addr = test_server.get_quic_udp_addr().unwrap();
-    run_bench_and_wait_for_finish(
-        &server_addr,
-        Transport::Quic,
-        "pinned-producer",
-        IggyByteSize::from_str("8MB").unwrap(),
-    );
-    run_bench_and_wait_for_finish(
-        &server_addr,
-        Transport::Quic,
-        "pinned-consumer",
-        IggyByteSize::from_str("8MB").unwrap(),
-    );
-}
diff --git a/core/integration/tests/bench/tcp.rs 
b/core/integration/tests/bench/tcp.rs
deleted file mode 100644
index 7b1fed9a..00000000
--- a/core/integration/tests/bench/tcp.rs
+++ /dev/null
@@ -1,64 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-use super::run_bench_and_wait_for_finish;
-use iggy::prelude::IggyByteSize;
-use integration::test_server::{IpAddrKind, TestServer, Transport};
-use serial_test::parallel;
-use std::str::FromStr;
-
-#[test]
-#[parallel]
-fn tcp_ipv4_bench() {
-    let mut test_server = TestServer::default();
-    test_server.start();
-    let server_addr = test_server.get_raw_tcp_addr().unwrap();
-    run_bench_and_wait_for_finish(
-        &server_addr,
-        Transport::Tcp,
-        "pinned-producer",
-        IggyByteSize::from_str("8MB").unwrap(),
-    );
-    run_bench_and_wait_for_finish(
-        &server_addr,
-        Transport::Tcp,
-        "pinned-consumer",
-        IggyByteSize::from_str("8MB").unwrap(),
-    );
-}
-
-#[cfg_attr(feature = "ci-qemu", ignore)]
-#[test]
-#[parallel]
-fn tcp_ipv6_bench() {
-    let mut test_server = TestServer::new(None, true, None, IpAddrKind::V6);
-    test_server.start();
-    let server_addr = test_server.get_raw_tcp_addr().unwrap();
-    run_bench_and_wait_for_finish(
-        &server_addr,
-        Transport::Tcp,
-        "pinned-producer",
-        IggyByteSize::from_str("8MB").unwrap(),
-    );
-    run_bench_and_wait_for_finish(
-        &server_addr,
-        Transport::Tcp,
-        "pinned-consumer",
-        IggyByteSize::from_str("8MB").unwrap(),
-    );
-}
diff --git 
a/core/integration/tests/data_integrity/verify_after_server_restart.rs 
b/core/integration/tests/data_integrity/verify_after_server_restart.rs
index 8c11495b..22939896 100644
--- a/core/integration/tests/data_integrity/verify_after_server_restart.rs
+++ b/core/integration/tests/data_integrity/verify_after_server_restart.rs
@@ -16,9 +16,9 @@
  * under the License.
  */
 
-use crate::bench::run_bench_and_wait_for_finish;
 use iggy::clients::client::IggyClient;
 use iggy::prelude::{Identifier, IggyByteSize, MessageClient, SystemClient};
+use integration::bench_utils::run_bench_and_wait_for_finish;
 use integration::{
     tcp_client::TcpClientFactory,
     test_server::{
@@ -27,13 +27,42 @@ use integration::{
 };
 use serial_test::parallel;
 use std::{collections::HashMap, str::FromStr};
+use test_case::test_matrix;
+
+/*
+ * Helper functions for test matrix parameters
+ */
+
+fn cache_open_segment() -> &'static str {
+    "open_segment"
+}
+
+fn cache_all() -> &'static str {
+    "all"
+}
+
+fn cache_none() -> &'static str {
+    "none"
+}
 
 // TODO(numminex) - Move the message generation method from benchmark run to a 
special method.
+#[test_matrix([cache_open_segment(), cache_all(), cache_none()])]
 #[tokio::test]
 #[parallel]
-async fn should_fill_data_and_verify_after_restart() {
-    // 1. Start server
-    let mut test_server = TestServer::new(None, false, None, IpAddrKind::V4);
+async fn should_fill_data_and_verify_after_restart(cache_setting: &'static 
str) {
+    // 1. Start server with cache configuration
+    let env_vars = HashMap::from([
+        (
+            SYSTEM_PATH_ENV_VAR.to_owned(),
+            TestServer::get_random_path(),
+        ),
+        (
+            "IGGY_SEGMENT_CACHE_INDEXES".to_string(),
+            cache_setting.to_string(),
+        ),
+    ]);
+
+    let mut test_server = TestServer::new(Some(env_vars.clone()), false, None, 
IpAddrKind::V4);
     test_server.start();
     let server_addr = test_server.get_raw_tcp_addr().unwrap();
     let local_data_path = test_server.get_local_data_path().to_owned();
@@ -42,7 +71,7 @@ async fn should_fill_data_and_verify_after_restart() {
     let amount_of_data_to_process = IggyByteSize::from_str("5 MB").unwrap();
     run_bench_and_wait_for_finish(
         &server_addr,
-        Transport::Tcp,
+        &Transport::Tcp,
         "pinned-producer",
         amount_of_data_to_process,
     );
@@ -50,7 +79,7 @@ async fn should_fill_data_and_verify_after_restart() {
     // 3. Run poll bench to check if everything's OK
     run_bench_and_wait_for_finish(
         &server_addr,
-        Transport::Tcp,
+        &Transport::Tcp,
         "pinned-consumer",
         amount_of_data_to_process,
     );
@@ -94,18 +123,32 @@ async fn should_fill_data_and_verify_after_restart() {
     let expected_clients_count = stats.clients_count;
     let expected_consumer_groups_count = stats.consumer_groups_count;
 
-    // 6. Stop server, remove current config file to properly fetch new server 
TCP address
+    // 6. Stop server
     test_server.stop();
     drop(test_server);
-    std::fs::remove_file(local_data_path.clone() + 
"/runtime/current_config.toml").unwrap();
 
-    // 7. Restart server
-    let extra_envs = HashMap::from([(SYSTEM_PATH_ENV_VAR.to_owned(), 
local_data_path.clone())]);
-    let mut test_server = TestServer::new(Some(extra_envs), false, None, 
IpAddrKind::V4);
+    // 7. Restart server with same settings
+    let mut test_server = TestServer::new(Some(env_vars), false, None, 
IpAddrKind::V4);
     test_server.start();
     let server_addr = test_server.get_raw_tcp_addr().unwrap();
 
-    // 8. Connect and login to newly started server
+    // 8. Run send bench again to add more data
+    run_bench_and_wait_for_finish(
+        &server_addr,
+        &Transport::Tcp,
+        "pinned-producer",
+        amount_of_data_to_process,
+    );
+
+    // 9. Run poll bench again to check if all data is still there
+    run_bench_and_wait_for_finish(
+        &server_addr,
+        &Transport::Tcp,
+        "pinned-consumer",
+        IggyByteSize::from(amount_of_data_to_process.as_bytes_u64() * 2),
+    );
+
+    // 10. Connect and login to newly started server
     let client = IggyClient::create(
         TcpClientFactory {
             server_addr: server_addr.clone(),
@@ -118,7 +161,7 @@ async fn should_fill_data_and_verify_after_restart() {
     );
     login_root(&client).await;
 
-    // 9. Save stats from the second server
+    // 11. Save stats from the second server (should have double the data)
     let stats = client.get_stats().await.unwrap();
     let actual_messages_size_bytes = stats.messages_size_bytes;
     let actual_streams_count = stats.streams_count;
@@ -129,10 +172,11 @@ async fn should_fill_data_and_verify_after_restart() {
     let actual_clients_count = stats.clients_count;
     let actual_consumer_groups_count = stats.consumer_groups_count;
 
-    // 10. Compare stats
+    // 12. Compare stats (expecting double the messages/size after second 
bench run)
     assert_eq!(
-        expected_messages_size_bytes, actual_messages_size_bytes,
-        "Messages size bytes"
+        expected_messages_size_bytes.as_bytes_usize() * 2,
+        actual_messages_size_bytes.as_bytes_usize(),
+        "Messages size bytes should be doubled"
     );
     assert_eq!(
         expected_streams_count, actual_streams_count,
@@ -143,13 +187,14 @@ async fn should_fill_data_and_verify_after_restart() {
         expected_partitions_count, actual_partitions_count,
         "Partitions count"
     );
-    assert_eq!(
-        expected_segments_count, actual_segments_count,
-        "Segments count"
+    assert!(
+        actual_segments_count >= expected_segments_count,
+        "Segments count should be at least the same or more"
     );
     assert_eq!(
-        expected_messages_count, actual_messages_count,
-        "Messages count"
+        expected_messages_count * 2,
+        actual_messages_count,
+        "Messages count should be doubled"
     );
     assert_eq!(
         expected_clients_count, actual_clients_count,
@@ -160,14 +205,14 @@ async fn should_fill_data_and_verify_after_restart() {
         "Consumer groups count"
     );
 
-    // 11. Again run poll bench to check if data is still there
+    // 13. Run poll bench to check if all data (10MB total) is still there
     run_bench_and_wait_for_finish(
         &server_addr,
-        Transport::Tcp,
+        &Transport::Tcp,
         "pinned-consumer",
-        amount_of_data_to_process,
+        IggyByteSize::from(amount_of_data_to_process.as_bytes_u64() * 2),
     );
 
-    // 12. Manual cleanup
+    // 14. Manual cleanup
     std::fs::remove_dir_all(local_data_path).unwrap();
 }
diff --git a/core/integration/tests/mod.rs b/core/integration/tests/mod.rs
index b0ec21d8..9d294947 100644
--- a/core/integration/tests/mod.rs
+++ b/core/integration/tests/mod.rs
@@ -26,7 +26,6 @@ use std::sync::{Arc, Once};
 use std::{panic, thread};
 
 mod archiver;
-mod bench;
 mod cli;
 mod config_provider;
 mod data_integrity;
diff --git a/core/integration/tests/server/cg.rs 
b/core/integration/tests/server/cg.rs
new file mode 100644
index 00000000..8ee3857e
--- /dev/null
+++ b/core/integration/tests/server/cg.rs
@@ -0,0 +1,38 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::server::{
+    ScenarioFn, join_scenario, multiple_clients_scenario, run_scenario, 
single_client_scenario,
+};
+use integration::test_server::Transport;
+use serial_test::parallel;
+use test_case::test_matrix;
+
+// Consumer group scenarios do not support HTTP
+#[test_matrix(
+    [Transport::Tcp, Transport::Quic],
+    [
+        join_scenario(),
+        single_client_scenario(),
+        multiple_clients_scenario(),
+    ]
+)]
+#[tokio::test]
+#[parallel]
+async fn matrix(transport: Transport, scenario: ScenarioFn) {
+    run_scenario(transport, scenario).await;
+}
diff --git a/core/integration/tests/server/general.rs 
b/core/integration/tests/server/general.rs
new file mode 100644
index 00000000..e98403b0
--- /dev/null
+++ b/core/integration/tests/server/general.rs
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::server::{
+    ScenarioFn, bench_scenario, create_message_payload_scenario, 
message_headers_scenario,
+    run_scenario, stream_size_validation_scenario, system_scenario, 
user_scenario,
+};
+use integration::test_server::Transport;
+use serial_test::parallel;
+use test_case::test_matrix;
+
+#[test_matrix(
+    [Transport::Tcp, Transport::Quic, Transport::Http],
+    [
+        system_scenario(),
+        user_scenario(),
+        message_headers_scenario(),
+        create_message_payload_scenario(),
+        stream_size_validation_scenario(),
+        bench_scenario(),
+    ]
+)]
+#[tokio::test]
+#[parallel]
+async fn matrix(transport: Transport, scenario: ScenarioFn) {
+    run_scenario(transport, scenario).await;
+}
diff --git a/core/integration/tests/server/http_server.rs 
b/core/integration/tests/server/http_server.rs
deleted file mode 100644
index fce1f1a8..00000000
--- a/core/integration/tests/server/http_server.rs
+++ /dev/null
@@ -1,73 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-use crate::server::scenarios::{
-    create_message_payload, stream_size_validation_scenario, system_scenario, 
user_scenario,
-};
-use integration::{http_client::HttpClientFactory, test_server::TestServer};
-use serial_test::parallel;
-
-#[tokio::test]
-#[parallel]
-async fn create_message_payload_scenario_should_be_valid() {
-    let mut test_server = TestServer::default();
-    test_server.start();
-    let server_addr = test_server.get_http_api_addr().unwrap();
-    let client_factory = HttpClientFactory { server_addr };
-    create_message_payload::run(&client_factory).await;
-}
-
-#[tokio::test]
-#[parallel]
-async fn message_headers_scenario_should_be_valid() {
-    let mut test_server = TestServer::default();
-    test_server.start();
-    let server_addr = test_server.get_http_api_addr().unwrap();
-    let client_factory = HttpClientFactory { server_addr };
-    create_message_payload::run(&client_factory).await;
-}
-
-#[tokio::test]
-#[parallel]
-async fn stream_size_validation_scenario_should_be_valid() {
-    let mut test_server = TestServer::default();
-    test_server.start();
-    let server_addr = test_server.get_http_api_addr().unwrap();
-    let client_factory = HttpClientFactory { server_addr };
-    stream_size_validation_scenario::run(&client_factory).await;
-}
-
-#[tokio::test]
-#[parallel]
-async fn system_scenario_should_be_valid() {
-    let mut test_server = TestServer::default();
-    test_server.start();
-    let server_addr = test_server.get_http_api_addr().unwrap();
-    let client_factory = HttpClientFactory { server_addr };
-    system_scenario::run(&client_factory).await;
-}
-
-#[tokio::test]
-#[parallel]
-async fn user_scenario_should_be_valid() {
-    let mut test_server = TestServer::default();
-    test_server.start();
-    let server_addr = test_server.get_http_api_addr().unwrap();
-    let client_factory = HttpClientFactory { server_addr };
-    user_scenario::run(&client_factory).await;
-}
diff --git a/core/integration/tests/server/mod.rs 
b/core/integration/tests/server/mod.rs
index c9be19a8..6d7fa1ed 100644
--- a/core/integration/tests/server/mod.rs
+++ b/core/integration/tests/server/mod.rs
@@ -16,7 +16,85 @@
  * under the License.
  */
 
-mod http_server;
-mod quic_server;
+mod cg;
+mod general;
 mod scenarios;
-mod tcp_server;
+mod specific;
+
+use integration::{
+    http_client::HttpClientFactory,
+    quic_client::QuicClientFactory,
+    tcp_client::TcpClientFactory,
+    test_server::{ClientFactory, TestServer, Transport},
+};
+use scenarios::{
+    bench_scenario, consumer_group_join_scenario,
+    consumer_group_with_multiple_clients_polling_messages_scenario,
+    consumer_group_with_single_client_polling_messages_scenario, 
create_message_payload,
+    message_headers_scenario, stream_size_validation_scenario, 
system_scenario, user_scenario,
+};
+use std::future::Future;
+use std::pin::Pin;
+
+type ScenarioFn = fn(&dyn ClientFactory) -> Pin<Box<dyn Future<Output = ()> + 
'_>>;
+
+fn system_scenario() -> ScenarioFn {
+    |factory| Box::pin(system_scenario::run(factory))
+}
+
+fn user_scenario() -> ScenarioFn {
+    |factory| Box::pin(user_scenario::run(factory))
+}
+
+fn message_headers_scenario() -> ScenarioFn {
+    |factory| Box::pin(message_headers_scenario::run(factory))
+}
+
+fn create_message_payload_scenario() -> ScenarioFn {
+    |factory| Box::pin(create_message_payload::run(factory))
+}
+
+fn join_scenario() -> ScenarioFn {
+    |factory| Box::pin(consumer_group_join_scenario::run(factory))
+}
+
+fn stream_size_validation_scenario() -> ScenarioFn {
+    |factory| Box::pin(stream_size_validation_scenario::run(factory))
+}
+
+fn single_client_scenario() -> ScenarioFn {
+    |factory| 
Box::pin(consumer_group_with_single_client_polling_messages_scenario::run(factory))
+}
+
+fn multiple_clients_scenario() -> ScenarioFn {
+    |factory| 
Box::pin(consumer_group_with_multiple_clients_polling_messages_scenario::run(factory))
+}
+
+fn bench_scenario() -> ScenarioFn {
+    |factory| Box::pin(bench_scenario::run(factory))
+}
+
+async fn run_scenario(transport: Transport, scenario: ScenarioFn) {
+    let mut test_server = TestServer::default();
+    test_server.start();
+
+    let client_factory: Box<dyn ClientFactory> = match transport {
+        Transport::Tcp => {
+            let server_addr = test_server.get_raw_tcp_addr().unwrap();
+            Box::new(TcpClientFactory {
+                server_addr,
+                ..Default::default()
+            })
+        }
+        Transport::Quic => {
+            let server_addr = test_server.get_quic_udp_addr().unwrap();
+            Box::new(QuicClientFactory { server_addr })
+        }
+        Transport::Http => {
+            let server_addr = test_server.get_http_api_addr().unwrap();
+            Box::new(HttpClientFactory { server_addr })
+        }
+    };
+
+    scenario(&*client_factory).await;
+}
diff --git a/core/integration/tests/server/quic_server.rs 
b/core/integration/tests/server/quic_server.rs
deleted file mode 100644
index fcad0129..00000000
--- a/core/integration/tests/server/quic_server.rs
+++ /dev/null
@@ -1,105 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-use crate::server::scenarios::{
-    consumer_group_join_scenario, 
consumer_group_with_multiple_clients_polling_messages_scenario,
-    consumer_group_with_single_client_polling_messages_scenario, 
create_message_payload,
-    message_headers_scenario, stream_size_validation_scenario, 
system_scenario, user_scenario,
-};
-use integration::{quic_client::QuicClientFactory, test_server::TestServer};
-use serial_test::parallel;
-
-#[tokio::test]
-#[parallel]
-async fn system_scenario_should_be_valid() {
-    let mut test_server = TestServer::default();
-    test_server.start();
-    let server_addr = test_server.get_quic_udp_addr().unwrap();
-    let client_factory = QuicClientFactory { server_addr };
-    system_scenario::run(&client_factory).await;
-}
-
-#[tokio::test]
-#[parallel]
-async fn user_scenario_should_be_valid() {
-    let mut test_server = TestServer::default();
-    test_server.start();
-    let server_addr = test_server.get_quic_udp_addr().unwrap();
-    let client_factory = QuicClientFactory { server_addr };
-    user_scenario::run(&client_factory).await;
-}
-
-#[tokio::test]
-#[parallel]
-async fn message_headers_scenario_should_be_valid() {
-    let mut test_server = TestServer::default();
-    test_server.start();
-    let server_addr = test_server.get_quic_udp_addr().unwrap();
-    let client_factory = QuicClientFactory { server_addr };
-    message_headers_scenario::run(&client_factory).await;
-}
-
-#[tokio::test]
-#[parallel]
-async fn create_message_payload_scenario_should_be_valid() {
-    let mut test_server = TestServer::default();
-    test_server.start();
-    let server_addr = test_server.get_quic_udp_addr().unwrap();
-    let client_factory = QuicClientFactory { server_addr };
-    create_message_payload::run(&client_factory).await;
-}
-
-#[tokio::test]
-#[parallel]
-async fn consumer_group_join_scenario_should_be_valid() {
-    let mut test_server = TestServer::default();
-    test_server.start();
-    let server_addr = test_server.get_quic_udp_addr().unwrap();
-    let client_factory = QuicClientFactory { server_addr };
-    consumer_group_join_scenario::run(&client_factory).await;
-}
-
-#[tokio::test]
-#[parallel]
-async fn 
consumer_group_with_single_client_polling_messages_scenario_should_be_valid() {
-    let mut test_server = TestServer::default();
-    test_server.start();
-    let server_addr = test_server.get_quic_udp_addr().unwrap();
-    let client_factory = QuicClientFactory { server_addr };
-    
consumer_group_with_single_client_polling_messages_scenario::run(&client_factory).await;
-}
-
-#[tokio::test]
-#[parallel]
-async fn 
consumer_group_with_multiple_clients_polling_messages_scenario_should_be_valid()
 {
-    let mut test_server = TestServer::default();
-    test_server.start();
-    let server_addr = test_server.get_quic_udp_addr().unwrap();
-    let client_factory = QuicClientFactory { server_addr };
-    
consumer_group_with_multiple_clients_polling_messages_scenario::run(&client_factory).await;
-}
-
-#[tokio::test]
-#[parallel]
-async fn stream_size_validation_scenario_should_be_valid() {
-    let mut test_server = TestServer::default();
-    test_server.start();
-    let server_addr = test_server.get_quic_udp_addr().unwrap();
-    let client_factory = QuicClientFactory { server_addr };
-    stream_size_validation_scenario::run(&client_factory).await;
-}
diff --git a/core/integration/tests/server/mod.rs 
b/core/integration/tests/server/scenarios/bench_scenario.rs
similarity index 61%
copy from core/integration/tests/server/mod.rs
copy to core/integration/tests/server/scenarios/bench_scenario.rs
index c9be19a8..990253f2 100644
--- a/core/integration/tests/server/mod.rs
+++ b/core/integration/tests/server/scenarios/bench_scenario.rs
@@ -16,7 +16,14 @@
  * under the License.
  */
 
-mod http_server;
-mod quic_server;
-mod scenarios;
-mod tcp_server;
+use iggy::prelude::*;
+use integration::{bench_utils::run_bench_and_wait_for_finish, 
test_server::ClientFactory};
+
+pub async fn run(client_factory: &dyn ClientFactory) {
+    let server_addr = client_factory.server_addr();
+    let transport = client_factory.transport();
+    let data_size = IggyByteSize::from(8 * 1024 * 1024);
+
+    run_bench_and_wait_for_finish(&server_addr, &transport, "pinned-producer", 
data_size);
+    run_bench_and_wait_for_finish(&server_addr, &transport, "pinned-consumer", 
data_size);
+}
diff --git 
a/core/integration/tests/server/scenarios/delete_segments_scenario.rs 
b/core/integration/tests/server/scenarios/delete_segments_scenario.rs
index 8cda05b5..d2e65f0c 100644
--- a/core/integration/tests/server/scenarios/delete_segments_scenario.rs
+++ b/core/integration/tests/server/scenarios/delete_segments_scenario.rs
@@ -27,10 +27,7 @@ const TOPIC_NAME: &str = "test_topic";
 const PARTITION_ID: u32 = 1;
 const LOG_EXTENSION: &str = "log";
 
-pub async fn test_delete_segments_scenario(
-    client_factory: &dyn ClientFactory,
-    test_server: &TestServer,
-) {
+pub async fn run(client_factory: &dyn ClientFactory, test_server: &TestServer) 
{
     let client = client_factory.create_client().await;
     let client = IggyClient::create(client, None, None);
 
diff --git a/core/integration/tests/server/scenarios/mod.rs 
b/core/integration/tests/server/scenarios/mod.rs
index 74676897..4671a95e 100644
--- a/core/integration/tests/server/scenarios/mod.rs
+++ b/core/integration/tests/server/scenarios/mod.rs
@@ -16,13 +16,7 @@
  * under the License.
  */
 
-use iggy::clients::client::IggyClient;
-use iggy::prelude::ConsumerGroupDetails;
-use iggy::prelude::ConsumerKind;
-use iggy::prelude::Identifier;
-use iggy::prelude::{ConsumerGroupClient, StreamClient};
-use integration::test_server::{ClientFactory, delete_user};
-
+pub mod bench_scenario;
 pub mod consumer_group_join_scenario;
 pub mod consumer_group_with_multiple_clients_polling_messages_scenario;
 pub mod consumer_group_with_single_client_polling_messages_scenario;
@@ -30,12 +24,14 @@ pub mod create_message_payload;
 pub mod delete_segments_scenario;
 pub mod message_headers_scenario;
 pub mod message_size_scenario;
-pub mod server_restart_scenario;
 pub mod stream_size_validation_scenario;
 pub mod system_scenario;
 pub mod tcp_tls_scenario;
 pub mod user_scenario;
 
+use iggy::prelude::*;
+use integration::test_server::{ClientFactory, delete_user};
+
 const STREAM_ID: u32 = 1;
 const TOPIC_ID: u32 = 1;
 const PARTITION_ID: u32 = 1;
diff --git a/core/integration/tests/server/scenarios/server_restart_scenario.rs 
b/core/integration/tests/server/scenarios/server_restart_scenario.rs
deleted file mode 100644
index f41febd1..00000000
--- a/core/integration/tests/server/scenarios/server_restart_scenario.rs
+++ /dev/null
@@ -1,280 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-use bytes::Bytes;
-use iggy::prelude::*;
-use integration::test_server::ClientFactory;
-
-const STREAM_ID: u32 = 1;
-const TOPIC_ID: u32 = 1;
-const PARTITION_ID: u32 = 1;
-const PARTITIONS_COUNT: u32 = 1;
-const MESSAGE_SIZE: usize = 50_000;
-
-pub async fn run(client_factory: &dyn ClientFactory, 
expected_messages_before_restart: u32) {
-    if expected_messages_before_restart == 0 {
-        run_initial_phase(client_factory).await;
-    } else {
-        run_after_restart_phase(client_factory, 
expected_messages_before_restart).await;
-    }
-}
-
-async fn run_initial_phase(client_factory: &dyn ClientFactory) {
-    let consumer = Consumer {
-        kind: ConsumerKind::Consumer,
-        id: Identifier::numeric(1).unwrap(),
-    };
-
-    let client = client_factory.create_client().await;
-    let client = IggyClient::create(client, None, None);
-
-    client
-        .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
-        .await
-        .unwrap();
-
-    client
-        .create_stream("test-stream", Some(STREAM_ID))
-        .await
-        .unwrap();
-
-    let stream = client
-        .get_stream(&Identifier::numeric(STREAM_ID).unwrap())
-        .await
-        .unwrap()
-        .expect("Initial stream should exist");
-
-    assert_eq!(stream.id, STREAM_ID);
-    assert_eq!(stream.name, "test-stream");
-
-    client
-        .create_topic(
-            &Identifier::numeric(STREAM_ID).unwrap(),
-            "test-topic",
-            PARTITIONS_COUNT,
-            CompressionAlgorithm::None,
-            None,
-            Some(TOPIC_ID),
-            IggyExpiry::NeverExpire,
-            MaxTopicSize::ServerDefault,
-        )
-        .await
-        .unwrap();
-
-    let messages_per_batch = 4;
-    let mut total_messages = 0;
-
-    for batch in 0..3 {
-        let mut messages = create_messages(batch * messages_per_batch, 
messages_per_batch);
-        client
-            .send_messages(
-                &Identifier::numeric(STREAM_ID).unwrap(),
-                &Identifier::numeric(TOPIC_ID).unwrap(),
-                &Partitioning::partition_id(PARTITION_ID),
-                &mut messages,
-            )
-            .await
-            .unwrap();
-        total_messages += messages_per_batch;
-    }
-
-    let polled_messages = client
-        .poll_messages(
-            &Identifier::numeric(STREAM_ID).unwrap(),
-            &Identifier::numeric(TOPIC_ID).unwrap(),
-            Some(PARTITION_ID),
-            &consumer,
-            &PollingStrategy::offset(0),
-            total_messages,
-            false,
-        )
-        .await
-        .unwrap();
-
-    assert_eq!(polled_messages.messages.len() as u32, total_messages);
-    verify_messages(&polled_messages.messages, 0, total_messages);
-}
-
-async fn run_after_restart_phase(
-    client_factory: &dyn ClientFactory,
-    expected_messages_before_restart: u32,
-) {
-    let consumer = Consumer {
-        kind: ConsumerKind::Consumer,
-        id: Identifier::numeric(1).unwrap(),
-    };
-
-    let client = client_factory.create_client().await;
-    let client = IggyClient::create(client, None, None);
-
-    client
-        .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
-        .await
-        .unwrap();
-
-    let stream = client
-        .get_stream(&Identifier::numeric(STREAM_ID).unwrap())
-        .await
-        .unwrap()
-        .expect("Stream should exist after restart");
-
-    assert_eq!(stream.id, STREAM_ID);
-    assert_eq!(stream.name, "test-stream");
-
-    let topic = client
-        .get_topic(
-            &Identifier::numeric(STREAM_ID).unwrap(),
-            &Identifier::numeric(TOPIC_ID).unwrap(),
-        )
-        .await
-        .unwrap()
-        .expect("Topic should exist after restart");
-
-    assert_eq!(topic.id, TOPIC_ID);
-    assert_eq!(topic.name, "test-topic");
-    assert_eq!(
-        topic.messages_count,
-        expected_messages_before_restart as u64
-    );
-
-    let messages_per_batch = 4;
-    let second_phase_messages = messages_per_batch * 2;
-
-    for batch in 0..2 {
-        let start_id = expected_messages_before_restart + (batch * 
messages_per_batch);
-        let mut messages = create_messages(start_id, messages_per_batch);
-        client
-            .send_messages(
-                &Identifier::numeric(STREAM_ID).unwrap(),
-                &Identifier::numeric(TOPIC_ID).unwrap(),
-                &Partitioning::partition_id(PARTITION_ID),
-                &mut messages,
-            )
-            .await
-            .unwrap();
-    }
-
-    let total_messages = expected_messages_before_restart + 
second_phase_messages;
-
-    // First, poll messages one-by-one to verify individual message access
-    for i in 0..total_messages {
-        let single_message = client
-            .poll_messages(
-                &Identifier::numeric(STREAM_ID).unwrap(),
-                &Identifier::numeric(TOPIC_ID).unwrap(),
-                Some(PARTITION_ID),
-                &consumer,
-                &PollingStrategy::offset(i as u64),
-                1,
-                false,
-            )
-            .await
-            .unwrap_or_else(|error| {
-                panic!("Failed to poll message at offset {i}: {error}");
-            });
-
-        assert_eq!(single_message.messages.len(), 1);
-        let msg = &single_message.messages[0];
-        assert_eq!(msg.header.offset, i as u64);
-
-        let payload_str = String::from_utf8_lossy(&msg.payload);
-        assert!(payload_str.starts_with(&format!("Message {} -", i)));
-    }
-
-    // Then poll all messages at once
-    let all_messages = client
-        .poll_messages(
-            &Identifier::numeric(STREAM_ID).unwrap(),
-            &Identifier::numeric(TOPIC_ID).unwrap(),
-            Some(PARTITION_ID),
-            &consumer,
-            &PollingStrategy::offset(0),
-            total_messages,
-            false,
-        )
-        .await
-        .unwrap();
-
-    assert_eq!(all_messages.messages.len() as u32, total_messages);
-
-    verify_messages(&all_messages.messages, 0, total_messages);
-
-    let new_messages = client
-        .poll_messages(
-            &Identifier::numeric(STREAM_ID).unwrap(),
-            &Identifier::numeric(TOPIC_ID).unwrap(),
-            Some(PARTITION_ID),
-            &consumer,
-            &PollingStrategy::offset(expected_messages_before_restart as u64),
-            second_phase_messages,
-            false,
-        )
-        .await
-        .unwrap();
-
-    assert_eq!(new_messages.messages.len() as u32, second_phase_messages);
-    verify_messages(
-        &new_messages.messages,
-        expected_messages_before_restart,
-        second_phase_messages,
-    );
-
-    let final_topic = client
-        .get_topic(
-            &Identifier::numeric(STREAM_ID).unwrap(),
-            &Identifier::numeric(TOPIC_ID).unwrap(),
-        )
-        .await
-        .unwrap()
-        .expect("Failed to get topic");
-
-    assert_eq!(final_topic.messages_count, total_messages as u64);
-    println!(
-        "Test completed: {} total messages across {} segments",
-        total_messages, final_topic.partitions[0].segments_count
-    );
-}
-
-fn create_messages(start_id: u32, count: u32) -> Vec<IggyMessage> {
-    let mut messages = Vec::new();
-    let large_payload = "x".repeat(MESSAGE_SIZE);
-
-    for i in 0..count {
-        let id = (start_id + i + 1) as u128;
-        let payload = format!("Message {} - {}", start_id + i, large_payload);
-        messages.push(
-            IggyMessage::builder()
-                .id(id)
-                .payload(Bytes::from(payload))
-                .build()
-                .expect("Failed to create message"),
-        );
-    }
-    messages
-}
-
-fn verify_messages(messages: &[IggyMessage], start_offset: u32, _count: u32) {
-    for (idx, message) in messages.iter().enumerate() {
-        let expected_offset = start_offset + idx as u32;
-        assert_eq!(message.header.offset, expected_offset as u64);
-
-        let payload_str = String::from_utf8_lossy(&message.payload);
-        assert!(payload_str.starts_with(&format!("Message {} -", 
expected_offset)));
-        assert!(payload_str.len() > MESSAGE_SIZE);
-    }
-}
diff --git a/core/integration/tests/server/scenarios/tcp_tls_scenario.rs 
b/core/integration/tests/server/scenarios/tcp_tls_scenario.rs
index aa70cafe..315cf5ea 100644
--- a/core/integration/tests/server/scenarios/tcp_tls_scenario.rs
+++ b/core/integration/tests/server/scenarios/tcp_tls_scenario.rs
@@ -60,6 +60,7 @@ pub async fn run(client: &IggyClient) {
             .build()
             .unwrap(),
     ];
+
     client
         .send_messages(
             &Identifier::numeric(stream_id).unwrap(),
@@ -90,5 +91,6 @@ pub async fn run(client: &IggyClient) {
         .delete_stream(&Identifier::numeric(stream_id).unwrap())
         .await
         .unwrap();
+
     assert_clean_system(client).await;
 }
diff --git a/core/integration/tests/server/specific.rs 
b/core/integration/tests/server/specific.rs
new file mode 100644
index 00000000..d2d28e37
--- /dev/null
+++ b/core/integration/tests/server/specific.rs
@@ -0,0 +1,110 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::server::scenarios::{delete_segments_scenario, 
message_size_scenario, tcp_tls_scenario};
+use iggy::prelude::*;
+use integration::{
+    tcp_client::TcpClientFactory,
+    test_server::{IpAddrKind, TestServer},
+    test_tls_utils::generate_test_certificates,
+};
+use serial_test::parallel;
+use std::collections::HashMap;
+
+// This test can run on any transport, but it requires both ClientFactory and
+// TestServer parameters, which doesn't fit the unified matrix approach.
+#[tokio::test]
+#[parallel]
+async fn should_delete_segments_and_validate_filesystem() {
+    let mut extra_envs = HashMap::new();
+    extra_envs.insert("IGGY_SYSTEM_SEGMENT_SIZE".to_string(), 
"1MiB".to_string());
+
+    let mut test_server = TestServer::new(Some(extra_envs), true, None, 
IpAddrKind::V4);
+    test_server.start();
+
+    let server_addr = test_server.get_raw_tcp_addr().unwrap();
+    let client_factory = TcpClientFactory {
+        server_addr,
+        ..Default::default()
+    };
+
+    delete_segments_scenario::run(&client_factory, &test_server).await;
+}
+
+// TCP TLS scenario is obviously specific to TCP transport, and requires 
special
+// setup so it's not included in the matrix.
+#[tokio::test]
+#[parallel]
+async fn tcp_tls_scenario_should_be_valid() {
+    let temp_dir = tempfile::TempDir::new().expect("Failed to create temp 
dir");
+    let cert_dir = temp_dir.path();
+    let cert_dir_str = cert_dir.to_str().unwrap();
+
+    generate_test_certificates(cert_dir_str).expect("Failed to generate test 
certificates");
+
+    let mut extra_envs = HashMap::new();
+    extra_envs.insert("IGGY_TCP_TLS_ENABLED".to_string(), "true".to_string());
+    extra_envs.insert(
+        "IGGY_TCP_TLS_CERT_FILE".to_string(),
+        cert_dir.join("test_cert.pem").to_str().unwrap().to_string(),
+    );
+    extra_envs.insert(
+        "IGGY_TCP_TLS_KEY_FILE".to_string(),
+        cert_dir.join("test_key.pem").to_str().unwrap().to_string(),
+    );
+
+    let mut test_server = TestServer::new(Some(extra_envs), true, None, 
IpAddrKind::V4);
+    test_server.start();
+
+    let server_addr = test_server.get_raw_tcp_addr().unwrap();
+    let cert_path = 
cert_dir.join("test_cert.pem").to_str().unwrap().to_string();
+
+    let client = IggyClientBuilder::new()
+        .with_tcp()
+        .with_server_address(server_addr)
+        .with_tls_enabled(true)
+        .with_tls_domain("localhost".to_string())
+        .with_tls_ca_file(cert_path)
+        .build()
+        .expect("Failed to create TLS client");
+
+    client
+        .connect()
+        .await
+        .expect("Failed to connect TLS client");
+
+    let client = IggyClient::create(Box::new(client), None, None);
+
+    tcp_tls_scenario::run(&client).await;
+}
+
+// Message size scenario is specific to TCP transport to test the behavior 
around the maximum message size.
+// When run on other transports, it will fail because both QUIC and HTTP have 
different message size limits.
+#[tokio::test]
+#[parallel]
+async fn message_size_scenario() {
+    let mut test_server = TestServer::default();
+    test_server.start();
+    let server_addr = test_server.get_raw_tcp_addr().unwrap();
+    let client_factory = TcpClientFactory {
+        server_addr,
+        ..Default::default()
+    };
+
+    message_size_scenario::run(&client_factory).await;
+}
diff --git a/core/integration/tests/server/tcp_server.rs 
b/core/integration/tests/server/tcp_server.rs
deleted file mode 100644
index 550d98c6..00000000
--- a/core/integration/tests/server/tcp_server.rs
+++ /dev/null
@@ -1,277 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-use crate::server::scenarios::{
-    consumer_group_join_scenario, 
consumer_group_with_multiple_clients_polling_messages_scenario,
-    consumer_group_with_single_client_polling_messages_scenario, 
create_message_payload,
-    delete_segments_scenario::test_delete_segments_scenario, 
message_headers_scenario,
-    message_size_scenario, server_restart_scenario, 
stream_size_validation_scenario,
-    system_scenario, tcp_tls_scenario, user_scenario,
-};
-use iggy::prelude::*;
-use integration::{
-    tcp_client::TcpClientFactory,
-    test_server::{IpAddrKind, TestServer},
-};
-use serial_test::parallel;
-use std::{collections::HashMap, thread::sleep};
-
-#[tokio::test]
-#[parallel]
-async fn system_scenario_should_be_valid() {
-    let mut test_server = TestServer::default();
-    test_server.start();
-    let server_addr = test_server.get_raw_tcp_addr().unwrap();
-    let client_factory = TcpClientFactory {
-        server_addr,
-        ..Default::default()
-    };
-    system_scenario::run(&client_factory).await;
-}
-
-#[tokio::test]
-#[parallel]
-async fn user_scenario_should_be_valid() {
-    let mut test_server = TestServer::default();
-    test_server.start();
-    let server_addr = test_server.get_raw_tcp_addr().unwrap();
-    let client_factory = TcpClientFactory {
-        server_addr,
-        ..Default::default()
-    };
-    user_scenario::run(&client_factory).await;
-}
-
-#[tokio::test]
-#[parallel]
-async fn message_headers_scenario_should_be_valid() {
-    let mut test_server = TestServer::default();
-    test_server.start();
-    let server_addr = test_server.get_raw_tcp_addr().unwrap();
-    let client_factory = TcpClientFactory {
-        server_addr,
-        ..Default::default()
-    };
-    message_headers_scenario::run(&client_factory).await;
-}
-
-#[tokio::test]
-#[parallel]
-async fn create_message_payload_scenario_should_be_valid() {
-    let mut test_server = TestServer::default();
-    test_server.start();
-    let server_addr = test_server.get_raw_tcp_addr().unwrap();
-    let client_factory = TcpClientFactory {
-        server_addr,
-        ..Default::default()
-    };
-    create_message_payload::run(&client_factory).await;
-}
-
-#[tokio::test]
-#[parallel]
-async fn consumer_group_join_scenario_should_be_valid() {
-    let mut test_server = TestServer::default();
-    test_server.start();
-    let server_addr = test_server.get_raw_tcp_addr().unwrap();
-    let client_factory = TcpClientFactory {
-        server_addr,
-        ..Default::default()
-    };
-    consumer_group_join_scenario::run(&client_factory).await;
-}
-
-#[tokio::test]
-#[parallel]
-async fn 
consumer_group_with_single_client_polling_messages_scenario_should_be_valid() {
-    let mut test_server = TestServer::default();
-    test_server.start();
-    let server_addr = test_server.get_raw_tcp_addr().unwrap();
-    let client_factory = TcpClientFactory {
-        server_addr,
-        ..Default::default()
-    };
-    
consumer_group_with_single_client_polling_messages_scenario::run(&client_factory).await;
-}
-
-#[tokio::test]
-#[parallel]
-async fn 
consumer_group_with_multiple_clients_polling_messages_scenario_should_be_valid()
 {
-    let mut test_server = TestServer::default();
-    test_server.start();
-    let server_addr = test_server.get_raw_tcp_addr().unwrap();
-    let client_factory = TcpClientFactory {
-        server_addr,
-        ..Default::default()
-    };
-    
consumer_group_with_multiple_clients_polling_messages_scenario::run(&client_factory).await;
-}
-
-#[tokio::test]
-#[parallel]
-async fn stream_size_validation_scenario_should_be_valid() {
-    let mut test_server = TestServer::default();
-    test_server.start();
-    let server_addr = test_server.get_raw_tcp_addr().unwrap();
-    let client_factory = TcpClientFactory {
-        server_addr,
-        ..Default::default()
-    };
-    stream_size_validation_scenario::run(&client_factory).await;
-}
-
-#[tokio::test]
-#[parallel]
-async fn message_size_scenario_should_be_valid() {
-    let mut test_server = TestServer::default();
-    test_server.start();
-    let server_addr = test_server.get_raw_tcp_addr().unwrap();
-    let client_factory = TcpClientFactory {
-        server_addr,
-        ..Default::default()
-    };
-    message_size_scenario::run(&client_factory).await;
-}
-
-#[tokio::test]
-#[parallel]
-async fn should_delete_segments_via_tcp_binary_protocol() {
-    let mut extra_envs = HashMap::new();
-    extra_envs.insert("IGGY_SYSTEM_SEGMENT_SIZE".to_string(), 
"1MiB".to_string());
-
-    let mut test_server = TestServer::new(Some(extra_envs), true, None, 
IpAddrKind::V4);
-    test_server.start();
-
-    let server_addr = test_server.get_raw_tcp_addr().unwrap();
-    let client_factory = TcpClientFactory {
-        server_addr,
-        ..Default::default()
-    };
-
-    test_delete_segments_scenario(&client_factory, &test_server).await;
-}
-
-#[tokio::test]
-#[parallel]
-async fn 
should_verify_data_integrity_after_server_restart_with_open_segment_index_cache()
 {
-    
verify_data_integrity_after_server_restart_with_cache_setting("open_segment").await;
-}
-
-#[tokio::test]
-#[parallel]
-async fn 
should_verify_data_integrity_after_server_restart_with_all_index_cache() {
-    verify_data_integrity_after_server_restart_with_cache_setting("all").await;
-}
-
-#[tokio::test]
-#[parallel]
-async fn 
should_verify_data_integrity_after_server_restart_with_no_index_cache() {
-    
verify_data_integrity_after_server_restart_with_cache_setting("none").await;
-}
-
-async fn 
verify_data_integrity_after_server_restart_with_cache_setting(cache_setting: 
&str) {
-    let data_dir = format!("local_data_restart_test_{}", 
uuid::Uuid::new_v4().simple());
-
-    let mut extra_env = HashMap::new();
-    extra_env.insert("IGGY_SYSTEM_SEGMENT_SIZE".to_string(), 
"250KiB".to_string());
-    extra_env.insert("IGGY_SYSTEM_PATH".to_string(), data_dir.clone());
-    extra_env.insert(
-        "IGGY_SYSTEM_SEGMENT_CACHE_INDEXES".to_string(),
-        cache_setting.to_string(),
-    );
-
-    let mut test_server = TestServer::new(Some(extra_env.clone()), false, 
None, IpAddrKind::V4);
-    test_server.start();
-
-    let server_addr = test_server.get_raw_tcp_addr().unwrap();
-    let client_factory = TcpClientFactory {
-        server_addr,
-        ..Default::default()
-    };
-
-    server_restart_scenario::run(&client_factory, 0).await;
-
-    sleep(std::time::Duration::from_secs(2));
-
-    test_server.stop();
-    drop(test_server);
-
-    let mut test_server = TestServer::new(Some(extra_env), false, None, 
IpAddrKind::V4);
-    test_server.start();
-
-    let server_addr = test_server.get_raw_tcp_addr().unwrap();
-    let client_factory = TcpClientFactory {
-        server_addr,
-        ..Default::default()
-    };
-
-    server_restart_scenario::run(&client_factory, 12).await;
-
-    test_server.stop();
-    drop(test_server);
-}
-
-#[tokio::test]
-#[parallel]
-async fn tcp_tls_scenario_should_be_valid() {
-    use iggy::clients::client_builder::IggyClientBuilder;
-    use integration::test_tls_utils::generate_test_certificates;
-
-    let temp_dir = tempfile::TempDir::new().expect("Failed to create temp 
dir");
-    let cert_dir = temp_dir.path();
-    let cert_dir_str = cert_dir.to_str().unwrap();
-
-    generate_test_certificates(cert_dir_str).expect("Failed to generate test 
certificates");
-
-    let mut extra_envs = HashMap::new();
-    extra_envs.insert("IGGY_TCP_TLS_ENABLED".to_string(), "true".to_string());
-    extra_envs.insert(
-        "IGGY_TCP_TLS_CERT_FILE".to_string(),
-        cert_dir.join("test_cert.pem").to_str().unwrap().to_string(),
-    );
-    extra_envs.insert(
-        "IGGY_TCP_TLS_KEY_FILE".to_string(),
-        cert_dir.join("test_key.pem").to_str().unwrap().to_string(),
-    );
-
-    let mut test_server = TestServer::new(Some(extra_envs), true, None, 
IpAddrKind::V4);
-    test_server.start();
-
-    tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
-
-    let server_addr = test_server.get_raw_tcp_addr().unwrap();
-    let cert_path = 
cert_dir.join("test_cert.pem").to_str().unwrap().to_string();
-
-    let client = IggyClientBuilder::new()
-        .with_tcp()
-        .with_server_address(server_addr)
-        .with_tls_enabled(true)
-        .with_tls_domain("localhost".to_string())
-        .with_tls_ca_file(cert_path)
-        .build()
-        .expect("Failed to create TLS client");
-
-    client
-        .connect()
-        .await
-        .expect("Failed to connect TLS client");
-
-    let client = iggy::clients::client::IggyClient::create(Box::new(client), 
None, None);
-
-    tcp_tls_scenario::run(&client).await;
-}
diff --git a/core/integration/tests/streaming/get_by_timestamp.rs 
b/core/integration/tests/streaming/get_by_timestamp.rs
index c5b9bf01..00e0a71e 100644
--- a/core/integration/tests/streaming/get_by_timestamp.rs
+++ b/core/integration/tests/streaming/get_by_timestamp.rs
@@ -84,14 +84,14 @@ fn very_large_batches() -> Vec<u32> {
 #[tokio::test]
 async fn test_get_messages_by_timestamp(
     message_size: IggyByteSize,
-    batch_lentghs: Vec<u32>,
+    batch_lengths: Vec<u32>,
     messages_required_to_save: u32,
     segment_size: IggyByteSize,
     cache_indexes: CacheIndexesConfig,
 ) {
     println!(
         "Running test with message_size: {}, batches: {:?}, 
messages_required_to_save: {}, segment_size: {}, cache_indexes: {}",
-        message_size, batch_lentghs, messages_required_to_save, segment_size, 
cache_indexes
+        message_size, batch_lengths, messages_required_to_save, segment_size, 
cache_indexes
     );
 
     let setup = TestSetup::init().await;
@@ -99,7 +99,7 @@ async fn test_get_messages_by_timestamp(
     let topic_id = 1;
     let partition_id = 1;
 
-    let total_messages_count = batch_lentghs.iter().sum();
+    let total_messages_count = batch_lengths.iter().sum();
 
     let config = Arc::new(SystemConfig {
         path: setup.config.path.to_string(),
@@ -173,11 +173,11 @@ async fn test_get_messages_by_timestamp(
 
     // Timestamp tracking for messages
     let initial_timestamp = IggyTimestamp::now();
-    let mut batch_timestamps = Vec::with_capacity(batch_lentghs.len());
+    let mut batch_timestamps = Vec::with_capacity(batch_lengths.len());
     let mut current_pos = 0;
 
     // Append all batches as defined in the test matrix with separate 
timestamps
-    for (batch_idx, &batch_len) in batch_lentghs.iter().enumerate() {
+    for (batch_idx, &batch_len) in batch_lengths.iter().enumerate() {
         // Add a small delay between batches to ensure distinct timestamps
         sleep(std::time::Duration::from_millis(2));
 
@@ -189,7 +189,7 @@ async fn test_get_messages_by_timestamp(
         println!(
             "Appending batch {}/{} with {} messages",
             batch_idx + 1,
-            batch_lentghs.len(),
+            batch_lengths.len(),
             batch_len
         );
 
@@ -241,7 +241,7 @@ async fn test_get_messages_by_timestamp(
         let middle_timestamp = 
IggyTimestamp::from(batch_timestamps[2].as_micros() + 1000);
 
         // Calculate how many messages should be in batches after the 3rd
-        let prior_batches_sum: u32 = batch_lentghs[..3].iter().sum();
+        let prior_batches_sum: u32 = batch_lengths[..3].iter().sum();
         let remaining_messages = total_sent_messages - prior_batches_sum;
 
         let middle_messages = partition

Reply via email to