This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch get-by-offset-timestamp-improvement in repository https://gitbox.apache.org/repos/asf/iggy.git
commit b0eaa90ff4cd97f549ddf597685d77a4f6dafebd Author: Hubert Gruszecki <[email protected]> AuthorDate: Tue Jan 20 15:16:44 2026 +0100 test(server): remove streaming tests superseded by API-level coverage The streaming-layer tests (get_by_offset, get_by_timestamp) are redundant now that get_messages_by_offset_api.rs covers the same scenarios through the full client stack. Besides that, migrated snapshot test to API-level scenario. --- core/integration/tests/mod.rs | 1 - core/integration/tests/server/cg.rs | 10 +- core/integration/tests/server/general.rs | 3 +- core/integration/tests/server/message_retrieval.rs | 10 +- core/integration/tests/server/mod.rs | 7 +- core/integration/tests/server/scenarios/mod.rs | 5 +- ...essages_by_offset_api.rs => offset_scenario.rs} | 43 +++ .../scenarios/snapshot_scenario.rs} | 44 +-- ...s_by_timestamp_api.rs => timestamp_scenario.rs} | 0 core/integration/tests/streaming/common/mod.rs | 19 - .../tests/streaming/common/test_setup.rs | 52 --- core/integration/tests/streaming/get_by_offset.rs | 396 --------------------- .../tests/streaming/get_by_timestamp.rs | 372 ------------------- core/integration/tests/streaming/mod.rs | 132 ------- 14 files changed, 84 insertions(+), 1010 deletions(-) diff --git a/core/integration/tests/mod.rs b/core/integration/tests/mod.rs index fb2156d8e..b451f4499 100644 --- a/core/integration/tests/mod.rs +++ b/core/integration/tests/mod.rs @@ -33,7 +33,6 @@ mod mcp; mod sdk; mod server; mod state; -mod streaming; lazy_static! { static ref TESTS_FAILED: AtomicBool = AtomicBool::new(false); diff --git a/core/integration/tests/server/cg.rs b/core/integration/tests/server/cg.rs index 736024c25..bf12c871e 100644 --- a/core/integration/tests/server/cg.rs +++ b/core/integration/tests/server/cg.rs @@ -23,10 +23,18 @@ use iggy_common::TransportProtocol; use serial_test::parallel; use test_case::test_matrix; +fn tcp() -> TransportProtocol { + TransportProtocol::Tcp +} + +fn websocket() -> TransportProtocol { + TransportProtocol::WebSocket +} + // TODO: Add `QUIC`. // Consumer group scenarios do not support HTTP #[test_matrix( - [TransportProtocol::Tcp, TransportProtocol::WebSocket], + [tcp(), websocket()], [ join_scenario(), single_client_scenario(), diff --git a/core/integration/tests/server/general.rs b/core/integration/tests/server/general.rs index 50277cc3c..ec88d6ffc 100644 --- a/core/integration/tests/server/general.rs +++ b/core/integration/tests/server/general.rs @@ -18,7 +18,7 @@ use crate::server::{ ScenarioFn, authentication_scenario, bench_scenario, consumer_timestamp_polling_scenario, create_message_payload_scenario, message_headers_scenario, permissions_scenario, run_scenario, - stream_size_validation_scenario, system_scenario, user_scenario, + snapshot_scenario, stream_size_validation_scenario, system_scenario, user_scenario, }; use iggy_common::TransportProtocol; use serial_test::parallel; @@ -36,6 +36,7 @@ use test_case::test_matrix; stream_size_validation_scenario(), bench_scenario(), consumer_timestamp_polling_scenario(), + snapshot_scenario(), ] )] #[tokio::test] diff --git a/core/integration/tests/server/message_retrieval.rs b/core/integration/tests/server/message_retrieval.rs index 7061d27a0..24a4ff0d0 100644 --- a/core/integration/tests/server/message_retrieval.rs +++ b/core/integration/tests/server/message_retrieval.rs @@ -16,7 +16,7 @@ * under the License. */ -use crate::server::scenarios::{get_messages_by_offset_api, get_messages_by_timestamp_api}; +use crate::server::scenarios::{offset_scenario, timestamp_scenario}; use integration::{ tcp_client::TcpClientFactory, test_server::{IpAddrKind, TestServer}, @@ -72,7 +72,7 @@ fn msgs_req_9984() -> u32 { )] #[tokio::test] #[parallel] -async fn test_get_messages_by_offset_api( +async fn get_by_offset_scenario( segment_size: &str, cache_indexes: &str, messages_required_to_save: u32, @@ -105,7 +105,7 @@ async fn test_get_messages_by_offset_api( ..Default::default() }; - get_messages_by_offset_api::run(&client_factory).await; + offset_scenario::run(&client_factory).await; } #[test_matrix( @@ -115,7 +115,7 @@ async fn test_get_messages_by_offset_api( )] #[tokio::test] #[parallel] -async fn test_get_messages_by_timestamp_api( +async fn get_by_timestamp_scenario( segment_size: &str, cache_indexes: &str, messages_required_to_save: u32, @@ -148,5 +148,5 @@ async fn test_get_messages_by_timestamp_api( ..Default::default() }; - get_messages_by_timestamp_api::run(&client_factory).await; + timestamp_scenario::run(&client_factory).await; } diff --git a/core/integration/tests/server/mod.rs b/core/integration/tests/server/mod.rs index 1a9285640..65ee5d2c4 100644 --- a/core/integration/tests/server/mod.rs +++ b/core/integration/tests/server/mod.rs @@ -37,7 +37,8 @@ use scenarios::{ consumer_group_with_multiple_clients_polling_messages_scenario, consumer_group_with_single_client_polling_messages_scenario, consumer_timestamp_polling_scenario, create_message_payload, message_headers_scenario, - permissions_scenario, stream_size_validation_scenario, system_scenario, user_scenario, + permissions_scenario, snapshot_scenario, stream_size_validation_scenario, system_scenario, + user_scenario, }; use std::pin::Pin; use std::{collections::HashMap, future::Future}; @@ -100,6 +101,10 @@ fn permissions_scenario() -> ScenarioFn { |factory| Box::pin(permissions_scenario::run(factory)) } +fn snapshot_scenario() -> ScenarioFn { + |factory| Box::pin(snapshot_scenario::run(factory)) +} + fn consumer_timestamp_polling_scenario() -> ScenarioFn { |factory| Box::pin(consumer_timestamp_polling_scenario::run(factory)) } diff --git a/core/integration/tests/server/scenarios/mod.rs b/core/integration/tests/server/scenarios/mod.rs index 58cc8a3b2..3253493c4 100644 --- a/core/integration/tests/server/scenarios/mod.rs +++ b/core/integration/tests/server/scenarios/mod.rs @@ -29,18 +29,19 @@ pub mod create_message_payload; pub mod cross_protocol_pat_scenario; pub mod delete_segments_scenario; pub mod encryption_scenario; -pub mod get_messages_by_offset_api; -pub mod get_messages_by_timestamp_api; pub mod message_headers_scenario; pub mod message_size_scenario; +pub mod offset_scenario; pub mod permissions_scenario; pub mod read_during_persistence_scenario; pub mod segment_rotation_race_scenario; pub mod single_message_per_batch_scenario; +pub mod snapshot_scenario; pub mod stale_client_consumer_group_scenario; pub mod stream_size_validation_scenario; pub mod system_scenario; pub mod tcp_tls_scenario; +pub mod timestamp_scenario; pub mod user_scenario; pub mod websocket_tls_scenario; diff --git a/core/integration/tests/server/scenarios/get_messages_by_offset_api.rs b/core/integration/tests/server/scenarios/offset_scenario.rs similarity index 90% rename from core/integration/tests/server/scenarios/get_messages_by_offset_api.rs rename to core/integration/tests/server/scenarios/offset_scenario.rs index 568fd847d..cc519bd63 100644 --- a/core/integration/tests/server/scenarios/get_messages_by_offset_api.rs +++ b/core/integration/tests/server/scenarios/offset_scenario.rs @@ -107,6 +107,7 @@ async fn run_offset_test( &batch_offsets, ) .await; + verify_sequential_chunk_reads(client, &stream_name, &topic_name, total_messages_count).await; cleanup(client, &stream_name).await; } @@ -416,6 +417,48 @@ async fn verify_message_content( } } +async fn verify_sequential_chunk_reads( + client: &IggyClient, + stream_name: &str, + topic_name: &str, + total_messages: u32, +) { + let chunk_size = 500u32; + let mut verified_count = 0u32; + + for chunk_start in (0..total_messages).step_by(chunk_size as usize) { + let read_size = std::cmp::min(chunk_size, total_messages - chunk_start); + + let polled = client + .poll_messages( + &Identifier::named(stream_name).unwrap(), + &Identifier::named(topic_name).unwrap(), + Some(PARTITION_ID), + &Consumer::default(), + &PollingStrategy::offset(chunk_start as u64), + read_size, + false, + ) + .await + .unwrap(); + + assert_eq!( + polled.messages.len() as u32, + read_size, + "Failed to read chunk at offset {} with size {}", + chunk_start, + read_size + ); + + verified_count += polled.messages.len() as u32; + } + + assert_eq!( + verified_count, total_messages, + "Sequential chunk reads didn't cover all messages" + ); +} + async fn cleanup(client: &IggyClient, stream_name: &str) { client .delete_stream(&Identifier::named(stream_name).unwrap()) diff --git a/core/integration/tests/streaming/snapshot.rs b/core/integration/tests/server/scenarios/snapshot_scenario.rs similarity index 56% rename from core/integration/tests/streaming/snapshot.rs rename to core/integration/tests/server/scenarios/snapshot_scenario.rs index 76c9fa971..0780ca6c3 100644 --- a/core/integration/tests/streaming/snapshot.rs +++ b/core/integration/tests/server/scenarios/snapshot_scenario.rs @@ -16,40 +16,28 @@ * under the License. */ -//TODO: Fix me use shard instead of system -/* -use crate::streaming::common::test_setup::TestSetup; -use iggy::prelude::{SnapshotCompression, SystemSnapshotType}; -use server::configs::cluster::ClusterConfig; -use server::configs::server::{DataMaintenanceConfig, PersonalAccessTokenConfig}; -use server::streaming::session::Session; +use iggy::prelude::*; +use integration::test_server::{ClientFactory, assert_clean_system}; use std::io::{Cursor, Read}; -use std::net::{Ipv4Addr, SocketAddr}; use zip::ZipArchive; +pub async fn run(client_factory: &dyn ClientFactory) { + let client = client_factory.create_client().await; + let client = IggyClient::create(client, None, None); -#[tokio::test] -async fn should_create_snapshot_file() { - let setup = TestSetup::init().await; - let mut system = System::new( - setup.config.clone(), - ClusterConfig::default(), - DataMaintenanceConfig::default(), - PersonalAccessTokenConfig::default(), - ); - - system.init().await.unwrap(); - - let session = Session::new(1, 1, SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 1234)); + client + .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) + .await + .unwrap(); - let snapshot = system - .get_snapshot( - &session, + let snapshot = client + .snapshot( SnapshotCompression::Deflated, - &vec![SystemSnapshotType::Test], + vec![SystemSnapshotType::Test], ) .await .unwrap(); + assert!(!snapshot.0.is_empty()); let cursor = Cursor::new(snapshot.0); @@ -57,7 +45,7 @@ async fn should_create_snapshot_file() { let mut test_file = zip.by_name("test.txt").unwrap(); let mut test_content = String::new(); test_file.read_to_string(&mut test_content).unwrap(); - assert_eq!(test_content, "test\n"); -} + assert_eq!(test_content.trim(), "test"); -*/ + assert_clean_system(&client).await; +} diff --git a/core/integration/tests/server/scenarios/get_messages_by_timestamp_api.rs b/core/integration/tests/server/scenarios/timestamp_scenario.rs similarity index 100% rename from core/integration/tests/server/scenarios/get_messages_by_timestamp_api.rs rename to core/integration/tests/server/scenarios/timestamp_scenario.rs diff --git a/core/integration/tests/streaming/common/mod.rs b/core/integration/tests/streaming/common/mod.rs deleted file mode 100644 index 6bab3f356..000000000 --- a/core/integration/tests/streaming/common/mod.rs +++ /dev/null @@ -1,19 +0,0 @@ -/* Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -pub mod test_setup; diff --git a/core/integration/tests/streaming/common/test_setup.rs b/core/integration/tests/streaming/common/test_setup.rs deleted file mode 100644 index f85aa73ea..000000000 --- a/core/integration/tests/streaming/common/test_setup.rs +++ /dev/null @@ -1,52 +0,0 @@ -/* Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -use compio::fs; -use iggy_common::MemoryPool; -use server::bootstrap::create_directories; -use server::configs::system::SystemConfig; -use std::sync::Arc; -use uuid::Uuid; - -pub struct TestSetup { - pub config: Arc<SystemConfig>, -} - -impl TestSetup { - pub async fn init() -> TestSetup { - Self::init_with_config(SystemConfig::default()).await - } - - pub async fn init_with_config(mut config: SystemConfig) -> TestSetup { - config.path = format!("local_data_{}", Uuid::now_v7().to_u128_le()); - config.partition.enforce_fsync = true; - config.state.enforce_fsync = true; - - let config = Arc::new(config); - fs::create_dir(config.get_system_path()).await.unwrap(); - create_directories(&config).await.unwrap(); - MemoryPool::init_pool(&config.memory_pool.into_other()); - TestSetup { config } - } -} - -impl Drop for TestSetup { - fn drop(&mut self) { - std::fs::remove_dir_all(self.config.get_system_path()).unwrap(); - } -} diff --git a/core/integration/tests/streaming/get_by_offset.rs b/core/integration/tests/streaming/get_by_offset.rs deleted file mode 100644 index ca5438529..000000000 --- a/core/integration/tests/streaming/get_by_offset.rs +++ /dev/null @@ -1,396 +0,0 @@ -/* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses let consumer = PollingConsumer::consumer(&Identifier::numeric(2).unwrap(), partition_id as usize); - let args = PollingArgs::new(PollingStrategy::offset(middle_offset + 1), remaining_messages, false); - let (_, middle_messages) = streams - .poll_messages(&namespace, consumer, args)s file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -use super::bootstrap_test_environment; -use crate::streaming::common::test_setup::TestSetup; -use bytes::BytesMut; -use iggy::prelude::*; -use server::configs::cache_indexes::CacheIndexesConfig; -use server::configs::system::{PartitionConfig, SegmentConfig, SystemConfig}; -use server::shard::namespace::IggyFullNamespace; -use server::shard::system::messages::PollingArgs; -use server::streaming::polling_consumer::PollingConsumer; -use server::streaming::segments::IggyMessagesBatchMut; -use server::streaming::traits::MainOps; -use std::collections::HashMap; -use std::str::FromStr; -use test_case::test_matrix; - -/* - * Below helper functions are here only to make test function name more readable. - */ - -fn msg_size(size: u64) -> IggyByteSize { - IggyByteSize::from_str(&format!("{size}B")).unwrap() -} - -fn segment_size(size: u64) -> IggyByteSize { - IggyByteSize::from_str(&format!("{size}B")).unwrap() -} - -fn msgs_req_to_save(count: u32) -> u32 { - count -} - -fn index_cache_all() -> CacheIndexesConfig { - CacheIndexesConfig::All -} - -fn index_cache_none() -> CacheIndexesConfig { - CacheIndexesConfig::None -} - -fn index_cache_open_segment() -> CacheIndexesConfig { - CacheIndexesConfig::OpenSegment -} - -fn small_batches() -> Vec<u32> { - vec![3, 4, 5, 6, 7] -} - -fn medium_batches() -> Vec<u32> { - vec![10, 20, 30, 40] -} - -fn large_batches() -> Vec<u32> { - vec![100, 200, 300, 400] -} - -fn very_large_batches() -> Vec<u32> { - vec![500, 1000, 1500, 1000] -} - -#[test_matrix( - [msg_size(50), msg_size(1000), msg_size(20000)], - [small_batches(), medium_batches(), large_batches(), very_large_batches()], - [msgs_req_to_save(1), msgs_req_to_save(24), msgs_req_to_save(1000), msgs_req_to_save(10000)], - [segment_size(10), segment_size(200), segment_size(10000000)], - [index_cache_none(), index_cache_all(), index_cache_open_segment()])] -#[compio::test] -async fn test_get_messages_by_offset( - message_size: IggyByteSize, - batch_lengths: Vec<u32>, - messages_required_to_save: u32, - segment_size: IggyByteSize, - cache_indexes: CacheIndexesConfig, -) { - println!( - "Running test with message_size: {message_size}, batches: {batch_lengths:?}, messages_required_to_save: {messages_required_to_save}, segment_size: {segment_size}, cache_indexes: {cache_indexes}" - ); - - let shard_id = 0; - let setup = TestSetup::init().await; - - let total_messages_count = batch_lengths.iter().sum(); - - let config = SystemConfig { - path: setup.config.path.to_string(), - partition: PartitionConfig { - messages_required_to_save, - enforce_fsync: true, - ..Default::default() - }, - segment: SegmentConfig { - cache_indexes, - size: segment_size, - ..Default::default() - }, - ..Default::default() - }; - - // Use the bootstrap method to create streams with proper slab structure - let bootstrap_result = bootstrap_test_environment(shard_id as u16, &config) - .await - .unwrap(); - let streams = bootstrap_result.streams; - let stream_identifier = bootstrap_result.stream_id; - let topic_identifier = bootstrap_result.topic_id; - let partition_id = bootstrap_result.partition_id; - let task_registry = bootstrap_result.task_registry; - - // Create namespace for MainOps calls - let namespace = IggyFullNamespace::new( - stream_identifier.clone(), - topic_identifier.clone(), - partition_id, - ); - - let mut all_messages = Vec::with_capacity(total_messages_count as usize); - - // Generate all messages as defined in the test matrix - for i in 1..=total_messages_count { - let id = i as u128; - let beginning_of_payload = format!("message {i}"); - let mut payload = BytesMut::new(); - payload.extend_from_slice(beginning_of_payload.as_bytes()); - payload.resize(message_size.as_bytes_usize(), 0xD); - let payload = payload.freeze(); - - let mut headers = HashMap::new(); - headers.insert( - HeaderKey::new("key_1").unwrap(), - HeaderValue::from_str("Value 1").unwrap(), - ); - headers.insert( - HeaderKey::new("key 2").unwrap(), - HeaderValue::from_bool(true).unwrap(), - ); - headers.insert( - HeaderKey::new("key-3").unwrap(), - HeaderValue::from_uint64(123456).unwrap(), - ); - - let message = IggyMessage::builder() - .id(id) - .payload(payload) - .user_headers(headers) - .build() - .expect("Failed to create message with valid payload and headers"); - all_messages.push(message); - } - - // Keep track of offsets after each batch - let mut batch_offsets = Vec::with_capacity(batch_lengths.len()); - let mut current_pos = 0; - - // Append all batches as defined in the test matrix - for (batch_idx, &batch_len) in batch_lengths.iter().enumerate() { - // If we've generated too many messages, skip the rest - if current_pos + batch_len as usize > all_messages.len() { - break; - } - - println!( - "Appending batch {}/{} with {} messages", - batch_idx + 1, - batch_lengths.len(), - batch_len - ); - - let batch_end_pos = current_pos + batch_len as usize; - let messages_slice_to_append = &all_messages[current_pos..batch_end_pos]; - - let messages_size = messages_slice_to_append - .iter() - .map(|m| m.get_size_bytes().as_bytes_u64() as u32) - .sum(); - - let batch = IggyMessagesBatchMut::from_messages(messages_slice_to_append, messages_size); - assert_eq!(batch.count(), batch_len); - streams - .append_messages(&config, &task_registry, &namespace, batch) - .await - .unwrap(); - - // Get current offset after appending - let current_offset = streams.with_partition_by_id( - &stream_identifier, - &topic_identifier, - partition_id, - |(_, _, _, offset, ..)| offset.load(std::sync::atomic::Ordering::Relaxed), - ); - batch_offsets.push(current_offset); - current_pos += batch_len as usize; - } - - // Use the exact total messages count from the test matrix - let total_sent_messages = total_messages_count; - - // Create a single consumer to reuse throughout the test - let consumer = - PollingConsumer::consumer(&Identifier::numeric(1).unwrap(), partition_id as usize); - - // Test 1: All messages from start - let args = PollingArgs::new(PollingStrategy::offset(0), total_sent_messages, false); - let (_, all_loaded_messages) = streams - .poll_messages(&namespace, consumer, args) - .await - .unwrap(); - assert_eq!( - all_loaded_messages.count(), - total_sent_messages, - "Expected {} messages from start, but got {}", - total_sent_messages, - all_loaded_messages.count() - ); - - // Test 2: Get messages from middle (after 3rd batch) - if batch_offsets.len() >= 3 { - let middle_offset = batch_offsets[2]; - let prior_batches_sum: u32 = batch_lengths[..3].iter().sum(); - let remaining_messages = total_sent_messages - prior_batches_sum; - - let args = PollingArgs::new( - PollingStrategy::offset(middle_offset + 1), - remaining_messages, - false, - ); - let (_, middle_messages) = streams - .poll_messages(&namespace, consumer, args) - .await - .unwrap(); - - assert_eq!( - middle_messages.count(), - remaining_messages, - "Expected {} messages from middle offset, but got {}", - remaining_messages, - middle_messages.count() - ); - } - - // Test 3: No messages beyond final offset - if !batch_offsets.is_empty() { - let final_offset = *batch_offsets.last().unwrap(); - let args = PollingArgs::new(PollingStrategy::offset(final_offset + 1), 1, false); - let (_, no_messages) = streams - .poll_messages(&namespace, consumer, args) - .await - .unwrap(); - assert_eq!( - no_messages.count(), - 0, - "Expected no messages beyond final offset, but got {}", - no_messages.count() - ); - } - - // Test 4: Small subset from start - let subset_size = std::cmp::min(3, total_sent_messages); - let args = PollingArgs::new(PollingStrategy::offset(0), subset_size, false); - let (_, subset_messages) = streams - .poll_messages(&namespace, consumer, args) - .await - .unwrap(); - assert_eq!( - subset_messages.count(), - subset_size, - "Expected {} messages in subset from start, but got {}", - subset_size, - subset_messages.count() - ); - - // Test 5: Messages spanning multiple batches - if batch_offsets.len() >= 4 { - let span_offset = batch_offsets[1] + 1; // Start from middle of 2nd batch - let span_size = 8; // Should span across 2nd, 3rd, and into 4th batch - let args = PollingArgs::new(PollingStrategy::offset(span_offset), span_size, false); - let (_, batches) = streams - .poll_messages(&namespace, consumer, args) - .await - .unwrap(); - assert_eq!( - batches.count(), - span_size, - "Expected {} messages spanning multiple batches, but got {}", - span_size, - batches.count() - ); - - // Test 6: Validate message content and ordering for all messages - let mut i = 0; - - for batch in batches.iter() { - for msg in batch.iter() { - let expected_offset = span_offset + i as u64; - assert!( - msg.header().offset() >= expected_offset, - "Message offset {} at position {} should be >= expected offset {}", - msg.header().offset(), - i, - expected_offset - ); - - let original_offset = msg.header().offset() as usize; - if original_offset < all_messages.len() { - let original_message = &all_messages[original_offset]; - - let loaded_id = msg.header().id(); - let original_id = original_message.header.id; - assert_eq!( - loaded_id, - original_id, - "Message ID mismatch at offset {}", - msg.header().offset(), - ); - - let loaded_payload = msg.payload(); - let original_payload = &original_message.payload; - assert_eq!( - loaded_payload, - original_payload, - "Payload mismatch at offset {}", - msg.header().offset(), - ); - - let loaded_headers = msg.user_headers_map().unwrap().unwrap(); - let original_headers = - HashMap::from_bytes(original_message.user_headers.clone().unwrap()) - .unwrap(); - assert_eq!( - loaded_headers, - original_headers, - "Headers mismatch at offset {}", - msg.header().offset(), - ); - } - i += 1; - } - } - } - - // Add sequential read test for all batch sizes - println!("Verifying sequential reads, expecting {total_sent_messages} messages"); - - let chunk_size = 500; - let mut verified_count = 0; - - for chunk_start in (0..total_sent_messages).step_by(chunk_size as usize) { - let read_size = std::cmp::min(chunk_size, total_sent_messages - chunk_start); - - let args = PollingArgs::new( - PollingStrategy::offset(chunk_start as u64), - read_size, - false, - ); - let (_, chunk) = streams - .poll_messages(&namespace, consumer, args) - .await - .unwrap(); - - assert_eq!( - chunk.count(), - read_size, - "Failed to read chunk at offset {chunk_start} with size {read_size}" - ); - - verified_count += chunk.count(); - - println!( - "Read chunk at offset {chunk_start} with size {read_size}, verified count: {verified_count}" - ); - } - - assert_eq!( - verified_count, total_sent_messages, - "Sequential chunk reads didn't cover all messages" - ); -} diff --git a/core/integration/tests/streaming/get_by_timestamp.rs b/core/integration/tests/streaming/get_by_timestamp.rs deleted file mode 100644 index 90afc5967..000000000 --- a/core/integration/tests/streaming/get_by_timestamp.rs +++ /dev/null @@ -1,372 +0,0 @@ -/* Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -use super::bootstrap_test_environment; -use crate::streaming::common::test_setup::TestSetup; -use bytes::BytesMut; -use iggy::prelude::*; -use server::configs::cache_indexes::CacheIndexesConfig; -use server::configs::system::{PartitionConfig, SegmentConfig, SystemConfig}; -use server::shard::namespace::IggyFullNamespace; -use server::shard::system::messages::PollingArgs; -use server::streaming::polling_consumer::PollingConsumer; -use server::streaming::segments::IggyMessagesBatchMut; -use server::streaming::traits::MainOps; -use std::collections::HashMap; -use std::str::FromStr; -use std::thread::sleep; -use test_case::test_matrix; - -/* - * Below helper functions are here only to make test function name more readable. - */ - -fn msg_size(size: u64) -> IggyByteSize { - IggyByteSize::from_str(&format!("{size}B")).unwrap() -} - -fn segment_size(size: u64) -> IggyByteSize { - IggyByteSize::from_str(&format!("{size}B")).unwrap() -} - -fn msgs_req_to_save(count: u32) -> u32 { - count -} - -fn index_cache_all() -> CacheIndexesConfig { - CacheIndexesConfig::All -} - -fn index_cache_none() -> CacheIndexesConfig { - CacheIndexesConfig::None -} - -fn index_cache_open_segment() -> CacheIndexesConfig { - CacheIndexesConfig::OpenSegment -} - -fn small_batches() -> Vec<u32> { - vec![3, 4, 5, 6, 7] -} - -fn medium_batches() -> Vec<u32> { - vec![10, 20, 30, 40] -} - -fn large_batches() -> Vec<u32> { - vec![100, 200, 300, 400] -} - -fn very_large_batches() -> Vec<u32> { - vec![500, 1000, 1500, 1000] -} - -#[test_matrix( - [msg_size(50), msg_size(1000), msg_size(20000)], - [small_batches(), medium_batches(), large_batches(), very_large_batches()], - [msgs_req_to_save(1), msgs_req_to_save(24), msgs_req_to_save(1000), msgs_req_to_save(10000)], - [segment_size(10), segment_size(200), segment_size(10000000)], - [index_cache_none(), index_cache_all(), index_cache_open_segment()])] -#[compio::test] -async fn test_get_messages_by_timestamp( - message_size: IggyByteSize, - batch_lengths: Vec<u32>, - messages_required_to_save: u32, - segment_size: IggyByteSize, - cache_indexes: CacheIndexesConfig, -) { - println!( - "Running test with message_size: {message_size}, batches: {batch_lengths:?}, messages_required_to_save: {messages_required_to_save}, segment_size: {segment_size}, cache_indexes: {cache_indexes}" - ); - - let shard_id = 0; - let setup = TestSetup::init().await; - - let total_messages_count = batch_lengths.iter().sum(); - - let config = SystemConfig { - path: setup.config.path.to_string(), - partition: PartitionConfig { - messages_required_to_save, - enforce_fsync: true, - ..Default::default() - }, - segment: SegmentConfig { - cache_indexes, - size: segment_size, - ..Default::default() - }, - ..Default::default() - }; - - // Use the bootstrap method to create streams with proper slab structure - let bootstrap_result = bootstrap_test_environment(shard_id as u16, &config) - .await - .unwrap(); - let streams = bootstrap_result.streams; - let stream_identifier = bootstrap_result.stream_id; - let topic_identifier = bootstrap_result.topic_id; - let partition_id = bootstrap_result.partition_id; - let task_registry = bootstrap_result.task_registry; - - // Create namespace for MainOps calls - let namespace = IggyFullNamespace::new( - stream_identifier.clone(), - topic_identifier.clone(), - partition_id, - ); - - let mut all_messages = Vec::with_capacity(total_messages_count as usize); - - // Generate all messages as defined in the test matrix - for i in 1..=total_messages_count { - let id = i as u128; - let beginning_of_payload = format!("message {i}"); - let mut payload = BytesMut::new(); - payload.extend_from_slice(beginning_of_payload.as_bytes()); - payload.resize(message_size.as_bytes_usize(), 0xD); - let payload = payload.freeze(); - - let mut headers = HashMap::new(); - headers.insert( - HeaderKey::new("key_1").unwrap(), - HeaderValue::from_str("Value 1").unwrap(), - ); - headers.insert( - HeaderKey::new("key 2").unwrap(), - HeaderValue::from_bool(true).unwrap(), - ); - headers.insert( - HeaderKey::new("key-3").unwrap(), - HeaderValue::from_uint64(123456).unwrap(), - ); - - let message = IggyMessage::builder() - .id(id) - .payload(payload) - .user_headers(headers) - .build() - .expect("Failed to create message with valid payload and headers"); - - all_messages.push(message); - } - - // Timestamp tracking for messages - let initial_timestamp = IggyTimestamp::now(); - let mut batch_timestamps = Vec::with_capacity(batch_lengths.len()); - let mut current_pos = 0; - - // Append all batches as defined in the test matrix with separate timestamps - for (batch_idx, &batch_len) in batch_lengths.iter().enumerate() { - // Add a small delay between batches to ensure distinct timestamps - sleep(std::time::Duration::from_millis(2)); - - // If we've generated too many messages, skip the rest - if current_pos + batch_len as usize > all_messages.len() { - break; - } - - println!( - "Appending batch {}/{} with {} messages", - batch_idx + 1, - batch_lengths.len(), - batch_len - ); - - let batch_end_pos = current_pos + batch_len as usize; - let messages_slice_to_append = &all_messages[current_pos..batch_end_pos]; - - let messages_size = messages_slice_to_append - .iter() - .map(|m| m.get_size_bytes().as_bytes_u64() as u32) - .sum(); - - let batch = IggyMessagesBatchMut::from_messages(messages_slice_to_append, messages_size); - assert_eq!(batch.count(), batch_len); - streams - .append_messages(&config, &task_registry, &namespace, batch) - .await - .unwrap(); - - // Capture the timestamp of this batch - batch_timestamps.push(IggyTimestamp::now()); - current_pos += batch_len as usize; - - // Add a small delay between batches to ensure distinct timestamps - sleep(std::time::Duration::from_millis(2)); - } - - let final_timestamp = IggyTimestamp::now(); - - // Use the exact total messages count from the test matrix - let total_sent_messages = total_messages_count; - - // Create a single consumer to reuse throughout the test - let consumer = - PollingConsumer::consumer(&Identifier::numeric(1).unwrap(), partition_id as usize); - - // Test 1: All messages from initial timestamp - let args = PollingArgs::new( - PollingStrategy::timestamp(initial_timestamp), - total_sent_messages, - false, - ); - let (_, all_loaded_messages) = streams - .poll_messages(&namespace, consumer, args) - .await - .unwrap(); - assert_eq!( - all_loaded_messages.count(), - total_sent_messages, - "Expected {} messages from initial timestamp, but got {}", - total_sent_messages, - all_loaded_messages.count() - ); - - // Test 2: Get messages from middle timestamp (after 3rd batch) - if batch_timestamps.len() >= 3 { - // Use a timestamp that's just before the 3rd batch's timestamp to ensure we get messages - // from that batch onwards - let middle_timestamp = IggyTimestamp::from(batch_timestamps[2].as_micros() + 1000); - - // Calculate how many messages should be in batches after the 3rd - let prior_batches_sum: u32 = batch_lengths[..3].iter().sum(); - let remaining_messages = total_sent_messages - prior_batches_sum; - - let args = PollingArgs::new( - PollingStrategy::timestamp(middle_timestamp), - remaining_messages, - false, - ); - let (_, middle_messages) = streams - .poll_messages(&namespace, consumer, args) - .await - .unwrap(); - - assert_eq!( - middle_messages.count(), - remaining_messages, - "Expected {} messages from middle timestamp, but got {}", - remaining_messages, - middle_messages.count() - ); - } - - // Test 3: No messages after final timestamp - let args = PollingArgs::new(PollingStrategy::timestamp(final_timestamp), 1, false); - let (_, no_messages) = streams - .poll_messages(&namespace, consumer, args) - .await - .unwrap(); - assert_eq!( - no_messages.count(), - 0, - "Expected no messages after final timestamp, but got {}", - no_messages.count() - ); - - // Test 4: Small subset from initial timestamp - let subset_size = std::cmp::min(3, total_sent_messages); - let args = PollingArgs::new( - PollingStrategy::timestamp(initial_timestamp), - subset_size, - false, - ); - let (_, subset_messages) = streams - .poll_messages(&namespace, consumer, args) - .await - .unwrap(); - assert_eq!( - subset_messages.count(), - subset_size, - "Expected {} messages in subset from initial timestamp, but got {}", - subset_size, - subset_messages.count() - ); - - // Test 5: Messages spanning multiple batches by timestamp - if batch_timestamps.len() >= 4 { - // Use a timestamp that's just before the 2nd batch's timestamp - let span_timestamp = IggyTimestamp::from(batch_timestamps[1].as_micros() + 1000); - let span_size = 8; // Should span across multiple batches - - let args = PollingArgs::new(PollingStrategy::timestamp(span_timestamp), span_size, false); - let (_, spanning_messages) = streams - .poll_messages(&namespace, consumer, args) - .await - .unwrap(); - - assert_eq!( - spanning_messages.count(), - span_size, - "Expected {} messages spanning multiple batches, but got {}", - span_size, - spanning_messages.count() - ); - - // Verify that all messages have timestamps >= our reference timestamp - let span_timestamp_micros = span_timestamp.as_micros(); - - // Test 6: Validate message content and ordering - for batch in spanning_messages.iter() { - for msg in batch.iter() { - let msg_timestamp = msg.header().timestamp(); - assert!( - msg_timestamp >= span_timestamp_micros, - "Message timestamp {msg_timestamp} should be >= span timestamp {span_timestamp_micros}" - ); - - // Verify message content - let loaded_id = msg.header().id(); - let original_offset = msg.header().offset() as usize; - - if original_offset < all_messages.len() { - let original_message = &all_messages[original_offset]; - let original_id = original_message.header.id; - - assert_eq!( - loaded_id, - original_id, - "Message ID mismatch at offset {}", - msg.header().offset(), - ); - - let loaded_payload = msg.payload(); - let original_payload = &original_message.payload; - assert_eq!( - loaded_payload, - original_payload, - "Payload mismatch at offset {}", - msg.header().offset(), - ); - - let loaded_headers = msg.user_headers_map().unwrap().unwrap(); - let original_headers = - HashMap::from_bytes(original_message.user_headers.clone().unwrap()) - .unwrap(); - assert_eq!( - loaded_headers, - original_headers, - "Headers mismatch at offset {}", - msg.header().offset(), - ); - } - } - } - } -} diff --git a/core/integration/tests/streaming/mod.rs b/core/integration/tests/streaming/mod.rs deleted file mode 100644 index 6ad773647..000000000 --- a/core/integration/tests/streaming/mod.rs +++ /dev/null @@ -1,132 +0,0 @@ -/* Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -use iggy_common::{CompressionAlgorithm, Identifier, IggyError, IggyExpiry, MaxTopicSize}; -use server::{ - configs::system::SystemConfig, - shard::{task_registry::TaskRegistry, transmission::connector::ShardConnector}, - slab::{streams::Streams, traits_ext::EntityMarker}, - streaming::{ - self, - partitions::{partition, storage::create_partition_file_hierarchy}, - segments::{Segment, storage::create_segment_storage}, - streams::{storage::create_stream_file_hierarchy, stream}, - topics::{storage::create_topic_file_hierarchy, topic}, - }, -}; -use std::rc::Rc; - -mod common; -mod get_by_offset; -mod get_by_timestamp; -mod snapshot; - -struct BootstrapResult { - streams: Streams, - stream_id: Identifier, - topic_id: Identifier, - partition_id: usize, - task_registry: Rc<TaskRegistry>, -} - -async fn bootstrap_test_environment( - shard_id: u16, - config: &SystemConfig, -) -> Result<BootstrapResult, IggyError> { - let stream_name = "stream-1".to_owned(); - let topic_name = "topic-1".to_owned(); - let topic_expiry = IggyExpiry::NeverExpire; - let topic_size = MaxTopicSize::Unlimited; - let partitions_count = 1; - - let streams = Streams::default(); - // Create stream together with its dirs - let stream = stream::create_and_insert_stream_mem(&streams, stream_name); - create_stream_file_hierarchy(stream.id(), config).await?; - // Create topic together with its dirs - let stream_id = Identifier::numeric(stream.id() as u32).unwrap(); - let parent_stats = streams.with_stream_by_id(&stream_id, |(_, stats)| stats.clone()); - let message_expiry = config.resolve_message_expiry(topic_expiry); - let max_topic_size = config.resolve_max_topic_size(topic_size)?; - - let topic = topic::create_and_insert_topics_mem( - &streams, - &stream_id, - topic_name, - 1, - message_expiry, - CompressionAlgorithm::default(), - max_topic_size, - parent_stats, - ); - create_topic_file_hierarchy(stream.id(), topic.id(), config).await?; - // Create partition together with its dirs - let topic_id = Identifier::numeric(topic.id() as u32).unwrap(); - let parent_stats = streams.with_topic_by_id( - &stream_id, - &topic_id, - streaming::topics::helpers::get_stats(), - ); - let partitions = partition::create_and_insert_partitions_mem( - &streams, - &stream_id, - &topic_id, - parent_stats, - partitions_count, - config, - ); - for partition in partitions { - create_partition_file_hierarchy(stream.id(), topic.id(), partition.id(), config).await?; - - // Open the log - let start_offset = 0; - let segment = Segment::new( - start_offset, - config.segment.size, - config.segment.message_expiry, - ); - let messages_size = 0; - let indexes_size = 0; - let storage = create_segment_storage( - config, - stream.id(), - topic.id(), - partition.id(), - messages_size, - indexes_size, - start_offset, - ) - .await?; - - streams.with_partition_by_id_mut(&stream_id, &topic_id, partition.id(), |(.., log)| { - log.add_persisted_segment(segment, storage); - }); - } - - // Create a test task registry with dummy stop sender from ShardConnector - let connector: ShardConnector<()> = ShardConnector::new(shard_id); - let task_registry = Rc::new(TaskRegistry::new(shard_id, vec![connector.stop_sender])); - - Ok(BootstrapResult { - streams, - stream_id, - topic_id, - partition_id: 0, - task_registry, - }) -}
