This is an automated email from the ASF dual-hosted git repository.
piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new deb9ff71 refactor(integration): consolidate server tests using test
matrices (#1901)
deb9ff71 is described below
commit deb9ff717ebd9fef16a6390af8dae70c984ec4c5
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Sun Jun 22 20:41:03 2025 +0200
refactor(integration): consolidate server tests using test matrices (#1901)
This refactoring eliminates code duplication across transport-specific
server tests (TCP, QUIC, HTTP) by introducing a unified test matrix
approach. The same test scenarios now run across all supported
transports automatically, ensuring consistent testing coverage while
reducing maintenance overhead.
Key changes:
- Moved benchmark utilities to a shared module (bench_utils)
- Added transport() and server_addr() methods to ClientFactory trait
- Created test matrices for general scenarios (all transports) and
consumer group scenarios (TCP/QUIC only)
- Consolidated data integrity tests to use parameterized cache settings
- Removed transport-specific test files in favor of unified approach
- Enhanced restart tests to verify data persistence with different
segment cache configurations
This change improves test maintainability by having a single source
of truth for each test scenario while preserving the ability to run
transport-specific tests where needed.
---
.gitignore | 2 +-
.../{tests/bench/mod.rs => src/bench_utils.rs} | 53 ++--
core/integration/src/http_client.rs | 10 +-
core/integration/src/lib.rs | 1 +
core/integration/src/quic_client.rs | 10 +-
core/integration/src/tcp_client.rs | 10 +-
core/integration/src/test_server.rs | 2 +
core/integration/tests/bench/http.rs | 64 -----
core/integration/tests/bench/quic.rs | 64 -----
core/integration/tests/bench/tcp.rs | 64 -----
.../data_integrity/verify_after_server_restart.rs | 95 +++++--
core/integration/tests/mod.rs | 1 -
core/integration/tests/server/cg.rs | 38 +++
core/integration/tests/server/general.rs | 41 +++
core/integration/tests/server/http_server.rs | 73 ------
core/integration/tests/server/mod.rs | 84 ++++++-
core/integration/tests/server/quic_server.rs | 105 --------
.../server/{mod.rs => scenarios/bench_scenario.rs} | 15 +-
.../server/scenarios/delete_segments_scenario.rs | 5 +-
core/integration/tests/server/scenarios/mod.rs | 12 +-
.../server/scenarios/server_restart_scenario.rs | 280 ---------------------
.../tests/server/scenarios/tcp_tls_scenario.rs | 2 +
core/integration/tests/server/specific.rs | 110 ++++++++
core/integration/tests/server/tcp_server.rs | 277 --------------------
.../tests/streaming/get_by_timestamp.rs | 14 +-
25 files changed, 420 insertions(+), 1012 deletions(-)
diff --git a/.gitignore b/.gitignore
index 806fdb39..d4da1bae 100644
--- a/.gitignore
+++ b/.gitignore
@@ -5,7 +5,7 @@ target*
.DS_Store
.gradle
local_data*
-bench_*
+/bench_*
/sdk/errors_table/
/rust-clippy-results.sarif
/performance_results*
diff --git a/core/integration/tests/bench/mod.rs
b/core/integration/src/bench_utils.rs
similarity index 77%
rename from core/integration/tests/bench/mod.rs
rename to core/integration/src/bench_utils.rs
index 97b0dcb1..579d2e11 100644
--- a/core/integration/tests/bench/mod.rs
+++ b/core/integration/src/bench_utils.rs
@@ -1,28 +1,23 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-mod http;
-mod quic;
-mod tcp;
-
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::test_server::Transport;
use assert_cmd::prelude::CommandCargoExt;
-use iggy::prelude::IggyByteSize;
-use integration::test_server::{TEST_VERBOSITY_ENV_VAR, Transport};
+use iggy::prelude::*;
use std::{
fs::{self, File, OpenOptions},
io::Write,
@@ -38,7 +33,7 @@ const DEFAULT_NUMBER_OF_STREAMS: u64 = 8;
pub fn run_bench_and_wait_for_finish(
server_addr: &str,
- transport: Transport,
+ transport: &Transport,
bench: &str,
amount_of_data_to_process: IggyByteSize,
) {
@@ -47,7 +42,8 @@ pub fn run_bench_and_wait_for_finish(
let mut stderr_file_path = None;
let mut stdout_file_path = None;
- if std::env::var(TEST_VERBOSITY_ENV_VAR).is_err() {
+ let test_verbosity_env_var = "IGGY_TEST_VERBOSE";
+ if std::env::var(test_verbosity_env_var).is_err() {
let stderr_file = get_random_path();
let stdout_file = get_random_path();
stderr_file_path = Some(stderr_file);
@@ -68,7 +64,7 @@ pub fn run_bench_and_wait_for_finish(
"--message-size",
&message_size.to_string(),
bench,
- &format!("{}", transport),
+ &transport.to_string(),
"--server-address",
server_addr,
]);
@@ -76,7 +72,7 @@ pub fn run_bench_and_wait_for_finish(
// By default, all iggy-bench logs are redirected to files,
// and dumped to stderr when test fails. With IGGY_TEST_VERBOSE=1
// logs are dumped to stdout during test execution.
- if std::env::var(TEST_VERBOSITY_ENV_VAR).is_ok() {
+ if std::env::var(test_verbosity_env_var).is_ok() {
command.stdout(Stdio::inherit());
command.stderr(Stdio::inherit());
} else {
@@ -127,7 +123,6 @@ pub fn run_bench_and_wait_for_finish(
panic!("Failed to get output from iggy-bench");
}
- // TODO: fix this, it needs to be called in Drop
if panicking() {
if let Some(stdout_file_path) = &stdout_file_path {
eprintln!(
diff --git a/core/integration/src/http_client.rs
b/core/integration/src/http_client.rs
index afd566ed..8d201d31 100644
--- a/core/integration/src/http_client.rs
+++ b/core/integration/src/http_client.rs
@@ -16,7 +16,7 @@
* under the License.
*/
-use crate::test_server::ClientFactory;
+use crate::test_server::{ClientFactory, Transport};
use async_trait::async_trait;
use iggy::http::http_client::HttpClient;
use iggy::prelude::{Client, HttpClientConfig};
@@ -37,6 +37,14 @@ impl ClientFactory for HttpClientFactory {
let client = HttpClient::create(Arc::new(config)).unwrap();
Box::new(client)
}
+
+ fn transport(&self) -> Transport {
+ Transport::Http
+ }
+
+ fn server_addr(&self) -> String {
+ self.server_addr.clone()
+ }
}
unsafe impl Send for HttpClientFactory {}
diff --git a/core/integration/src/lib.rs b/core/integration/src/lib.rs
index be388600..1f65508b 100644
--- a/core/integration/src/lib.rs
+++ b/core/integration/src/lib.rs
@@ -16,6 +16,7 @@
* under the License.
*/
+pub mod bench_utils;
pub mod file;
#[allow(deprecated)]
pub mod http_client;
diff --git a/core/integration/src/quic_client.rs
b/core/integration/src/quic_client.rs
index a8ef2446..0205fc40 100644
--- a/core/integration/src/quic_client.rs
+++ b/core/integration/src/quic_client.rs
@@ -16,7 +16,7 @@
* under the License.
*/
-use crate::test_server::ClientFactory;
+use crate::test_server::{ClientFactory, Transport};
use async_trait::async_trait;
use iggy::prelude::{Client, QuicClientConfig};
use iggy::quic::quick_client::QuicClient;
@@ -38,6 +38,14 @@ impl ClientFactory for QuicClientFactory {
Client::connect(&client).await.unwrap();
Box::new(client)
}
+
+ fn transport(&self) -> Transport {
+ Transport::Quic
+ }
+
+ fn server_addr(&self) -> String {
+ self.server_addr.clone()
+ }
}
unsafe impl Send for QuicClientFactory {}
diff --git a/core/integration/src/tcp_client.rs
b/core/integration/src/tcp_client.rs
index 35f4a894..a5a4ae04 100644
--- a/core/integration/src/tcp_client.rs
+++ b/core/integration/src/tcp_client.rs
@@ -16,7 +16,7 @@
* under the License.
*/
-use crate::test_server::ClientFactory;
+use crate::test_server::{ClientFactory, Transport};
use async_trait::async_trait;
use iggy::prelude::{Client, TcpClient, TcpClientConfig};
use std::sync::Arc;
@@ -49,6 +49,14 @@ impl ClientFactory for TcpClientFactory {
});
Box::new(client)
}
+
+ fn transport(&self) -> Transport {
+ Transport::Tcp
+ }
+
+ fn server_addr(&self) -> String {
+ self.server_addr.clone()
+ }
}
unsafe impl Send for TcpClientFactory {}
diff --git a/core/integration/src/test_server.rs
b/core/integration/src/test_server.rs
index 322aa054..312ece59 100644
--- a/core/integration/src/test_server.rs
+++ b/core/integration/src/test_server.rs
@@ -54,6 +54,8 @@ pub enum IpAddrKind {
#[async_trait]
pub trait ClientFactory: Sync + Send {
async fn create_client(&self) -> Box<dyn Client>;
+ fn transport(&self) -> Transport;
+ fn server_addr(&self) -> String;
}
#[derive(Debug, Clone, Copy, PartialEq, Display)]
diff --git a/core/integration/tests/bench/http.rs
b/core/integration/tests/bench/http.rs
deleted file mode 100644
index f5bee4fb..00000000
--- a/core/integration/tests/bench/http.rs
+++ /dev/null
@@ -1,64 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-use super::run_bench_and_wait_for_finish;
-use iggy::prelude::IggyByteSize;
-use integration::test_server::{IpAddrKind, TestServer, Transport};
-use serial_test::parallel;
-use std::str::FromStr;
-
-#[test]
-#[parallel]
-fn http_ipv4_bench() {
- let mut test_server = TestServer::default();
- test_server.start();
- let server_addr = test_server.get_http_api_addr().unwrap();
- run_bench_and_wait_for_finish(
- &server_addr,
- Transport::Http,
- "pinned-producer",
- IggyByteSize::from_str("8MB").unwrap(),
- );
- run_bench_and_wait_for_finish(
- &server_addr,
- Transport::Http,
- "pinned-consumer",
- IggyByteSize::from_str("8MB").unwrap(),
- );
-}
-
-#[cfg_attr(feature = "ci-qemu", ignore)]
-#[test]
-#[parallel]
-fn http_ipv6_bench() {
- let mut test_server = TestServer::new(None, true, None, IpAddrKind::V6);
- test_server.start();
- let server_addr = test_server.get_http_api_addr().unwrap();
- run_bench_and_wait_for_finish(
- &server_addr,
- Transport::Http,
- "pinned-producer",
- IggyByteSize::from_str("8MB").unwrap(),
- );
- run_bench_and_wait_for_finish(
- &server_addr,
- Transport::Http,
- "pinned-consumer",
- IggyByteSize::from_str("8MB").unwrap(),
- );
-}
diff --git a/core/integration/tests/bench/quic.rs
b/core/integration/tests/bench/quic.rs
deleted file mode 100644
index ce3cd193..00000000
--- a/core/integration/tests/bench/quic.rs
+++ /dev/null
@@ -1,64 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-use super::run_bench_and_wait_for_finish;
-use iggy::prelude::IggyByteSize;
-use integration::test_server::{IpAddrKind, TestServer, Transport};
-use serial_test::parallel;
-use std::str::FromStr;
-
-#[test]
-#[parallel]
-fn quic_ipv4_bench() {
- let mut test_server = TestServer::default();
- test_server.start();
- let server_addr = test_server.get_quic_udp_addr().unwrap();
- run_bench_and_wait_for_finish(
- &server_addr,
- Transport::Quic,
- "pinned-producer",
- IggyByteSize::from_str("8MB").unwrap(),
- );
- run_bench_and_wait_for_finish(
- &server_addr,
- Transport::Quic,
- "pinned-consumer",
- IggyByteSize::from_str("8MB").unwrap(),
- );
-}
-
-#[cfg_attr(feature = "ci-qemu", ignore)]
-#[test]
-#[parallel]
-fn quic_ipv6_bench() {
- let mut test_server = TestServer::new(None, true, None, IpAddrKind::V6);
- test_server.start();
- let server_addr = test_server.get_quic_udp_addr().unwrap();
- run_bench_and_wait_for_finish(
- &server_addr,
- Transport::Quic,
- "pinned-producer",
- IggyByteSize::from_str("8MB").unwrap(),
- );
- run_bench_and_wait_for_finish(
- &server_addr,
- Transport::Quic,
- "pinned-consumer",
- IggyByteSize::from_str("8MB").unwrap(),
- );
-}
diff --git a/core/integration/tests/bench/tcp.rs
b/core/integration/tests/bench/tcp.rs
deleted file mode 100644
index 7b1fed9a..00000000
--- a/core/integration/tests/bench/tcp.rs
+++ /dev/null
@@ -1,64 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-use super::run_bench_and_wait_for_finish;
-use iggy::prelude::IggyByteSize;
-use integration::test_server::{IpAddrKind, TestServer, Transport};
-use serial_test::parallel;
-use std::str::FromStr;
-
-#[test]
-#[parallel]
-fn tcp_ipv4_bench() {
- let mut test_server = TestServer::default();
- test_server.start();
- let server_addr = test_server.get_raw_tcp_addr().unwrap();
- run_bench_and_wait_for_finish(
- &server_addr,
- Transport::Tcp,
- "pinned-producer",
- IggyByteSize::from_str("8MB").unwrap(),
- );
- run_bench_and_wait_for_finish(
- &server_addr,
- Transport::Tcp,
- "pinned-consumer",
- IggyByteSize::from_str("8MB").unwrap(),
- );
-}
-
-#[cfg_attr(feature = "ci-qemu", ignore)]
-#[test]
-#[parallel]
-fn tcp_ipv6_bench() {
- let mut test_server = TestServer::new(None, true, None, IpAddrKind::V6);
- test_server.start();
- let server_addr = test_server.get_raw_tcp_addr().unwrap();
- run_bench_and_wait_for_finish(
- &server_addr,
- Transport::Tcp,
- "pinned-producer",
- IggyByteSize::from_str("8MB").unwrap(),
- );
- run_bench_and_wait_for_finish(
- &server_addr,
- Transport::Tcp,
- "pinned-consumer",
- IggyByteSize::from_str("8MB").unwrap(),
- );
-}
diff --git
a/core/integration/tests/data_integrity/verify_after_server_restart.rs
b/core/integration/tests/data_integrity/verify_after_server_restart.rs
index 8c11495b..22939896 100644
--- a/core/integration/tests/data_integrity/verify_after_server_restart.rs
+++ b/core/integration/tests/data_integrity/verify_after_server_restart.rs
@@ -16,9 +16,9 @@
* under the License.
*/
-use crate::bench::run_bench_and_wait_for_finish;
use iggy::clients::client::IggyClient;
use iggy::prelude::{Identifier, IggyByteSize, MessageClient, SystemClient};
+use integration::bench_utils::run_bench_and_wait_for_finish;
use integration::{
tcp_client::TcpClientFactory,
test_server::{
@@ -27,13 +27,42 @@ use integration::{
};
use serial_test::parallel;
use std::{collections::HashMap, str::FromStr};
+use test_case::test_matrix;
+
+/*
+ * Helper functions for test matrix parameters
+ */
+
+fn cache_open_segment() -> &'static str {
+ "open_segment"
+}
+
+fn cache_all() -> &'static str {
+ "all"
+}
+
+fn cache_none() -> &'static str {
+ "none"
+}
// TODO(numminex) - Move the message generation method from benchmark run to a
special method.
+#[test_matrix([cache_open_segment(), cache_all(), cache_none()])]
#[tokio::test]
#[parallel]
-async fn should_fill_data_and_verify_after_restart() {
- // 1. Start server
- let mut test_server = TestServer::new(None, false, None, IpAddrKind::V4);
+async fn should_fill_data_and_verify_after_restart(cache_setting: &'static
str) {
+ // 1. Start server with cache configuration
+ let env_vars = HashMap::from([
+ (
+ SYSTEM_PATH_ENV_VAR.to_owned(),
+ TestServer::get_random_path(),
+ ),
+ (
+ "IGGY_SEGMENT_CACHE_INDEXES".to_string(),
+ cache_setting.to_string(),
+ ),
+ ]);
+
+ let mut test_server = TestServer::new(Some(env_vars.clone()), false, None,
IpAddrKind::V4);
test_server.start();
let server_addr = test_server.get_raw_tcp_addr().unwrap();
let local_data_path = test_server.get_local_data_path().to_owned();
@@ -42,7 +71,7 @@ async fn should_fill_data_and_verify_after_restart() {
let amount_of_data_to_process = IggyByteSize::from_str("5 MB").unwrap();
run_bench_and_wait_for_finish(
&server_addr,
- Transport::Tcp,
+ &Transport::Tcp,
"pinned-producer",
amount_of_data_to_process,
);
@@ -50,7 +79,7 @@ async fn should_fill_data_and_verify_after_restart() {
// 3. Run poll bench to check if everything's OK
run_bench_and_wait_for_finish(
&server_addr,
- Transport::Tcp,
+ &Transport::Tcp,
"pinned-consumer",
amount_of_data_to_process,
);
@@ -94,18 +123,32 @@ async fn should_fill_data_and_verify_after_restart() {
let expected_clients_count = stats.clients_count;
let expected_consumer_groups_count = stats.consumer_groups_count;
- // 6. Stop server, remove current config file to properly fetch new server
TCP address
+ // 6. Stop server
test_server.stop();
drop(test_server);
- std::fs::remove_file(local_data_path.clone() +
"/runtime/current_config.toml").unwrap();
- // 7. Restart server
- let extra_envs = HashMap::from([(SYSTEM_PATH_ENV_VAR.to_owned(),
local_data_path.clone())]);
- let mut test_server = TestServer::new(Some(extra_envs), false, None,
IpAddrKind::V4);
+ // 7. Restart server with same settings
+ let mut test_server = TestServer::new(Some(env_vars), false, None,
IpAddrKind::V4);
test_server.start();
let server_addr = test_server.get_raw_tcp_addr().unwrap();
- // 8. Connect and login to newly started server
+ // 8. Run send bench again to add more data
+ run_bench_and_wait_for_finish(
+ &server_addr,
+ &Transport::Tcp,
+ "pinned-producer",
+ amount_of_data_to_process,
+ );
+
+ // 9. Run poll bench again to check if all data is still there
+ run_bench_and_wait_for_finish(
+ &server_addr,
+ &Transport::Tcp,
+ "pinned-consumer",
+ IggyByteSize::from(amount_of_data_to_process.as_bytes_u64() * 2),
+ );
+
+ // 10. Connect and login to newly started server
let client = IggyClient::create(
TcpClientFactory {
server_addr: server_addr.clone(),
@@ -118,7 +161,7 @@ async fn should_fill_data_and_verify_after_restart() {
);
login_root(&client).await;
- // 9. Save stats from the second server
+ // 11. Save stats from the second server (should have double the data)
let stats = client.get_stats().await.unwrap();
let actual_messages_size_bytes = stats.messages_size_bytes;
let actual_streams_count = stats.streams_count;
@@ -129,10 +172,11 @@ async fn should_fill_data_and_verify_after_restart() {
let actual_clients_count = stats.clients_count;
let actual_consumer_groups_count = stats.consumer_groups_count;
- // 10. Compare stats
+ // 12. Compare stats (expecting double the messages/size after second
bench run)
assert_eq!(
- expected_messages_size_bytes, actual_messages_size_bytes,
- "Messages size bytes"
+ expected_messages_size_bytes.as_bytes_usize() * 2,
+ actual_messages_size_bytes.as_bytes_usize(),
+ "Messages size bytes should be doubled"
);
assert_eq!(
expected_streams_count, actual_streams_count,
@@ -143,13 +187,14 @@ async fn should_fill_data_and_verify_after_restart() {
expected_partitions_count, actual_partitions_count,
"Partitions count"
);
- assert_eq!(
- expected_segments_count, actual_segments_count,
- "Segments count"
+ assert!(
+ actual_segments_count >= expected_segments_count,
+ "Segments count should be at least the same or more"
);
assert_eq!(
- expected_messages_count, actual_messages_count,
- "Messages count"
+ expected_messages_count * 2,
+ actual_messages_count,
+ "Messages count should be doubled"
);
assert_eq!(
expected_clients_count, actual_clients_count,
@@ -160,14 +205,14 @@ async fn should_fill_data_and_verify_after_restart() {
"Consumer groups count"
);
- // 11. Again run poll bench to check if data is still there
+ // 13. Run poll bench to check if all data (10MB total) is still there
run_bench_and_wait_for_finish(
&server_addr,
- Transport::Tcp,
+ &Transport::Tcp,
"pinned-consumer",
- amount_of_data_to_process,
+ IggyByteSize::from(amount_of_data_to_process.as_bytes_u64() * 2),
);
- // 12. Manual cleanup
+ // 14. Manual cleanup
std::fs::remove_dir_all(local_data_path).unwrap();
}
diff --git a/core/integration/tests/mod.rs b/core/integration/tests/mod.rs
index b0ec21d8..9d294947 100644
--- a/core/integration/tests/mod.rs
+++ b/core/integration/tests/mod.rs
@@ -26,7 +26,6 @@ use std::sync::{Arc, Once};
use std::{panic, thread};
mod archiver;
-mod bench;
mod cli;
mod config_provider;
mod data_integrity;
diff --git a/core/integration/tests/server/cg.rs
b/core/integration/tests/server/cg.rs
new file mode 100644
index 00000000..8ee3857e
--- /dev/null
+++ b/core/integration/tests/server/cg.rs
@@ -0,0 +1,38 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::server::{
+ ScenarioFn, join_scenario, multiple_clients_scenario, run_scenario,
single_client_scenario,
+};
+use integration::test_server::Transport;
+use serial_test::parallel;
+use test_case::test_matrix;
+
+// Consumer group scenarios do not support HTTP
+#[test_matrix(
+ [Transport::Tcp, Transport::Quic],
+ [
+ join_scenario(),
+ single_client_scenario(),
+ multiple_clients_scenario(),
+ ]
+)]
+#[tokio::test]
+#[parallel]
+async fn matrix(transport: Transport, scenario: ScenarioFn) {
+ run_scenario(transport, scenario).await;
+}
diff --git a/core/integration/tests/server/general.rs
b/core/integration/tests/server/general.rs
new file mode 100644
index 00000000..e98403b0
--- /dev/null
+++ b/core/integration/tests/server/general.rs
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::server::{
+ ScenarioFn, bench_scenario, create_message_payload_scenario,
message_headers_scenario,
+ run_scenario, stream_size_validation_scenario, system_scenario,
user_scenario,
+};
+use integration::test_server::Transport;
+use serial_test::parallel;
+use test_case::test_matrix;
+
+#[test_matrix(
+ [Transport::Tcp, Transport::Quic, Transport::Http],
+ [
+ system_scenario(),
+ user_scenario(),
+ message_headers_scenario(),
+ create_message_payload_scenario(),
+ stream_size_validation_scenario(),
+ bench_scenario(),
+ ]
+)]
+#[tokio::test]
+#[parallel]
+async fn matrix(transport: Transport, scenario: ScenarioFn) {
+ run_scenario(transport, scenario).await;
+}
diff --git a/core/integration/tests/server/http_server.rs
b/core/integration/tests/server/http_server.rs
deleted file mode 100644
index fce1f1a8..00000000
--- a/core/integration/tests/server/http_server.rs
+++ /dev/null
@@ -1,73 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-use crate::server::scenarios::{
- create_message_payload, stream_size_validation_scenario, system_scenario,
user_scenario,
-};
-use integration::{http_client::HttpClientFactory, test_server::TestServer};
-use serial_test::parallel;
-
-#[tokio::test]
-#[parallel]
-async fn create_message_payload_scenario_should_be_valid() {
- let mut test_server = TestServer::default();
- test_server.start();
- let server_addr = test_server.get_http_api_addr().unwrap();
- let client_factory = HttpClientFactory { server_addr };
- create_message_payload::run(&client_factory).await;
-}
-
-#[tokio::test]
-#[parallel]
-async fn message_headers_scenario_should_be_valid() {
- let mut test_server = TestServer::default();
- test_server.start();
- let server_addr = test_server.get_http_api_addr().unwrap();
- let client_factory = HttpClientFactory { server_addr };
- create_message_payload::run(&client_factory).await;
-}
-
-#[tokio::test]
-#[parallel]
-async fn stream_size_validation_scenario_should_be_valid() {
- let mut test_server = TestServer::default();
- test_server.start();
- let server_addr = test_server.get_http_api_addr().unwrap();
- let client_factory = HttpClientFactory { server_addr };
- stream_size_validation_scenario::run(&client_factory).await;
-}
-
-#[tokio::test]
-#[parallel]
-async fn system_scenario_should_be_valid() {
- let mut test_server = TestServer::default();
- test_server.start();
- let server_addr = test_server.get_http_api_addr().unwrap();
- let client_factory = HttpClientFactory { server_addr };
- system_scenario::run(&client_factory).await;
-}
-
-#[tokio::test]
-#[parallel]
-async fn user_scenario_should_be_valid() {
- let mut test_server = TestServer::default();
- test_server.start();
- let server_addr = test_server.get_http_api_addr().unwrap();
- let client_factory = HttpClientFactory { server_addr };
- user_scenario::run(&client_factory).await;
-}
diff --git a/core/integration/tests/server/mod.rs
b/core/integration/tests/server/mod.rs
index c9be19a8..6d7fa1ed 100644
--- a/core/integration/tests/server/mod.rs
+++ b/core/integration/tests/server/mod.rs
@@ -16,7 +16,85 @@
* under the License.
*/
-mod http_server;
-mod quic_server;
+mod cg;
+mod general;
mod scenarios;
-mod tcp_server;
+mod specific;
+
+use integration::{
+ http_client::HttpClientFactory,
+ quic_client::QuicClientFactory,
+ tcp_client::TcpClientFactory,
+ test_server::{ClientFactory, TestServer, Transport},
+};
+use scenarios::{
+ bench_scenario, consumer_group_join_scenario,
+ consumer_group_with_multiple_clients_polling_messages_scenario,
+ consumer_group_with_single_client_polling_messages_scenario,
create_message_payload,
+ message_headers_scenario, stream_size_validation_scenario,
system_scenario, user_scenario,
+};
+use std::future::Future;
+use std::pin::Pin;
+
+type ScenarioFn = fn(&dyn ClientFactory) -> Pin<Box<dyn Future<Output = ()> +
'_>>;
+
+fn system_scenario() -> ScenarioFn {
+ |factory| Box::pin(system_scenario::run(factory))
+}
+
+fn user_scenario() -> ScenarioFn {
+ |factory| Box::pin(user_scenario::run(factory))
+}
+
+fn message_headers_scenario() -> ScenarioFn {
+ |factory| Box::pin(message_headers_scenario::run(factory))
+}
+
+fn create_message_payload_scenario() -> ScenarioFn {
+ |factory| Box::pin(create_message_payload::run(factory))
+}
+
+fn join_scenario() -> ScenarioFn {
+ |factory| Box::pin(consumer_group_join_scenario::run(factory))
+}
+
+fn stream_size_validation_scenario() -> ScenarioFn {
+ |factory| Box::pin(stream_size_validation_scenario::run(factory))
+}
+
+fn single_client_scenario() -> ScenarioFn {
+ |factory|
Box::pin(consumer_group_with_single_client_polling_messages_scenario::run(factory))
+}
+
+fn multiple_clients_scenario() -> ScenarioFn {
+ |factory|
Box::pin(consumer_group_with_multiple_clients_polling_messages_scenario::run(factory))
+}
+
+fn bench_scenario() -> ScenarioFn {
+ |factory| Box::pin(bench_scenario::run(factory))
+}
+
+async fn run_scenario(transport: Transport, scenario: ScenarioFn) {
+ let mut test_server = TestServer::default();
+ test_server.start();
+
+ let client_factory: Box<dyn ClientFactory> = match transport {
+ Transport::Tcp => {
+ let server_addr = test_server.get_raw_tcp_addr().unwrap();
+ Box::new(TcpClientFactory {
+ server_addr,
+ ..Default::default()
+ })
+ }
+ Transport::Quic => {
+ let server_addr = test_server.get_quic_udp_addr().unwrap();
+ Box::new(QuicClientFactory { server_addr })
+ }
+ Transport::Http => {
+ let server_addr = test_server.get_http_api_addr().unwrap();
+ Box::new(HttpClientFactory { server_addr })
+ }
+ };
+
+ scenario(&*client_factory).await;
+}
diff --git a/core/integration/tests/server/quic_server.rs
b/core/integration/tests/server/quic_server.rs
deleted file mode 100644
index fcad0129..00000000
--- a/core/integration/tests/server/quic_server.rs
+++ /dev/null
@@ -1,105 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-use crate::server::scenarios::{
- consumer_group_join_scenario,
consumer_group_with_multiple_clients_polling_messages_scenario,
- consumer_group_with_single_client_polling_messages_scenario,
create_message_payload,
- message_headers_scenario, stream_size_validation_scenario,
system_scenario, user_scenario,
-};
-use integration::{quic_client::QuicClientFactory, test_server::TestServer};
-use serial_test::parallel;
-
-#[tokio::test]
-#[parallel]
-async fn system_scenario_should_be_valid() {
- let mut test_server = TestServer::default();
- test_server.start();
- let server_addr = test_server.get_quic_udp_addr().unwrap();
- let client_factory = QuicClientFactory { server_addr };
- system_scenario::run(&client_factory).await;
-}
-
-#[tokio::test]
-#[parallel]
-async fn user_scenario_should_be_valid() {
- let mut test_server = TestServer::default();
- test_server.start();
- let server_addr = test_server.get_quic_udp_addr().unwrap();
- let client_factory = QuicClientFactory { server_addr };
- user_scenario::run(&client_factory).await;
-}
-
-#[tokio::test]
-#[parallel]
-async fn message_headers_scenario_should_be_valid() {
- let mut test_server = TestServer::default();
- test_server.start();
- let server_addr = test_server.get_quic_udp_addr().unwrap();
- let client_factory = QuicClientFactory { server_addr };
- message_headers_scenario::run(&client_factory).await;
-}
-
-#[tokio::test]
-#[parallel]
-async fn create_message_payload_scenario_should_be_valid() {
- let mut test_server = TestServer::default();
- test_server.start();
- let server_addr = test_server.get_quic_udp_addr().unwrap();
- let client_factory = QuicClientFactory { server_addr };
- create_message_payload::run(&client_factory).await;
-}
-
-#[tokio::test]
-#[parallel]
-async fn consumer_group_join_scenario_should_be_valid() {
- let mut test_server = TestServer::default();
- test_server.start();
- let server_addr = test_server.get_quic_udp_addr().unwrap();
- let client_factory = QuicClientFactory { server_addr };
- consumer_group_join_scenario::run(&client_factory).await;
-}
-
-#[tokio::test]
-#[parallel]
-async fn
consumer_group_with_single_client_polling_messages_scenario_should_be_valid() {
- let mut test_server = TestServer::default();
- test_server.start();
- let server_addr = test_server.get_quic_udp_addr().unwrap();
- let client_factory = QuicClientFactory { server_addr };
-
consumer_group_with_single_client_polling_messages_scenario::run(&client_factory).await;
-}
-
-#[tokio::test]
-#[parallel]
-async fn
consumer_group_with_multiple_clients_polling_messages_scenario_should_be_valid()
{
- let mut test_server = TestServer::default();
- test_server.start();
- let server_addr = test_server.get_quic_udp_addr().unwrap();
- let client_factory = QuicClientFactory { server_addr };
-
consumer_group_with_multiple_clients_polling_messages_scenario::run(&client_factory).await;
-}
-
-#[tokio::test]
-#[parallel]
-async fn stream_size_validation_scenario_should_be_valid() {
- let mut test_server = TestServer::default();
- test_server.start();
- let server_addr = test_server.get_quic_udp_addr().unwrap();
- let client_factory = QuicClientFactory { server_addr };
- stream_size_validation_scenario::run(&client_factory).await;
-}
diff --git a/core/integration/tests/server/mod.rs
b/core/integration/tests/server/scenarios/bench_scenario.rs
similarity index 61%
copy from core/integration/tests/server/mod.rs
copy to core/integration/tests/server/scenarios/bench_scenario.rs
index c9be19a8..990253f2 100644
--- a/core/integration/tests/server/mod.rs
+++ b/core/integration/tests/server/scenarios/bench_scenario.rs
@@ -16,7 +16,14 @@
* under the License.
*/
-mod http_server;
-mod quic_server;
-mod scenarios;
-mod tcp_server;
+use iggy::prelude::*;
+use integration::{bench_utils::run_bench_and_wait_for_finish,
test_server::ClientFactory};
+
+pub async fn run(client_factory: &dyn ClientFactory) {
+ let server_addr = client_factory.server_addr();
+ let transport = client_factory.transport();
+ let data_size = IggyByteSize::from(8 * 1024 * 1024);
+
+ run_bench_and_wait_for_finish(&server_addr, &transport, "pinned-producer",
data_size);
+ run_bench_and_wait_for_finish(&server_addr, &transport, "pinned-consumer",
data_size);
+}
diff --git
a/core/integration/tests/server/scenarios/delete_segments_scenario.rs
b/core/integration/tests/server/scenarios/delete_segments_scenario.rs
index 8cda05b5..d2e65f0c 100644
--- a/core/integration/tests/server/scenarios/delete_segments_scenario.rs
+++ b/core/integration/tests/server/scenarios/delete_segments_scenario.rs
@@ -27,10 +27,7 @@ const TOPIC_NAME: &str = "test_topic";
const PARTITION_ID: u32 = 1;
const LOG_EXTENSION: &str = "log";
-pub async fn test_delete_segments_scenario(
- client_factory: &dyn ClientFactory,
- test_server: &TestServer,
-) {
+pub async fn run(client_factory: &dyn ClientFactory, test_server: &TestServer)
{
let client = client_factory.create_client().await;
let client = IggyClient::create(client, None, None);
diff --git a/core/integration/tests/server/scenarios/mod.rs
b/core/integration/tests/server/scenarios/mod.rs
index 74676897..4671a95e 100644
--- a/core/integration/tests/server/scenarios/mod.rs
+++ b/core/integration/tests/server/scenarios/mod.rs
@@ -16,13 +16,7 @@
* under the License.
*/
-use iggy::clients::client::IggyClient;
-use iggy::prelude::ConsumerGroupDetails;
-use iggy::prelude::ConsumerKind;
-use iggy::prelude::Identifier;
-use iggy::prelude::{ConsumerGroupClient, StreamClient};
-use integration::test_server::{ClientFactory, delete_user};
-
+pub mod bench_scenario;
pub mod consumer_group_join_scenario;
pub mod consumer_group_with_multiple_clients_polling_messages_scenario;
pub mod consumer_group_with_single_client_polling_messages_scenario;
@@ -30,12 +24,14 @@ pub mod create_message_payload;
pub mod delete_segments_scenario;
pub mod message_headers_scenario;
pub mod message_size_scenario;
-pub mod server_restart_scenario;
pub mod stream_size_validation_scenario;
pub mod system_scenario;
pub mod tcp_tls_scenario;
pub mod user_scenario;
+use iggy::prelude::*;
+use integration::test_server::{ClientFactory, delete_user};
+
const STREAM_ID: u32 = 1;
const TOPIC_ID: u32 = 1;
const PARTITION_ID: u32 = 1;
diff --git a/core/integration/tests/server/scenarios/server_restart_scenario.rs
b/core/integration/tests/server/scenarios/server_restart_scenario.rs
deleted file mode 100644
index f41febd1..00000000
--- a/core/integration/tests/server/scenarios/server_restart_scenario.rs
+++ /dev/null
@@ -1,280 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-use bytes::Bytes;
-use iggy::prelude::*;
-use integration::test_server::ClientFactory;
-
-const STREAM_ID: u32 = 1;
-const TOPIC_ID: u32 = 1;
-const PARTITION_ID: u32 = 1;
-const PARTITIONS_COUNT: u32 = 1;
-const MESSAGE_SIZE: usize = 50_000;
-
-pub async fn run(client_factory: &dyn ClientFactory,
expected_messages_before_restart: u32) {
- if expected_messages_before_restart == 0 {
- run_initial_phase(client_factory).await;
- } else {
- run_after_restart_phase(client_factory,
expected_messages_before_restart).await;
- }
-}
-
-async fn run_initial_phase(client_factory: &dyn ClientFactory) {
- let consumer = Consumer {
- kind: ConsumerKind::Consumer,
- id: Identifier::numeric(1).unwrap(),
- };
-
- let client = client_factory.create_client().await;
- let client = IggyClient::create(client, None, None);
-
- client
- .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
- .await
- .unwrap();
-
- client
- .create_stream("test-stream", Some(STREAM_ID))
- .await
- .unwrap();
-
- let stream = client
- .get_stream(&Identifier::numeric(STREAM_ID).unwrap())
- .await
- .unwrap()
- .expect("Initial stream should exist");
-
- assert_eq!(stream.id, STREAM_ID);
- assert_eq!(stream.name, "test-stream");
-
- client
- .create_topic(
- &Identifier::numeric(STREAM_ID).unwrap(),
- "test-topic",
- PARTITIONS_COUNT,
- CompressionAlgorithm::None,
- None,
- Some(TOPIC_ID),
- IggyExpiry::NeverExpire,
- MaxTopicSize::ServerDefault,
- )
- .await
- .unwrap();
-
- let messages_per_batch = 4;
- let mut total_messages = 0;
-
- for batch in 0..3 {
- let mut messages = create_messages(batch * messages_per_batch,
messages_per_batch);
- client
- .send_messages(
- &Identifier::numeric(STREAM_ID).unwrap(),
- &Identifier::numeric(TOPIC_ID).unwrap(),
- &Partitioning::partition_id(PARTITION_ID),
- &mut messages,
- )
- .await
- .unwrap();
- total_messages += messages_per_batch;
- }
-
- let polled_messages = client
- .poll_messages(
- &Identifier::numeric(STREAM_ID).unwrap(),
- &Identifier::numeric(TOPIC_ID).unwrap(),
- Some(PARTITION_ID),
- &consumer,
- &PollingStrategy::offset(0),
- total_messages,
- false,
- )
- .await
- .unwrap();
-
- assert_eq!(polled_messages.messages.len() as u32, total_messages);
- verify_messages(&polled_messages.messages, 0, total_messages);
-}
-
-async fn run_after_restart_phase(
- client_factory: &dyn ClientFactory,
- expected_messages_before_restart: u32,
-) {
- let consumer = Consumer {
- kind: ConsumerKind::Consumer,
- id: Identifier::numeric(1).unwrap(),
- };
-
- let client = client_factory.create_client().await;
- let client = IggyClient::create(client, None, None);
-
- client
- .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
- .await
- .unwrap();
-
- let stream = client
- .get_stream(&Identifier::numeric(STREAM_ID).unwrap())
- .await
- .unwrap()
- .expect("Stream should exist after restart");
-
- assert_eq!(stream.id, STREAM_ID);
- assert_eq!(stream.name, "test-stream");
-
- let topic = client
- .get_topic(
- &Identifier::numeric(STREAM_ID).unwrap(),
- &Identifier::numeric(TOPIC_ID).unwrap(),
- )
- .await
- .unwrap()
- .expect("Topic should exist after restart");
-
- assert_eq!(topic.id, TOPIC_ID);
- assert_eq!(topic.name, "test-topic");
- assert_eq!(
- topic.messages_count,
- expected_messages_before_restart as u64
- );
-
- let messages_per_batch = 4;
- let second_phase_messages = messages_per_batch * 2;
-
- for batch in 0..2 {
- let start_id = expected_messages_before_restart + (batch *
messages_per_batch);
- let mut messages = create_messages(start_id, messages_per_batch);
- client
- .send_messages(
- &Identifier::numeric(STREAM_ID).unwrap(),
- &Identifier::numeric(TOPIC_ID).unwrap(),
- &Partitioning::partition_id(PARTITION_ID),
- &mut messages,
- )
- .await
- .unwrap();
- }
-
- let total_messages = expected_messages_before_restart +
second_phase_messages;
-
- // First, poll messages one-by-one to verify individual message access
- for i in 0..total_messages {
- let single_message = client
- .poll_messages(
- &Identifier::numeric(STREAM_ID).unwrap(),
- &Identifier::numeric(TOPIC_ID).unwrap(),
- Some(PARTITION_ID),
- &consumer,
- &PollingStrategy::offset(i as u64),
- 1,
- false,
- )
- .await
- .unwrap_or_else(|error| {
- panic!("Failed to poll message at offset {i}: {error}");
- });
-
- assert_eq!(single_message.messages.len(), 1);
- let msg = &single_message.messages[0];
- assert_eq!(msg.header.offset, i as u64);
-
- let payload_str = String::from_utf8_lossy(&msg.payload);
- assert!(payload_str.starts_with(&format!("Message {} -", i)));
- }
-
- // Then poll all messages at once
- let all_messages = client
- .poll_messages(
- &Identifier::numeric(STREAM_ID).unwrap(),
- &Identifier::numeric(TOPIC_ID).unwrap(),
- Some(PARTITION_ID),
- &consumer,
- &PollingStrategy::offset(0),
- total_messages,
- false,
- )
- .await
- .unwrap();
-
- assert_eq!(all_messages.messages.len() as u32, total_messages);
-
- verify_messages(&all_messages.messages, 0, total_messages);
-
- let new_messages = client
- .poll_messages(
- &Identifier::numeric(STREAM_ID).unwrap(),
- &Identifier::numeric(TOPIC_ID).unwrap(),
- Some(PARTITION_ID),
- &consumer,
- &PollingStrategy::offset(expected_messages_before_restart as u64),
- second_phase_messages,
- false,
- )
- .await
- .unwrap();
-
- assert_eq!(new_messages.messages.len() as u32, second_phase_messages);
- verify_messages(
- &new_messages.messages,
- expected_messages_before_restart,
- second_phase_messages,
- );
-
- let final_topic = client
- .get_topic(
- &Identifier::numeric(STREAM_ID).unwrap(),
- &Identifier::numeric(TOPIC_ID).unwrap(),
- )
- .await
- .unwrap()
- .expect("Failed to get topic");
-
- assert_eq!(final_topic.messages_count, total_messages as u64);
- println!(
- "Test completed: {} total messages across {} segments",
- total_messages, final_topic.partitions[0].segments_count
- );
-}
-
-fn create_messages(start_id: u32, count: u32) -> Vec<IggyMessage> {
- let mut messages = Vec::new();
- let large_payload = "x".repeat(MESSAGE_SIZE);
-
- for i in 0..count {
- let id = (start_id + i + 1) as u128;
- let payload = format!("Message {} - {}", start_id + i, large_payload);
- messages.push(
- IggyMessage::builder()
- .id(id)
- .payload(Bytes::from(payload))
- .build()
- .expect("Failed to create message"),
- );
- }
- messages
-}
-
-fn verify_messages(messages: &[IggyMessage], start_offset: u32, _count: u32) {
- for (idx, message) in messages.iter().enumerate() {
- let expected_offset = start_offset + idx as u32;
- assert_eq!(message.header.offset, expected_offset as u64);
-
- let payload_str = String::from_utf8_lossy(&message.payload);
- assert!(payload_str.starts_with(&format!("Message {} -",
expected_offset)));
- assert!(payload_str.len() > MESSAGE_SIZE);
- }
-}
diff --git a/core/integration/tests/server/scenarios/tcp_tls_scenario.rs
b/core/integration/tests/server/scenarios/tcp_tls_scenario.rs
index aa70cafe..315cf5ea 100644
--- a/core/integration/tests/server/scenarios/tcp_tls_scenario.rs
+++ b/core/integration/tests/server/scenarios/tcp_tls_scenario.rs
@@ -60,6 +60,7 @@ pub async fn run(client: &IggyClient) {
.build()
.unwrap(),
];
+
client
.send_messages(
&Identifier::numeric(stream_id).unwrap(),
@@ -90,5 +91,6 @@ pub async fn run(client: &IggyClient) {
.delete_stream(&Identifier::numeric(stream_id).unwrap())
.await
.unwrap();
+
assert_clean_system(client).await;
}
diff --git a/core/integration/tests/server/specific.rs
b/core/integration/tests/server/specific.rs
new file mode 100644
index 00000000..d2d28e37
--- /dev/null
+++ b/core/integration/tests/server/specific.rs
@@ -0,0 +1,110 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::server::scenarios::{delete_segments_scenario,
message_size_scenario, tcp_tls_scenario};
+use iggy::prelude::*;
+use integration::{
+ tcp_client::TcpClientFactory,
+ test_server::{IpAddrKind, TestServer},
+ test_tls_utils::generate_test_certificates,
+};
+use serial_test::parallel;
+use std::collections::HashMap;
+
+// This test can run on any transport, but it requires both ClientFactory and
+// TestServer parameters, which doesn't fit the unified matrix approach.
+#[tokio::test]
+#[parallel]
+async fn should_delete_segments_and_validate_filesystem() {
+ let mut extra_envs = HashMap::new();
+ extra_envs.insert("IGGY_SYSTEM_SEGMENT_SIZE".to_string(),
"1MiB".to_string());
+
+ let mut test_server = TestServer::new(Some(extra_envs), true, None,
IpAddrKind::V4);
+ test_server.start();
+
+ let server_addr = test_server.get_raw_tcp_addr().unwrap();
+ let client_factory = TcpClientFactory {
+ server_addr,
+ ..Default::default()
+ };
+
+ delete_segments_scenario::run(&client_factory, &test_server).await;
+}
+
+// TCP TLS scenario is obviously specific to TCP transport, and requires
special
+// setup so it's not included in the matrix.
+#[tokio::test]
+#[parallel]
+async fn tcp_tls_scenario_should_be_valid() {
+ let temp_dir = tempfile::TempDir::new().expect("Failed to create temp
dir");
+ let cert_dir = temp_dir.path();
+ let cert_dir_str = cert_dir.to_str().unwrap();
+
+ generate_test_certificates(cert_dir_str).expect("Failed to generate test
certificates");
+
+ let mut extra_envs = HashMap::new();
+ extra_envs.insert("IGGY_TCP_TLS_ENABLED".to_string(), "true".to_string());
+ extra_envs.insert(
+ "IGGY_TCP_TLS_CERT_FILE".to_string(),
+ cert_dir.join("test_cert.pem").to_str().unwrap().to_string(),
+ );
+ extra_envs.insert(
+ "IGGY_TCP_TLS_KEY_FILE".to_string(),
+ cert_dir.join("test_key.pem").to_str().unwrap().to_string(),
+ );
+
+ let mut test_server = TestServer::new(Some(extra_envs), true, None,
IpAddrKind::V4);
+ test_server.start();
+
+ let server_addr = test_server.get_raw_tcp_addr().unwrap();
+ let cert_path =
cert_dir.join("test_cert.pem").to_str().unwrap().to_string();
+
+ let client = IggyClientBuilder::new()
+ .with_tcp()
+ .with_server_address(server_addr)
+ .with_tls_enabled(true)
+ .with_tls_domain("localhost".to_string())
+ .with_tls_ca_file(cert_path)
+ .build()
+ .expect("Failed to create TLS client");
+
+ client
+ .connect()
+ .await
+ .expect("Failed to connect TLS client");
+
+ let client = IggyClient::create(Box::new(client), None, None);
+
+ tcp_tls_scenario::run(&client).await;
+}
+
+// Message size scenario is specific to TCP transport to test the behavior
around the maximum message size.
+// When run on other transports, it will fail because both QUIC and HTTP have
different message size limits.
+#[tokio::test]
+#[parallel]
+async fn message_size_scenario() {
+ let mut test_server = TestServer::default();
+ test_server.start();
+ let server_addr = test_server.get_raw_tcp_addr().unwrap();
+ let client_factory = TcpClientFactory {
+ server_addr,
+ ..Default::default()
+ };
+
+ message_size_scenario::run(&client_factory).await;
+}
diff --git a/core/integration/tests/server/tcp_server.rs
b/core/integration/tests/server/tcp_server.rs
deleted file mode 100644
index 550d98c6..00000000
--- a/core/integration/tests/server/tcp_server.rs
+++ /dev/null
@@ -1,277 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-use crate::server::scenarios::{
- consumer_group_join_scenario,
consumer_group_with_multiple_clients_polling_messages_scenario,
- consumer_group_with_single_client_polling_messages_scenario,
create_message_payload,
- delete_segments_scenario::test_delete_segments_scenario,
message_headers_scenario,
- message_size_scenario, server_restart_scenario,
stream_size_validation_scenario,
- system_scenario, tcp_tls_scenario, user_scenario,
-};
-use iggy::prelude::*;
-use integration::{
- tcp_client::TcpClientFactory,
- test_server::{IpAddrKind, TestServer},
-};
-use serial_test::parallel;
-use std::{collections::HashMap, thread::sleep};
-
-#[tokio::test]
-#[parallel]
-async fn system_scenario_should_be_valid() {
- let mut test_server = TestServer::default();
- test_server.start();
- let server_addr = test_server.get_raw_tcp_addr().unwrap();
- let client_factory = TcpClientFactory {
- server_addr,
- ..Default::default()
- };
- system_scenario::run(&client_factory).await;
-}
-
-#[tokio::test]
-#[parallel]
-async fn user_scenario_should_be_valid() {
- let mut test_server = TestServer::default();
- test_server.start();
- let server_addr = test_server.get_raw_tcp_addr().unwrap();
- let client_factory = TcpClientFactory {
- server_addr,
- ..Default::default()
- };
- user_scenario::run(&client_factory).await;
-}
-
-#[tokio::test]
-#[parallel]
-async fn message_headers_scenario_should_be_valid() {
- let mut test_server = TestServer::default();
- test_server.start();
- let server_addr = test_server.get_raw_tcp_addr().unwrap();
- let client_factory = TcpClientFactory {
- server_addr,
- ..Default::default()
- };
- message_headers_scenario::run(&client_factory).await;
-}
-
-#[tokio::test]
-#[parallel]
-async fn create_message_payload_scenario_should_be_valid() {
- let mut test_server = TestServer::default();
- test_server.start();
- let server_addr = test_server.get_raw_tcp_addr().unwrap();
- let client_factory = TcpClientFactory {
- server_addr,
- ..Default::default()
- };
- create_message_payload::run(&client_factory).await;
-}
-
-#[tokio::test]
-#[parallel]
-async fn consumer_group_join_scenario_should_be_valid() {
- let mut test_server = TestServer::default();
- test_server.start();
- let server_addr = test_server.get_raw_tcp_addr().unwrap();
- let client_factory = TcpClientFactory {
- server_addr,
- ..Default::default()
- };
- consumer_group_join_scenario::run(&client_factory).await;
-}
-
-#[tokio::test]
-#[parallel]
-async fn
consumer_group_with_single_client_polling_messages_scenario_should_be_valid() {
- let mut test_server = TestServer::default();
- test_server.start();
- let server_addr = test_server.get_raw_tcp_addr().unwrap();
- let client_factory = TcpClientFactory {
- server_addr,
- ..Default::default()
- };
-
consumer_group_with_single_client_polling_messages_scenario::run(&client_factory).await;
-}
-
-#[tokio::test]
-#[parallel]
-async fn
consumer_group_with_multiple_clients_polling_messages_scenario_should_be_valid()
{
- let mut test_server = TestServer::default();
- test_server.start();
- let server_addr = test_server.get_raw_tcp_addr().unwrap();
- let client_factory = TcpClientFactory {
- server_addr,
- ..Default::default()
- };
-
consumer_group_with_multiple_clients_polling_messages_scenario::run(&client_factory).await;
-}
-
-#[tokio::test]
-#[parallel]
-async fn stream_size_validation_scenario_should_be_valid() {
- let mut test_server = TestServer::default();
- test_server.start();
- let server_addr = test_server.get_raw_tcp_addr().unwrap();
- let client_factory = TcpClientFactory {
- server_addr,
- ..Default::default()
- };
- stream_size_validation_scenario::run(&client_factory).await;
-}
-
-#[tokio::test]
-#[parallel]
-async fn message_size_scenario_should_be_valid() {
- let mut test_server = TestServer::default();
- test_server.start();
- let server_addr = test_server.get_raw_tcp_addr().unwrap();
- let client_factory = TcpClientFactory {
- server_addr,
- ..Default::default()
- };
- message_size_scenario::run(&client_factory).await;
-}
-
-#[tokio::test]
-#[parallel]
-async fn should_delete_segments_via_tcp_binary_protocol() {
- let mut extra_envs = HashMap::new();
- extra_envs.insert("IGGY_SYSTEM_SEGMENT_SIZE".to_string(),
"1MiB".to_string());
-
- let mut test_server = TestServer::new(Some(extra_envs), true, None,
IpAddrKind::V4);
- test_server.start();
-
- let server_addr = test_server.get_raw_tcp_addr().unwrap();
- let client_factory = TcpClientFactory {
- server_addr,
- ..Default::default()
- };
-
- test_delete_segments_scenario(&client_factory, &test_server).await;
-}
-
-#[tokio::test]
-#[parallel]
-async fn
should_verify_data_integrity_after_server_restart_with_open_segment_index_cache()
{
-
verify_data_integrity_after_server_restart_with_cache_setting("open_segment").await;
-}
-
-#[tokio::test]
-#[parallel]
-async fn
should_verify_data_integrity_after_server_restart_with_all_index_cache() {
- verify_data_integrity_after_server_restart_with_cache_setting("all").await;
-}
-
-#[tokio::test]
-#[parallel]
-async fn
should_verify_data_integrity_after_server_restart_with_no_index_cache() {
-
verify_data_integrity_after_server_restart_with_cache_setting("none").await;
-}
-
-async fn
verify_data_integrity_after_server_restart_with_cache_setting(cache_setting:
&str) {
- let data_dir = format!("local_data_restart_test_{}",
uuid::Uuid::new_v4().simple());
-
- let mut extra_env = HashMap::new();
- extra_env.insert("IGGY_SYSTEM_SEGMENT_SIZE".to_string(),
"250KiB".to_string());
- extra_env.insert("IGGY_SYSTEM_PATH".to_string(), data_dir.clone());
- extra_env.insert(
- "IGGY_SYSTEM_SEGMENT_CACHE_INDEXES".to_string(),
- cache_setting.to_string(),
- );
-
- let mut test_server = TestServer::new(Some(extra_env.clone()), false,
None, IpAddrKind::V4);
- test_server.start();
-
- let server_addr = test_server.get_raw_tcp_addr().unwrap();
- let client_factory = TcpClientFactory {
- server_addr,
- ..Default::default()
- };
-
- server_restart_scenario::run(&client_factory, 0).await;
-
- sleep(std::time::Duration::from_secs(2));
-
- test_server.stop();
- drop(test_server);
-
- let mut test_server = TestServer::new(Some(extra_env), false, None,
IpAddrKind::V4);
- test_server.start();
-
- let server_addr = test_server.get_raw_tcp_addr().unwrap();
- let client_factory = TcpClientFactory {
- server_addr,
- ..Default::default()
- };
-
- server_restart_scenario::run(&client_factory, 12).await;
-
- test_server.stop();
- drop(test_server);
-}
-
-#[tokio::test]
-#[parallel]
-async fn tcp_tls_scenario_should_be_valid() {
- use iggy::clients::client_builder::IggyClientBuilder;
- use integration::test_tls_utils::generate_test_certificates;
-
- let temp_dir = tempfile::TempDir::new().expect("Failed to create temp
dir");
- let cert_dir = temp_dir.path();
- let cert_dir_str = cert_dir.to_str().unwrap();
-
- generate_test_certificates(cert_dir_str).expect("Failed to generate test
certificates");
-
- let mut extra_envs = HashMap::new();
- extra_envs.insert("IGGY_TCP_TLS_ENABLED".to_string(), "true".to_string());
- extra_envs.insert(
- "IGGY_TCP_TLS_CERT_FILE".to_string(),
- cert_dir.join("test_cert.pem").to_str().unwrap().to_string(),
- );
- extra_envs.insert(
- "IGGY_TCP_TLS_KEY_FILE".to_string(),
- cert_dir.join("test_key.pem").to_str().unwrap().to_string(),
- );
-
- let mut test_server = TestServer::new(Some(extra_envs), true, None,
IpAddrKind::V4);
- test_server.start();
-
- tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
-
- let server_addr = test_server.get_raw_tcp_addr().unwrap();
- let cert_path =
cert_dir.join("test_cert.pem").to_str().unwrap().to_string();
-
- let client = IggyClientBuilder::new()
- .with_tcp()
- .with_server_address(server_addr)
- .with_tls_enabled(true)
- .with_tls_domain("localhost".to_string())
- .with_tls_ca_file(cert_path)
- .build()
- .expect("Failed to create TLS client");
-
- client
- .connect()
- .await
- .expect("Failed to connect TLS client");
-
- let client = iggy::clients::client::IggyClient::create(Box::new(client),
None, None);
-
- tcp_tls_scenario::run(&client).await;
-}
diff --git a/core/integration/tests/streaming/get_by_timestamp.rs
b/core/integration/tests/streaming/get_by_timestamp.rs
index c5b9bf01..00e0a71e 100644
--- a/core/integration/tests/streaming/get_by_timestamp.rs
+++ b/core/integration/tests/streaming/get_by_timestamp.rs
@@ -84,14 +84,14 @@ fn very_large_batches() -> Vec<u32> {
#[tokio::test]
async fn test_get_messages_by_timestamp(
message_size: IggyByteSize,
- batch_lentghs: Vec<u32>,
+ batch_lengths: Vec<u32>,
messages_required_to_save: u32,
segment_size: IggyByteSize,
cache_indexes: CacheIndexesConfig,
) {
println!(
"Running test with message_size: {}, batches: {:?},
messages_required_to_save: {}, segment_size: {}, cache_indexes: {}",
- message_size, batch_lentghs, messages_required_to_save, segment_size,
cache_indexes
+ message_size, batch_lengths, messages_required_to_save, segment_size,
cache_indexes
);
let setup = TestSetup::init().await;
@@ -99,7 +99,7 @@ async fn test_get_messages_by_timestamp(
let topic_id = 1;
let partition_id = 1;
- let total_messages_count = batch_lentghs.iter().sum();
+ let total_messages_count = batch_lengths.iter().sum();
let config = Arc::new(SystemConfig {
path: setup.config.path.to_string(),
@@ -173,11 +173,11 @@ async fn test_get_messages_by_timestamp(
// Timestamp tracking for messages
let initial_timestamp = IggyTimestamp::now();
- let mut batch_timestamps = Vec::with_capacity(batch_lentghs.len());
+ let mut batch_timestamps = Vec::with_capacity(batch_lengths.len());
let mut current_pos = 0;
// Append all batches as defined in the test matrix with separate
timestamps
- for (batch_idx, &batch_len) in batch_lentghs.iter().enumerate() {
+ for (batch_idx, &batch_len) in batch_lengths.iter().enumerate() {
// Add a small delay between batches to ensure distinct timestamps
sleep(std::time::Duration::from_millis(2));
@@ -189,7 +189,7 @@ async fn test_get_messages_by_timestamp(
println!(
"Appending batch {}/{} with {} messages",
batch_idx + 1,
- batch_lentghs.len(),
+ batch_lengths.len(),
batch_len
);
@@ -241,7 +241,7 @@ async fn test_get_messages_by_timestamp(
let middle_timestamp =
IggyTimestamp::from(batch_timestamps[2].as_micros() + 1000);
// Calculate how many messages should be in batches after the 3rd
- let prior_batches_sum: u32 = batch_lentghs[..3].iter().sum();
+ let prior_batches_sum: u32 = batch_lengths[..3].iter().sum();
let remaining_messages = total_sent_messages - prior_batches_sum;
let middle_messages = partition