This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 3dcdda940 chore(integration): remove streaming tests superseded by
API-level coverage (#2591)
3dcdda940 is described below
commit 3dcdda9405009a4c5efa307c597ea7f86c5ee48e
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Tue Jan 20 15:53:55 2026 +0100
chore(integration): remove streaming tests superseded by API-level coverage
(#2591)
The streaming-layer tests (get_by_offset, get_by_timestamp)
are redundant now that get_messages_by_offset_api.rs covers
the same scenarios through the full client stack.
Besides that, migrated snapshot test to API-level scenario.
---
core/integration/tests/mod.rs | 1 -
core/integration/tests/server/cg.rs | 10 +-
core/integration/tests/server/general.rs | 3 +-
core/integration/tests/server/message_retrieval.rs | 10 +-
core/integration/tests/server/mod.rs | 7 +-
core/integration/tests/server/scenarios/mod.rs | 5 +-
...essages_by_offset_api.rs => offset_scenario.rs} | 43 +++
.../scenarios/snapshot_scenario.rs} | 44 +--
...s_by_timestamp_api.rs => timestamp_scenario.rs} | 0
core/integration/tests/streaming/common/mod.rs | 19 -
.../tests/streaming/common/test_setup.rs | 52 ---
core/integration/tests/streaming/get_by_offset.rs | 396 ---------------------
.../tests/streaming/get_by_timestamp.rs | 372 -------------------
core/integration/tests/streaming/mod.rs | 132 -------
14 files changed, 84 insertions(+), 1010 deletions(-)
diff --git a/core/integration/tests/mod.rs b/core/integration/tests/mod.rs
index fb2156d8e..b451f4499 100644
--- a/core/integration/tests/mod.rs
+++ b/core/integration/tests/mod.rs
@@ -33,7 +33,6 @@ mod mcp;
mod sdk;
mod server;
mod state;
-mod streaming;
lazy_static! {
static ref TESTS_FAILED: AtomicBool = AtomicBool::new(false);
diff --git a/core/integration/tests/server/cg.rs
b/core/integration/tests/server/cg.rs
index 736024c25..bf12c871e 100644
--- a/core/integration/tests/server/cg.rs
+++ b/core/integration/tests/server/cg.rs
@@ -23,10 +23,18 @@ use iggy_common::TransportProtocol;
use serial_test::parallel;
use test_case::test_matrix;
+fn tcp() -> TransportProtocol {
+ TransportProtocol::Tcp
+}
+
+fn websocket() -> TransportProtocol {
+ TransportProtocol::WebSocket
+}
+
// TODO: Add `QUIC`.
// Consumer group scenarios do not support HTTP
#[test_matrix(
- [TransportProtocol::Tcp, TransportProtocol::WebSocket],
+ [tcp(), websocket()],
[
join_scenario(),
single_client_scenario(),
diff --git a/core/integration/tests/server/general.rs
b/core/integration/tests/server/general.rs
index 50277cc3c..ec88d6ffc 100644
--- a/core/integration/tests/server/general.rs
+++ b/core/integration/tests/server/general.rs
@@ -18,7 +18,7 @@
use crate::server::{
ScenarioFn, authentication_scenario, bench_scenario,
consumer_timestamp_polling_scenario,
create_message_payload_scenario, message_headers_scenario,
permissions_scenario, run_scenario,
- stream_size_validation_scenario, system_scenario, user_scenario,
+ snapshot_scenario, stream_size_validation_scenario, system_scenario,
user_scenario,
};
use iggy_common::TransportProtocol;
use serial_test::parallel;
@@ -36,6 +36,7 @@ use test_case::test_matrix;
stream_size_validation_scenario(),
bench_scenario(),
consumer_timestamp_polling_scenario(),
+ snapshot_scenario(),
]
)]
#[tokio::test]
diff --git a/core/integration/tests/server/message_retrieval.rs
b/core/integration/tests/server/message_retrieval.rs
index 7061d27a0..24a4ff0d0 100644
--- a/core/integration/tests/server/message_retrieval.rs
+++ b/core/integration/tests/server/message_retrieval.rs
@@ -16,7 +16,7 @@
* under the License.
*/
-use crate::server::scenarios::{get_messages_by_offset_api,
get_messages_by_timestamp_api};
+use crate::server::scenarios::{offset_scenario, timestamp_scenario};
use integration::{
tcp_client::TcpClientFactory,
test_server::{IpAddrKind, TestServer},
@@ -72,7 +72,7 @@ fn msgs_req_9984() -> u32 {
)]
#[tokio::test]
#[parallel]
-async fn test_get_messages_by_offset_api(
+async fn get_by_offset_scenario(
segment_size: &str,
cache_indexes: &str,
messages_required_to_save: u32,
@@ -105,7 +105,7 @@ async fn test_get_messages_by_offset_api(
..Default::default()
};
- get_messages_by_offset_api::run(&client_factory).await;
+ offset_scenario::run(&client_factory).await;
}
#[test_matrix(
@@ -115,7 +115,7 @@ async fn test_get_messages_by_offset_api(
)]
#[tokio::test]
#[parallel]
-async fn test_get_messages_by_timestamp_api(
+async fn get_by_timestamp_scenario(
segment_size: &str,
cache_indexes: &str,
messages_required_to_save: u32,
@@ -148,5 +148,5 @@ async fn test_get_messages_by_timestamp_api(
..Default::default()
};
- get_messages_by_timestamp_api::run(&client_factory).await;
+ timestamp_scenario::run(&client_factory).await;
}
diff --git a/core/integration/tests/server/mod.rs
b/core/integration/tests/server/mod.rs
index 1a9285640..65ee5d2c4 100644
--- a/core/integration/tests/server/mod.rs
+++ b/core/integration/tests/server/mod.rs
@@ -37,7 +37,8 @@ use scenarios::{
consumer_group_with_multiple_clients_polling_messages_scenario,
consumer_group_with_single_client_polling_messages_scenario,
consumer_timestamp_polling_scenario, create_message_payload,
message_headers_scenario,
- permissions_scenario, stream_size_validation_scenario, system_scenario,
user_scenario,
+ permissions_scenario, snapshot_scenario, stream_size_validation_scenario,
system_scenario,
+ user_scenario,
};
use std::pin::Pin;
use std::{collections::HashMap, future::Future};
@@ -100,6 +101,10 @@ fn permissions_scenario() -> ScenarioFn {
|factory| Box::pin(permissions_scenario::run(factory))
}
+fn snapshot_scenario() -> ScenarioFn {
+ |factory| Box::pin(snapshot_scenario::run(factory))
+}
+
fn consumer_timestamp_polling_scenario() -> ScenarioFn {
|factory| Box::pin(consumer_timestamp_polling_scenario::run(factory))
}
diff --git a/core/integration/tests/server/scenarios/mod.rs
b/core/integration/tests/server/scenarios/mod.rs
index 58cc8a3b2..3253493c4 100644
--- a/core/integration/tests/server/scenarios/mod.rs
+++ b/core/integration/tests/server/scenarios/mod.rs
@@ -29,18 +29,19 @@ pub mod create_message_payload;
pub mod cross_protocol_pat_scenario;
pub mod delete_segments_scenario;
pub mod encryption_scenario;
-pub mod get_messages_by_offset_api;
-pub mod get_messages_by_timestamp_api;
pub mod message_headers_scenario;
pub mod message_size_scenario;
+pub mod offset_scenario;
pub mod permissions_scenario;
pub mod read_during_persistence_scenario;
pub mod segment_rotation_race_scenario;
pub mod single_message_per_batch_scenario;
+pub mod snapshot_scenario;
pub mod stale_client_consumer_group_scenario;
pub mod stream_size_validation_scenario;
pub mod system_scenario;
pub mod tcp_tls_scenario;
+pub mod timestamp_scenario;
pub mod user_scenario;
pub mod websocket_tls_scenario;
diff --git
a/core/integration/tests/server/scenarios/get_messages_by_offset_api.rs
b/core/integration/tests/server/scenarios/offset_scenario.rs
similarity index 90%
rename from
core/integration/tests/server/scenarios/get_messages_by_offset_api.rs
rename to core/integration/tests/server/scenarios/offset_scenario.rs
index 568fd847d..cc519bd63 100644
--- a/core/integration/tests/server/scenarios/get_messages_by_offset_api.rs
+++ b/core/integration/tests/server/scenarios/offset_scenario.rs
@@ -107,6 +107,7 @@ async fn run_offset_test(
&batch_offsets,
)
.await;
+ verify_sequential_chunk_reads(client, &stream_name, &topic_name,
total_messages_count).await;
cleanup(client, &stream_name).await;
}
@@ -416,6 +417,48 @@ async fn verify_message_content(
}
}
+async fn verify_sequential_chunk_reads(
+ client: &IggyClient,
+ stream_name: &str,
+ topic_name: &str,
+ total_messages: u32,
+) {
+ let chunk_size = 500u32;
+ let mut verified_count = 0u32;
+
+ for chunk_start in (0..total_messages).step_by(chunk_size as usize) {
+ let read_size = std::cmp::min(chunk_size, total_messages -
chunk_start);
+
+ let polled = client
+ .poll_messages(
+ &Identifier::named(stream_name).unwrap(),
+ &Identifier::named(topic_name).unwrap(),
+ Some(PARTITION_ID),
+ &Consumer::default(),
+ &PollingStrategy::offset(chunk_start as u64),
+ read_size,
+ false,
+ )
+ .await
+ .unwrap();
+
+ assert_eq!(
+ polled.messages.len() as u32,
+ read_size,
+ "Failed to read chunk at offset {} with size {}",
+ chunk_start,
+ read_size
+ );
+
+ verified_count += polled.messages.len() as u32;
+ }
+
+ assert_eq!(
+ verified_count, total_messages,
+ "Sequential chunk reads didn't cover all messages"
+ );
+}
+
async fn cleanup(client: &IggyClient, stream_name: &str) {
client
.delete_stream(&Identifier::named(stream_name).unwrap())
diff --git a/core/integration/tests/streaming/snapshot.rs
b/core/integration/tests/server/scenarios/snapshot_scenario.rs
similarity index 56%
rename from core/integration/tests/streaming/snapshot.rs
rename to core/integration/tests/server/scenarios/snapshot_scenario.rs
index 76c9fa971..0780ca6c3 100644
--- a/core/integration/tests/streaming/snapshot.rs
+++ b/core/integration/tests/server/scenarios/snapshot_scenario.rs
@@ -16,40 +16,28 @@
* under the License.
*/
-//TODO: Fix me use shard instead of system
-/*
-use crate::streaming::common::test_setup::TestSetup;
-use iggy::prelude::{SnapshotCompression, SystemSnapshotType};
-use server::configs::cluster::ClusterConfig;
-use server::configs::server::{DataMaintenanceConfig,
PersonalAccessTokenConfig};
-use server::streaming::session::Session;
+use iggy::prelude::*;
+use integration::test_server::{ClientFactory, assert_clean_system};
use std::io::{Cursor, Read};
-use std::net::{Ipv4Addr, SocketAddr};
use zip::ZipArchive;
+pub async fn run(client_factory: &dyn ClientFactory) {
+ let client = client_factory.create_client().await;
+ let client = IggyClient::create(client, None, None);
-#[tokio::test]
-async fn should_create_snapshot_file() {
- let setup = TestSetup::init().await;
- let mut system = System::new(
- setup.config.clone(),
- ClusterConfig::default(),
- DataMaintenanceConfig::default(),
- PersonalAccessTokenConfig::default(),
- );
-
- system.init().await.unwrap();
-
- let session = Session::new(1, 1,
SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 1234));
+ client
+ .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+ .await
+ .unwrap();
- let snapshot = system
- .get_snapshot(
- &session,
+ let snapshot = client
+ .snapshot(
SnapshotCompression::Deflated,
- &vec![SystemSnapshotType::Test],
+ vec![SystemSnapshotType::Test],
)
.await
.unwrap();
+
assert!(!snapshot.0.is_empty());
let cursor = Cursor::new(snapshot.0);
@@ -57,7 +45,7 @@ async fn should_create_snapshot_file() {
let mut test_file = zip.by_name("test.txt").unwrap();
let mut test_content = String::new();
test_file.read_to_string(&mut test_content).unwrap();
- assert_eq!(test_content, "test\n");
-}
+ assert_eq!(test_content.trim(), "test");
-*/
+ assert_clean_system(&client).await;
+}
diff --git
a/core/integration/tests/server/scenarios/get_messages_by_timestamp_api.rs
b/core/integration/tests/server/scenarios/timestamp_scenario.rs
similarity index 100%
rename from
core/integration/tests/server/scenarios/get_messages_by_timestamp_api.rs
rename to core/integration/tests/server/scenarios/timestamp_scenario.rs
diff --git a/core/integration/tests/streaming/common/mod.rs
b/core/integration/tests/streaming/common/mod.rs
deleted file mode 100644
index 6bab3f356..000000000
--- a/core/integration/tests/streaming/common/mod.rs
+++ /dev/null
@@ -1,19 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-pub mod test_setup;
diff --git a/core/integration/tests/streaming/common/test_setup.rs
b/core/integration/tests/streaming/common/test_setup.rs
deleted file mode 100644
index f85aa73ea..000000000
--- a/core/integration/tests/streaming/common/test_setup.rs
+++ /dev/null
@@ -1,52 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-use compio::fs;
-use iggy_common::MemoryPool;
-use server::bootstrap::create_directories;
-use server::configs::system::SystemConfig;
-use std::sync::Arc;
-use uuid::Uuid;
-
-pub struct TestSetup {
- pub config: Arc<SystemConfig>,
-}
-
-impl TestSetup {
- pub async fn init() -> TestSetup {
- Self::init_with_config(SystemConfig::default()).await
- }
-
- pub async fn init_with_config(mut config: SystemConfig) -> TestSetup {
- config.path = format!("local_data_{}", Uuid::now_v7().to_u128_le());
- config.partition.enforce_fsync = true;
- config.state.enforce_fsync = true;
-
- let config = Arc::new(config);
- fs::create_dir(config.get_system_path()).await.unwrap();
- create_directories(&config).await.unwrap();
- MemoryPool::init_pool(&config.memory_pool.into_other());
- TestSetup { config }
- }
-}
-
-impl Drop for TestSetup {
- fn drop(&mut self) {
- std::fs::remove_dir_all(self.config.get_system_path()).unwrap();
- }
-}
diff --git a/core/integration/tests/streaming/get_by_offset.rs
b/core/integration/tests/streaming/get_by_offset.rs
deleted file mode 100644
index ca5438529..000000000
--- a/core/integration/tests/streaming/get_by_offset.rs
+++ /dev/null
@@ -1,396 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses let consumer =
PollingConsumer::consumer(&Identifier::numeric(2).unwrap(), partition_id as
usize);
- let args = PollingArgs::new(PollingStrategy::offset(middle_offset + 1),
remaining_messages, false);
- let (_, middle_messages) = streams
- .poll_messages(&namespace, consumer, args)s file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-use super::bootstrap_test_environment;
-use crate::streaming::common::test_setup::TestSetup;
-use bytes::BytesMut;
-use iggy::prelude::*;
-use server::configs::cache_indexes::CacheIndexesConfig;
-use server::configs::system::{PartitionConfig, SegmentConfig, SystemConfig};
-use server::shard::namespace::IggyFullNamespace;
-use server::shard::system::messages::PollingArgs;
-use server::streaming::polling_consumer::PollingConsumer;
-use server::streaming::segments::IggyMessagesBatchMut;
-use server::streaming::traits::MainOps;
-use std::collections::HashMap;
-use std::str::FromStr;
-use test_case::test_matrix;
-
-/*
- * Below helper functions are here only to make test function name more
readable.
- */
-
-fn msg_size(size: u64) -> IggyByteSize {
- IggyByteSize::from_str(&format!("{size}B")).unwrap()
-}
-
-fn segment_size(size: u64) -> IggyByteSize {
- IggyByteSize::from_str(&format!("{size}B")).unwrap()
-}
-
-fn msgs_req_to_save(count: u32) -> u32 {
- count
-}
-
-fn index_cache_all() -> CacheIndexesConfig {
- CacheIndexesConfig::All
-}
-
-fn index_cache_none() -> CacheIndexesConfig {
- CacheIndexesConfig::None
-}
-
-fn index_cache_open_segment() -> CacheIndexesConfig {
- CacheIndexesConfig::OpenSegment
-}
-
-fn small_batches() -> Vec<u32> {
- vec![3, 4, 5, 6, 7]
-}
-
-fn medium_batches() -> Vec<u32> {
- vec![10, 20, 30, 40]
-}
-
-fn large_batches() -> Vec<u32> {
- vec![100, 200, 300, 400]
-}
-
-fn very_large_batches() -> Vec<u32> {
- vec![500, 1000, 1500, 1000]
-}
-
-#[test_matrix(
- [msg_size(50), msg_size(1000), msg_size(20000)],
- [small_batches(), medium_batches(), large_batches(), very_large_batches()],
- [msgs_req_to_save(1), msgs_req_to_save(24), msgs_req_to_save(1000),
msgs_req_to_save(10000)],
- [segment_size(10), segment_size(200), segment_size(10000000)],
- [index_cache_none(), index_cache_all(), index_cache_open_segment()])]
-#[compio::test]
-async fn test_get_messages_by_offset(
- message_size: IggyByteSize,
- batch_lengths: Vec<u32>,
- messages_required_to_save: u32,
- segment_size: IggyByteSize,
- cache_indexes: CacheIndexesConfig,
-) {
- println!(
- "Running test with message_size: {message_size}, batches:
{batch_lengths:?}, messages_required_to_save: {messages_required_to_save},
segment_size: {segment_size}, cache_indexes: {cache_indexes}"
- );
-
- let shard_id = 0;
- let setup = TestSetup::init().await;
-
- let total_messages_count = batch_lengths.iter().sum();
-
- let config = SystemConfig {
- path: setup.config.path.to_string(),
- partition: PartitionConfig {
- messages_required_to_save,
- enforce_fsync: true,
- ..Default::default()
- },
- segment: SegmentConfig {
- cache_indexes,
- size: segment_size,
- ..Default::default()
- },
- ..Default::default()
- };
-
- // Use the bootstrap method to create streams with proper slab structure
- let bootstrap_result = bootstrap_test_environment(shard_id as u16, &config)
- .await
- .unwrap();
- let streams = bootstrap_result.streams;
- let stream_identifier = bootstrap_result.stream_id;
- let topic_identifier = bootstrap_result.topic_id;
- let partition_id = bootstrap_result.partition_id;
- let task_registry = bootstrap_result.task_registry;
-
- // Create namespace for MainOps calls
- let namespace = IggyFullNamespace::new(
- stream_identifier.clone(),
- topic_identifier.clone(),
- partition_id,
- );
-
- let mut all_messages = Vec::with_capacity(total_messages_count as usize);
-
- // Generate all messages as defined in the test matrix
- for i in 1..=total_messages_count {
- let id = i as u128;
- let beginning_of_payload = format!("message {i}");
- let mut payload = BytesMut::new();
- payload.extend_from_slice(beginning_of_payload.as_bytes());
- payload.resize(message_size.as_bytes_usize(), 0xD);
- let payload = payload.freeze();
-
- let mut headers = HashMap::new();
- headers.insert(
- HeaderKey::new("key_1").unwrap(),
- HeaderValue::from_str("Value 1").unwrap(),
- );
- headers.insert(
- HeaderKey::new("key 2").unwrap(),
- HeaderValue::from_bool(true).unwrap(),
- );
- headers.insert(
- HeaderKey::new("key-3").unwrap(),
- HeaderValue::from_uint64(123456).unwrap(),
- );
-
- let message = IggyMessage::builder()
- .id(id)
- .payload(payload)
- .user_headers(headers)
- .build()
- .expect("Failed to create message with valid payload and headers");
- all_messages.push(message);
- }
-
- // Keep track of offsets after each batch
- let mut batch_offsets = Vec::with_capacity(batch_lengths.len());
- let mut current_pos = 0;
-
- // Append all batches as defined in the test matrix
- for (batch_idx, &batch_len) in batch_lengths.iter().enumerate() {
- // If we've generated too many messages, skip the rest
- if current_pos + batch_len as usize > all_messages.len() {
- break;
- }
-
- println!(
- "Appending batch {}/{} with {} messages",
- batch_idx + 1,
- batch_lengths.len(),
- batch_len
- );
-
- let batch_end_pos = current_pos + batch_len as usize;
- let messages_slice_to_append =
&all_messages[current_pos..batch_end_pos];
-
- let messages_size = messages_slice_to_append
- .iter()
- .map(|m| m.get_size_bytes().as_bytes_u64() as u32)
- .sum();
-
- let batch =
IggyMessagesBatchMut::from_messages(messages_slice_to_append, messages_size);
- assert_eq!(batch.count(), batch_len);
- streams
- .append_messages(&config, &task_registry, &namespace, batch)
- .await
- .unwrap();
-
- // Get current offset after appending
- let current_offset = streams.with_partition_by_id(
- &stream_identifier,
- &topic_identifier,
- partition_id,
- |(_, _, _, offset, ..)|
offset.load(std::sync::atomic::Ordering::Relaxed),
- );
- batch_offsets.push(current_offset);
- current_pos += batch_len as usize;
- }
-
- // Use the exact total messages count from the test matrix
- let total_sent_messages = total_messages_count;
-
- // Create a single consumer to reuse throughout the test
- let consumer =
- PollingConsumer::consumer(&Identifier::numeric(1).unwrap(),
partition_id as usize);
-
- // Test 1: All messages from start
- let args = PollingArgs::new(PollingStrategy::offset(0),
total_sent_messages, false);
- let (_, all_loaded_messages) = streams
- .poll_messages(&namespace, consumer, args)
- .await
- .unwrap();
- assert_eq!(
- all_loaded_messages.count(),
- total_sent_messages,
- "Expected {} messages from start, but got {}",
- total_sent_messages,
- all_loaded_messages.count()
- );
-
- // Test 2: Get messages from middle (after 3rd batch)
- if batch_offsets.len() >= 3 {
- let middle_offset = batch_offsets[2];
- let prior_batches_sum: u32 = batch_lengths[..3].iter().sum();
- let remaining_messages = total_sent_messages - prior_batches_sum;
-
- let args = PollingArgs::new(
- PollingStrategy::offset(middle_offset + 1),
- remaining_messages,
- false,
- );
- let (_, middle_messages) = streams
- .poll_messages(&namespace, consumer, args)
- .await
- .unwrap();
-
- assert_eq!(
- middle_messages.count(),
- remaining_messages,
- "Expected {} messages from middle offset, but got {}",
- remaining_messages,
- middle_messages.count()
- );
- }
-
- // Test 3: No messages beyond final offset
- if !batch_offsets.is_empty() {
- let final_offset = *batch_offsets.last().unwrap();
- let args = PollingArgs::new(PollingStrategy::offset(final_offset + 1),
1, false);
- let (_, no_messages) = streams
- .poll_messages(&namespace, consumer, args)
- .await
- .unwrap();
- assert_eq!(
- no_messages.count(),
- 0,
- "Expected no messages beyond final offset, but got {}",
- no_messages.count()
- );
- }
-
- // Test 4: Small subset from start
- let subset_size = std::cmp::min(3, total_sent_messages);
- let args = PollingArgs::new(PollingStrategy::offset(0), subset_size,
false);
- let (_, subset_messages) = streams
- .poll_messages(&namespace, consumer, args)
- .await
- .unwrap();
- assert_eq!(
- subset_messages.count(),
- subset_size,
- "Expected {} messages in subset from start, but got {}",
- subset_size,
- subset_messages.count()
- );
-
- // Test 5: Messages spanning multiple batches
- if batch_offsets.len() >= 4 {
- let span_offset = batch_offsets[1] + 1; // Start from middle of 2nd
batch
- let span_size = 8; // Should span across 2nd, 3rd, and into 4th batch
- let args = PollingArgs::new(PollingStrategy::offset(span_offset),
span_size, false);
- let (_, batches) = streams
- .poll_messages(&namespace, consumer, args)
- .await
- .unwrap();
- assert_eq!(
- batches.count(),
- span_size,
- "Expected {} messages spanning multiple batches, but got {}",
- span_size,
- batches.count()
- );
-
- // Test 6: Validate message content and ordering for all messages
- let mut i = 0;
-
- for batch in batches.iter() {
- for msg in batch.iter() {
- let expected_offset = span_offset + i as u64;
- assert!(
- msg.header().offset() >= expected_offset,
- "Message offset {} at position {} should be >= expected
offset {}",
- msg.header().offset(),
- i,
- expected_offset
- );
-
- let original_offset = msg.header().offset() as usize;
- if original_offset < all_messages.len() {
- let original_message = &all_messages[original_offset];
-
- let loaded_id = msg.header().id();
- let original_id = original_message.header.id;
- assert_eq!(
- loaded_id,
- original_id,
- "Message ID mismatch at offset {}",
- msg.header().offset(),
- );
-
- let loaded_payload = msg.payload();
- let original_payload = &original_message.payload;
- assert_eq!(
- loaded_payload,
- original_payload,
- "Payload mismatch at offset {}",
- msg.header().offset(),
- );
-
- let loaded_headers =
msg.user_headers_map().unwrap().unwrap();
- let original_headers =
-
HashMap::from_bytes(original_message.user_headers.clone().unwrap())
- .unwrap();
- assert_eq!(
- loaded_headers,
- original_headers,
- "Headers mismatch at offset {}",
- msg.header().offset(),
- );
- }
- i += 1;
- }
- }
- }
-
- // Add sequential read test for all batch sizes
- println!("Verifying sequential reads, expecting {total_sent_messages}
messages");
-
- let chunk_size = 500;
- let mut verified_count = 0;
-
- for chunk_start in (0..total_sent_messages).step_by(chunk_size as usize) {
- let read_size = std::cmp::min(chunk_size, total_sent_messages -
chunk_start);
-
- let args = PollingArgs::new(
- PollingStrategy::offset(chunk_start as u64),
- read_size,
- false,
- );
- let (_, chunk) = streams
- .poll_messages(&namespace, consumer, args)
- .await
- .unwrap();
-
- assert_eq!(
- chunk.count(),
- read_size,
- "Failed to read chunk at offset {chunk_start} with size
{read_size}"
- );
-
- verified_count += chunk.count();
-
- println!(
- "Read chunk at offset {chunk_start} with size {read_size},
verified count: {verified_count}"
- );
- }
-
- assert_eq!(
- verified_count, total_sent_messages,
- "Sequential chunk reads didn't cover all messages"
- );
-}
diff --git a/core/integration/tests/streaming/get_by_timestamp.rs
b/core/integration/tests/streaming/get_by_timestamp.rs
deleted file mode 100644
index 90afc5967..000000000
--- a/core/integration/tests/streaming/get_by_timestamp.rs
+++ /dev/null
@@ -1,372 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-use super::bootstrap_test_environment;
-use crate::streaming::common::test_setup::TestSetup;
-use bytes::BytesMut;
-use iggy::prelude::*;
-use server::configs::cache_indexes::CacheIndexesConfig;
-use server::configs::system::{PartitionConfig, SegmentConfig, SystemConfig};
-use server::shard::namespace::IggyFullNamespace;
-use server::shard::system::messages::PollingArgs;
-use server::streaming::polling_consumer::PollingConsumer;
-use server::streaming::segments::IggyMessagesBatchMut;
-use server::streaming::traits::MainOps;
-use std::collections::HashMap;
-use std::str::FromStr;
-use std::thread::sleep;
-use test_case::test_matrix;
-
-/*
- * Below helper functions are here only to make test function name more
readable.
- */
-
-fn msg_size(size: u64) -> IggyByteSize {
- IggyByteSize::from_str(&format!("{size}B")).unwrap()
-}
-
-fn segment_size(size: u64) -> IggyByteSize {
- IggyByteSize::from_str(&format!("{size}B")).unwrap()
-}
-
-fn msgs_req_to_save(count: u32) -> u32 {
- count
-}
-
-fn index_cache_all() -> CacheIndexesConfig {
- CacheIndexesConfig::All
-}
-
-fn index_cache_none() -> CacheIndexesConfig {
- CacheIndexesConfig::None
-}
-
-fn index_cache_open_segment() -> CacheIndexesConfig {
- CacheIndexesConfig::OpenSegment
-}
-
-fn small_batches() -> Vec<u32> {
- vec![3, 4, 5, 6, 7]
-}
-
-fn medium_batches() -> Vec<u32> {
- vec![10, 20, 30, 40]
-}
-
-fn large_batches() -> Vec<u32> {
- vec![100, 200, 300, 400]
-}
-
-fn very_large_batches() -> Vec<u32> {
- vec![500, 1000, 1500, 1000]
-}
-
-#[test_matrix(
- [msg_size(50), msg_size(1000), msg_size(20000)],
- [small_batches(), medium_batches(), large_batches(), very_large_batches()],
- [msgs_req_to_save(1), msgs_req_to_save(24), msgs_req_to_save(1000),
msgs_req_to_save(10000)],
- [segment_size(10), segment_size(200), segment_size(10000000)],
- [index_cache_none(), index_cache_all(), index_cache_open_segment()])]
-#[compio::test]
-async fn test_get_messages_by_timestamp(
- message_size: IggyByteSize,
- batch_lengths: Vec<u32>,
- messages_required_to_save: u32,
- segment_size: IggyByteSize,
- cache_indexes: CacheIndexesConfig,
-) {
- println!(
- "Running test with message_size: {message_size}, batches:
{batch_lengths:?}, messages_required_to_save: {messages_required_to_save},
segment_size: {segment_size}, cache_indexes: {cache_indexes}"
- );
-
- let shard_id = 0;
- let setup = TestSetup::init().await;
-
- let total_messages_count = batch_lengths.iter().sum();
-
- let config = SystemConfig {
- path: setup.config.path.to_string(),
- partition: PartitionConfig {
- messages_required_to_save,
- enforce_fsync: true,
- ..Default::default()
- },
- segment: SegmentConfig {
- cache_indexes,
- size: segment_size,
- ..Default::default()
- },
- ..Default::default()
- };
-
- // Use the bootstrap method to create streams with proper slab structure
- let bootstrap_result = bootstrap_test_environment(shard_id as u16, &config)
- .await
- .unwrap();
- let streams = bootstrap_result.streams;
- let stream_identifier = bootstrap_result.stream_id;
- let topic_identifier = bootstrap_result.topic_id;
- let partition_id = bootstrap_result.partition_id;
- let task_registry = bootstrap_result.task_registry;
-
- // Create namespace for MainOps calls
- let namespace = IggyFullNamespace::new(
- stream_identifier.clone(),
- topic_identifier.clone(),
- partition_id,
- );
-
- let mut all_messages = Vec::with_capacity(total_messages_count as usize);
-
- // Generate all messages as defined in the test matrix
- for i in 1..=total_messages_count {
- let id = i as u128;
- let beginning_of_payload = format!("message {i}");
- let mut payload = BytesMut::new();
- payload.extend_from_slice(beginning_of_payload.as_bytes());
- payload.resize(message_size.as_bytes_usize(), 0xD);
- let payload = payload.freeze();
-
- let mut headers = HashMap::new();
- headers.insert(
- HeaderKey::new("key_1").unwrap(),
- HeaderValue::from_str("Value 1").unwrap(),
- );
- headers.insert(
- HeaderKey::new("key 2").unwrap(),
- HeaderValue::from_bool(true).unwrap(),
- );
- headers.insert(
- HeaderKey::new("key-3").unwrap(),
- HeaderValue::from_uint64(123456).unwrap(),
- );
-
- let message = IggyMessage::builder()
- .id(id)
- .payload(payload)
- .user_headers(headers)
- .build()
- .expect("Failed to create message with valid payload and headers");
-
- all_messages.push(message);
- }
-
- // Timestamp tracking for messages
- let initial_timestamp = IggyTimestamp::now();
- let mut batch_timestamps = Vec::with_capacity(batch_lengths.len());
- let mut current_pos = 0;
-
- // Append all batches as defined in the test matrix with separate
timestamps
- for (batch_idx, &batch_len) in batch_lengths.iter().enumerate() {
- // Add a small delay between batches to ensure distinct timestamps
- sleep(std::time::Duration::from_millis(2));
-
- // If we've generated too many messages, skip the rest
- if current_pos + batch_len as usize > all_messages.len() {
- break;
- }
-
- println!(
- "Appending batch {}/{} with {} messages",
- batch_idx + 1,
- batch_lengths.len(),
- batch_len
- );
-
- let batch_end_pos = current_pos + batch_len as usize;
- let messages_slice_to_append =
&all_messages[current_pos..batch_end_pos];
-
- let messages_size = messages_slice_to_append
- .iter()
- .map(|m| m.get_size_bytes().as_bytes_u64() as u32)
- .sum();
-
- let batch =
IggyMessagesBatchMut::from_messages(messages_slice_to_append, messages_size);
- assert_eq!(batch.count(), batch_len);
- streams
- .append_messages(&config, &task_registry, &namespace, batch)
- .await
- .unwrap();
-
- // Capture the timestamp of this batch
- batch_timestamps.push(IggyTimestamp::now());
- current_pos += batch_len as usize;
-
- // Add a small delay between batches to ensure distinct timestamps
- sleep(std::time::Duration::from_millis(2));
- }
-
- let final_timestamp = IggyTimestamp::now();
-
- // Use the exact total messages count from the test matrix
- let total_sent_messages = total_messages_count;
-
- // Create a single consumer to reuse throughout the test
- let consumer =
- PollingConsumer::consumer(&Identifier::numeric(1).unwrap(),
partition_id as usize);
-
- // Test 1: All messages from initial timestamp
- let args = PollingArgs::new(
- PollingStrategy::timestamp(initial_timestamp),
- total_sent_messages,
- false,
- );
- let (_, all_loaded_messages) = streams
- .poll_messages(&namespace, consumer, args)
- .await
- .unwrap();
- assert_eq!(
- all_loaded_messages.count(),
- total_sent_messages,
- "Expected {} messages from initial timestamp, but got {}",
- total_sent_messages,
- all_loaded_messages.count()
- );
-
- // Test 2: Get messages from middle timestamp (after 3rd batch)
- if batch_timestamps.len() >= 3 {
- // Use a timestamp that's just before the 3rd batch's timestamp to
ensure we get messages
- // from that batch onwards
- let middle_timestamp =
IggyTimestamp::from(batch_timestamps[2].as_micros() + 1000);
-
- // Calculate how many messages should be in batches after the 3rd
- let prior_batches_sum: u32 = batch_lengths[..3].iter().sum();
- let remaining_messages = total_sent_messages - prior_batches_sum;
-
- let args = PollingArgs::new(
- PollingStrategy::timestamp(middle_timestamp),
- remaining_messages,
- false,
- );
- let (_, middle_messages) = streams
- .poll_messages(&namespace, consumer, args)
- .await
- .unwrap();
-
- assert_eq!(
- middle_messages.count(),
- remaining_messages,
- "Expected {} messages from middle timestamp, but got {}",
- remaining_messages,
- middle_messages.count()
- );
- }
-
- // Test 3: No messages after final timestamp
- let args = PollingArgs::new(PollingStrategy::timestamp(final_timestamp),
1, false);
- let (_, no_messages) = streams
- .poll_messages(&namespace, consumer, args)
- .await
- .unwrap();
- assert_eq!(
- no_messages.count(),
- 0,
- "Expected no messages after final timestamp, but got {}",
- no_messages.count()
- );
-
- // Test 4: Small subset from initial timestamp
- let subset_size = std::cmp::min(3, total_sent_messages);
- let args = PollingArgs::new(
- PollingStrategy::timestamp(initial_timestamp),
- subset_size,
- false,
- );
- let (_, subset_messages) = streams
- .poll_messages(&namespace, consumer, args)
- .await
- .unwrap();
- assert_eq!(
- subset_messages.count(),
- subset_size,
- "Expected {} messages in subset from initial timestamp, but got {}",
- subset_size,
- subset_messages.count()
- );
-
- // Test 5: Messages spanning multiple batches by timestamp
- if batch_timestamps.len() >= 4 {
- // Use a timestamp that's just before the 2nd batch's timestamp
- let span_timestamp =
IggyTimestamp::from(batch_timestamps[1].as_micros() + 1000);
- let span_size = 8; // Should span across multiple batches
-
- let args =
PollingArgs::new(PollingStrategy::timestamp(span_timestamp), span_size, false);
- let (_, spanning_messages) = streams
- .poll_messages(&namespace, consumer, args)
- .await
- .unwrap();
-
- assert_eq!(
- spanning_messages.count(),
- span_size,
- "Expected {} messages spanning multiple batches, but got {}",
- span_size,
- spanning_messages.count()
- );
-
- // Verify that all messages have timestamps >= our reference timestamp
- let span_timestamp_micros = span_timestamp.as_micros();
-
- // Test 6: Validate message content and ordering
- for batch in spanning_messages.iter() {
- for msg in batch.iter() {
- let msg_timestamp = msg.header().timestamp();
- assert!(
- msg_timestamp >= span_timestamp_micros,
- "Message timestamp {msg_timestamp} should be >= span
timestamp {span_timestamp_micros}"
- );
-
- // Verify message content
- let loaded_id = msg.header().id();
- let original_offset = msg.header().offset() as usize;
-
- if original_offset < all_messages.len() {
- let original_message = &all_messages[original_offset];
- let original_id = original_message.header.id;
-
- assert_eq!(
- loaded_id,
- original_id,
- "Message ID mismatch at offset {}",
- msg.header().offset(),
- );
-
- let loaded_payload = msg.payload();
- let original_payload = &original_message.payload;
- assert_eq!(
- loaded_payload,
- original_payload,
- "Payload mismatch at offset {}",
- msg.header().offset(),
- );
-
- let loaded_headers =
msg.user_headers_map().unwrap().unwrap();
- let original_headers =
-
HashMap::from_bytes(original_message.user_headers.clone().unwrap())
- .unwrap();
- assert_eq!(
- loaded_headers,
- original_headers,
- "Headers mismatch at offset {}",
- msg.header().offset(),
- );
- }
- }
- }
- }
-}
diff --git a/core/integration/tests/streaming/mod.rs
b/core/integration/tests/streaming/mod.rs
deleted file mode 100644
index 6ad773647..000000000
--- a/core/integration/tests/streaming/mod.rs
+++ /dev/null
@@ -1,132 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-use iggy_common::{CompressionAlgorithm, Identifier, IggyError, IggyExpiry,
MaxTopicSize};
-use server::{
- configs::system::SystemConfig,
- shard::{task_registry::TaskRegistry,
transmission::connector::ShardConnector},
- slab::{streams::Streams, traits_ext::EntityMarker},
- streaming::{
- self,
- partitions::{partition, storage::create_partition_file_hierarchy},
- segments::{Segment, storage::create_segment_storage},
- streams::{storage::create_stream_file_hierarchy, stream},
- topics::{storage::create_topic_file_hierarchy, topic},
- },
-};
-use std::rc::Rc;
-
-mod common;
-mod get_by_offset;
-mod get_by_timestamp;
-mod snapshot;
-
-struct BootstrapResult {
- streams: Streams,
- stream_id: Identifier,
- topic_id: Identifier,
- partition_id: usize,
- task_registry: Rc<TaskRegistry>,
-}
-
-async fn bootstrap_test_environment(
- shard_id: u16,
- config: &SystemConfig,
-) -> Result<BootstrapResult, IggyError> {
- let stream_name = "stream-1".to_owned();
- let topic_name = "topic-1".to_owned();
- let topic_expiry = IggyExpiry::NeverExpire;
- let topic_size = MaxTopicSize::Unlimited;
- let partitions_count = 1;
-
- let streams = Streams::default();
- // Create stream together with its dirs
- let stream = stream::create_and_insert_stream_mem(&streams, stream_name);
- create_stream_file_hierarchy(stream.id(), config).await?;
- // Create topic together with its dirs
- let stream_id = Identifier::numeric(stream.id() as u32).unwrap();
- let parent_stats = streams.with_stream_by_id(&stream_id, |(_, stats)|
stats.clone());
- let message_expiry = config.resolve_message_expiry(topic_expiry);
- let max_topic_size = config.resolve_max_topic_size(topic_size)?;
-
- let topic = topic::create_and_insert_topics_mem(
- &streams,
- &stream_id,
- topic_name,
- 1,
- message_expiry,
- CompressionAlgorithm::default(),
- max_topic_size,
- parent_stats,
- );
- create_topic_file_hierarchy(stream.id(), topic.id(), config).await?;
- // Create partition together with its dirs
- let topic_id = Identifier::numeric(topic.id() as u32).unwrap();
- let parent_stats = streams.with_topic_by_id(
- &stream_id,
- &topic_id,
- streaming::topics::helpers::get_stats(),
- );
- let partitions = partition::create_and_insert_partitions_mem(
- &streams,
- &stream_id,
- &topic_id,
- parent_stats,
- partitions_count,
- config,
- );
- for partition in partitions {
- create_partition_file_hierarchy(stream.id(), topic.id(),
partition.id(), config).await?;
-
- // Open the log
- let start_offset = 0;
- let segment = Segment::new(
- start_offset,
- config.segment.size,
- config.segment.message_expiry,
- );
- let messages_size = 0;
- let indexes_size = 0;
- let storage = create_segment_storage(
- config,
- stream.id(),
- topic.id(),
- partition.id(),
- messages_size,
- indexes_size,
- start_offset,
- )
- .await?;
-
- streams.with_partition_by_id_mut(&stream_id, &topic_id,
partition.id(), |(.., log)| {
- log.add_persisted_segment(segment, storage);
- });
- }
-
- // Create a test task registry with dummy stop sender from ShardConnector
- let connector: ShardConnector<()> = ShardConnector::new(shard_id);
- let task_registry = Rc::new(TaskRegistry::new(shard_id,
vec![connector.stop_sender]));
-
- Ok(BootstrapResult {
- streams,
- stream_id,
- topic_id,
- partition_id: 0,
- task_registry,
- })
-}