This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch zero-copy-no-batching in repository https://gitbox.apache.org/repos/asf/iggy.git
commit ce3cf664e7bb422661f4d5679254e03dee027717 Author: Hubert Gruszecki <[email protected]> AuthorDate: Sun Mar 23 19:16:31 2025 +0100 another day, another refactor --- integration/tests/streaming/get_by_offset.rs | 289 +++++++++++------ integration/tests/streaming/get_by_timestamp.rs | 350 ++++++++++++++------- integration/tests/streaming/mod.rs | 2 +- sdk/src/models/messaging/indexes.rs | 179 ++--------- sdk/src/models/messaging/message.rs | 46 +-- sdk/src/models/messaging/messages_batch.rs | 167 ++++++---- server/src/streaming/partitions/messages.rs | 134 +++----- .../src/streaming/segments/indexes/index_reader.rs | 6 +- .../src/streaming/segments/indexes/index_writer.rs | 2 +- .../src/streaming/segments/indexes/indexes_mut.rs | 62 ++-- .../streaming/segments/messages/messages_writer.rs | 11 +- server/src/streaming/segments/messages/mod.rs | 13 +- .../src/streaming/segments/messages_accumulator.rs | 184 ++++++----- server/src/streaming/segments/reading_messages.rs | 87 +++-- .../streaming/segments/types/message_view_mut.rs | 7 - .../streaming/segments/types/messages_batch_mut.rs | 140 ++++++--- .../streaming/segments/types/messages_batch_set.rs | 42 ++- server/src/streaming/segments/writing_messages.rs | 83 ++--- server/src/streaming/systems/messages.rs | 2 +- 19 files changed, 980 insertions(+), 826 deletions(-) diff --git a/integration/tests/streaming/get_by_offset.rs b/integration/tests/streaming/get_by_offset.rs index 721a3e8e..7a67e6d9 100644 --- a/integration/tests/streaming/get_by_offset.rs +++ b/integration/tests/streaming/get_by_offset.rs @@ -15,11 +15,11 @@ use test_case::test_matrix; */ fn msg_size(size: u64) -> IggyByteSize { - IggyByteSize::from_str(&format!("{}b", size)).unwrap() + IggyByteSize::from_str(&format!("{}B", size)).unwrap() } fn segment_size(size: u64) -> IggyByteSize { - IggyByteSize::from_str(&format!("{}b", size)).unwrap() + IggyByteSize::from_str(&format!("{}B", size)).unwrap() } fn msgs_req_to_save(count: u32) -> u32 { @@ -34,23 +34,42 @@ fn index_cache_disabled() -> bool { false } +fn small_batches() -> Vec<u32> { + vec![3, 4, 5, 6, 7] +} + +fn medium_batches() -> Vec<u32> { + vec![10, 20, 30, 40] +} + +fn large_batches() -> Vec<u32> { + vec![100, 200, 300, 400] +} + +fn very_large_batches() -> Vec<u32> { + vec![500, 1000, 1500, 1000] +} + #[test_matrix( [msg_size(50), msg_size(1000), msg_size(20000)], - [msgs_req_to_save(10), msgs_req_to_save(24), msgs_req_to_save(1000)], - [segment_size(500), segment_size(2000), segment_size(100000)], + [small_batches(), medium_batches(), large_batches(), very_large_batches()], + [msgs_req_to_save(10), msgs_req_to_save(24), msgs_req_to_save(1000), msgs_req_to_save(4000)], + [segment_size(500), segment_size(2000), segment_size(100000), segment_size(10000000)], [index_cache_disabled(), index_cache_enabled()])] #[tokio::test] async fn test_get_messages_by_offset( message_size: IggyByteSize, + batch_sizes: Vec<u32>, messages_required_to_save: u32, segment_size: IggyByteSize, index_cache_enabled: bool, ) { println!( - "Running test with messages_required_to_save: {}, segment_size: {}, message_size: {}, cache_indexes: {}", + "Running test with message_size: {}, batches: {:?}, messages_required_to_save: {}, segment_size: {}, cache_indexes: {}", + message_size, + batch_sizes, messages_required_to_save, segment_size, - message_size, index_cache_enabled ); @@ -59,9 +78,7 @@ async fn test_get_messages_by_offset( let topic_id = 1; let partition_id = 1; - // Define batch sizes for 5 appends - let batch_sizes = [3, 4, 5, 6, 7]; - let total_messages: u32 = batch_sizes.iter().sum(); + let total_messages_count = batch_sizes.iter().sum(); let config = Arc::new(SystemConfig { path: setup.config.path.to_string(), @@ -98,8 +115,10 @@ async fn test_get_messages_by_offset( setup.create_partitions_directory(stream_id, topic_id).await; partition.persist().await.unwrap(); - let mut all_messages = Vec::with_capacity(total_messages as usize); - for i in 1..=total_messages { + let mut all_messages = Vec::with_capacity(total_messages_count as usize); + + // Generate all messages as defined in the test matrix + for i in 1..=total_messages_count { let id = i as u128; let beginning_of_payload = format!("message {}", i); let mut payload = BytesMut::new(); @@ -133,9 +152,22 @@ async fn test_get_messages_by_offset( let mut batch_offsets = Vec::with_capacity(batch_sizes.len()); let mut current_pos = 0; - // Append all batches - for batch_len in batch_sizes { - let messages_slice_to_append = &all_messages[current_pos..current_pos + batch_len as usize]; + // Append all batches as defined in the test matrix + for (batch_idx, &batch_len) in batch_sizes.iter().enumerate() { + // If we've generated too many messages, skip the rest + if current_pos + batch_len as usize > all_messages.len() { + break; + } + + println!( + "Appending batch {}/{} with {} messages", + batch_idx + 1, + batch_sizes.len(), + batch_len + ); + + let batch_end_pos = current_pos + batch_len as usize; + let messages_slice_to_append = &all_messages[current_pos..batch_end_pos]; let messages_size = messages_slice_to_append .iter() @@ -150,49 +182,59 @@ async fn test_get_messages_by_offset( current_pos += batch_len as usize; } + // Use the exact total messages count from the test matrix + let total_sent_messages = total_messages_count; + // Test 1: All messages from start let all_loaded_messages = partition - .get_messages_by_offset(0, total_messages) + .get_messages_by_offset(0, total_sent_messages) .await .unwrap(); assert_eq!( all_loaded_messages.count(), - total_messages, + total_sent_messages, "Expected {} messages from start, but got {}", - total_messages, + total_sent_messages, all_loaded_messages.count() ); // Test 2: Get messages from middle (after 3rd batch) - let middle_offset = batch_offsets[2]; - let remaining_messages = total_messages - (batch_sizes[0] + batch_sizes[1] + batch_sizes[2]); - let middle_messages = partition - .get_messages_by_offset(middle_offset + 1, remaining_messages) - .await - .unwrap(); - assert_eq!( - middle_messages.count(), - remaining_messages, - "Expected {} messages from middle offset, but got {}", - remaining_messages, - middle_messages.count() - ); + if batch_offsets.len() >= 3 { + let middle_offset = batch_offsets[2]; + let prior_batches_sum: u32 = batch_sizes[..3].iter().sum(); + let remaining_messages = total_sent_messages - prior_batches_sum; + + let middle_messages = partition + .get_messages_by_offset(middle_offset + 1, remaining_messages) + .await + .unwrap(); + + assert_eq!( + middle_messages.count(), + remaining_messages, + "Expected {} messages from middle offset, but got {}", + remaining_messages, + middle_messages.count() + ); + } // Test 3: No messages beyond final offset - let final_offset = batch_offsets.last().unwrap(); - let no_messages = partition - .get_messages_by_offset(final_offset + 1, 1) - .await - .unwrap(); - assert_eq!( - no_messages.count(), - 0, - "Expected no messages beyond final offset, but got {}", - no_messages.count() - ); + if !batch_offsets.is_empty() { + let final_offset = *batch_offsets.last().unwrap(); + let no_messages = partition + .get_messages_by_offset(final_offset + 1, 1) + .await + .unwrap(); + assert_eq!( + no_messages.count(), + 0, + "Expected no messages beyond final offset, but got {}", + no_messages.count() + ); + } // Test 4: Small subset from start - let subset_size = 3; + let subset_size = std::cmp::min(3, total_sent_messages); let subset_messages = partition .get_messages_by_offset(0, subset_size) .await @@ -206,70 +248,109 @@ async fn test_get_messages_by_offset( ); // Test 5: Messages spanning multiple batches - let span_offset = batch_offsets[1] + 1; // Start from middle of 2nd batch - let span_size = 8; // Should span across 2nd, 3rd, and into 4th batch - let batches = partition - .get_messages_by_offset(span_offset, span_size) - .await - .unwrap(); - assert_eq!( - batches.count(), - span_size, - "Expected {} messages spanning multiple batches, but got {}", - span_size, - batches.count() - ); + if batch_offsets.len() >= 4 { + let span_offset = batch_offsets[1] + 1; // Start from middle of 2nd batch + let span_size = 8; // Should span across 2nd, 3rd, and into 4th batch + let batches = partition + .get_messages_by_offset(span_offset, span_size) + .await + .unwrap(); + assert_eq!( + batches.count(), + span_size, + "Expected {} messages spanning multiple batches, but got {}", + span_size, + batches.count() + ); - // Test 6: Validate message content and ordering - let mut i = 0; - for batch in batches.iter() { - for msg in batch.iter() { - println!( - "Message at position {}, offset: {}", - i, - msg.header().offset() - ); - let expected_offset = span_offset + i as u64; - assert!( - msg.header().offset() >= expected_offset, - "Message offset {} at position {} should be >= expected offset {}", - msg.header().offset(), - i, - expected_offset - ); - - let original_offset = msg.header().offset() as usize; - let original_message = &all_messages[original_offset]; - - let loaded_id = msg.header().id(); - let original_id = original_message.header.id; - assert_eq!( - loaded_id, - original_id, - "Message ID mismatch at offset {}", - msg.header().offset(), - ); - - let loaded_payload = msg.payload(); - let original_payload = &original_message.payload; - assert_eq!( - loaded_payload, - original_payload, - "Payload mismatch at offset {}", - msg.header().offset(), - ); - - let loaded_headers = msg.user_headers_map().unwrap().unwrap(); - let original_headers = - HashMap::from_bytes(original_message.user_headers.as_ref().unwrap().clone()) + // Test 6: Validate message content and ordering for all messages + let mut i = 0; + + for batch in batches.iter() { + for msg in batch.iter() { + let expected_offset = span_offset + i as u64; + assert!( + msg.header().offset() >= expected_offset, + "Message offset {} at position {} should be >= expected offset {}", + msg.header().offset(), + i, + expected_offset + ); + + let original_offset = msg.header().offset() as usize; + if original_offset < all_messages.len() { + let original_message = &all_messages[original_offset]; + + let loaded_id = msg.header().id(); + let original_id = original_message.header.id; + assert_eq!( + loaded_id, + original_id, + "Message ID mismatch at offset {}", + msg.header().offset(), + ); + + let loaded_payload = msg.payload(); + let original_payload = &original_message.payload; + assert_eq!( + loaded_payload, + original_payload, + "Payload mismatch at offset {}", + msg.header().offset(), + ); + + let loaded_headers = msg.user_headers_map().unwrap().unwrap(); + let original_headers = HashMap::from_bytes( + original_message.user_headers.as_ref().unwrap().clone(), + ) .unwrap(); - assert_eq!( - loaded_headers, - original_headers, - "Headers mismatch at offset {}", - msg.header().offset(), - ); - i += 1; + assert_eq!( + loaded_headers, + original_headers, + "Headers mismatch at offset {}", + msg.header().offset(), + ); + } + i += 1; + } } } + + // Add sequential read test for all batch sizes + println!( + "Verifying sequential reads, expecting {} messages", + total_sent_messages + ); + + let chunk_size = 500; + let mut verified_count = 0; + + for chunk_start in (0..total_sent_messages).step_by(chunk_size as usize) { + let read_size = std::cmp::min(chunk_size, total_sent_messages - chunk_start); + + let chunk = partition + .get_messages_by_offset(chunk_start as u64, read_size) + .await + .unwrap(); + + assert_eq!( + chunk.count(), + read_size, + "Failed to read chunk at offset {} with size {}", + chunk_start, + read_size + ); + + verified_count += chunk.count(); + + println!( + "Read chunk at offset {} with size {}, verified count: {}", + chunk_start, read_size, verified_count + ); + } + + assert_eq!( + verified_count, total_sent_messages, + "Sequential chunk reads didn't cover all messages" + ); } diff --git a/integration/tests/streaming/get_by_timestamp.rs b/integration/tests/streaming/get_by_timestamp.rs index 8241e819..d89e38ef 100644 --- a/integration/tests/streaming/get_by_timestamp.rs +++ b/integration/tests/streaming/get_by_timestamp.rs @@ -1,5 +1,6 @@ use crate::streaming::common::test_setup::TestSetup; use bytes::BytesMut; +use iggy::confirmation::Confirmation; use iggy::prelude::*; use server::configs::system::{PartitionConfig, SegmentConfig, SystemConfig}; use server::streaming::partitions::partition::Partition; @@ -16,11 +17,11 @@ use test_case::test_matrix; */ fn msg_size(size: u64) -> IggyByteSize { - IggyByteSize::from_str(&format!("{}b", size)).unwrap() + IggyByteSize::from_str(&format!("{}B", size)).unwrap() } fn segment_size(size: u64) -> IggyByteSize { - IggyByteSize::from_str(&format!("{}b", size)).unwrap() + IggyByteSize::from_str(&format!("{}B", size)).unwrap() } fn msgs_req_to_save(count: u32) -> u32 { @@ -35,23 +36,42 @@ fn index_cache_disabled() -> bool { false } +fn small_batches() -> Vec<u32> { + vec![3, 4, 5, 6, 7] +} + +fn medium_batches() -> Vec<u32> { + vec![10, 20, 30, 40] +} + +fn large_batches() -> Vec<u32> { + vec![100, 200, 300, 400] +} + +fn very_large_batches() -> Vec<u32> { + vec![500, 1000, 1500, 1000] +} + #[test_matrix( [msg_size(50), msg_size(1000), msg_size(20000)], - [msgs_req_to_save(3), msgs_req_to_save(10), msgs_req_to_save(1000)], - [segment_size(500), segment_size(2000), segment_size(100000)], + [small_batches(), medium_batches(), large_batches(), very_large_batches()], + [msgs_req_to_save(10), msgs_req_to_save(24), msgs_req_to_save(1000), msgs_req_to_save(4000)], + [segment_size(500), segment_size(2000), segment_size(100000), segment_size(10000000)], [index_cache_disabled(), index_cache_enabled()])] #[tokio::test] async fn test_get_messages_by_timestamp( message_size: IggyByteSize, + batch_sizes: Vec<u32>, messages_required_to_save: u32, segment_size: IggyByteSize, index_cache_enabled: bool, ) { println!( - "Running test with messages_required_to_save: {}, segment_size: {}, message_size: {}, cache_indexes: {}", + "Running test with message_size: {}, batches: {:?}, messages_required_to_save: {}, segment_size: {}, cache_indexes: {}", + message_size, + batch_sizes, messages_required_to_save, segment_size, - message_size, index_cache_enabled ); @@ -60,9 +80,7 @@ async fn test_get_messages_by_timestamp( let topic_id = 1; let partition_id = 1; - // Define batch sizes for 5 appends - let batch_sizes = [3, 4, 5, 6, 7]; - let total_messages: u32 = batch_sizes.iter().sum(); + let total_messages_count = batch_sizes.iter().sum(); let config = Arc::new(SystemConfig { path: setup.config.path.to_string(), @@ -78,6 +96,7 @@ async fn test_get_messages_by_timestamp( }, ..Default::default() }); + let mut partition = Partition::create( stream_id, topic_id, @@ -98,8 +117,10 @@ async fn test_get_messages_by_timestamp( setup.create_partitions_directory(stream_id, topic_id).await; partition.persist().await.unwrap(); - let mut all_messages = Vec::with_capacity(total_messages as usize); - for i in 1..=total_messages { + let mut all_messages = Vec::with_capacity(total_messages_count as usize); + + // Generate all messages as defined in the test matrix + for i in 1..=total_messages_count { let id = i as u128; let beginning_of_payload = format!("message {}", i); let mut payload = BytesMut::new(); @@ -120,6 +141,7 @@ async fn test_get_messages_by_timestamp( HeaderKey::new("key-3").unwrap(), HeaderValue::from_uint64(123456).unwrap(), ); + let message = IggyMessage::builder() .with_id(id) .with_payload(payload) @@ -128,65 +150,94 @@ async fn test_get_messages_by_timestamp( all_messages.push(message); } + // Timestamp tracking for messages let initial_timestamp = IggyTimestamp::now(); let mut batch_timestamps = Vec::with_capacity(batch_sizes.len()); let mut current_pos = 0; - // Append all batches with timestamps - for batch_len in batch_sizes { - let messages_slice_to_append = &all_messages[current_pos..current_pos + batch_len as usize]; + // Append all batches as defined in the test matrix with separate timestamps + for (batch_idx, &batch_len) in batch_sizes.iter().enumerate() { + // Add a small delay between batches to ensure distinct timestamps + sleep(std::time::Duration::from_millis(2)); + + // If we've generated too many messages, skip the rest + if current_pos + batch_len as usize > all_messages.len() { + break; + } + + println!( + "Appending batch {}/{} with {} messages", + batch_idx + 1, + batch_sizes.len(), + batch_len + ); + + let batch_end_pos = current_pos + batch_len as usize; + let messages_slice_to_append = &all_messages[current_pos..batch_end_pos]; let messages_size = messages_slice_to_append .iter() - .map(|m| m.get_size_bytes().as_bytes_u32()) + .map(|m| m.get_size_bytes().as_bytes_u64() as u32) .sum(); let batch = IggyMessagesBatchMut::from_messages(messages_slice_to_append, messages_size); assert_eq!(batch.count(), batch_len); - partition.append_messages(batch, None).await.unwrap(); + partition + .append_messages(batch, Some(Confirmation::Wait)) + .await + .unwrap(); + + // Capture the timestamp of this batch batch_timestamps.push(IggyTimestamp::now()); current_pos += batch_len as usize; - sleep(std::time::Duration::from_millis(10)); + + // Add a small delay between batches to ensure distinct timestamps + sleep(std::time::Duration::from_millis(2)); } let final_timestamp = IggyTimestamp::now(); + // Use the exact total messages count from the test matrix + let total_sent_messages = total_messages_count; + // Test 1: All messages from initial timestamp let all_loaded_messages = partition - .get_messages_by_timestamp(initial_timestamp, total_messages) + .get_messages_by_timestamp(initial_timestamp, total_sent_messages) .await .unwrap(); assert_eq!( all_loaded_messages.count(), - total_messages, + total_sent_messages, "Expected {} messages from initial timestamp, but got {}", - total_messages, + total_sent_messages, all_loaded_messages.count() ); // Test 2: Get messages from middle timestamp (after 3rd batch) - let middle_timestamp = batch_timestamps[2]; - // We expect to get all messages after the 3rd batch (this is where the test was failing) - let expected_messages = batch_sizes[3] + batch_sizes[4]; // Only these two batches remain after batch 2 - let remaining_count = total_messages - (batch_sizes[0] + batch_sizes[1] + batch_sizes[2]); - - // Use a timestamp that's 50us earlier than the recorded batch timestamp - // This ensures we catch the right batch boundary consistently - let adjusted_timestamp = (middle_timestamp.as_micros() - 50).into(); - - let middle_messages = partition - .get_messages_by_timestamp(adjusted_timestamp, remaining_count) - .await - .unwrap(); - assert_eq!( - middle_messages.count(), - expected_messages, - "Expected {} messages from middle timestamp, but got {}", - expected_messages, - middle_messages.count() - ); + if batch_timestamps.len() >= 3 { + // Use a timestamp that's just before the 3rd batch's timestamp to ensure we get messages + // from that batch onwards + let middle_timestamp = IggyTimestamp::from(batch_timestamps[2].as_micros() + 1000); + + // Calculate how many messages should be in batches after the 3rd + let prior_batches_sum: u32 = batch_sizes[..3].iter().sum(); + let remaining_messages = total_sent_messages - prior_batches_sum; + + let middle_messages = partition + .get_messages_by_timestamp(middle_timestamp, remaining_messages) + .await + .unwrap(); + + assert_eq!( + middle_messages.count(), + remaining_messages, + "Expected {} messages from middle timestamp, but got {}", + remaining_messages, + middle_messages.count() + ); + } - // Test 3: No messages from final timestamp + // Test 3: No messages after final timestamp let no_messages = partition .get_messages_by_timestamp(final_timestamp, 1) .await @@ -194,12 +245,12 @@ async fn test_get_messages_by_timestamp( assert_eq!( no_messages.count(), 0, - "Expected no messages from final timestamp, but got {}", + "Expected no messages after final timestamp, but got {}", no_messages.count() ); // Test 4: Small subset from initial timestamp - let subset_size = 3; + let subset_size = std::cmp::min(3, total_sent_messages); let subset_messages = partition .get_messages_by_timestamp(initial_timestamp, subset_size) .await @@ -212,77 +263,150 @@ async fn test_get_messages_by_timestamp( subset_messages.count() ); - println!("initial timestamp: {}", initial_timestamp.as_micros()); - - let mut i = 0; - for batch in subset_messages.iter() { - for message in batch.iter() { - let loaded_message = subset_messages.get(i).unwrap(); - let original_message = &all_messages[i]; - - let loaded_header = loaded_message.header(); - let original_header = &original_message.header; - - assert_eq!( - loaded_header.id(), - original_header.id, - "Message ID mismatch at position {}", - i - ); - assert_eq!( - loaded_message.payload(), - original_message.payload, - "Payload mismatch at position {}", - i - ); - let loaded_headers = msg.user_headers().map(|headers| headers.to_vec()); - let original_headers = original_message.user_headers.as_ref().unwrap().clone(); - assert_eq!( - HashMap::from_bytes(loaded_headers), - HashMap::from_bytes(original_headers), - "Headers mismatch at position {}", - i - ); - assert!( - loaded_message.header().timestamp() >= initial_timestamp.as_micros(), - "Message timestamp mismatch at position {}, timestamp {} is less than initial timestamp {}", - i, - loaded_message.header().timestamp(), - initial_timestamp - ); - - i += 1; - } - } - - // Test 5: Messages spanning multiple batches (from middle of 2nd batch timestamp) - let span_timestamp = batch_timestamps[1]; - let span_size = 8; - let spanning_messages = partition - .get_messages_by_timestamp(span_timestamp, span_size) - .await - .unwrap(); - assert_eq!( - spanning_messages.count(), - span_size, - "Expected {} messages spanning multiple batches, but got {}", - span_size, - spanning_messages.count() - ); + // Test 5: Messages spanning multiple batches by timestamp + if batch_timestamps.len() >= 4 { + // Use a timestamp that's just before the 2nd batch's timestamp + let span_timestamp = IggyTimestamp::from(batch_timestamps[1].as_micros() + 1000); + let span_size = 8; // Should span across multiple batches + + let spanning_messages = partition + .get_messages_by_timestamp(span_timestamp, span_size) + .await + .unwrap(); + + assert_eq!( + spanning_messages.count(), + span_size, + "Expected {} messages spanning multiple batches, but got {}", + span_size, + spanning_messages.count() + ); - // Make sure the timestamp we're using for comparison is accurate - let span_timestamp_micros = span_timestamp.as_micros(); - println!("Span timestamp in microseconds: {}", span_timestamp_micros); - - for batch in spanning_messages.iter() { - for msg in batch.iter() { - let msg_timestamp = msg.header().timestamp(); - assert!( - msg_timestamp >= span_timestamp_micros, - "Message timestamp {} should be >= span timestamp {}", - msg_timestamp, - span_timestamp_micros - ); + // Verify that all messages have timestamps >= our reference timestamp + let span_timestamp_micros = span_timestamp.as_micros(); + + // Test 6: Validate message content and ordering + for batch in spanning_messages.iter() { + for msg in batch.iter() { + let msg_timestamp = msg.header().timestamp(); + assert!( + msg_timestamp >= span_timestamp_micros, + "Message timestamp {} should be >= span timestamp {}", + msg_timestamp, + span_timestamp_micros + ); + + // Verify message content + let loaded_id = msg.header().id(); + let original_offset = msg.header().offset() as usize; + + if original_offset < all_messages.len() { + let original_message = &all_messages[original_offset]; + let original_id = original_message.header.id; + + assert_eq!( + loaded_id, + original_id, + "Message ID mismatch at offset {}", + msg.header().offset(), + ); + + let loaded_payload = msg.payload(); + let original_payload = &original_message.payload; + assert_eq!( + loaded_payload, + original_payload, + "Payload mismatch at offset {}", + msg.header().offset(), + ); + + let loaded_headers = msg.user_headers_map().unwrap().unwrap(); + let original_headers = HashMap::from_bytes( + original_message.user_headers.as_ref().unwrap().clone(), + ) + .unwrap(); + assert_eq!( + loaded_headers, + original_headers, + "Headers mismatch at offset {}", + msg.header().offset(), + ); + } + } } } + + // // Add sequential read test by timestamp + // println!( + // "Verifying sequential time-based reads, expecting {} messages", + // total_sent_messages + // ); + + // // For timestamp-based sequential reads, we'll use timestamps from batch_timestamps + // // array to retrieve messages in batches (which are part of returned batch_set) + // if !batch_timestamps.is_empty() { + // let chunk_size = 500; + // let mut verified_count = 0; + + // let mut current_timestamp = initial_timestamp; + + // while verified_count < total_sent_messages { + // let msgs_to_read = std::cmp::min(chunk_size, total_sent_messages - verified_count); + + // let batch_set = partition + // .get_messages_by_timestamp(current_timestamp, msgs_to_read) + // .await + // .unwrap(); + + // if batch_set.count() == 0 { + // println!( + // "No messages found for timestamp {}, expected {}", + // current_timestamp.as_micros(), + // msgs_to_read + // ); + // // We've read all messages, or there's a gap in the timestamps + // break; + // } + + // verified_count += batch_set.count(); + + // println!( + // "Read chunk from timestamp {} with size {}, verified count: {}, first offset {}, last offset {}", + // current_timestamp.as_micros(), + // batch_set.count(), + // verified_count, + // batch_set + // .iter() + // .next() + // .unwrap() + // .iter() + // .next() + // .unwrap() + // .header() + // .offset(), + // batch_set + // .iter() + // .last() + // .unwrap() + // .iter() + // .last() + // .unwrap() + // .header() + // .offset() + // ); + + // // Get the last message timestamp from the batch set and use it for the next query + // // Add a small offset to avoid getting the same message again + // if let Some(batch) = batch_set.iter().last() { + // if let Some(msg) = batch.iter().last() { + // current_timestamp = IggyTimestamp::from(msg.header().timestamp() + 1); + // } + // } + // } + + // assert_eq!( + // verified_count, total_sent_messages, + // "Sequential timestamp batch set reads didn't cover all messages" + // ); + // } } diff --git a/integration/tests/streaming/mod.rs b/integration/tests/streaming/mod.rs index 12b37189..1799e9c3 100644 --- a/integration/tests/streaming/mod.rs +++ b/integration/tests/streaming/mod.rs @@ -4,7 +4,7 @@ use iggy::prelude::IggyMessage; mod common; // mod consumer_offset; mod get_by_offset; -// mod get_by_timestamp; +mod get_by_timestamp; // mod messages; // mod partition; // mod segment; diff --git a/sdk/src/models/messaging/indexes.rs b/sdk/src/models/messaging/indexes.rs index 5a6cc179..420c1b78 100644 --- a/sdk/src/models/messaging/indexes.rs +++ b/sdk/src/models/messaging/indexes.rs @@ -28,20 +28,11 @@ impl IggyIndexes { } } - pub fn into_inner(self) -> Bytes { - self.buffer - } - /// Gets the number of indexes in the container pub fn count(&self) -> u32 { self.buffer.len() as u32 / INDEX_SIZE as u32 } - /// Checks if the container is empty - pub fn is_empty(&self) -> bool { - self.count() == 0 - } - /// Gets the size of all indexes messages pub fn messages_size(&self) -> u32 { self.last_position() - self.base_position @@ -63,30 +54,48 @@ impl IggyIndexes { } } + /// Gets a slice of the container pub fn slice_by_offset(&self, relative_start_offset: u32, count: u32) -> Option<IggyIndexes> { - let available_count = self.count().saturating_sub(relative_start_offset); + if self.count() == 0 || relative_start_offset >= self.count() { + return None; + } - let required_count = count; - let actual_count = std::cmp::min(required_count, available_count); + let available_count = self.count().saturating_sub(relative_start_offset); + let actual_count = std::cmp::min(count, available_count); - if actual_count == 0 || relative_start_offset >= self.count() { + if actual_count == 0 { return None; } let end_pos = relative_start_offset + actual_count; let start_byte = relative_start_offset as usize * INDEX_SIZE; let end_byte = end_pos as usize * INDEX_SIZE; + + if end_byte > self.buffer.len() { + return None; + } + let slice = self.buffer.slice(start_byte..end_byte); - let i = relative_start_offset.saturating_sub(1); - if i == 0 { - tracing::error!("IDX 0 base_position: 0"); - Some(IggyIndexes::new(slice, 0)) + let base_position = if relative_start_offset > 0 { + match self.get(relative_start_offset - 1) { + Some(index) => index.position(), + None => self.base_position, + } } else { - let base_position = self.get(relative_start_offset - 1).unwrap().position(); - tracing::error!("base_position: {base_position}"); - Some(IggyIndexes::new(slice, base_position)) + self.base_position + }; + + Some(IggyIndexes::new(slice, base_position)) + } + + /// Gets a first index + pub fn first(&self) -> Option<IggyIndexView> { + if self.count() == 0 { + return None; } + + Some(IggyIndexView::new(&self.buffer[0..INDEX_SIZE])) } /// Gets a last index @@ -149,136 +158,6 @@ impl IggyIndexes { result } - // /// Calculate boundary information for reading messages from disk based on cached indexes. - // /// - // /// This computes the exact file position, bytes to read, and expected message count - // /// based on the provided relative offsets using in-memory index data. - // /// - // /// Returns None if the requested offset is out of bounds. - // pub fn calculate_cached_read_boundary_by_offset( - // &self, - // relative_start_offset: u32, - // relative_end_offset: u32, - // ) -> Option<ReadBoundary> { - // if self.count() == 0 { - // trace!("No indexes available in memory"); - // return None; - // } - - // if relative_start_offset >= self.count() { - // trace!( - // "Start offset {} is out of bounds. Total cached indexes: {}", - // relative_start_offset, - // self.count(), - // ); - // return None; - // } - - // let effective_end_offset = relative_end_offset.min(self.count() - 1); - - // tracing::error!( - // "start offset: {}, end offset: {}", - // relative_start_offset, - // effective_end_offset - // ); - - // // With our new index interpretation: - // // - For messages after the first one, the start position comes from the PREVIOUS index - // // - For the first message (offset 0), the start position is implicitly 0 - // // - The end position always comes from the current index - - // let start_position = if relative_start_offset > 0 { - // // For non-first messages, get start position from previous index - // let prev_index = self.get(relative_start_offset - 1)?; - // prev_index.position() - // } else { - // // For the first message, start position is 0 - // 0 - // }; - - // // The end position comes from the last index we want to read - // let last_index = self.get(effective_end_offset)?; - // let end_position = last_index.position(); - - // let bytes = end_position - start_position; - // let messages_count = effective_end_offset - relative_start_offset + 1; - - // trace!( - // "Calculated read boundary from cached indexes: start_pos={}, bytes={}, count={}", - // start_position, - // bytes, - // messages_count - // ); - - // Some(ReadBoundary::new(start_position, bytes, messages_count)) - // } - - // /// Calculate boundary information for reading messages from disk based on a timestamp. - // /// - // /// This finds the index with timestamp closest to the requested timestamp and returns - // /// the boundary information to read the messages from that point forward. - // /// - // /// Returns None if the timestamp is not found or there are no indexes. - // pub fn calculate_cached_read_boundary_by_timestamp( - // &self, - // timestamp: u64, - // messages_count: u32, - // ) -> Option<ReadBoundary> { - // let start_index = self.find_by_timestamp(timestamp)?; - - // tracing::trace!("Found start_index: {}", start_index); - - // let start_position_in_array = start_index.offset() - self.first_offset(); - // let end_position_in_array = - // (start_position_in_array + messages_count - 1).min(self.count() - 1); - - // tracing::trace!( - // "Calculated end_position_in_array: {}", - // end_position_in_array - // ); - // let end_index = self.get(end_position_in_array)?; - - // tracing::trace!("Found end_index: {:?}", end_index); - - // let start_position = start_index.position(); - // // Check to prevent overflow in case end_index.position() < start_position - // let bytes = if end_index.position() >= start_position { - // end_index.position() - start_position - // } else { - // tracing::warn!( - // "End index position {} is less than start position {}. Using 0 bytes.", - // end_index.position(), - // start_position - // ); - // 0 - // }; - - // // Ensure we don't have underflow when calculating messages count - // let actual_messages_count = if end_position_in_array >= start_position_in_array { - // end_position_in_array - start_position_in_array + 1 - // } else { - // tracing::warn!( - // "End position {} is less than start position {}. Using 1 message.", - // end_position_in_array, - // start_position_in_array - // ); - // 1 - // }; - - // trace!( - // "Calculated read boundary by timestamp: start_pos={}, bytes={}, count={}", - // start_position, - // bytes, - // actual_messages_count - // ); - - // Some(ReadBoundary::new( - // start_position, - // bytes, - // actual_messages_count, - // )) - // } - pub fn base_position(&self) -> u32 { self.base_position } diff --git a/sdk/src/models/messaging/message.rs b/sdk/src/models/messaging/message.rs index 07c911d9..c18d9c6e 100644 --- a/sdk/src/models/messaging/message.rs +++ b/sdk/src/models/messaging/message.rs @@ -42,50 +42,6 @@ impl IggyMessage { IggyMessageBuilder::new() } - // /// Convert Bytes to messages - // pub(crate) fn from_raw_bytes(buffer: Bytes, count: u32) -> Result<Vec<IggyMessage>, IggyError> { - // let mut messages = Vec::with_capacity(count as usize); - // let mut position = 0; - // let buf_len = buffer.len(); - - // while position < buf_len { - // if position + IGGY_MESSAGE_HEADER_SIZE as usize > buf_len { - // break; - // } - // let header_bytes = buffer.slice(position..position + IGGY_MESSAGE_HEADER_SIZE as usize); - // let header = match IggyMessageHeader::from_bytes(header_bytes) { - // Ok(h) => h, - // Err(e) => { - // error!("Failed to parse message header: {}", e); - // return Err(e); - // } - // }; - // position += IGGY_MESSAGE_HEADER_SIZE as usize; - - // let payload_end = position + header.payload_length as usize; - // if payload_end > buf_len { - // break; - // } - // let payload = buffer.slice(position..payload_end); - // position = payload_end; - - // let headers: Option<Bytes> = if header.user_headers_length > 0 { - // Some(buffer.slice(position..position + header.user_headers_length as usize)) - // } else { - // None - // }; - // position += header.user_headers_length as usize; - - // messages.push(IggyMessage { - // header, - // payload, - // user_headers: headers, - // }); - // } - - // Ok(messages) - // } - /// Convert Bytes to messages pub(crate) fn from_raw_bytes(buffer: Bytes, count: u32) -> Result<Vec<IggyMessage>, IggyError> { let mut messages = Vec::with_capacity(count as usize); @@ -184,7 +140,7 @@ impl BytesSerializable for IggyMessage { bytes.put_slice(&message_header); bytes.put_slice(&self.payload); if let Some(headers) = &self.user_headers { - bytes.put_slice(&headers); + bytes.put_slice(headers); } bytes.freeze() } diff --git a/sdk/src/models/messaging/messages_batch.rs b/sdk/src/models/messaging/messages_batch.rs index 64b3f27f..e8c044a4 100644 --- a/sdk/src/models/messaging/messages_batch.rs +++ b/sdk/src/models/messaging/messages_batch.rs @@ -1,4 +1,4 @@ -use super::{IggyIndexes, IggyMessageHeaderView, IggyMessageView, IggyMessageViewIterator}; +use super::{IggyIndexes, IggyMessageView, IggyMessageViewIterator}; use crate::{ error::IggyError, models::messaging::INDEX_SIZE, @@ -76,7 +76,9 @@ impl IggyMessagesBatch { let indexes_count = self.indexes.len() / INDEX_SIZE; debug_assert!( self.count as usize == indexes_count || self.count as usize == indexes_count - 1, - "Mismatch between message count and indexes count" + "Mismatch between message count and indexes count ({}) != {}", + self.count, + indexes_count ); self.count @@ -97,13 +99,17 @@ impl IggyMessagesBatch { &self.messages } + pub fn indexes_slice(&self) -> &[u8] { + &self.indexes + } + /// Decompose the batch into its components pub fn decompose(self) -> (IggyIndexes, Bytes, u32) { (self.indexes, self.messages, self.count) } /// Get index of first message - pub fn first_index(&self) -> u64 { + pub fn first_offset(&self) -> u64 { self.iter() .next() .map(|msg| msg.header().offset()) @@ -134,11 +140,19 @@ impl IggyMessagesBatch { .unwrap_or(0) } + /// Helper method to read a base position (u32) from the byte array at the given index + fn base_position_at(&self, position_index: u32) -> u32 { + tracing::error!("base_position = {}", self.indexes.base_position()); + if let Some(index) = self.indexes.get(position_index) { + index.position() - self.indexes.base_position() + } else { + 0 + } + } + /// Helper method to read a position (u32) from the byte array at the given index - fn read_position_at(&self, position_index: u32) -> u32 { - let idx = position_index * INDEX_SIZE as u32; - println!("idx={idx}"); - if let Some(index) = self.indexes.get(idx) { + fn position_at(&self, position_index: u32) -> u32 { + if let Some(index) = self.indexes.get(position_index) { index.position() } else { 0 @@ -152,46 +166,55 @@ impl IggyMessagesBatch { return None; } - let indexes_count = self.indexes.len() / INDEX_SIZE; - let mut pos = 0; - - for i in 0..indexes_count { - let msg_start = if i == 0 { - 0 - } else { - self.read_position_at(i as u32 - 1) as usize - }; + // TODO(hubcio): this can be optimized via binary search + let first_message_index = self + .iter() + .position(|msg| msg.header().offset() >= start_offset); - let msg_offset = IggyMessageHeaderView::new(&self.messages[msg_start..]).offset(); + first_message_index?; - if msg_offset >= start_offset { - pos = i; - break; - } + let first_message_index = first_message_index.unwrap(); + let last_message_index = + std::cmp::min(first_message_index + count as usize, self.count() as usize); - if i == indexes_count - 1 { - return None; - } - } + let sub_indexes = self.indexes.slice_by_offset( + first_message_index as u32, + (last_message_index - first_message_index) as u32, + )?; - let end_idx = std::cmp::min(pos + count as usize, indexes_count); - - let first_byte = if pos == 0 { + let first_message_position = if first_message_index == 0 { 0 } else { - self.read_position_at(pos as u32 - 1) as usize + self.position_at(first_message_index as u32 - 1) as usize + - self.indexes.base_position() as usize }; - let last_byte = self.read_position_at(end_idx as u32 - 1) as usize; - let sub_buffer = self.messages.slice(first_byte..last_byte); + let last_message_position = if last_message_index >= self.count() as usize { + self.messages.len() + } else { + self.position_at(last_message_index as u32 - 1) as usize + - self.indexes.base_position() as usize + }; + + debug_assert!( + first_message_position <= self.messages.len(), + "First message position {} exceeds buffer length {}", + first_message_position, + self.messages.len() + ); + debug_assert!( + last_message_position <= self.messages.len(), + "Last message position {} exceeds buffer length {}", + last_message_position, + self.messages.len() + ); - let sub_indexes = self - .indexes - .slice_by_offset(pos as u32, end_idx as u32) - .unwrap(); + let sub_buffer = self + .messages + .slice(first_message_position..last_message_position); Some(IggyMessagesBatch { - count: (end_idx - pos) as u32, + count: (last_message_index - first_message_index) as u32, indexes: sub_indexes, messages: sub_buffer, }) @@ -206,45 +229,55 @@ impl IggyMessagesBatch { return None; } - let indexes_count = self.indexes.len() / 4; - let mut pos = None; + // TODO(hubcio): this can be optimized via binary search + let first_message_index = self + .iter() + .position(|msg| msg.header().timestamp() >= timestamp); - for i in 0..indexes_count { - let msg_start = if i == 0 { - 0 - } else { - self.read_position_at(i as u32 - 1) as usize - }; - - let msg_ts = IggyMessageHeaderView::new(&self.messages[msg_start..]).timestamp(); - - if msg_ts >= timestamp { - pos = Some(i); - break; - } - } + first_message_index?; - let pos = pos?; + let first_message_index = first_message_index.unwrap(); + let last_message_index = + std::cmp::min(first_message_index + count as usize, self.count() as usize); - let end_idx = std::cmp::min(pos + count as usize, indexes_count); + let sub_indexes = self.indexes.slice_by_offset( + first_message_index as u32, + (last_message_index - first_message_index) as u32, + )?; - let first_byte = if pos == 0 { + let first_message_position = if first_message_index == 0 { 0 } else { - self.read_position_at(pos as u32 - 1) as usize + self.position_at(first_message_index as u32 - 1) as usize + - self.indexes.base_position() as usize }; - let last_byte = self.read_position_at(end_idx as u32 - 1) as usize; + let last_message_position = if last_message_index >= self.count() as usize { + self.messages.len() + } else { + self.position_at(last_message_index as u32 - 1) as usize + - self.indexes.base_position() as usize + }; - let sub_buffer = self.messages.slice(first_byte..last_byte); + debug_assert!( + first_message_position <= self.messages.len(), + "First message position {} exceeds buffer length {}", + first_message_position, + self.messages.len() + ); + debug_assert!( + last_message_position <= self.messages.len(), + "Last message position {} exceeds buffer length {}", + last_message_position, + self.messages.len() + ); - let sub_indexes = self - .indexes - .slice_by_offset(pos as u32, end_idx as u32) - .unwrap(); + let sub_buffer = self + .messages + .slice(first_message_position..last_message_position); Some(IggyMessagesBatch { - count: (end_idx - pos) as u32, + count: (last_message_index - first_message_index) as u32, indexes: sub_indexes, messages: sub_buffer, }) @@ -260,13 +293,13 @@ impl IggyMessagesBatch { let start_position = if index == 0 { 0 } else { - self.read_position_at(index as u32 - 1) as usize + self.position_at(index as u32 - 1) as usize }; let end_position = if index == self.count as usize - 1 { self.messages.len() } else { - self.read_position_at(index as u32) as usize + self.position_at(index as u32) as usize }; Some(IggyMessageView::new( @@ -294,13 +327,13 @@ impl Index<usize> for IggyMessagesBatch { let start_position = if index == 0 { 0 } else { - self.read_position_at(index as u32 - 1) as usize + self.position_at(index as u32 - 1) as usize }; let end_position = if index == self.count as usize - 1 { self.messages.len() } else { - self.read_position_at(index as u32) as usize + self.position_at(index as u32) as usize }; &self.messages[start_position..end_position] diff --git a/server/src/streaming/partitions/messages.rs b/server/src/streaming/partitions/messages.rs index 48b9370c..4dc3ef4c 100644 --- a/server/src/streaming/partitions/messages.rs +++ b/server/src/streaming/partitions/messages.rs @@ -27,14 +27,12 @@ impl Partition { let query_ts = timestamp.as_micros(); - // Filter segments that may contain messages with timestamp >= query_ts let filtered_segments: Vec<&Segment> = self .segments .iter() .filter(|segment| segment.end_timestamp() >= query_ts) .collect(); - // Use a dedicated method to retrieve messages across segments by timestamp Self::get_messages_from_segments_by_timestamp(filtered_segments, query_ts, count).await } @@ -50,23 +48,22 @@ impl Partition { self.current_offset ); - if self.segments.is_empty() || start_offset > self.current_offset { + if self.segments.is_empty() || start_offset > self.current_offset || count == 0 { return Ok(IggyMessagesBatchSet::empty()); } - let end_offset = self.get_end_offset(start_offset, count); - - // TODO: Most likely don't need to find the specific range of segments, just find the first segment containing the first offset - // and during reads roll to the next one, when the first is exhausted. - let segments = self.filter_segments_by_offsets(start_offset, end_offset); - let out = match segments.len() { - 0 => panic!("TODO"), - 1 => Ok(segments[0] - .get_messages_by_offset(start_offset, count) - .await?), - _ => Ok(Self::get_messages_from_segments(segments, start_offset, count).await?), - }?; - Ok(out) + let start_segment_idx = match self + .segments + .iter() + .rposition(|segment| segment.start_offset() <= start_offset) + { + Some(idx) => idx, + None => return Ok(IggyMessagesBatchSet::empty()), + }; + + let relevant_segments: Vec<&Segment> = self.segments[start_segment_idx..].iter().collect(); + + Self::get_messages_from_segments(relevant_segments, start_offset, count).await } // Retrieves the first messages (up to a specified count). @@ -128,30 +125,6 @@ impl Partition { self.get_messages_by_offset(offset, count).await } - fn get_end_offset(&self, offset: u64, count: u32) -> u64 { - let mut end_offset = offset + (count - 1) as u64; - let segment = self.segments.last().unwrap(); - let max_offset = segment.end_offset(); - if end_offset > max_offset { - end_offset = max_offset; - } - - end_offset - } - - fn filter_segments_by_offsets(&self, start_offset: u64, end_offset: u64) -> Vec<&Segment> { - let slice_start = self - .segments - .iter() - .rposition(|segment| segment.start_offset() <= start_offset) - .unwrap_or(0); - - self.segments[slice_start..] - .iter() - .filter(|segment| segment.start_offset() <= end_offset) - .collect() - } - /// Retrieves messages from multiple segments. async fn get_messages_from_segments( segments: Vec<&Segment>, @@ -161,21 +134,22 @@ impl Partition { let mut remaining_count = count; let mut current_offset = offset; let mut batches = IggyMessagesBatchSet::empty(); + for segment in segments { if remaining_count == 0 { break; } let messages = segment - .get_messages_by_offset(current_offset, remaining_count) - .await - .with_error_context(|error| { - format!( - "{COMPONENT} (error: {error}) - failed to get messages from segment, segment: {}, \ - offset: {}, count: {}", - segment, current_offset, remaining_count - ) - })?; + .get_messages_by_offset(current_offset, remaining_count) + .await + .with_error_context(|error| { + format!( + "{COMPONENT} (error: {error}) - failed to get messages from segment, segment: {}, \ + offset: {}, count: {}", + segment, current_offset, remaining_count + ) + })?; let messages_count = messages.count(); if messages_count == 0 { @@ -184,37 +158,16 @@ impl Partition { remaining_count = remaining_count.saturating_sub(messages_count); - if messages_count > 0 { + if let Some(last_offset) = messages.last_offset() { + current_offset = last_offset + 1; + } else if messages_count > 0 { current_offset += messages_count as u64; } + batches.add_batch_set(messages); } Ok(batches) - // let mut results = Vec::new(); - // let mut remaining_count = count; - // for segment in segments { - // if remaining_count == 0 { - // break; - // } - // let slices = segment - // .get_messages_by_offset(offset, remaining_count) - // .await - // .with_error_context(|error| { - // format!( - // "{COMPONENT} (error: {error}) - failed to get messages from segment, segment: {}, \ - // offset: {}, count: {}", - // segment, offset, remaining_count - // ) - // })?; - // let messages_count = slices - // .iter() - // .map(|slice| slice.header.last_offset_delta) - // .sum(); - // remaining_count = remaining_count.saturating_sub(messages_count); - // results.extend(slices); - // } - // Ok(results) } /// Retrieves messages from multiple segments by timestamp. @@ -242,7 +195,6 @@ impl Partition { ) })?; - // Update remaining count for the next segment let messages_count = messages.count(); remaining_count = remaining_count.saturating_sub(messages_count); @@ -263,19 +215,18 @@ impl Partition { messages.size(), self.partition_id ); - { - let last_segment = self.segments.last_mut().ok_or(IggyError::SegmentNotFound)?; - if last_segment.is_closed() { - let start_offset = last_segment.end_offset() + 1; - trace!( + + let last_segment = self.segments.last_mut().ok_or(IggyError::SegmentNotFound)?; + if last_segment.is_closed() { + let start_offset = last_segment.end_offset() + 1; + trace!( "Current segment is closed, creating new segment with start offset: {} for partition with ID: {}...", start_offset, self.partition_id ); - self.add_persisted_segment(start_offset).await.with_error_context(|error| format!( + self.add_persisted_segment(start_offset).await.with_error_context(|error| format!( "{COMPONENT} (error: {error}) - failed to add persisted segment, partition: {}, start offset: {}", self, start_offset, ))? - } } let current_offset = if !self.should_increment_offset { @@ -317,8 +268,6 @@ impl Partition { } */ - println!("Current offset before appending batch: {}", current_offset); - let messages_count = { let last_segment = self.segments.last_mut().ok_or(IggyError::SegmentNotFound)?; last_segment @@ -338,8 +287,6 @@ impl Partition { current_offset + messages_count as u64 - 1 }; - println!("Last offset after appending batch: {}", last_offset); - if self.should_increment_offset { self.current_offset = last_offset; } else { @@ -348,20 +295,19 @@ impl Partition { } self.unsaved_messages_count += messages_count; + + let last_segment = self.segments.last_mut().ok_or(IggyError::SegmentNotFound)?; + if self.unsaved_messages_count >= self.config.partition.messages_required_to_save + || last_segment.is_full().await { - let last_segment = self.segments.last_mut().ok_or(IggyError::SegmentNotFound)?; - if self.unsaved_messages_count >= self.config.partition.messages_required_to_save - || last_segment.is_full().await - { - trace!( + trace!( "Segment with start offset: {} for partition with ID: {} will be persisted on disk...", last_segment.start_offset(), self.partition_id ); - last_segment.persist_messages(confirmation).await.unwrap(); - self.unsaved_messages_count = 0; - } + last_segment.persist_messages(confirmation).await.unwrap(); + self.unsaved_messages_count = 0; } Ok(()) diff --git a/server/src/streaming/segments/indexes/index_reader.rs b/server/src/streaming/segments/indexes/index_reader.rs index 955fd4bb..98b4fd52 100644 --- a/server/src/streaming/segments/indexes/index_reader.rs +++ b/server/src/streaming/segments/indexes/index_reader.rs @@ -302,11 +302,7 @@ impl IndexReader { } } - if low > 0 { - Ok(Some(low - 1)) - } else { - Ok(Some(0)) - } + Ok(Some(low)) } /// Returns the size of the index file in bytes. diff --git a/server/src/streaming/segments/indexes/index_writer.rs b/server/src/streaming/segments/indexes/index_writer.rs index d3094a9f..a4e0eb43 100644 --- a/server/src/streaming/segments/indexes/index_writer.rs +++ b/server/src/streaming/segments/indexes/index_writer.rs @@ -87,7 +87,7 @@ impl IndexWriter { self.index_size_bytes .fetch_add(indexes.len() as u64, Ordering::Release); - trace!("Saved {count} indexes to file: {}", self.file_path); + trace!("XXXX Saved {count} indexes to file: {}", self.file_path); Ok(()) } diff --git a/server/src/streaming/segments/indexes/indexes_mut.rs b/server/src/streaming/segments/indexes/indexes_mut.rs index ff0f3af1..535676d7 100644 --- a/server/src/streaming/segments/indexes/indexes_mut.rs +++ b/server/src/streaming/segments/indexes/indexes_mut.rs @@ -8,7 +8,7 @@ use std::ops::{Deref, Index as StdIndex}; #[derive(Default, Clone)] pub struct IggyIndexesMut { buffer: BytesMut, - unsaved_count: u32, + saved_count: u32, } impl IggyIndexesMut { @@ -16,7 +16,7 @@ impl IggyIndexesMut { pub fn empty() -> Self { Self { buffer: BytesMut::new(), - unsaved_count: 0, + saved_count: 0, } } @@ -24,7 +24,7 @@ impl IggyIndexesMut { pub fn from_bytes(indexes: BytesMut) -> Self { Self { buffer: indexes, - unsaved_count: 0, + saved_count: 0, } } @@ -32,18 +32,13 @@ impl IggyIndexesMut { pub fn with_capacity(capacity: usize) -> Self { Self { buffer: BytesMut::with_capacity(capacity * INDEX_SIZE), - unsaved_count: 0, + saved_count: 0, } } /// Makes the indexes immutable - pub fn make_immutable(self) -> IggyIndexes { - IggyIndexes::new(self.buffer.freeze(), self.unsaved_count) - } - - /// Sets the number of unsaved messages - pub fn set_unsaved_messages_count(&mut self, count: u32) { - self.unsaved_count = count; + pub fn make_immutable(self, base_position: u32) -> IggyIndexes { + IggyIndexes::new(self.buffer.freeze(), base_position) } /// Inserts a new index at the end of buffer @@ -53,10 +48,9 @@ impl IggyIndexesMut { self.buffer.put_u64_le(timestamp); } - /// Appends another IggyIndexesMut instance to this one. Other indexes buffer is consumed. - pub fn concatenate(&mut self, other: IggyIndexesMut) { - self.buffer.put_slice(&other.buffer); - self.unsaved_count += other.unsaved_count; + /// Appends another slice of indexes to this one. + pub fn concatenate(&mut self, other: &[u8]) { + self.buffer.put_slice(other); } /// Gets the number of indexes in the container @@ -168,27 +162,26 @@ impl IggyIndexesMut { result } - /// Gets a slice of the buffer containing the last N unsaved indexes. - pub fn get_unsaved_indexes(&self) -> &[u8] { - if self.count() == 0 || self.unsaved_count == 0 { - tracing::error!("No unsaved indexes"); - return &[]; - } - - let start_idx = (self.count() - self.unsaved_count) as usize * 16; + /// Clears the container, removing all indexes + pub fn clear(&mut self) { + self.saved_count = 0; + self.buffer.clear(); + } - &self.buffer[start_idx..] + /// Gets the number of unsaved indexes + pub fn unsaved_count(&self) -> u32 { + self.count().saturating_sub(self.saved_count) } - /// Mark all unsaved indexes as saved - pub fn mark_saved(&mut self) { - self.unsaved_count = 0; + /// Gets the unsaved part of the index buffer + pub fn unsaved_slice(&self) -> &[u8] { + let start_pos = self.saved_count as usize * INDEX_SIZE; + &self.buffer[start_pos..] } - /// Clears the container, removing all indexes - pub fn clear(&mut self) { - self.buffer.clear(); - self.unsaved_count = 0; + /// Mark all indexes as saved to disk + pub fn mark_saved(&mut self) { + self.saved_count = self.count(); } /// Slices the container to return a view of a specific range of indexes @@ -281,11 +274,7 @@ impl IggyIndexesMut { } } - if low > 0 { - Some(low - 1) - } else { - Some(0) - } + Some(low) } } @@ -317,7 +306,6 @@ impl fmt::Debug for IggyIndexesMut { writeln!(f, "IggyIndexesMut {{")?; writeln!(f, " count: {},", count)?; - writeln!(f, " unsaved_count: {},", self.unsaved_count)?; writeln!(f, " indexes: [")?; for i in 0..count { diff --git a/server/src/streaming/segments/messages/messages_writer.rs b/server/src/streaming/segments/messages/messages_writer.rs index c730fdc4..9d0cdd44 100644 --- a/server/src/streaming/segments/messages/messages_writer.rs +++ b/server/src/streaming/segments/messages/messages_writer.rs @@ -84,10 +84,17 @@ impl MessagesWriter { confirmation: Confirmation, ) -> Result<IggyByteSize, IggyError> { let messages_size = batch_set.size(); + let messages_count = batch_set.count(); + let containers_count = batch_set.containers_count(); trace!( - "Saving batch of size {messages_size} bytes to messages file: {}", + "Saving batch set of size {messages_size} bytes ({containers_count} containers, {messages_count} messages) to messages file: {}", self.file_path ); + + for container in batch_set.iter() { + tracing::error!("Container size: {}", container.size()); + } + match confirmation { Confirmation::Wait => { if let Some(ref mut file) = self.file { @@ -108,7 +115,7 @@ impl MessagesWriter { self.messages_size_bytes .fetch_add(messages_size as u64, Ordering::AcqRel); trace!( - "Written batch of size {messages_size} bytes to messages file: {}", + "Written batch of size {messages_size} bytes ({containers_count} containers, {messages_count} messages) to messages file: {}", self.file_path ); if self.fsync { diff --git a/server/src/streaming/segments/messages/mod.rs b/server/src/streaming/segments/messages/mod.rs index 9480de2a..0642fdbb 100644 --- a/server/src/streaming/segments/messages/mod.rs +++ b/server/src/streaming/segments/messages/mod.rs @@ -21,16 +21,21 @@ async fn write_batch( let mut slices: Vec<IoSlice> = batches.iter().map(|b| IoSlice::new(b)).collect(); let slices = &mut slices.as_mut_slice(); - let mut written = 0; + let mut total_written = 0; + while !slices.is_empty() { - written += file + let bytes_written = file .write_vectored(slices) .await .with_error_context(|error| { format!("Failed to write messages to file: {file_path}, error: {error}",) }) .map_err(|_| IggyError::CannotWriteToFile)?; - IoSlice::advance_slices(slices, written); + + total_written += bytes_written; + + IoSlice::advance_slices(slices, bytes_written); } - Ok(written) + + Ok(total_written) } diff --git a/server/src/streaming/segments/messages_accumulator.rs b/server/src/streaming/segments/messages_accumulator.rs index b0d7ae35..1d11a1a7 100644 --- a/server/src/streaming/segments/messages_accumulator.rs +++ b/server/src/streaming/segments/messages_accumulator.rs @@ -2,44 +2,54 @@ use super::{ indexes::IggyIndexesMut, types::{IggyMessagesBatchMut, IggyMessagesBatchSet}, }; -use iggy::utils::timestamp::IggyTimestamp; +use tracing::trace; -/// A container that accumulates messages before they are written to disk +/// A container that accumulates messages in memory before they are written to disk. +/// +/// The accumulator serves as a staging area for messages, allowing them to be +/// collected and prepared for persistence. It maintains metadata like offsets, +/// timestamps, and positions to ensure correct ordering and indexing. #[derive(Debug, Default)] pub struct MessagesAccumulator { - /// Base offset of the first message + /// Base offset of the first message in the accumulator base_offset: u64, - /// Base timestamp of the first message - base_timestamp: u64, - - /// Current offset + /// Current (latest) offset in the accumulator current_offset: u64, - /// Current position + /// Current byte position for the next message in the segment current_position: u32, - /// Current timestamp - current_timestamp: u64, - - /// A buffer containing all accumulated messages + /// Collection of all message batches in the accumulator batches: IggyMessagesBatchSet, - /// Number of messages in the accumulator + /// Total number of messages in the accumulator messages_count: u32, } impl MessagesAccumulator { - /// Coalesces a batch of messages into the accumulator - /// This method also prepares the messages for persistence by setting - /// their offset, timestamp, record size, and checksum fields - /// Returns the number of messages in the batch + /// Adds a batch of messages to the accumulator and prepares them for persistence. + /// + /// This method assigns offsets, timestamps, and positions to the messages + /// and updates the indexes accordingly. It ensures message continuity by + /// managing offsets to prevent gaps or overlaps. + /// + /// # Arguments + /// + /// * `start_offset` - The segment's starting offset + /// * `current_offset` - The suggested starting offset for this batch + /// * `current_position` - The current byte position in the segment + /// * `indexes` - The segment's index data to update + /// * `batch` - The batch of messages to add + /// + /// # Returns + /// + /// The number of messages successfully added to the accumulator pub fn coalesce_batch( &mut self, - start_offset: u64, - current_offset: u64, - current_position: u32, - indexes: &mut IggyIndexesMut, + segment_start_offset: u64, + segment_current_offset: u64, + segment_current_position: u32, batch: IggyMessagesBatchMut, ) -> u32 { let batch_messages_count = batch.count(); @@ -48,68 +58,69 @@ impl MessagesAccumulator { return 0; } - println!("Coalescing batch: base_offset={}, current_offset={}, messages_count={}, batch_count={}", - self.base_offset, current_offset, self.messages_count, batch_messages_count); + trace!( + "Coalescing batch base_offset={}, segment_current_offset={}, messages_count={}, batch_count={}", + self.base_offset, + segment_current_offset, + self.messages_count, + batch_messages_count + ); - if self.batches.is_empty() { - self.base_offset = current_offset; - self.base_timestamp = IggyTimestamp::now().as_micros(); - self.current_offset = current_offset; - self.current_timestamp = self.base_timestamp; - self.current_position = current_position; - } else { - // Important: If we already have messages, the current_offset should be the correct - // starting point for this batch. - // If current_offset is greater than our current highest offset + 1, use it as is. - // Otherwise, use our highest offset + 1 to ensure no overlap. - let next_expected_offset = self.current_offset + 1; - if current_offset > next_expected_offset { - self.current_offset = current_offset; - } else { - self.current_offset = next_expected_offset; - } - } + self.initialize_or_update_offsets(segment_current_offset, segment_current_position); - let batch = batch.prepare_for_persistence( - start_offset, + let prepared_batch = batch.prepare_for_persistence( + segment_start_offset, self.current_offset, self.current_position, - indexes, ); - let batch_size = batch.size(); - self.batches.add_batch(batch); + let batch_size = prepared_batch.size(); + + self.batches.add_batch(prepared_batch); self.messages_count += batch_messages_count; - // Update current_offset to be the last offset in this batch self.current_offset = self.base_offset + self.messages_count as u64 - 1; - - println!( - "After coalescing: base_offset={}, current_offset={}, messages_count={}", - self.base_offset, self.current_offset, self.messages_count - ); - - self.current_timestamp = self - .batches - .iter() - .last() - .unwrap() - .iter() - .last() - .unwrap() - .header() - .timestamp(); self.current_position += batch_size; batch_messages_count } - /// Gets messages from the accumulator based on start offset and count + /// Initialize accumulator state for the first batch or update offsets for subsequent batches + fn initialize_or_update_offsets(&mut self, current_offset: u64, current_position: u32) { + if self.batches.is_empty() { + self.base_offset = current_offset; + self.current_offset = current_offset; + self.current_position = current_position; + } else { + let next_expected_offset = self.current_offset + 1; + self.current_offset = current_offset.max(next_expected_offset); + } + } + + /// Retrieves messages from the accumulator based on start offset and count. + /// + /// # Arguments + /// + /// * `start_offset` - The starting offset to retrieve messages from + /// * `count` - Maximum number of messages to retrieve + /// + /// # Returns + /// + /// A batch set containing the requested messages pub fn get_messages_by_offset(&self, start_offset: u64, count: u32) -> IggyMessagesBatchSet { self.batches.get_by_offset(start_offset, count) } - /// Gets messages from the accumulator based on start timestamp and count + /// Retrieves messages from the accumulator based on start timestamp and count. + /// + /// # Arguments + /// + /// * `start_timestamp` - The earliest timestamp to retrieve messages from + /// * `count` - Maximum number of messages to retrieve + /// + /// # Returns + /// + /// A batch set containing the requested messages pub fn get_messages_by_timestamp( &self, start_timestamp: u64, @@ -118,38 +129,57 @@ impl MessagesAccumulator { self.batches.get_by_timestamp(start_timestamp, count) } - /// Checks if the accumulator is empty + /// Checks if the accumulator is empty (has no messages). pub fn is_empty(&self) -> bool { self.batches.is_empty() || self.messages_count == 0 } - /// Returns the number of unsaved messages + /// Returns the number of messages in the accumulator that have not been persisted. pub fn unsaved_messages_count(&self) -> usize { self.messages_count as usize } - /// Returns the maximum offset in the accumulator + /// Returns the highest offset in the accumulator. pub fn last_offset(&self) -> u64 { self.current_offset } - /// Returns the maximum timestamp in the accumulator + /// Returns the timestamp of the last message in the accumulator. pub fn last_timestamp(&self) -> u64 { - self.current_timestamp + self.batches.last_timestamp().unwrap_or(0) } - /// Returns the base offset of the accumulator - pub fn base_offset(&self) -> u64 { + /// Returns the starting offset of the first message in the accumulator. + pub fn first_offset(&self) -> u64 { self.base_offset } - /// Returns the base timestamp of the accumulator - pub fn base_timestamp(&self) -> u64 { - self.base_timestamp + /// Returns the timestamp of the first message in the accumulator. + pub fn first_timestamp(&self) -> u64 { + self.batches.first_timestamp().unwrap_or(0) + } + + /// Updates the segment's indexes with index information from all batches in this accumulator. + /// + /// This method transfers all the index information from the accumulator's batches to the + /// provided segment indexes object, ensuring that message positions, offsets, and timestamps + /// are properly recorded for later retrieval. + /// + /// # Arguments + /// + /// * `segment_indexes` - The segment's mutable indexes to update + pub fn update_indexes(&self, segment_indexes: &mut IggyIndexesMut) { + for batch in self.batches.iter() { + segment_indexes.concatenate(batch.indexes_slice()); + } + + tracing::error!("hubcio after update indexes: {}", segment_indexes.count()); } - /// Takes ownership of the accumulator and returns the messages and indexes - pub fn materialize(self) -> IggyMessagesBatchSet { + /// Consumes the accumulator and returns the contained message batches. + /// + /// This is typically called when it's time to persist the accumulated messages to disk. + pub fn into_batch_set(self) -> IggyMessagesBatchSet { self.batches } } diff --git a/server/src/streaming/segments/reading_messages.rs b/server/src/streaming/segments/reading_messages.rs index 2248c5be..fdf4deaa 100644 --- a/server/src/streaming/segments/reading_messages.rs +++ b/server/src/streaming/segments/reading_messages.rs @@ -15,10 +15,6 @@ impl Segment { } pub fn get_messages_count(&self) -> u32 { - if self.get_messages_size() == 0 { - return 0; - } - (self.end_offset - self.start_offset + 1) as u32 } @@ -43,9 +39,11 @@ impl Segment { .await; } - let accumulator_first_timestamp = self.accumulator.base_timestamp(); + let accumulator_first_timestamp = self.accumulator.first_timestamp(); let accumulator_last_timestamp = self.accumulator.last_timestamp(); + tracing::error!("hubcio accumulator first timestamp: {accumulator_first_timestamp}, last timestamp: {accumulator_last_timestamp}"); + // Case 1: Requested timestamp is higher than any available timestamp if timestamp > accumulator_last_timestamp { return Ok(IggyMessagesBatchSet::empty()); @@ -107,44 +105,89 @@ impl Segment { end_offset = self.end_offset; } + trace!( + "Getting messages by offset: {}, count: {}, segment start_offset: {}, segment end_offset: {}", + offset, + count, + self.start_offset, + self.end_offset + ); + // Case 0: Accumulator is empty, so all messages have to be on disk if self.accumulator.is_empty() { return self.load_messages_from_disk_by_offset(offset, count).await; } - let accumulator_first_msg_offset = self.accumulator.base_offset(); + let accumulator_first_msg_offset = self.accumulator.first_offset(); let accumulator_last_msg_offset = self.accumulator.last_offset(); // Case 1: All messages are in accumulator buffer if offset >= accumulator_first_msg_offset && end_offset <= accumulator_last_msg_offset { + tracing::error!( + "hubcio segment has {} messages, getting all from accumulator", + self.get_messages_count() + ); return Ok(self.accumulator.get_messages_by_offset(offset, count)); } // Case 2: All messages are on disk if end_offset < accumulator_first_msg_offset { + tracing::error!( + "hubcio segment has {} messages, getting all from disk", + self.get_messages_count() + ); return self.load_messages_from_disk_by_offset(offset, count).await; } // Case 3: Messages span disk and accumulator buffer boundary + // Calculate how many messages we need from disk + let disk_count = if offset < accumulator_first_msg_offset { + ((accumulator_first_msg_offset - offset) as u32).min(count) + } else { + 0 + }; - // Load messages from disk up to the accumulator buffer boundary - let mut messages = self - .load_messages_from_disk_by_offset(offset, (accumulator_first_msg_offset - offset) as u32) - .await.with_error_context(|error| format!( - "{COMPONENT} (error: {error}) - failed to load messages from disk, stream ID: {}, topic ID: {}, partition ID: {}, start offset: {offset}, end offset :{}", - self.stream_id, self.topic_id, self.partition_id, accumulator_first_msg_offset - 1 - ))?; - - // Load remaining messages from accumulator buffer - let buffer_start = std::cmp::max(offset, accumulator_first_msg_offset); - let buffer_count = (end_offset - buffer_start + 1) as u32; - let buffer_messages = self - .accumulator - .get_messages_by_offset(buffer_start, buffer_count); + let mut combined_batch_set = IggyMessagesBatchSet::empty(); - messages.add_batch_set(buffer_messages); + // Load messages from disk if needed + if disk_count > 0 { + let disk_messages = self + .load_messages_from_disk_by_offset(offset, disk_count) + .await + .with_error_context(|error| { + format!( + "STREAMING_SEGMENT (error: {error}) - failed to load messages from disk, stream ID: {}, topic ID: {}, partition ID: {}, start offset: {offset}, count: {disk_count}", + self.stream_id, self.topic_id, self.partition_id + ) + })?; + + if !disk_messages.is_empty() { + combined_batch_set.add_batch_set(disk_messages); + } + } + + // Calculate how many more messages we need from the accumulator + let remaining_count = count - combined_batch_set.count(); + + tracing::error!( + "hubcio segment has {} messages, remaining count: {}", + self.get_messages_count(), + remaining_count + ); + + if remaining_count > 0 { + let accumulator_start_offset = std::cmp::max(offset, accumulator_first_msg_offset); + + let accumulator_messages = self + .accumulator + .get_messages_by_offset(accumulator_start_offset, remaining_count); + + if !accumulator_messages.is_empty() { + combined_batch_set.add_batch_set(accumulator_messages); + } + } - Ok(messages) + Ok(combined_batch_set) } /// Loads and returns all message IDs from the log file. diff --git a/server/src/streaming/segments/types/message_view_mut.rs b/server/src/streaming/segments/types/message_view_mut.rs index 4a82ebd2..ca283dec 100644 --- a/server/src/streaming/segments/types/message_view_mut.rs +++ b/server/src/streaming/segments/types/message_view_mut.rs @@ -16,11 +16,6 @@ pub struct IggyMessageViewMut<'a> { impl<'a> IggyMessageViewMut<'a> { /// Create a new mutable message view from a buffer pub fn new(buffer: &'a mut [u8]) -> Self { - // let (payload_len, headers_len) = { - // let hdr_slice = &buffer[0..IGGY_MESSAGE_HEADER_SIZE as usize]; - // let hdr_view = IggyMessageHeaderView::new(hdr_slice); - // (hdr_view.payload_length(), hdr_view.headers_length()) - // }; Self { buffer, payload_offset: IGGY_MESSAGE_HEADER_SIZE as usize, @@ -89,7 +84,6 @@ impl LendingIterator for IggyMessageViewMutIterator<'_> { return None; } - // Make sure we have enough bytes for at least a header if self.position + IGGY_MESSAGE_HEADER_SIZE as usize > self.buffer.len() { tracing::error!( "Buffer too small for message header at position {}, buffer len: {}", @@ -103,7 +97,6 @@ impl LendingIterator for IggyMessageViewMutIterator<'_> { let buffer_slice = &mut self.buffer[self.position..]; let view = IggyMessageViewMut::new(buffer_slice); - // Safety check: Make sure we're advancing let message_size = view.size(); if message_size == 0 { tracing::error!( diff --git a/server/src/streaming/segments/types/messages_batch_mut.rs b/server/src/streaming/segments/types/messages_batch_mut.rs index de318d33..f8e5acea 100644 --- a/server/src/streaming/segments/types/messages_batch_mut.rs +++ b/server/src/streaming/segments/types/messages_batch_mut.rs @@ -3,16 +3,21 @@ use crate::streaming::segments::indexes::IggyIndexesMut; use bytes::{BufMut, BytesMut}; use iggy::models::messaging::{IggyMessagesBatch, INDEX_SIZE}; use iggy::prelude::*; +use iggy::utils::timestamp::IggyTimestamp; use lending_iterator::prelude::*; use std::ops::Deref; +use tracing::trace; -/// A container for mutable messages +/// A container for mutable messages that are being prepared for persistence. +/// +/// `IggyMessagesBatchMut` holds both the raw message data in a `BytesMut` buffer +/// and the corresponding index data that allows for efficient message lookup. #[derive(Debug, Default)] pub struct IggyMessagesBatchMut { - /// The byte-indexes of messages in the buffer, represented as array of u32's - /// Each index points to the END position of a message (start of next message) + /// The index data for all messages in the buffer indexes: IggyIndexesMut, - /// The buffer containing the messages + + /// The buffer containing the serialized message data messages: BytesMut, } @@ -23,20 +28,35 @@ impl Sizeable for IggyMessagesBatchMut { } impl IggyMessagesBatchMut { - /// Create an empty messages container with bytes capacity to prevent realloc() + /// Creates an empty messages container with the specified capacity to avoid reallocations. + /// + /// # Arguments + /// + /// * `bytes_capacity` - The expected total size of all messages in bytes pub fn with_capacity(bytes_capacity: usize) -> Self { + let index_capacity = bytes_capacity / INDEX_SIZE + 1; // Add 1 to avoid rounding down to 0 Self { - indexes: IggyIndexesMut::with_capacity(bytes_capacity / INDEX_SIZE), + indexes: IggyIndexesMut::with_capacity(index_capacity), messages: BytesMut::with_capacity(bytes_capacity), } } - /// Create a new messages container from a existing buffer of bytes + /// Creates a new messages container from existing index and message buffers. + /// + /// # Arguments + /// + /// * `indexes` - Preprocessed index data + /// * `messages` - Serialized message data pub fn from_indexes_and_messages(indexes: IggyIndexesMut, messages: BytesMut) -> Self { Self { indexes, messages } } - /// Create a new messages container from a slice of messages + /// Creates a new messages container from a slice of IggyMessage objects. + /// + /// # Arguments + /// + /// * `messages` - Slice of message objects to store + /// * `messages_size` - Total size of all messages in bytes pub fn from_messages(messages: &[IggyMessage], messages_size: u32) -> Self { let mut messages_buffer = BytesMut::with_capacity(messages_size as usize); let mut indexes_buffer = IggyIndexesMut::with_capacity(messages.len()); @@ -52,92 +72,109 @@ impl IggyMessagesBatchMut { Self::from_indexes_and_messages(indexes_buffer, messages_buffer) } - /// Create a lending iterator over mutable messages + /// Creates a lending iterator that yields mutable views of messages. pub fn iter_mut(&mut self) -> IggyMessageViewMutIterator { IggyMessageViewMutIterator::new(&mut self.messages) } - /// Create a lending iterator over immutable messages + /// Creates an iterator that yields immutable views of messages. pub fn iter(&self) -> IggyMessageViewIterator { IggyMessageViewIterator::new(&self.messages) } - /// Get the number of messages + /// Returns the number of messages in the batch. pub fn count(&self) -> u32 { - debug_assert_eq!(self.indexes.len() % INDEX_SIZE, 0); - self.indexes.len() as u32 / INDEX_SIZE as u32 + let index_count = self.indexes.len() as u32 / INDEX_SIZE as u32; + debug_assert_eq!( + self.indexes.len() % INDEX_SIZE, + 0, + "Index buffer length must be a multiple of INDEX_SIZE" + ); + index_count } - /// Get the total size of all messages in bytes + /// Returns the total size of all messages in bytes. pub fn size(&self) -> u32 { self.messages.len() as u32 } - /// Makes the messages batch immutable - pub fn make_immutable(self) -> IggyMessagesBatch { - let messages_count = self.count(); - let indexes = self.indexes.make_immutable(); - let messages = self.messages.freeze(); - IggyMessagesBatch::new(indexes, messages, messages_count) - } - - /// Prepares all messages in the batch for persistence by setting their offsets, timestamps, - /// and other necessary fields. Returns the prepared, immutable messages. + /// Prepares all messages in the batch for persistence by setting their offsets, + /// timestamps, and other necessary fields. + /// + /// # Arguments + /// + /// * `start_offset` - The starting offset of the segment + /// * `base_offset` - The base offset for this batch of messages + /// * `current_position` - The current position in the segment + /// * `segment_indexes` - The segment's index data, which will be updated + /// + /// # Returns + /// + /// An immutable `IggyMessagesBatch` ready for persistence pub fn prepare_for_persistence( self, start_offset: u64, base_offset: u64, current_position: u32, - segment_indexes: &mut IggyIndexesMut, ) -> IggyMessagesBatch { let messages_count = self.count(); + if messages_count == 0 { + return IggyMessagesBatch::empty(); + } + let timestamp = IggyTimestamp::now().as_micros(); - let mut curr_abs_offset = base_offset; - let mut current_position = current_position; let (mut indexes, mut messages) = self.decompose(); + + let mut curr_abs_offset = base_offset; + let mut curr_position = current_position; + let mut iter = IggyMessageViewMutIterator::new(&mut messages); let mut curr_rel_offset: u32 = 0; - // TODO: fix crash when idx cache is disabled while let Some(mut message) = iter.next() { message.header_mut().set_offset(curr_abs_offset); message.header_mut().set_timestamp(timestamp); message.update_checksum(); - current_position += message.size() as u32; + let message_size = message.size() as u32; + curr_position += message_size; - indexes.set_offset_at(curr_rel_offset, curr_abs_offset as u32); - indexes.set_position_at(curr_rel_offset, current_position); + let relative_offset = (curr_abs_offset - start_offset) as u32; + indexes.set_offset_at(curr_rel_offset, relative_offset); + indexes.set_position_at(curr_rel_offset, curr_position); indexes.set_timestamp_at(curr_rel_offset, timestamp); - println!( - "Message {} offset: {}, position: {}, timestamp: {}", - curr_rel_offset, curr_abs_offset, current_position, timestamp - ); - curr_abs_offset += 1; curr_rel_offset += 1; } - indexes.set_unsaved_messages_count(messages_count); - segment_indexes.concatenate(indexes); + IggyMessagesBatch::new( + indexes.make_immutable(current_position), + messages.freeze(), + messages_count, + ) + } - let relative_offset = base_offset - start_offset; - let indexes = segment_indexes - .slice_by_offset( - relative_offset as u32, - relative_offset as u32 + messages_count, - ) - .unwrap(); + /// Returns the first timestamp in the batch + pub fn first_timestamp(&self) -> u64 { + IggyMessageView::new(&self.messages).header().timestamp() + } - IggyMessagesBatch::new(indexes, messages.freeze(), messages_count) + /// Returns the last timestamp in the batch + pub fn last_timestamp(&self) -> u64 { + let last_message_offset = self.indexes.get(self.count() - 1).unwrap().offset(); + IggyMessageView::new(&self.messages[last_message_offset as usize..]) + .header() + .timestamp() } + /// Checks if the batch is empty. pub fn is_empty(&self) -> bool { self.count() == 0 } + /// Decomposes the batch into its constituent parts. pub fn decompose(self) -> (IggyIndexesMut, BytesMut) { (self.indexes, self.messages) } @@ -176,6 +213,11 @@ impl<'a> Iterator for IggyMessageViewIterator<'a> { let remaining = &self.buffer[self.position..]; if remaining.len() < IGGY_MESSAGE_HEADER_SIZE as usize { + trace!( + "Buffer too small for message header at position {}, buffer len: {}", + self.position, + self.buffer.len() + ); return None; } @@ -185,6 +227,12 @@ impl<'a> Iterator for IggyMessageViewIterator<'a> { + IGGY_MESSAGE_HEADER_SIZE as usize; if message_size > remaining.len() { + trace!( + "Message size {} exceeds remaining buffer size {} at position {}", + message_size, + remaining.len(), + self.position + ); return None; } diff --git a/server/src/streaming/segments/types/messages_batch_set.rs b/server/src/streaming/segments/types/messages_batch_set.rs index 1593da92..34b9840c 100644 --- a/server/src/streaming/segments/types/messages_batch_set.rs +++ b/server/src/streaming/segments/types/messages_batch_set.rs @@ -76,6 +76,16 @@ impl IggyMessagesBatchSet { self.batches.is_empty() || self.count == 0 } + /// Get first timestamp of first batch + pub fn first_timestamp(&self) -> Option<u64> { + self.batches.first().map(|batch| batch.first_timestamp()) + } + + /// Get first offset of first batch + pub fn first_offset(&self) -> Option<u64> { + self.batches.first().map(|batch| batch.first_offset()) + } + /// Get offset of last message of last batch pub fn last_offset(&self) -> Option<u64> { self.batches.last().map(|batch| batch.last_offset()) @@ -109,24 +119,43 @@ impl IggyMessagesBatchSet { if self.is_empty() || count == 0 { return Self::empty(); } + tracing::trace!( + "Getting {} messages from batch set, start offset {}, end offset calculated {}, end offset real {}, messages count {}...", + count, + start_offset, + start_offset + count as u64 - 1, + self.last_offset().unwrap_or(0), + self.count() + ); let mut result = Self::with_capacity(self.containers_count()); let mut remaining_count = count; for container in self.iter() { + tracing::error!( + "BATCH_SET container has {} messages, first offset {}, last offset {}", + container.count(), + container.first_offset(), + container.last_offset() + ); if remaining_count == 0 { break; } - let first_offset = container.first_index(); + let first_offset = container.first_offset(); if first_offset + container.count() as u64 <= start_offset { continue; } if let Some(sliced) = container.slice_by_offset(start_offset, remaining_count) { if sliced.count() > 0 { + tracing::error!( + "BATCH_SET will get {} messages from container", + sliced.count() + ); remaining_count -= sliced.count(); result.add_batch(sliced); + tracing::error!("BATCH_SET remaining count {}", remaining_count); } } } @@ -151,6 +180,17 @@ impl IggyMessagesBatchSet { break; } + let first_timestamp = container.first_timestamp(); + if first_timestamp < timestamp { + tracing::error!( + "BATCH_SET container has {} messages, first timestamp {}, requested timestamp {}", + container.count(), + first_timestamp, + timestamp + ); + continue; + } + if let Some(sliced) = container.slice_by_timestamp(timestamp, remaining_count) { if sliced.count() > 0 { remaining_count -= sliced.count(); diff --git a/server/src/streaming/segments/writing_messages.rs b/server/src/streaming/segments/writing_messages.rs index 3004681c..4d41b2ee 100644 --- a/server/src/streaming/segments/writing_messages.rs +++ b/server/src/streaming/segments/writing_messages.rs @@ -25,12 +25,11 @@ impl Segment { self.start_offset, current_offset, self.last_index_position, - &mut self.indexes, messages, ); if self.end_offset == 0 { - self.start_timestamp = messages_accumulator.base_timestamp(); + self.start_timestamp = messages_accumulator.first_timestamp(); } self.end_timestamp = messages_accumulator.last_timestamp(); self.end_offset = messages_accumulator.last_offset(); @@ -62,51 +61,14 @@ impl Segment { let accumulator = std::mem::take(&mut self.accumulator); - let batches = accumulator.materialize(); + accumulator.update_indexes(&mut self.indexes); + + let batches = accumulator.into_batch_set(); let confirmation = match confirmation { Some(val) => val, None => self.config.segment.server_confirmation, }; - let saved_bytes = self.save_message_batches(batches, confirmation).await?; - - self.save_indexes().await?; - - self.check_and_handle_segment_full().await?; - - let saved_messages_count = unsaved_messages_count; - - trace!( - "Saved {} messages on disk in segment with start offset: {} for partition with ID: {}, total bytes written: {}.", - saved_messages_count, - self.start_offset, - self.partition_id, - saved_bytes - ); - - Ok(saved_messages_count) - } - - fn update_counters(&mut self, messages_size: u64, messages_count: u64) { - self.size_of_parent_stream - .fetch_add(messages_size, Ordering::AcqRel); - self.size_of_parent_topic - .fetch_add(messages_size, Ordering::AcqRel); - self.size_of_parent_partition - .fetch_add(messages_size, Ordering::AcqRel); - self.messages_count_of_parent_stream - .fetch_add(messages_count, Ordering::SeqCst); - self.messages_count_of_parent_topic - .fetch_add(messages_count, Ordering::SeqCst); - self.messages_count_of_parent_partition - .fetch_add(messages_count, Ordering::SeqCst); - } - - async fn save_message_batches( - &mut self, - batches: IggyMessagesBatchSet, - confirmation: Confirmation, - ) -> Result<IggyByteSize, IggyError> { let batch_size = batches.size(); let batch_count = batches.count(); @@ -124,12 +86,7 @@ impl Segment { self.last_index_position += saved_bytes.as_bytes_u64() as u32; - Ok(saved_bytes) - } - - async fn save_indexes(&mut self) -> Result<(), IggyError> { - let unsaved_indexes_slice = self.indexes.get_unsaved_indexes(); - + let unsaved_indexes_slice = self.indexes.unsaved_slice(); self.index_writer .as_mut() .expect("Index writer not initialized") @@ -145,10 +102,38 @@ impl Segment { self.indexes.mark_saved(); if !self.config.segment.cache_indexes { + tracing::error!("Clearing indexes cache"); self.indexes.clear(); } - Ok(()) + self.check_and_handle_segment_full().await?; + + let saved_messages_count = unsaved_messages_count; + + trace!( + "Saved {} messages on disk in segment with start offset: {} for partition with ID: {}, total bytes written: {}.", + saved_messages_count, + self.start_offset, + self.partition_id, + saved_bytes + ); + + Ok(saved_messages_count) + } + + fn update_counters(&mut self, messages_size: u64, messages_count: u64) { + self.size_of_parent_stream + .fetch_add(messages_size, Ordering::AcqRel); + self.size_of_parent_topic + .fetch_add(messages_size, Ordering::AcqRel); + self.size_of_parent_partition + .fetch_add(messages_size, Ordering::AcqRel); + self.messages_count_of_parent_stream + .fetch_add(messages_count, Ordering::SeqCst); + self.messages_count_of_parent_topic + .fetch_add(messages_count, Ordering::SeqCst); + self.messages_count_of_parent_partition + .fetch_add(messages_count, Ordering::SeqCst); } async fn check_and_handle_segment_full(&mut self) -> Result<(), IggyError> { diff --git a/server/src/streaming/systems/messages.rs b/server/src/streaming/systems/messages.rs index 4ebc0d90..0b452dc7 100644 --- a/server/src/streaming/systems/messages.rs +++ b/server/src/streaming/systems/messages.rs @@ -168,7 +168,7 @@ impl System { } } } - let indexes = indexes.make_immutable(); + let indexes = indexes.make_immutable(0); let decrypted_messages = decrypted_messages.freeze(); let decrypted_batch = IggyMessagesBatch::new(indexes, decrypted_messages, count); decrypted_batches.push(decrypted_batch);
