This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch fix_remaining_tests in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 1c025a22fcc20825b92f8b92f9348b7817530877 Author: numminex <[email protected]> AuthorDate: Mon Oct 27 12:49:50 2025 +0100 feat(io_uring): fix remaining tests --- .../consumer_offsets/get_consumer_offset.rs | 34 ++- .../consumer_offsets/store_consumer_offset.rs | 38 ++-- core/common/src/commands/messages/poll_messages.rs | 43 ++-- core/common/src/types/args/mod.rs | 6 +- core/integration/tests/sdk/producer/mod.rs | 2 +- .../server/scenarios/delete_segments_scenario.rs | 14 +- .../server/scenarios/message_size_scenario.rs | 14 +- .../handlers/segments/delete_segments_handler.rs | 84 ++++++-- core/server/src/shard/mod.rs | 21 +- core/server/src/shard/system/segments.rs | 26 +-- core/server/src/shard/system/streams.rs | 231 +-------------------- core/server/src/shard/transmission/event.rs | 6 - core/server/src/shard/transmission/frame.rs | 1 + core/server/src/shard/transmission/message.rs | 3 + 14 files changed, 169 insertions(+), 354 deletions(-) diff --git a/core/common/src/commands/consumer_offsets/get_consumer_offset.rs b/core/common/src/commands/consumer_offsets/get_consumer_offset.rs index a98ac6c4d..8382025d1 100644 --- a/core/common/src/commands/consumer_offsets/get_consumer_offset.rs +++ b/core/common/src/commands/consumer_offsets/get_consumer_offset.rs @@ -82,21 +82,24 @@ impl BytesSerializable for GetConsumerOffset { let stream_id_bytes = self.stream_id.to_bytes(); let topic_id_bytes = self.topic_id.to_bytes(); let mut bytes = BytesMut::with_capacity( - 4 + consumer_bytes.len() + stream_id_bytes.len() + topic_id_bytes.len(), + 5 + consumer_bytes.len() + stream_id_bytes.len() + topic_id_bytes.len(), ); bytes.put_slice(&consumer_bytes); bytes.put_slice(&stream_id_bytes); bytes.put_slice(&topic_id_bytes); + // Encode partition_id with a flag byte: 1 = Some, 0 = None if let Some(partition_id) = self.partition_id { + bytes.put_u8(1); bytes.put_u32_le(partition_id); } else { - bytes.put_u32_le(0); + bytes.put_u8(0); + bytes.put_u32_le(0); // Padding to keep structure consistent } bytes.freeze() } fn from_bytes(bytes: Bytes) -> Result<GetConsumerOffset, IggyError> { - if bytes.len() < 15 { + if bytes.len() < 16 { return Err(IggyError::InvalidCommand); } @@ -112,15 +115,17 @@ impl BytesSerializable for GetConsumerOffset { position += stream_id.get_size_bytes().as_bytes_usize(); let topic_id = Identifier::from_bytes(bytes.slice(position..))?; position += topic_id.get_size_bytes().as_bytes_usize(); - let partition_id = u32::from_le_bytes( - bytes[position..position + 4] + // Decode partition_id with flag byte: 1 = Some, 0 = None + let has_partition_id = bytes[position]; + let partition_id_value = u32::from_le_bytes( + bytes[position + 1..position + 5] .try_into() .map_err(|_| IggyError::InvalidNumberEncoding)?, ); - let partition_id = if partition_id == 0 { - None + let partition_id = if has_partition_id == 1 { + Some(partition_id_value) } else { - Some(partition_id) + None }; let command = GetConsumerOffset { consumer, @@ -171,13 +176,19 @@ mod tests { position += stream_id.get_size_bytes().as_bytes_usize(); let topic_id = Identifier::from_bytes(bytes.slice(position..)).unwrap(); position += topic_id.get_size_bytes().as_bytes_usize(); - let partition_id = u32::from_le_bytes(bytes[position..position + 4].try_into().unwrap()); + let has_partition_id = bytes[position]; + let partition_id = u32::from_le_bytes(bytes[position + 1..position + 5].try_into().unwrap()); + let partition_id = if has_partition_id == 1 { + Some(partition_id) + } else { + None + }; assert!(!bytes.is_empty()); assert_eq!(consumer, command.consumer); assert_eq!(stream_id, command.stream_id); assert_eq!(topic_id, command.topic_id); - assert_eq!(Some(partition_id), command.partition_id); + assert_eq!(partition_id, command.partition_id); } #[test] @@ -191,11 +202,12 @@ mod tests { let stream_id_bytes = stream_id.to_bytes(); let topic_id_bytes = topic_id.to_bytes(); let mut bytes = BytesMut::with_capacity( - 4 + consumer_bytes.len() + stream_id_bytes.len() + topic_id_bytes.len(), + 5 + consumer_bytes.len() + stream_id_bytes.len() + topic_id_bytes.len(), ); bytes.put_slice(&consumer_bytes); bytes.put_slice(&stream_id_bytes); bytes.put_slice(&topic_id_bytes); + bytes.put_u8(1); // Flag: partition_id is Some bytes.put_u32_le(partition_id); let command = GetConsumerOffset::from_bytes(bytes.freeze()); diff --git a/core/common/src/commands/consumer_offsets/store_consumer_offset.rs b/core/common/src/commands/consumer_offsets/store_consumer_offset.rs index 9500138d1..9f07f3ccf 100644 --- a/core/common/src/commands/consumer_offsets/store_consumer_offset.rs +++ b/core/common/src/commands/consumer_offsets/store_consumer_offset.rs @@ -81,22 +81,25 @@ impl BytesSerializable for StoreConsumerOffset { let stream_id_bytes = self.stream_id.to_bytes(); let topic_id_bytes = self.topic_id.to_bytes(); let mut bytes = BytesMut::with_capacity( - 12 + consumer_bytes.len() + stream_id_bytes.len() + topic_id_bytes.len(), + 13 + consumer_bytes.len() + stream_id_bytes.len() + topic_id_bytes.len(), ); bytes.put_slice(&consumer_bytes); bytes.put_slice(&stream_id_bytes); bytes.put_slice(&topic_id_bytes); + // Encode partition_id with a flag byte: 1 = Some, 0 = None if let Some(partition_id) = self.partition_id { + bytes.put_u8(1); bytes.put_u32_le(partition_id); } else { - bytes.put_u32_le(0); + bytes.put_u8(0); + bytes.put_u32_le(0); // Padding to keep structure consistent } bytes.put_u64_le(self.offset); bytes.freeze() } fn from_bytes(bytes: Bytes) -> Result<StoreConsumerOffset, IggyError> { - if bytes.len() < 23 { + if bytes.len() < 24 { return Err(IggyError::InvalidCommand); } @@ -112,18 +115,20 @@ impl BytesSerializable for StoreConsumerOffset { position += stream_id.get_size_bytes().as_bytes_usize(); let topic_id = Identifier::from_bytes(bytes.slice(position..))?; position += topic_id.get_size_bytes().as_bytes_usize(); - let partition_id = u32::from_le_bytes( - bytes[position..position + 4] + // Decode partition_id with flag byte: 1 = Some, 0 = None + let has_partition_id = bytes[position]; + let partition_id_value = u32::from_le_bytes( + bytes[position + 1..position + 5] .try_into() .map_err(|_| IggyError::InvalidNumberEncoding)?, ); - let partition_id = if partition_id == 0 { - None + let partition_id = if has_partition_id == 1 { + Some(partition_id_value) } else { - Some(partition_id) + None }; let offset = u64::from_le_bytes( - bytes[position + 4..position + 12] + bytes[position + 5..position + 13] .try_into() .map_err(|_| IggyError::InvalidNumberEncoding)?, ); @@ -179,14 +184,20 @@ mod tests { position += stream_id.get_size_bytes().as_bytes_usize(); let topic_id = Identifier::from_bytes(bytes.slice(position..)).unwrap(); position += topic_id.get_size_bytes().as_bytes_usize(); - let partition_id = u32::from_le_bytes(bytes[position..position + 4].try_into().unwrap()); - let offset = u64::from_le_bytes(bytes[position + 4..position + 12].try_into().unwrap()); + let has_partition_id = bytes[position]; + let partition_id = u32::from_le_bytes(bytes[position + 1..position + 5].try_into().unwrap()); + let partition_id = if has_partition_id == 1 { + Some(partition_id) + } else { + None + }; + let offset = u64::from_le_bytes(bytes[position + 5..position + 13].try_into().unwrap()); assert!(!bytes.is_empty()); assert_eq!(consumer, command.consumer); assert_eq!(stream_id, command.stream_id); assert_eq!(topic_id, command.topic_id); - assert_eq!(Some(partition_id), command.partition_id); + assert_eq!(partition_id, command.partition_id); assert_eq!(offset, command.offset); } @@ -202,11 +213,12 @@ mod tests { let stream_id_bytes = stream_id.to_bytes(); let topic_id_bytes = topic_id.to_bytes(); let mut bytes = BytesMut::with_capacity( - 12 + consumer_bytes.len() + stream_id_bytes.len() + topic_id_bytes.len(), + 13 + consumer_bytes.len() + stream_id_bytes.len() + topic_id_bytes.len(), ); bytes.put_slice(&consumer_bytes); bytes.put_slice(&stream_id_bytes); bytes.put_slice(&topic_id_bytes); + bytes.put_u8(1); // Flag: partition_id is Some bytes.put_u32_le(partition_id); bytes.put_u64_le(offset); diff --git a/core/common/src/commands/messages/poll_messages.rs b/core/common/src/commands/messages/poll_messages.rs index e51b03ef6..dbbca1bb1 100644 --- a/core/common/src/commands/messages/poll_messages.rs +++ b/core/common/src/commands/messages/poll_messages.rs @@ -76,7 +76,7 @@ impl PollMessages { let topic_id_bytes = topic_id.to_bytes(); let strategy_bytes = strategy.to_bytes(); let mut bytes = BytesMut::with_capacity( - 9 + consumer_bytes.len() + 10 + consumer_bytes.len() + stream_id_bytes.len() + topic_id_bytes.len() + strategy_bytes.len(), @@ -84,10 +84,13 @@ impl PollMessages { bytes.put_slice(&consumer_bytes); bytes.put_slice(&stream_id_bytes); bytes.put_slice(&topic_id_bytes); + // Encode partition_id with a flag byte: 1 = Some, 0 = None if let Some(partition_id) = partition_id { + bytes.put_u8(1); bytes.put_u32_le(partition_id); } else { - bytes.put_u32_le(0); + bytes.put_u8(0); + bytes.put_u32_le(0); // Padding to keep structure consistent } bytes.put_slice(&strategy_bytes); bytes.put_u32_le(count); @@ -149,7 +152,7 @@ impl BytesSerializable for PollMessages { } fn from_bytes(bytes: Bytes) -> Result<Self, IggyError> { - if bytes.len() < 29 { + if bytes.len() < 30 { return Err(IggyError::InvalidCommand); } @@ -165,17 +168,20 @@ impl BytesSerializable for PollMessages { position += stream_id.get_size_bytes().as_bytes_usize(); let topic_id = Identifier::from_bytes(bytes.slice(position..))?; position += topic_id.get_size_bytes().as_bytes_usize(); - let partition_id = u32::from_le_bytes( - bytes[position..position + 4] + // Decode partition_id with flag byte: 1 = Some, 0 = None + let has_partition_id = bytes[position]; + let partition_id_value = u32::from_le_bytes( + bytes[position + 1..position + 5] .try_into() .map_err(|_| IggyError::InvalidNumberEncoding)?, ); - let partition_id = match partition_id { - 0 => None, - partition_id => Some(partition_id), + let partition_id = if has_partition_id == 1 { + Some(partition_id_value) + } else { + None }; - let polling_kind = PollingKind::from_code(bytes[position + 4])?; - position += 5; + let polling_kind = PollingKind::from_code(bytes[position + 5])?; + position += 6; let value = u64::from_le_bytes( bytes[position..position + 8] .try_into() @@ -254,9 +260,15 @@ mod tests { position += stream_id.get_size_bytes().as_bytes_usize(); let topic_id = Identifier::from_bytes(bytes.slice(position..)).unwrap(); position += topic_id.get_size_bytes().as_bytes_usize(); - let partition_id = u32::from_le_bytes(bytes[position..position + 4].try_into().unwrap()); - let polling_kind = PollingKind::from_code(bytes[position + 4]).unwrap(); - position += 5; + let has_partition_id = bytes[position]; + let partition_id = u32::from_le_bytes(bytes[position + 1..position + 5].try_into().unwrap()); + let partition_id = if has_partition_id == 1 { + Some(partition_id) + } else { + None + }; + let polling_kind = PollingKind::from_code(bytes[position + 5]).unwrap(); + position += 6; let value = u64::from_le_bytes(bytes[position..position + 8].try_into().unwrap()); let strategy = PollingStrategy { kind: polling_kind, @@ -270,7 +282,7 @@ mod tests { assert_eq!(consumer, command.consumer); assert_eq!(stream_id, command.stream_id); assert_eq!(topic_id, command.topic_id); - assert_eq!(Some(partition_id), command.partition_id); + assert_eq!(partition_id, command.partition_id); assert_eq!(strategy, command.strategy); assert_eq!(count, command.count); assert_eq!(auto_commit, command.auto_commit); @@ -291,7 +303,7 @@ mod tests { let topic_id_bytes = topic_id.to_bytes(); let strategy_bytes = strategy.to_bytes(); let mut bytes = BytesMut::with_capacity( - 9 + consumer_bytes.len() + 10 + consumer_bytes.len() + stream_id_bytes.len() + topic_id_bytes.len() + strategy_bytes.len(), @@ -299,6 +311,7 @@ mod tests { bytes.put_slice(&consumer_bytes); bytes.put_slice(&stream_id_bytes); bytes.put_slice(&topic_id_bytes); + bytes.put_u8(1); // Flag: partition_id is Some bytes.put_u32_le(partition_id); bytes.put_slice(&strategy_bytes); bytes.put_u32_le(count); diff --git a/core/common/src/types/args/mod.rs b/core/common/src/types/args/mod.rs index 11f93194e..cc22f9439 100644 --- a/core/common/src/types/args/mod.rs +++ b/core/common/src/types/args/mod.rs @@ -408,11 +408,11 @@ impl Default for Args { quic_max_concurrent_bidi_streams: 10000, quic_datagram_send_buffer_size: 100000, quic_initial_mtu: 1200, - quic_send_window: 100000, - quic_receive_window: 100000, + quic_send_window: 1000000, + quic_receive_window: 1000000, quic_response_buffer_size: 1048576, quic_keep_alive_interval: 5000, - quic_max_idle_timeout: 10000, + quic_max_idle_timeout: 100000, quic_validate_certificate: false, quic_heartbeat_interval: "5s".to_string(), websocket_server_address: "127.0.0.1:8092".to_string(), diff --git a/core/integration/tests/sdk/producer/mod.rs b/core/integration/tests/sdk/producer/mod.rs index 455fec872..74e194198 100644 --- a/core/integration/tests/sdk/producer/mod.rs +++ b/core/integration/tests/sdk/producer/mod.rs @@ -22,7 +22,7 @@ use bytes::Bytes; use iggy::clients::client::IggyClient; use iggy::prelude::*; -const PARTITION_ID: u32 = 1; +const PARTITION_ID: u32 = 0; const STREAM_NAME: &str = "test-stream-producer"; const TOPIC_NAME: &str = "test-topic-producer"; const PARTITIONS_COUNT: u32 = 3; diff --git a/core/integration/tests/server/scenarios/delete_segments_scenario.rs b/core/integration/tests/server/scenarios/delete_segments_scenario.rs index a35c5bf1e..a5959e5b5 100644 --- a/core/integration/tests/server/scenarios/delete_segments_scenario.rs +++ b/core/integration/tests/server/scenarios/delete_segments_scenario.rs @@ -20,11 +20,9 @@ use iggy::prelude::*; use integration::test_server::{ClientFactory, TestServer}; use std::fs::{DirEntry, read_dir}; -const STREAM_ID: u32 = 1; const STREAM_NAME: &str = "test_stream"; -const TOPIC_ID: u32 = 1; const TOPIC_NAME: &str = "test_topic"; -const PARTITION_ID: u32 = 1; +const PARTITION_ID: u32 = 0; const LOG_EXTENSION: &str = "log"; pub async fn run(client_factory: &dyn ClientFactory, test_server: &TestServer) { @@ -36,13 +34,14 @@ pub async fn run(client_factory: &dyn ClientFactory, test_server: &TestServer) { .await .unwrap(); - client.create_stream(STREAM_NAME).await.unwrap(); + let stream = client.create_stream(STREAM_NAME).await.unwrap(); + let stream_id = stream.id; - client + let topic = client .create_topic( &Identifier::named(STREAM_NAME).unwrap(), TOPIC_NAME, - PARTITION_ID, + 1, CompressionAlgorithm::None, None, IggyExpiry::NeverExpire, @@ -50,6 +49,7 @@ pub async fn run(client_factory: &dyn ClientFactory, test_server: &TestServer) { ) .await .unwrap(); + let topic_id = topic.id; // Send 5 large messages to create multiple segments let large_payload = "A".repeat(1024 * 1024); @@ -79,7 +79,7 @@ pub async fn run(client_factory: &dyn ClientFactory, test_server: &TestServer) { // Check initial segment count on filesystem let data_path = test_server.get_local_data_path(); let partition_path = - format!("{data_path}/streams/{STREAM_ID}/topics/{TOPIC_ID}/partitions/{PARTITION_ID}"); + format!("{data_path}/streams/{stream_id}/topics/{topic_id}/partitions/{PARTITION_ID}"); let initial_segments = get_segment_paths_for_partition(&partition_path); println!( diff --git a/core/integration/tests/server/scenarios/message_size_scenario.rs b/core/integration/tests/server/scenarios/message_size_scenario.rs index a6430f59a..1eb3bf94d 100644 --- a/core/integration/tests/server/scenarios/message_size_scenario.rs +++ b/core/integration/tests/server/scenarios/message_size_scenario.rs @@ -22,8 +22,6 @@ use integration::test_server::{ClientFactory, assert_clean_system, login_root}; use std::collections::HashMap; use std::str::FromStr; -const STREAM_ID: u32 = 1; -const TOPIC_ID: u32 = 1; const STREAM_NAME: &str = "test-stream"; const TOPIC_NAME: &str = "test-topic"; const PARTITIONS_COUNT: u32 = 3; @@ -93,8 +91,8 @@ async fn assert_message_count(client: &IggyClient, expected_count: u32) { // 4. Poll messages and validate the count let polled_messages = client .poll_messages( - &STREAM_ID.try_into().unwrap(), - &TOPIC_ID.try_into().unwrap(), + &STREAM_NAME.try_into().unwrap(), + &TOPIC_NAME.try_into().unwrap(), Some(PARTITION_ID), &Consumer::default(), &PollingStrategy::offset(0), @@ -114,7 +112,7 @@ async fn init_system(client: &IggyClient) { // 2. Create the topic client .create_topic( - &STREAM_ID.try_into().unwrap(), + &STREAM_NAME.try_into().unwrap(), TOPIC_NAME, PARTITIONS_COUNT, Default::default(), @@ -128,7 +126,7 @@ async fn init_system(client: &IggyClient) { async fn cleanup_system(client: &IggyClient) { client - .delete_stream(&STREAM_ID.try_into().unwrap()) + .delete_stream(&STREAM_NAME.try_into().unwrap()) .await .unwrap(); } @@ -179,8 +177,8 @@ async fn send_message_and_check_result( let send_result = client .send_messages( - &STREAM_ID.try_into().unwrap(), - &TOPIC_ID.try_into().unwrap(), + &STREAM_NAME.try_into().unwrap(), + &TOPIC_NAME.try_into().unwrap(), &Partitioning::partition_id(PARTITION_ID), &mut messages, ) diff --git a/core/server/src/binary/handlers/segments/delete_segments_handler.rs b/core/server/src/binary/handlers/segments/delete_segments_handler.rs index 42d18859b..129f14177 100644 --- a/core/server/src/binary/handlers/segments/delete_segments_handler.rs +++ b/core/server/src/binary/handlers/segments/delete_segments_handler.rs @@ -20,9 +20,12 @@ use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHa use crate::binary::handlers::utils::receive_and_validate; use crate::binary::{handlers::partitions::COMPONENT, sender::SenderKind}; +use crate::shard::namespace::IggyNamespace; +use crate::shard::transmission::frame::ShardResponse; +use crate::shard::transmission::message::{ShardMessage, ShardRequest, ShardRequestPayload, ShardSendRequestResult}; use crate::shard::IggyShard; -use crate::shard::transmission::event::ShardEvent; use crate::state::command::EntryCommand; +use crate::streaming; use crate::streaming::session::Session; use anyhow::Result; use error_set::ErrContext; @@ -45,31 +48,65 @@ impl ServerCommandHandler for DeleteSegments { shard: &Rc<IggyShard>, ) -> Result<(), IggyError> { debug!("session: {session}, command: {self}"); + let stream_id = self.stream_id.clone(); let topic_id = self.topic_id.clone(); - let partition_id = self.partition_id; + let partition_id = self.partition_id as usize; + let segments_count = self.segments_count; - shard - .delete_segments( - session, - &self.stream_id, - &self.topic_id, - self.partition_id as usize, - self.segments_count, - ) - .await - .with_error_context(|error| { - format!( - "{COMPONENT} (error: {error}) - failed to delete segments for topic with ID: {topic_id} in stream with ID: {stream_id}, session: {session}", - ) - })?; - let event = ShardEvent::DeletedSegments { - stream_id: self.stream_id.clone(), - topic_id: self.topic_id.clone(), - partition_id: self.partition_id as usize, - segments_count: self.segments_count, - }; - shard.broadcast_event_to_all_shards(event).await?; + // Ensure authentication and topic exists + shard.ensure_authenticated(session)?; + shard.ensure_topic_exists(&stream_id, &topic_id)?; + shard.ensure_partition_exists(&stream_id, &topic_id, partition_id)?; + + // Get numeric IDs for namespace + let numeric_stream_id = shard + .streams2 + .with_stream_by_id(&stream_id, streaming::streams::helpers::get_stream_id()); + + let numeric_topic_id = shard.streams2.with_topic_by_id( + &stream_id, + &topic_id, + streaming::topics::helpers::get_topic_id(), + ); + + // Route request to the correct shard + let namespace = IggyNamespace::new(numeric_stream_id, numeric_topic_id, partition_id); + let payload = ShardRequestPayload::DeleteSegments { segments_count }; + let request = ShardRequest::new(stream_id.clone(), topic_id.clone(), partition_id, payload); + let message = ShardMessage::Request(request); + + match shard + .send_request_to_shard_or_recoil(Some(&namespace), message) + .await? + { + ShardSendRequestResult::Recoil(message) => { + if let ShardMessage::Request(crate::shard::transmission::message::ShardRequest { + stream_id, + topic_id, + partition_id, + payload, + }) = message + && let ShardRequestPayload::DeleteSegments { segments_count } = payload + { + shard + .delete_segments_base(&stream_id, &topic_id, partition_id, segments_count) + .await + .with_error_context(|error| { + format!( + "{COMPONENT} (error: {error}) - failed to delete segments for topic with ID: {topic_id} in stream with ID: {stream_id}, session: {session}", + ) + })?; + } else { + return Err(IggyError::InvalidCommand); + } + } + ShardSendRequestResult::Response(response) => { + if !matches!(response, ShardResponse::DeleteSegments) { + return Err(IggyError::InvalidCommand); + } + } + } shard .state @@ -83,6 +120,7 @@ impl ServerCommandHandler for DeleteSegments { "{COMPONENT} (error: {error}) - failed to apply 'delete segments' command for partition with ID: {partition_id} in topic with ID: {topic_id} in stream with ID: {stream_id}, session: {session}", ) })?; + sender.send_empty_ok_response().await?; Ok(()) } diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs index 0c64dfe77..bfb04506b 100644 --- a/core/server/src/shard/mod.rs +++ b/core/server/src/shard/mod.rs @@ -388,7 +388,6 @@ impl IggyShard { |(_, _, _, offset, .., log)| { *log = loaded_log; let current_offset = log.active_segment().end_offset; - tracing::warn!("loaded current_offset: {}", current_offset); offset.store(current_offset, Ordering::Relaxed); }, ); @@ -514,6 +513,11 @@ impl IggyShard { .await?; Ok(ShardResponse::FlushUnsavedBuffer) } + ShardRequestPayload::DeleteSegments { segments_count } => { + self.delete_segments_base(&stream_id, &topic_id, partition_id, segments_count) + .await?; + Ok(ShardResponse::DeleteSegments) + } ShardRequestPayload::CreateStream { user_id, name } => { assert_eq!(self.id, 0, "CreateStream should only be handled by shard0"); @@ -833,21 +837,6 @@ impl IggyShard { Ok(()) } - ShardEvent::DeletedSegments { - stream_id, - topic_id, - partition_id, - segments_count, - } => { - self.delete_segments_bypass_auth( - &stream_id, - &topic_id, - partition_id, - segments_count, - ) - .await?; - Ok(()) - } ShardEvent::FlushUnsavedBuffer { stream_id, topic_id, diff --git a/core/server/src/shard/system/segments.rs b/core/server/src/shard/system/segments.rs index 62d3ef0fa..aa2c4fa51 100644 --- a/core/server/src/shard/system/segments.rs +++ b/core/server/src/shard/system/segments.rs @@ -17,7 +17,6 @@ use crate::streaming; * specific language governing permissions and limitations * under the License. */ -use crate::streaming::session::Session; use iggy_common::Identifier; use iggy_common::IggyError; @@ -82,20 +81,20 @@ impl IggyShard { let path = msg_writer.path(); drop(msg_writer); drop(index_writer); - compio::fs::remove_file(&path).await.map_err(|_| { - tracing::error!("Failed to delete segment file at path: {}", path); + compio::fs::remove_file(&path).await.map_err(|e| { + tracing::error!("Failed to delete segment file at path: {}, err: {}", path, e); IggyError::CannotDeleteFile })?; } else { let start_offset = segment.start_offset; - let path = self.config.system.get_segment_path( + let path = self.config.system.get_messages_file_path( numeric_stream_id, numeric_topic_id, partition_id, start_offset, ); - compio::fs::remove_file(&path).await.map_err(|_| { - tracing::error!("Failed to delete segment file at path: {}", path); + compio::fs::remove_file(&path).await.map_err(|e| { + tracing::error!("Failed to delete segment file at path: {}, err: {}", path, e); IggyError::CannotDeleteFile })?; } @@ -105,19 +104,4 @@ impl IggyShard { stats.increment_segments_count(1); Ok(()) } - - pub async fn delete_segments( - &self, - session: &Session, - stream_id: &Identifier, - topic_id: &Identifier, - partition_id: usize, - segments_count: u32, - ) -> Result<(), IggyError> { - // Assert authentication. - self.ensure_authenticated(session)?; - self.ensure_topic_exists(stream_id, topic_id)?; - self.delete_segments_base(stream_id, topic_id, partition_id, segments_count) - .await - } } diff --git a/core/server/src/shard/system/streams.rs b/core/server/src/shard/system/streams.rs index 196c1edbb..e2ff1b0b1 100644 --- a/core/server/src/shard/system/streams.rs +++ b/core/server/src/shard/system/streams.rs @@ -218,233 +218,4 @@ impl IggyShard { Ok(()) } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::configs::server::ServerConfig; - use crate::shard::ShardInfo; - use crate::shard::transmission::connector::ShardConnector; - use crate::slab::streams::Streams; - use crate::slab::users::Users; - use crate::state::file::FileState; - use crate::streaming::persistence::persister::{FilePersister, PersisterKind}; - use crate::streaming::session::Session; - use crate::streaming::streams; - use crate::streaming::users::user::User; - use crate::streaming::utils::ptr::EternalPtr; - use crate::versioning::SemanticVersion; - use dashmap::DashMap; - use iggy_common::defaults::{DEFAULT_ROOT_PASSWORD, DEFAULT_ROOT_USERNAME}; - use std::net::{Ipv4Addr, SocketAddr}; - use std::rc::Rc; - use std::sync::Arc; - - fn create_test_shard() -> Rc<IggyShard> { - let tempdir = tempfile::TempDir::new().unwrap(); - let state_path = tempdir.path().join("state.log"); - let config = ServerConfig::default(); - - let streams = Streams::default(); - let shards_table = Box::new(DashMap::new()); - let shards_table = Box::leak(shards_table); - let shards_table: EternalPtr<DashMap<crate::shard::namespace::IggyNamespace, ShardInfo>> = - shards_table.into(); - let users = Users::new(); - let state = FileState::new( - &state_path.to_string_lossy(), - &SemanticVersion::current().unwrap(), - Arc::new(PersisterKind::File(FilePersister)), - None, - ); - let connections = vec![ShardConnector::new(0)]; - - let builder = IggyShard::builder(); - let shard = builder - .id(0) - .streams(streams) - .shards_table(shards_table) - .state(state) - .users(users) - .connections(connections) - .config(config) - .encryptor(None) - .version(SemanticVersion::current().unwrap()) - .build(); - - Rc::new(shard) - } - - #[compio::test] - async fn should_create_and_get_stream_by_id_and_name() { - let shard = create_test_shard(); - - let stream_name = "test_stream"; - - // Initialize root user and session - let root = User::root(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD); - let root_id = shard.users.insert(root); - let session = Session::new( - 1, - root_id as u32, - SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 1234), - ); - shard - .permissioner - .borrow_mut() - .init(&[&User::root(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)]); - - // Create stream - let stream = shard - .create_stream2(&session, stream_name.to_string()) - .await - .unwrap(); - - let stream_id = stream.id(); - assert_eq!(stream.root().name(), stream_name); - - // Verify stream exists by ID - assert!( - shard - .streams2 - .exists(&Identifier::numeric(stream_id as u32).unwrap()) - ); - - // Verify stream exists by name - assert!( - shard - .streams2 - .exists(&Identifier::from_str_value(stream_name).unwrap()) - ); - - // Verify we can access stream data by ID - let retrieved_name = shard.streams2.with_stream_by_id( - &Identifier::numeric(stream_id as u32).unwrap(), - streams::helpers::get_stream_name(), - ); - assert_eq!(retrieved_name, stream_name); - - // Verify we can access stream data by name - let retrieved_id = shard.streams2.with_stream_by_id( - &Identifier::from_str_value(stream_name).unwrap(), - streams::helpers::get_stream_id(), - ); - assert_eq!(retrieved_id, stream_id); - } - - #[compio::test] - async fn should_update_stream_name() { - let shard = create_test_shard(); - - let initial_name = "initial_stream"; - let updated_name = "updated_stream"; - - // Initialize root user and session - let root = User::root(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD); - let root_id = shard.users.insert(root); - let session = Session::new( - 1, - root_id as u32, - SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 1234), - ); - shard - .permissioner - .borrow_mut() - .init(&[&User::root(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)]); - - // Create stream - let stream = shard - .create_stream2(&session, initial_name.to_string()) - .await - .unwrap(); - - let stream_id = stream.id(); - - // Update stream name - shard - .update_stream2( - &session, - &Identifier::numeric(stream_id as u32).unwrap(), - updated_name.to_string(), - ) - .unwrap(); - - // Verify old name doesn't exist - assert!( - !shard - .streams2 - .exists(&Identifier::from_str_value(initial_name).unwrap()) - ); - - // Verify new name exists - assert!( - shard - .streams2 - .exists(&Identifier::from_str_value(updated_name).unwrap()) - ); - - // Verify stream data - let retrieved_name = shard.streams2.with_stream_by_id( - &Identifier::numeric(stream_id as u32).unwrap(), - streams::helpers::get_stream_name(), - ); - assert_eq!(retrieved_name, updated_name); - } - - #[compio::test] - async fn should_delete_stream() { - let shard = create_test_shard(); - - let stream_name = "to_be_deleted"; - - // Initialize root user and session - let root = User::root(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD); - let root_id = shard.users.insert(root); - let session = Session::new( - 1, - root_id as u32, - SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 1234), - ); - shard - .permissioner - .borrow_mut() - .init(&[&User::root(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)]); - - // Create stream - let stream = shard - .create_stream2(&session, stream_name.to_string()) - .await - .unwrap(); - - let stream_id = stream.id(); - - // Verify stream exists - assert!( - shard - .streams2 - .exists(&Identifier::numeric(stream_id as u32).unwrap()) - ); - - // Delete stream - let deleted_stream = shard - .delete_stream2(&session, &Identifier::numeric(stream_id as u32).unwrap()) - .await - .unwrap(); - - assert_eq!(deleted_stream.id(), stream_id); - assert_eq!(deleted_stream.root().name(), stream_name); - - // Verify stream no longer exists - assert!( - !shard - .streams2 - .exists(&Identifier::numeric(stream_id as u32).unwrap()) - ); - assert!( - !shard - .streams2 - .exists(&Identifier::from_str_value(stream_name).unwrap()) - ); - } -} +} \ No newline at end of file diff --git a/core/server/src/shard/transmission/event.rs b/core/server/src/shard/transmission/event.rs index 9e51a3ad1..abc4ea140 100644 --- a/core/server/src/shard/transmission/event.rs +++ b/core/server/src/shard/transmission/event.rs @@ -114,12 +114,6 @@ pub enum ShardEvent { user_id: u32, name: String, }, - DeletedSegments { - stream_id: Identifier, - topic_id: Identifier, - partition_id: usize, - segments_count: u32, - }, AddressBound { protocol: TransportProtocol, address: SocketAddr, diff --git a/core/server/src/shard/transmission/frame.rs b/core/server/src/shard/transmission/frame.rs index 2bbd79b86..c07d95166 100644 --- a/core/server/src/shard/transmission/frame.rs +++ b/core/server/src/shard/transmission/frame.rs @@ -32,6 +32,7 @@ pub enum ShardResponse { PollMessages((IggyPollMetadata, IggyMessagesBatchSet)), SendMessages, FlushUnsavedBuffer, + DeleteSegments, Event, CreateStreamResponse(stream2::Stream), CreateTopicResponse(topic2::Topic), diff --git a/core/server/src/shard/transmission/message.rs b/core/server/src/shard/transmission/message.rs index a61a45052..ced203b2a 100644 --- a/core/server/src/shard/transmission/message.rs +++ b/core/server/src/shard/transmission/message.rs @@ -101,6 +101,9 @@ pub enum ShardRequestPayload { GetStats { user_id: u32, }, + DeleteSegments { + segments_count: u32, + }, } impl From<ShardRequest> for ShardMessage {
