This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/io_uring_tpc by this push:
     new 29ff93036 feat(io_uring): fix remaining tests (#2293)
29ff93036 is described below

commit 29ff93036f1570a380aec76e329cd0bcaae163d0
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Mon Oct 27 12:52:08 2025 +0100

    feat(io_uring): fix remaining tests (#2293)
---
 .../consumer_offsets/get_consumer_offset.rs        |  34 ++-
 .../consumer_offsets/store_consumer_offset.rs      |  38 ++--
 core/common/src/commands/messages/poll_messages.rs |  43 ++--
 core/common/src/types/args/mod.rs                  |   6 +-
 core/integration/tests/sdk/producer/mod.rs         |   2 +-
 .../server/scenarios/delete_segments_scenario.rs   |  14 +-
 .../server/scenarios/message_size_scenario.rs      |  14 +-
 .../handlers/segments/delete_segments_handler.rs   |  84 ++++++--
 core/server/src/shard/mod.rs                       |  21 +-
 core/server/src/shard/system/segments.rs           |  26 +--
 core/server/src/shard/system/streams.rs            | 231 +--------------------
 core/server/src/shard/transmission/event.rs        |   6 -
 core/server/src/shard/transmission/frame.rs        |   1 +
 core/server/src/shard/transmission/message.rs      |   3 +
 14 files changed, 169 insertions(+), 354 deletions(-)

diff --git a/core/common/src/commands/consumer_offsets/get_consumer_offset.rs 
b/core/common/src/commands/consumer_offsets/get_consumer_offset.rs
index a98ac6c4d..8382025d1 100644
--- a/core/common/src/commands/consumer_offsets/get_consumer_offset.rs
+++ b/core/common/src/commands/consumer_offsets/get_consumer_offset.rs
@@ -82,21 +82,24 @@ impl BytesSerializable for GetConsumerOffset {
         let stream_id_bytes = self.stream_id.to_bytes();
         let topic_id_bytes = self.topic_id.to_bytes();
         let mut bytes = BytesMut::with_capacity(
-            4 + consumer_bytes.len() + stream_id_bytes.len() + 
topic_id_bytes.len(),
+            5 + consumer_bytes.len() + stream_id_bytes.len() + 
topic_id_bytes.len(),
         );
         bytes.put_slice(&consumer_bytes);
         bytes.put_slice(&stream_id_bytes);
         bytes.put_slice(&topic_id_bytes);
+        // Encode partition_id with a flag byte: 1 = Some, 0 = None
         if let Some(partition_id) = self.partition_id {
+            bytes.put_u8(1);
             bytes.put_u32_le(partition_id);
         } else {
-            bytes.put_u32_le(0);
+            bytes.put_u8(0);
+            bytes.put_u32_le(0); // Padding to keep structure consistent
         }
         bytes.freeze()
     }
 
     fn from_bytes(bytes: Bytes) -> Result<GetConsumerOffset, IggyError> {
-        if bytes.len() < 15 {
+        if bytes.len() < 16 {
             return Err(IggyError::InvalidCommand);
         }
 
@@ -112,15 +115,17 @@ impl BytesSerializable for GetConsumerOffset {
         position += stream_id.get_size_bytes().as_bytes_usize();
         let topic_id = Identifier::from_bytes(bytes.slice(position..))?;
         position += topic_id.get_size_bytes().as_bytes_usize();
-        let partition_id = u32::from_le_bytes(
-            bytes[position..position + 4]
+        // Decode partition_id with flag byte: 1 = Some, 0 = None
+        let has_partition_id = bytes[position];
+        let partition_id_value = u32::from_le_bytes(
+            bytes[position + 1..position + 5]
                 .try_into()
                 .map_err(|_| IggyError::InvalidNumberEncoding)?,
         );
-        let partition_id = if partition_id == 0 {
-            None
+        let partition_id = if has_partition_id == 1 {
+            Some(partition_id_value)
         } else {
-            Some(partition_id)
+            None
         };
         let command = GetConsumerOffset {
             consumer,
@@ -171,13 +176,19 @@ mod tests {
         position += stream_id.get_size_bytes().as_bytes_usize();
         let topic_id = 
Identifier::from_bytes(bytes.slice(position..)).unwrap();
         position += topic_id.get_size_bytes().as_bytes_usize();
-        let partition_id = u32::from_le_bytes(bytes[position..position + 
4].try_into().unwrap());
+        let has_partition_id = bytes[position];
+        let partition_id = u32::from_le_bytes(bytes[position + 1..position + 
5].try_into().unwrap());
+        let partition_id = if has_partition_id == 1 {
+            Some(partition_id)
+        } else {
+            None
+        };
 
         assert!(!bytes.is_empty());
         assert_eq!(consumer, command.consumer);
         assert_eq!(stream_id, command.stream_id);
         assert_eq!(topic_id, command.topic_id);
-        assert_eq!(Some(partition_id), command.partition_id);
+        assert_eq!(partition_id, command.partition_id);
     }
 
     #[test]
@@ -191,11 +202,12 @@ mod tests {
         let stream_id_bytes = stream_id.to_bytes();
         let topic_id_bytes = topic_id.to_bytes();
         let mut bytes = BytesMut::with_capacity(
-            4 + consumer_bytes.len() + stream_id_bytes.len() + 
topic_id_bytes.len(),
+            5 + consumer_bytes.len() + stream_id_bytes.len() + 
topic_id_bytes.len(),
         );
         bytes.put_slice(&consumer_bytes);
         bytes.put_slice(&stream_id_bytes);
         bytes.put_slice(&topic_id_bytes);
+        bytes.put_u8(1); // Flag: partition_id is Some
         bytes.put_u32_le(partition_id);
 
         let command = GetConsumerOffset::from_bytes(bytes.freeze());
diff --git a/core/common/src/commands/consumer_offsets/store_consumer_offset.rs 
b/core/common/src/commands/consumer_offsets/store_consumer_offset.rs
index 9500138d1..9f07f3ccf 100644
--- a/core/common/src/commands/consumer_offsets/store_consumer_offset.rs
+++ b/core/common/src/commands/consumer_offsets/store_consumer_offset.rs
@@ -81,22 +81,25 @@ impl BytesSerializable for StoreConsumerOffset {
         let stream_id_bytes = self.stream_id.to_bytes();
         let topic_id_bytes = self.topic_id.to_bytes();
         let mut bytes = BytesMut::with_capacity(
-            12 + consumer_bytes.len() + stream_id_bytes.len() + 
topic_id_bytes.len(),
+            13 + consumer_bytes.len() + stream_id_bytes.len() + 
topic_id_bytes.len(),
         );
         bytes.put_slice(&consumer_bytes);
         bytes.put_slice(&stream_id_bytes);
         bytes.put_slice(&topic_id_bytes);
+        // Encode partition_id with a flag byte: 1 = Some, 0 = None
         if let Some(partition_id) = self.partition_id {
+            bytes.put_u8(1);
             bytes.put_u32_le(partition_id);
         } else {
-            bytes.put_u32_le(0);
+            bytes.put_u8(0);
+            bytes.put_u32_le(0); // Padding to keep structure consistent
         }
         bytes.put_u64_le(self.offset);
         bytes.freeze()
     }
 
     fn from_bytes(bytes: Bytes) -> Result<StoreConsumerOffset, IggyError> {
-        if bytes.len() < 23 {
+        if bytes.len() < 24 {
             return Err(IggyError::InvalidCommand);
         }
 
@@ -112,18 +115,20 @@ impl BytesSerializable for StoreConsumerOffset {
         position += stream_id.get_size_bytes().as_bytes_usize();
         let topic_id = Identifier::from_bytes(bytes.slice(position..))?;
         position += topic_id.get_size_bytes().as_bytes_usize();
-        let partition_id = u32::from_le_bytes(
-            bytes[position..position + 4]
+        // Decode partition_id with flag byte: 1 = Some, 0 = None
+        let has_partition_id = bytes[position];
+        let partition_id_value = u32::from_le_bytes(
+            bytes[position + 1..position + 5]
                 .try_into()
                 .map_err(|_| IggyError::InvalidNumberEncoding)?,
         );
-        let partition_id = if partition_id == 0 {
-            None
+        let partition_id = if has_partition_id == 1 {
+            Some(partition_id_value)
         } else {
-            Some(partition_id)
+            None
         };
         let offset = u64::from_le_bytes(
-            bytes[position + 4..position + 12]
+            bytes[position + 5..position + 13]
                 .try_into()
                 .map_err(|_| IggyError::InvalidNumberEncoding)?,
         );
@@ -179,14 +184,20 @@ mod tests {
         position += stream_id.get_size_bytes().as_bytes_usize();
         let topic_id = 
Identifier::from_bytes(bytes.slice(position..)).unwrap();
         position += topic_id.get_size_bytes().as_bytes_usize();
-        let partition_id = u32::from_le_bytes(bytes[position..position + 
4].try_into().unwrap());
-        let offset = u64::from_le_bytes(bytes[position + 4..position + 
12].try_into().unwrap());
+        let has_partition_id = bytes[position];
+        let partition_id = u32::from_le_bytes(bytes[position + 1..position + 
5].try_into().unwrap());
+        let partition_id = if has_partition_id == 1 {
+            Some(partition_id)
+        } else {
+            None
+        };
+        let offset = u64::from_le_bytes(bytes[position + 5..position + 
13].try_into().unwrap());
 
         assert!(!bytes.is_empty());
         assert_eq!(consumer, command.consumer);
         assert_eq!(stream_id, command.stream_id);
         assert_eq!(topic_id, command.topic_id);
-        assert_eq!(Some(partition_id), command.partition_id);
+        assert_eq!(partition_id, command.partition_id);
         assert_eq!(offset, command.offset);
     }
 
@@ -202,11 +213,12 @@ mod tests {
         let stream_id_bytes = stream_id.to_bytes();
         let topic_id_bytes = topic_id.to_bytes();
         let mut bytes = BytesMut::with_capacity(
-            12 + consumer_bytes.len() + stream_id_bytes.len() + 
topic_id_bytes.len(),
+            13 + consumer_bytes.len() + stream_id_bytes.len() + 
topic_id_bytes.len(),
         );
         bytes.put_slice(&consumer_bytes);
         bytes.put_slice(&stream_id_bytes);
         bytes.put_slice(&topic_id_bytes);
+        bytes.put_u8(1); // Flag: partition_id is Some
         bytes.put_u32_le(partition_id);
         bytes.put_u64_le(offset);
 
diff --git a/core/common/src/commands/messages/poll_messages.rs 
b/core/common/src/commands/messages/poll_messages.rs
index e51b03ef6..dbbca1bb1 100644
--- a/core/common/src/commands/messages/poll_messages.rs
+++ b/core/common/src/commands/messages/poll_messages.rs
@@ -76,7 +76,7 @@ impl PollMessages {
         let topic_id_bytes = topic_id.to_bytes();
         let strategy_bytes = strategy.to_bytes();
         let mut bytes = BytesMut::with_capacity(
-            9 + consumer_bytes.len()
+            10 + consumer_bytes.len()
                 + stream_id_bytes.len()
                 + topic_id_bytes.len()
                 + strategy_bytes.len(),
@@ -84,10 +84,13 @@ impl PollMessages {
         bytes.put_slice(&consumer_bytes);
         bytes.put_slice(&stream_id_bytes);
         bytes.put_slice(&topic_id_bytes);
+        // Encode partition_id with a flag byte: 1 = Some, 0 = None
         if let Some(partition_id) = partition_id {
+            bytes.put_u8(1);
             bytes.put_u32_le(partition_id);
         } else {
-            bytes.put_u32_le(0);
+            bytes.put_u8(0);
+            bytes.put_u32_le(0); // Padding to keep structure consistent
         }
         bytes.put_slice(&strategy_bytes);
         bytes.put_u32_le(count);
@@ -149,7 +152,7 @@ impl BytesSerializable for PollMessages {
     }
 
     fn from_bytes(bytes: Bytes) -> Result<Self, IggyError> {
-        if bytes.len() < 29 {
+        if bytes.len() < 30 {
             return Err(IggyError::InvalidCommand);
         }
 
@@ -165,17 +168,20 @@ impl BytesSerializable for PollMessages {
         position += stream_id.get_size_bytes().as_bytes_usize();
         let topic_id = Identifier::from_bytes(bytes.slice(position..))?;
         position += topic_id.get_size_bytes().as_bytes_usize();
-        let partition_id = u32::from_le_bytes(
-            bytes[position..position + 4]
+        // Decode partition_id with flag byte: 1 = Some, 0 = None
+        let has_partition_id = bytes[position];
+        let partition_id_value = u32::from_le_bytes(
+            bytes[position + 1..position + 5]
                 .try_into()
                 .map_err(|_| IggyError::InvalidNumberEncoding)?,
         );
-        let partition_id = match partition_id {
-            0 => None,
-            partition_id => Some(partition_id),
+        let partition_id = if has_partition_id == 1 {
+            Some(partition_id_value)
+        } else {
+            None
         };
-        let polling_kind = PollingKind::from_code(bytes[position + 4])?;
-        position += 5;
+        let polling_kind = PollingKind::from_code(bytes[position + 5])?;
+        position += 6;
         let value = u64::from_le_bytes(
             bytes[position..position + 8]
                 .try_into()
@@ -254,9 +260,15 @@ mod tests {
         position += stream_id.get_size_bytes().as_bytes_usize();
         let topic_id = 
Identifier::from_bytes(bytes.slice(position..)).unwrap();
         position += topic_id.get_size_bytes().as_bytes_usize();
-        let partition_id = u32::from_le_bytes(bytes[position..position + 
4].try_into().unwrap());
-        let polling_kind = PollingKind::from_code(bytes[position + 
4]).unwrap();
-        position += 5;
+        let has_partition_id = bytes[position];
+        let partition_id = u32::from_le_bytes(bytes[position + 1..position + 
5].try_into().unwrap());
+        let partition_id = if has_partition_id == 1 {
+            Some(partition_id)
+        } else {
+            None
+        };
+        let polling_kind = PollingKind::from_code(bytes[position + 
5]).unwrap();
+        position += 6;
         let value = u64::from_le_bytes(bytes[position..position + 
8].try_into().unwrap());
         let strategy = PollingStrategy {
             kind: polling_kind,
@@ -270,7 +282,7 @@ mod tests {
         assert_eq!(consumer, command.consumer);
         assert_eq!(stream_id, command.stream_id);
         assert_eq!(topic_id, command.topic_id);
-        assert_eq!(Some(partition_id), command.partition_id);
+        assert_eq!(partition_id, command.partition_id);
         assert_eq!(strategy, command.strategy);
         assert_eq!(count, command.count);
         assert_eq!(auto_commit, command.auto_commit);
@@ -291,7 +303,7 @@ mod tests {
         let topic_id_bytes = topic_id.to_bytes();
         let strategy_bytes = strategy.to_bytes();
         let mut bytes = BytesMut::with_capacity(
-            9 + consumer_bytes.len()
+            10 + consumer_bytes.len()
                 + stream_id_bytes.len()
                 + topic_id_bytes.len()
                 + strategy_bytes.len(),
@@ -299,6 +311,7 @@ mod tests {
         bytes.put_slice(&consumer_bytes);
         bytes.put_slice(&stream_id_bytes);
         bytes.put_slice(&topic_id_bytes);
+        bytes.put_u8(1); // Flag: partition_id is Some
         bytes.put_u32_le(partition_id);
         bytes.put_slice(&strategy_bytes);
         bytes.put_u32_le(count);
diff --git a/core/common/src/types/args/mod.rs 
b/core/common/src/types/args/mod.rs
index 11f93194e..cc22f9439 100644
--- a/core/common/src/types/args/mod.rs
+++ b/core/common/src/types/args/mod.rs
@@ -408,11 +408,11 @@ impl Default for Args {
             quic_max_concurrent_bidi_streams: 10000,
             quic_datagram_send_buffer_size: 100000,
             quic_initial_mtu: 1200,
-            quic_send_window: 100000,
-            quic_receive_window: 100000,
+            quic_send_window: 1000000,
+            quic_receive_window: 1000000,
             quic_response_buffer_size: 1048576,
             quic_keep_alive_interval: 5000,
-            quic_max_idle_timeout: 10000,
+            quic_max_idle_timeout: 100000,
             quic_validate_certificate: false,
             quic_heartbeat_interval: "5s".to_string(),
             websocket_server_address: "127.0.0.1:8092".to_string(),
diff --git a/core/integration/tests/sdk/producer/mod.rs 
b/core/integration/tests/sdk/producer/mod.rs
index 455fec872..74e194198 100644
--- a/core/integration/tests/sdk/producer/mod.rs
+++ b/core/integration/tests/sdk/producer/mod.rs
@@ -22,7 +22,7 @@ use bytes::Bytes;
 use iggy::clients::client::IggyClient;
 use iggy::prelude::*;
 
-const PARTITION_ID: u32 = 1;
+const PARTITION_ID: u32 = 0;
 const STREAM_NAME: &str = "test-stream-producer";
 const TOPIC_NAME: &str = "test-topic-producer";
 const PARTITIONS_COUNT: u32 = 3;
diff --git 
a/core/integration/tests/server/scenarios/delete_segments_scenario.rs 
b/core/integration/tests/server/scenarios/delete_segments_scenario.rs
index a35c5bf1e..a5959e5b5 100644
--- a/core/integration/tests/server/scenarios/delete_segments_scenario.rs
+++ b/core/integration/tests/server/scenarios/delete_segments_scenario.rs
@@ -20,11 +20,9 @@ use iggy::prelude::*;
 use integration::test_server::{ClientFactory, TestServer};
 use std::fs::{DirEntry, read_dir};
 
-const STREAM_ID: u32 = 1;
 const STREAM_NAME: &str = "test_stream";
-const TOPIC_ID: u32 = 1;
 const TOPIC_NAME: &str = "test_topic";
-const PARTITION_ID: u32 = 1;
+const PARTITION_ID: u32 = 0;
 const LOG_EXTENSION: &str = "log";
 
 pub async fn run(client_factory: &dyn ClientFactory, test_server: &TestServer) 
{
@@ -36,13 +34,14 @@ pub async fn run(client_factory: &dyn ClientFactory, 
test_server: &TestServer) {
         .await
         .unwrap();
 
-    client.create_stream(STREAM_NAME).await.unwrap();
+    let stream = client.create_stream(STREAM_NAME).await.unwrap();
+    let stream_id = stream.id;
 
-    client
+    let topic = client
         .create_topic(
             &Identifier::named(STREAM_NAME).unwrap(),
             TOPIC_NAME,
-            PARTITION_ID,
+            1,
             CompressionAlgorithm::None,
             None,
             IggyExpiry::NeverExpire,
@@ -50,6 +49,7 @@ pub async fn run(client_factory: &dyn ClientFactory, 
test_server: &TestServer) {
         )
         .await
         .unwrap();
+    let topic_id = topic.id;
 
     // Send 5 large messages to create multiple segments
     let large_payload = "A".repeat(1024 * 1024);
@@ -79,7 +79,7 @@ pub async fn run(client_factory: &dyn ClientFactory, 
test_server: &TestServer) {
     // Check initial segment count on filesystem
     let data_path = test_server.get_local_data_path();
     let partition_path =
-        
format!("{data_path}/streams/{STREAM_ID}/topics/{TOPIC_ID}/partitions/{PARTITION_ID}");
+        
format!("{data_path}/streams/{stream_id}/topics/{topic_id}/partitions/{PARTITION_ID}");
 
     let initial_segments = get_segment_paths_for_partition(&partition_path);
     println!(
diff --git a/core/integration/tests/server/scenarios/message_size_scenario.rs 
b/core/integration/tests/server/scenarios/message_size_scenario.rs
index a6430f59a..1eb3bf94d 100644
--- a/core/integration/tests/server/scenarios/message_size_scenario.rs
+++ b/core/integration/tests/server/scenarios/message_size_scenario.rs
@@ -22,8 +22,6 @@ use integration::test_server::{ClientFactory, 
assert_clean_system, login_root};
 use std::collections::HashMap;
 use std::str::FromStr;
 
-const STREAM_ID: u32 = 1;
-const TOPIC_ID: u32 = 1;
 const STREAM_NAME: &str = "test-stream";
 const TOPIC_NAME: &str = "test-topic";
 const PARTITIONS_COUNT: u32 = 3;
@@ -93,8 +91,8 @@ async fn assert_message_count(client: &IggyClient, 
expected_count: u32) {
     // 4. Poll messages and validate the count
     let polled_messages = client
         .poll_messages(
-            &STREAM_ID.try_into().unwrap(),
-            &TOPIC_ID.try_into().unwrap(),
+            &STREAM_NAME.try_into().unwrap(),
+            &TOPIC_NAME.try_into().unwrap(),
             Some(PARTITION_ID),
             &Consumer::default(),
             &PollingStrategy::offset(0),
@@ -114,7 +112,7 @@ async fn init_system(client: &IggyClient) {
     // 2. Create the topic
     client
         .create_topic(
-            &STREAM_ID.try_into().unwrap(),
+            &STREAM_NAME.try_into().unwrap(),
             TOPIC_NAME,
             PARTITIONS_COUNT,
             Default::default(),
@@ -128,7 +126,7 @@ async fn init_system(client: &IggyClient) {
 
 async fn cleanup_system(client: &IggyClient) {
     client
-        .delete_stream(&STREAM_ID.try_into().unwrap())
+        .delete_stream(&STREAM_NAME.try_into().unwrap())
         .await
         .unwrap();
 }
@@ -179,8 +177,8 @@ async fn send_message_and_check_result(
 
     let send_result = client
         .send_messages(
-            &STREAM_ID.try_into().unwrap(),
-            &TOPIC_ID.try_into().unwrap(),
+            &STREAM_NAME.try_into().unwrap(),
+            &TOPIC_NAME.try_into().unwrap(),
             &Partitioning::partition_id(PARTITION_ID),
             &mut messages,
         )
diff --git 
a/core/server/src/binary/handlers/segments/delete_segments_handler.rs 
b/core/server/src/binary/handlers/segments/delete_segments_handler.rs
index 42d18859b..129f14177 100644
--- a/core/server/src/binary/handlers/segments/delete_segments_handler.rs
+++ b/core/server/src/binary/handlers/segments/delete_segments_handler.rs
@@ -20,9 +20,12 @@ use crate::binary::command::{BinaryServerCommand, 
ServerCommand, ServerCommandHa
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::{handlers::partitions::COMPONENT, sender::SenderKind};
 
+use crate::shard::namespace::IggyNamespace;
+use crate::shard::transmission::frame::ShardResponse;
+use crate::shard::transmission::message::{ShardMessage, ShardRequest, 
ShardRequestPayload, ShardSendRequestResult};
 use crate::shard::IggyShard;
-use crate::shard::transmission::event::ShardEvent;
 use crate::state::command::EntryCommand;
+use crate::streaming;
 use crate::streaming::session::Session;
 use anyhow::Result;
 use error_set::ErrContext;
@@ -45,31 +48,65 @@ impl ServerCommandHandler for DeleteSegments {
         shard: &Rc<IggyShard>,
     ) -> Result<(), IggyError> {
         debug!("session: {session}, command: {self}");
+        
         let stream_id = self.stream_id.clone();
         let topic_id = self.topic_id.clone();
-        let partition_id = self.partition_id;
+        let partition_id = self.partition_id as usize;
+        let segments_count = self.segments_count;
 
-        shard
-            .delete_segments(
-                session,
-                &self.stream_id,
-                &self.topic_id,
-                self.partition_id as usize,
-                self.segments_count,
-            )
-            .await
-            .with_error_context(|error| {
-                format!(
-                    "{COMPONENT} (error: {error}) - failed to delete segments 
for topic with ID: {topic_id} in stream with ID: {stream_id}, session: 
{session}",
-                )
-            })?;
-        let event = ShardEvent::DeletedSegments {
-            stream_id: self.stream_id.clone(),
-            topic_id: self.topic_id.clone(),
-            partition_id: self.partition_id as usize,
-            segments_count: self.segments_count,
-        };
-        shard.broadcast_event_to_all_shards(event).await?;
+        // Ensure authentication and topic exists
+        shard.ensure_authenticated(session)?;
+        shard.ensure_topic_exists(&stream_id, &topic_id)?;
+        shard.ensure_partition_exists(&stream_id, &topic_id, partition_id)?;
+
+        // Get numeric IDs for namespace
+        let numeric_stream_id = shard
+            .streams2
+            .with_stream_by_id(&stream_id, 
streaming::streams::helpers::get_stream_id());
+
+        let numeric_topic_id = shard.streams2.with_topic_by_id(
+            &stream_id,
+            &topic_id,
+            streaming::topics::helpers::get_topic_id(),
+        );
+
+        // Route request to the correct shard
+        let namespace = IggyNamespace::new(numeric_stream_id, 
numeric_topic_id, partition_id);
+        let payload = ShardRequestPayload::DeleteSegments { segments_count };
+        let request = ShardRequest::new(stream_id.clone(), topic_id.clone(), 
partition_id, payload);
+        let message = ShardMessage::Request(request);
+        
+        match shard
+            .send_request_to_shard_or_recoil(Some(&namespace), message)
+            .await?
+        {
+            ShardSendRequestResult::Recoil(message) => {
+                if let 
ShardMessage::Request(crate::shard::transmission::message::ShardRequest {
+                    stream_id,
+                    topic_id,
+                    partition_id,
+                    payload,
+                }) = message
+                    && let ShardRequestPayload::DeleteSegments { 
segments_count } = payload
+                {
+                    shard
+                        .delete_segments_base(&stream_id, &topic_id, 
partition_id, segments_count)
+                        .await
+                        .with_error_context(|error| {
+                            format!(
+                                "{COMPONENT} (error: {error}) - failed to 
delete segments for topic with ID: {topic_id} in stream with ID: {stream_id}, 
session: {session}",
+                            )
+                        })?;
+                } else {
+                    return Err(IggyError::InvalidCommand);
+                }
+            }
+            ShardSendRequestResult::Response(response) => {
+                if !matches!(response, ShardResponse::DeleteSegments) {
+                    return Err(IggyError::InvalidCommand);
+                }
+            }
+        }
 
         shard
             .state
@@ -83,6 +120,7 @@ impl ServerCommandHandler for DeleteSegments {
                     "{COMPONENT} (error: {error}) - failed to apply 'delete 
segments' command for partition with ID: {partition_id} in topic with ID: 
{topic_id} in stream with ID: {stream_id}, session: {session}",
                 )
             })?;
+        
         sender.send_empty_ok_response().await?;
         Ok(())
     }
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index 0c64dfe77..bfb04506b 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -388,7 +388,6 @@ impl IggyShard {
                             |(_, _, _, offset, .., log)| {
                                 *log = loaded_log;
                                 let current_offset = 
log.active_segment().end_offset;
-                                tracing::warn!("loaded current_offset: {}", 
current_offset);
                                 offset.store(current_offset, 
Ordering::Relaxed);
                             },
                         );
@@ -514,6 +513,11 @@ impl IggyShard {
                     .await?;
                 Ok(ShardResponse::FlushUnsavedBuffer)
             }
+            ShardRequestPayload::DeleteSegments { segments_count } => {
+                self.delete_segments_base(&stream_id, &topic_id, partition_id, 
segments_count)
+                    .await?;
+                Ok(ShardResponse::DeleteSegments)
+            }
             ShardRequestPayload::CreateStream { user_id, name } => {
                 assert_eq!(self.id, 0, "CreateStream should only be handled by 
shard0");
 
@@ -833,21 +837,6 @@ impl IggyShard {
 
                 Ok(())
             }
-            ShardEvent::DeletedSegments {
-                stream_id,
-                topic_id,
-                partition_id,
-                segments_count,
-            } => {
-                self.delete_segments_bypass_auth(
-                    &stream_id,
-                    &topic_id,
-                    partition_id,
-                    segments_count,
-                )
-                .await?;
-                Ok(())
-            }
             ShardEvent::FlushUnsavedBuffer {
                 stream_id,
                 topic_id,
diff --git a/core/server/src/shard/system/segments.rs 
b/core/server/src/shard/system/segments.rs
index 62d3ef0fa..aa2c4fa51 100644
--- a/core/server/src/shard/system/segments.rs
+++ b/core/server/src/shard/system/segments.rs
@@ -17,7 +17,6 @@ use crate::streaming;
  * specific language governing permissions and limitations
  * under the License.
  */
-use crate::streaming::session::Session;
 use iggy_common::Identifier;
 use iggy_common::IggyError;
 
@@ -82,20 +81,20 @@ impl IggyShard {
                 let path = msg_writer.path();
                 drop(msg_writer);
                 drop(index_writer);
-                compio::fs::remove_file(&path).await.map_err(|_| {
-                    tracing::error!("Failed to delete segment file at path: 
{}", path);
+                compio::fs::remove_file(&path).await.map_err(|e| {
+                    tracing::error!("Failed to delete segment file at path: 
{}, err: {}", path, e);
                     IggyError::CannotDeleteFile
                 })?;
             } else {
                 let start_offset = segment.start_offset;
-                let path = self.config.system.get_segment_path(
+                let path = self.config.system.get_messages_file_path(
                     numeric_stream_id,
                     numeric_topic_id,
                     partition_id,
                     start_offset,
                 );
-                compio::fs::remove_file(&path).await.map_err(|_| {
-                    tracing::error!("Failed to delete segment file at path: 
{}", path);
+                compio::fs::remove_file(&path).await.map_err(|e| {
+                    tracing::error!("Failed to delete segment file at path: 
{}, err: {}", path, e);
                     IggyError::CannotDeleteFile
                 })?;
             }
@@ -105,19 +104,4 @@ impl IggyShard {
         stats.increment_segments_count(1);
         Ok(())
     }
-
-    pub async fn delete_segments(
-        &self,
-        session: &Session,
-        stream_id: &Identifier,
-        topic_id: &Identifier,
-        partition_id: usize,
-        segments_count: u32,
-    ) -> Result<(), IggyError> {
-        // Assert authentication.
-        self.ensure_authenticated(session)?;
-        self.ensure_topic_exists(stream_id, topic_id)?;
-        self.delete_segments_base(stream_id, topic_id, partition_id, 
segments_count)
-            .await
-    }
 }
diff --git a/core/server/src/shard/system/streams.rs 
b/core/server/src/shard/system/streams.rs
index 196c1edbb..e2ff1b0b1 100644
--- a/core/server/src/shard/system/streams.rs
+++ b/core/server/src/shard/system/streams.rs
@@ -218,233 +218,4 @@ impl IggyShard {
 
         Ok(())
     }
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-    use crate::configs::server::ServerConfig;
-    use crate::shard::ShardInfo;
-    use crate::shard::transmission::connector::ShardConnector;
-    use crate::slab::streams::Streams;
-    use crate::slab::users::Users;
-    use crate::state::file::FileState;
-    use crate::streaming::persistence::persister::{FilePersister, 
PersisterKind};
-    use crate::streaming::session::Session;
-    use crate::streaming::streams;
-    use crate::streaming::users::user::User;
-    use crate::streaming::utils::ptr::EternalPtr;
-    use crate::versioning::SemanticVersion;
-    use dashmap::DashMap;
-    use iggy_common::defaults::{DEFAULT_ROOT_PASSWORD, DEFAULT_ROOT_USERNAME};
-    use std::net::{Ipv4Addr, SocketAddr};
-    use std::rc::Rc;
-    use std::sync::Arc;
-
-    fn create_test_shard() -> Rc<IggyShard> {
-        let tempdir = tempfile::TempDir::new().unwrap();
-        let state_path = tempdir.path().join("state.log");
-        let config = ServerConfig::default();
-
-        let streams = Streams::default();
-        let shards_table = Box::new(DashMap::new());
-        let shards_table = Box::leak(shards_table);
-        let shards_table: 
EternalPtr<DashMap<crate::shard::namespace::IggyNamespace, ShardInfo>> =
-            shards_table.into();
-        let users = Users::new();
-        let state = FileState::new(
-            &state_path.to_string_lossy(),
-            &SemanticVersion::current().unwrap(),
-            Arc::new(PersisterKind::File(FilePersister)),
-            None,
-        );
-        let connections = vec![ShardConnector::new(0)];
-
-        let builder = IggyShard::builder();
-        let shard = builder
-            .id(0)
-            .streams(streams)
-            .shards_table(shards_table)
-            .state(state)
-            .users(users)
-            .connections(connections)
-            .config(config)
-            .encryptor(None)
-            .version(SemanticVersion::current().unwrap())
-            .build();
-
-        Rc::new(shard)
-    }
-
-    #[compio::test]
-    async fn should_create_and_get_stream_by_id_and_name() {
-        let shard = create_test_shard();
-
-        let stream_name = "test_stream";
-
-        // Initialize root user and session
-        let root = User::root(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD);
-        let root_id = shard.users.insert(root);
-        let session = Session::new(
-            1,
-            root_id as u32,
-            SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 1234),
-        );
-        shard
-            .permissioner
-            .borrow_mut()
-            .init(&[&User::root(DEFAULT_ROOT_USERNAME, 
DEFAULT_ROOT_PASSWORD)]);
-
-        // Create stream
-        let stream = shard
-            .create_stream2(&session, stream_name.to_string())
-            .await
-            .unwrap();
-
-        let stream_id = stream.id();
-        assert_eq!(stream.root().name(), stream_name);
-
-        // Verify stream exists by ID
-        assert!(
-            shard
-                .streams2
-                .exists(&Identifier::numeric(stream_id as u32).unwrap())
-        );
-
-        // Verify stream exists by name
-        assert!(
-            shard
-                .streams2
-                .exists(&Identifier::from_str_value(stream_name).unwrap())
-        );
-
-        // Verify we can access stream data by ID
-        let retrieved_name = shard.streams2.with_stream_by_id(
-            &Identifier::numeric(stream_id as u32).unwrap(),
-            streams::helpers::get_stream_name(),
-        );
-        assert_eq!(retrieved_name, stream_name);
-
-        // Verify we can access stream data by name
-        let retrieved_id = shard.streams2.with_stream_by_id(
-            &Identifier::from_str_value(stream_name).unwrap(),
-            streams::helpers::get_stream_id(),
-        );
-        assert_eq!(retrieved_id, stream_id);
-    }
-
-    #[compio::test]
-    async fn should_update_stream_name() {
-        let shard = create_test_shard();
-
-        let initial_name = "initial_stream";
-        let updated_name = "updated_stream";
-
-        // Initialize root user and session
-        let root = User::root(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD);
-        let root_id = shard.users.insert(root);
-        let session = Session::new(
-            1,
-            root_id as u32,
-            SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 1234),
-        );
-        shard
-            .permissioner
-            .borrow_mut()
-            .init(&[&User::root(DEFAULT_ROOT_USERNAME, 
DEFAULT_ROOT_PASSWORD)]);
-
-        // Create stream
-        let stream = shard
-            .create_stream2(&session, initial_name.to_string())
-            .await
-            .unwrap();
-
-        let stream_id = stream.id();
-
-        // Update stream name
-        shard
-            .update_stream2(
-                &session,
-                &Identifier::numeric(stream_id as u32).unwrap(),
-                updated_name.to_string(),
-            )
-            .unwrap();
-
-        // Verify old name doesn't exist
-        assert!(
-            !shard
-                .streams2
-                .exists(&Identifier::from_str_value(initial_name).unwrap())
-        );
-
-        // Verify new name exists
-        assert!(
-            shard
-                .streams2
-                .exists(&Identifier::from_str_value(updated_name).unwrap())
-        );
-
-        // Verify stream data
-        let retrieved_name = shard.streams2.with_stream_by_id(
-            &Identifier::numeric(stream_id as u32).unwrap(),
-            streams::helpers::get_stream_name(),
-        );
-        assert_eq!(retrieved_name, updated_name);
-    }
-
-    #[compio::test]
-    async fn should_delete_stream() {
-        let shard = create_test_shard();
-
-        let stream_name = "to_be_deleted";
-
-        // Initialize root user and session
-        let root = User::root(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD);
-        let root_id = shard.users.insert(root);
-        let session = Session::new(
-            1,
-            root_id as u32,
-            SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 1234),
-        );
-        shard
-            .permissioner
-            .borrow_mut()
-            .init(&[&User::root(DEFAULT_ROOT_USERNAME, 
DEFAULT_ROOT_PASSWORD)]);
-
-        // Create stream
-        let stream = shard
-            .create_stream2(&session, stream_name.to_string())
-            .await
-            .unwrap();
-
-        let stream_id = stream.id();
-
-        // Verify stream exists
-        assert!(
-            shard
-                .streams2
-                .exists(&Identifier::numeric(stream_id as u32).unwrap())
-        );
-
-        // Delete stream
-        let deleted_stream = shard
-            .delete_stream2(&session, &Identifier::numeric(stream_id as 
u32).unwrap())
-            .await
-            .unwrap();
-
-        assert_eq!(deleted_stream.id(), stream_id);
-        assert_eq!(deleted_stream.root().name(), stream_name);
-
-        // Verify stream no longer exists
-        assert!(
-            !shard
-                .streams2
-                .exists(&Identifier::numeric(stream_id as u32).unwrap())
-        );
-        assert!(
-            !shard
-                .streams2
-                .exists(&Identifier::from_str_value(stream_name).unwrap())
-        );
-    }
-}
+}
\ No newline at end of file
diff --git a/core/server/src/shard/transmission/event.rs 
b/core/server/src/shard/transmission/event.rs
index 9e51a3ad1..abc4ea140 100644
--- a/core/server/src/shard/transmission/event.rs
+++ b/core/server/src/shard/transmission/event.rs
@@ -114,12 +114,6 @@ pub enum ShardEvent {
         user_id: u32,
         name: String,
     },
-    DeletedSegments {
-        stream_id: Identifier,
-        topic_id: Identifier,
-        partition_id: usize,
-        segments_count: u32,
-    },
     AddressBound {
         protocol: TransportProtocol,
         address: SocketAddr,
diff --git a/core/server/src/shard/transmission/frame.rs 
b/core/server/src/shard/transmission/frame.rs
index 2bbd79b86..c07d95166 100644
--- a/core/server/src/shard/transmission/frame.rs
+++ b/core/server/src/shard/transmission/frame.rs
@@ -32,6 +32,7 @@ pub enum ShardResponse {
     PollMessages((IggyPollMetadata, IggyMessagesBatchSet)),
     SendMessages,
     FlushUnsavedBuffer,
+    DeleteSegments,
     Event,
     CreateStreamResponse(stream2::Stream),
     CreateTopicResponse(topic2::Topic),
diff --git a/core/server/src/shard/transmission/message.rs 
b/core/server/src/shard/transmission/message.rs
index a61a45052..ced203b2a 100644
--- a/core/server/src/shard/transmission/message.rs
+++ b/core/server/src/shard/transmission/message.rs
@@ -101,6 +101,9 @@ pub enum ShardRequestPayload {
     GetStats {
         user_id: u32,
     },
+    DeleteSegments {
+        segments_count: u32,
+    },
 }
 
 impl From<ShardRequest> for ShardMessage {


Reply via email to