This is an automated email from the ASF dual-hosted git repository.
gkoszyk pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/io_uring_tpc by this push:
new 29ff93036 feat(io_uring): fix remaining tests (#2293)
29ff93036 is described below
commit 29ff93036f1570a380aec76e329cd0bcaae163d0
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Mon Oct 27 12:52:08 2025 +0100
feat(io_uring): fix remaining tests (#2293)
---
.../consumer_offsets/get_consumer_offset.rs | 34 ++-
.../consumer_offsets/store_consumer_offset.rs | 38 ++--
core/common/src/commands/messages/poll_messages.rs | 43 ++--
core/common/src/types/args/mod.rs | 6 +-
core/integration/tests/sdk/producer/mod.rs | 2 +-
.../server/scenarios/delete_segments_scenario.rs | 14 +-
.../server/scenarios/message_size_scenario.rs | 14 +-
.../handlers/segments/delete_segments_handler.rs | 84 ++++++--
core/server/src/shard/mod.rs | 21 +-
core/server/src/shard/system/segments.rs | 26 +--
core/server/src/shard/system/streams.rs | 231 +--------------------
core/server/src/shard/transmission/event.rs | 6 -
core/server/src/shard/transmission/frame.rs | 1 +
core/server/src/shard/transmission/message.rs | 3 +
14 files changed, 169 insertions(+), 354 deletions(-)
diff --git a/core/common/src/commands/consumer_offsets/get_consumer_offset.rs
b/core/common/src/commands/consumer_offsets/get_consumer_offset.rs
index a98ac6c4d..8382025d1 100644
--- a/core/common/src/commands/consumer_offsets/get_consumer_offset.rs
+++ b/core/common/src/commands/consumer_offsets/get_consumer_offset.rs
@@ -82,21 +82,24 @@ impl BytesSerializable for GetConsumerOffset {
let stream_id_bytes = self.stream_id.to_bytes();
let topic_id_bytes = self.topic_id.to_bytes();
let mut bytes = BytesMut::with_capacity(
- 4 + consumer_bytes.len() + stream_id_bytes.len() +
topic_id_bytes.len(),
+ 5 + consumer_bytes.len() + stream_id_bytes.len() +
topic_id_bytes.len(),
);
bytes.put_slice(&consumer_bytes);
bytes.put_slice(&stream_id_bytes);
bytes.put_slice(&topic_id_bytes);
+ // Encode partition_id with a flag byte: 1 = Some, 0 = None
if let Some(partition_id) = self.partition_id {
+ bytes.put_u8(1);
bytes.put_u32_le(partition_id);
} else {
- bytes.put_u32_le(0);
+ bytes.put_u8(0);
+ bytes.put_u32_le(0); // Padding to keep structure consistent
}
bytes.freeze()
}
fn from_bytes(bytes: Bytes) -> Result<GetConsumerOffset, IggyError> {
- if bytes.len() < 15 {
+ if bytes.len() < 16 {
return Err(IggyError::InvalidCommand);
}
@@ -112,15 +115,17 @@ impl BytesSerializable for GetConsumerOffset {
position += stream_id.get_size_bytes().as_bytes_usize();
let topic_id = Identifier::from_bytes(bytes.slice(position..))?;
position += topic_id.get_size_bytes().as_bytes_usize();
- let partition_id = u32::from_le_bytes(
- bytes[position..position + 4]
+ // Decode partition_id with flag byte: 1 = Some, 0 = None
+ let has_partition_id = bytes[position];
+ let partition_id_value = u32::from_le_bytes(
+ bytes[position + 1..position + 5]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
- let partition_id = if partition_id == 0 {
- None
+ let partition_id = if has_partition_id == 1 {
+ Some(partition_id_value)
} else {
- Some(partition_id)
+ None
};
let command = GetConsumerOffset {
consumer,
@@ -171,13 +176,19 @@ mod tests {
position += stream_id.get_size_bytes().as_bytes_usize();
let topic_id =
Identifier::from_bytes(bytes.slice(position..)).unwrap();
position += topic_id.get_size_bytes().as_bytes_usize();
- let partition_id = u32::from_le_bytes(bytes[position..position +
4].try_into().unwrap());
+ let has_partition_id = bytes[position];
+ let partition_id = u32::from_le_bytes(bytes[position + 1..position +
5].try_into().unwrap());
+ let partition_id = if has_partition_id == 1 {
+ Some(partition_id)
+ } else {
+ None
+ };
assert!(!bytes.is_empty());
assert_eq!(consumer, command.consumer);
assert_eq!(stream_id, command.stream_id);
assert_eq!(topic_id, command.topic_id);
- assert_eq!(Some(partition_id), command.partition_id);
+ assert_eq!(partition_id, command.partition_id);
}
#[test]
@@ -191,11 +202,12 @@ mod tests {
let stream_id_bytes = stream_id.to_bytes();
let topic_id_bytes = topic_id.to_bytes();
let mut bytes = BytesMut::with_capacity(
- 4 + consumer_bytes.len() + stream_id_bytes.len() +
topic_id_bytes.len(),
+ 5 + consumer_bytes.len() + stream_id_bytes.len() +
topic_id_bytes.len(),
);
bytes.put_slice(&consumer_bytes);
bytes.put_slice(&stream_id_bytes);
bytes.put_slice(&topic_id_bytes);
+ bytes.put_u8(1); // Flag: partition_id is Some
bytes.put_u32_le(partition_id);
let command = GetConsumerOffset::from_bytes(bytes.freeze());
diff --git a/core/common/src/commands/consumer_offsets/store_consumer_offset.rs
b/core/common/src/commands/consumer_offsets/store_consumer_offset.rs
index 9500138d1..9f07f3ccf 100644
--- a/core/common/src/commands/consumer_offsets/store_consumer_offset.rs
+++ b/core/common/src/commands/consumer_offsets/store_consumer_offset.rs
@@ -81,22 +81,25 @@ impl BytesSerializable for StoreConsumerOffset {
let stream_id_bytes = self.stream_id.to_bytes();
let topic_id_bytes = self.topic_id.to_bytes();
let mut bytes = BytesMut::with_capacity(
- 12 + consumer_bytes.len() + stream_id_bytes.len() +
topic_id_bytes.len(),
+ 13 + consumer_bytes.len() + stream_id_bytes.len() +
topic_id_bytes.len(),
);
bytes.put_slice(&consumer_bytes);
bytes.put_slice(&stream_id_bytes);
bytes.put_slice(&topic_id_bytes);
+ // Encode partition_id with a flag byte: 1 = Some, 0 = None
if let Some(partition_id) = self.partition_id {
+ bytes.put_u8(1);
bytes.put_u32_le(partition_id);
} else {
- bytes.put_u32_le(0);
+ bytes.put_u8(0);
+ bytes.put_u32_le(0); // Padding to keep structure consistent
}
bytes.put_u64_le(self.offset);
bytes.freeze()
}
fn from_bytes(bytes: Bytes) -> Result<StoreConsumerOffset, IggyError> {
- if bytes.len() < 23 {
+ if bytes.len() < 24 {
return Err(IggyError::InvalidCommand);
}
@@ -112,18 +115,20 @@ impl BytesSerializable for StoreConsumerOffset {
position += stream_id.get_size_bytes().as_bytes_usize();
let topic_id = Identifier::from_bytes(bytes.slice(position..))?;
position += topic_id.get_size_bytes().as_bytes_usize();
- let partition_id = u32::from_le_bytes(
- bytes[position..position + 4]
+ // Decode partition_id with flag byte: 1 = Some, 0 = None
+ let has_partition_id = bytes[position];
+ let partition_id_value = u32::from_le_bytes(
+ bytes[position + 1..position + 5]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
- let partition_id = if partition_id == 0 {
- None
+ let partition_id = if has_partition_id == 1 {
+ Some(partition_id_value)
} else {
- Some(partition_id)
+ None
};
let offset = u64::from_le_bytes(
- bytes[position + 4..position + 12]
+ bytes[position + 5..position + 13]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
@@ -179,14 +184,20 @@ mod tests {
position += stream_id.get_size_bytes().as_bytes_usize();
let topic_id =
Identifier::from_bytes(bytes.slice(position..)).unwrap();
position += topic_id.get_size_bytes().as_bytes_usize();
- let partition_id = u32::from_le_bytes(bytes[position..position +
4].try_into().unwrap());
- let offset = u64::from_le_bytes(bytes[position + 4..position +
12].try_into().unwrap());
+ let has_partition_id = bytes[position];
+ let partition_id = u32::from_le_bytes(bytes[position + 1..position +
5].try_into().unwrap());
+ let partition_id = if has_partition_id == 1 {
+ Some(partition_id)
+ } else {
+ None
+ };
+ let offset = u64::from_le_bytes(bytes[position + 5..position +
13].try_into().unwrap());
assert!(!bytes.is_empty());
assert_eq!(consumer, command.consumer);
assert_eq!(stream_id, command.stream_id);
assert_eq!(topic_id, command.topic_id);
- assert_eq!(Some(partition_id), command.partition_id);
+ assert_eq!(partition_id, command.partition_id);
assert_eq!(offset, command.offset);
}
@@ -202,11 +213,12 @@ mod tests {
let stream_id_bytes = stream_id.to_bytes();
let topic_id_bytes = topic_id.to_bytes();
let mut bytes = BytesMut::with_capacity(
- 12 + consumer_bytes.len() + stream_id_bytes.len() +
topic_id_bytes.len(),
+ 13 + consumer_bytes.len() + stream_id_bytes.len() +
topic_id_bytes.len(),
);
bytes.put_slice(&consumer_bytes);
bytes.put_slice(&stream_id_bytes);
bytes.put_slice(&topic_id_bytes);
+ bytes.put_u8(1); // Flag: partition_id is Some
bytes.put_u32_le(partition_id);
bytes.put_u64_le(offset);
diff --git a/core/common/src/commands/messages/poll_messages.rs
b/core/common/src/commands/messages/poll_messages.rs
index e51b03ef6..dbbca1bb1 100644
--- a/core/common/src/commands/messages/poll_messages.rs
+++ b/core/common/src/commands/messages/poll_messages.rs
@@ -76,7 +76,7 @@ impl PollMessages {
let topic_id_bytes = topic_id.to_bytes();
let strategy_bytes = strategy.to_bytes();
let mut bytes = BytesMut::with_capacity(
- 9 + consumer_bytes.len()
+ 10 + consumer_bytes.len()
+ stream_id_bytes.len()
+ topic_id_bytes.len()
+ strategy_bytes.len(),
@@ -84,10 +84,13 @@ impl PollMessages {
bytes.put_slice(&consumer_bytes);
bytes.put_slice(&stream_id_bytes);
bytes.put_slice(&topic_id_bytes);
+ // Encode partition_id with a flag byte: 1 = Some, 0 = None
if let Some(partition_id) = partition_id {
+ bytes.put_u8(1);
bytes.put_u32_le(partition_id);
} else {
- bytes.put_u32_le(0);
+ bytes.put_u8(0);
+ bytes.put_u32_le(0); // Padding to keep structure consistent
}
bytes.put_slice(&strategy_bytes);
bytes.put_u32_le(count);
@@ -149,7 +152,7 @@ impl BytesSerializable for PollMessages {
}
fn from_bytes(bytes: Bytes) -> Result<Self, IggyError> {
- if bytes.len() < 29 {
+ if bytes.len() < 30 {
return Err(IggyError::InvalidCommand);
}
@@ -165,17 +168,20 @@ impl BytesSerializable for PollMessages {
position += stream_id.get_size_bytes().as_bytes_usize();
let topic_id = Identifier::from_bytes(bytes.slice(position..))?;
position += topic_id.get_size_bytes().as_bytes_usize();
- let partition_id = u32::from_le_bytes(
- bytes[position..position + 4]
+ // Decode partition_id with flag byte: 1 = Some, 0 = None
+ let has_partition_id = bytes[position];
+ let partition_id_value = u32::from_le_bytes(
+ bytes[position + 1..position + 5]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
- let partition_id = match partition_id {
- 0 => None,
- partition_id => Some(partition_id),
+ let partition_id = if has_partition_id == 1 {
+ Some(partition_id_value)
+ } else {
+ None
};
- let polling_kind = PollingKind::from_code(bytes[position + 4])?;
- position += 5;
+ let polling_kind = PollingKind::from_code(bytes[position + 5])?;
+ position += 6;
let value = u64::from_le_bytes(
bytes[position..position + 8]
.try_into()
@@ -254,9 +260,15 @@ mod tests {
position += stream_id.get_size_bytes().as_bytes_usize();
let topic_id =
Identifier::from_bytes(bytes.slice(position..)).unwrap();
position += topic_id.get_size_bytes().as_bytes_usize();
- let partition_id = u32::from_le_bytes(bytes[position..position +
4].try_into().unwrap());
- let polling_kind = PollingKind::from_code(bytes[position +
4]).unwrap();
- position += 5;
+ let has_partition_id = bytes[position];
+ let partition_id = u32::from_le_bytes(bytes[position + 1..position +
5].try_into().unwrap());
+ let partition_id = if has_partition_id == 1 {
+ Some(partition_id)
+ } else {
+ None
+ };
+ let polling_kind = PollingKind::from_code(bytes[position +
5]).unwrap();
+ position += 6;
let value = u64::from_le_bytes(bytes[position..position +
8].try_into().unwrap());
let strategy = PollingStrategy {
kind: polling_kind,
@@ -270,7 +282,7 @@ mod tests {
assert_eq!(consumer, command.consumer);
assert_eq!(stream_id, command.stream_id);
assert_eq!(topic_id, command.topic_id);
- assert_eq!(Some(partition_id), command.partition_id);
+ assert_eq!(partition_id, command.partition_id);
assert_eq!(strategy, command.strategy);
assert_eq!(count, command.count);
assert_eq!(auto_commit, command.auto_commit);
@@ -291,7 +303,7 @@ mod tests {
let topic_id_bytes = topic_id.to_bytes();
let strategy_bytes = strategy.to_bytes();
let mut bytes = BytesMut::with_capacity(
- 9 + consumer_bytes.len()
+ 10 + consumer_bytes.len()
+ stream_id_bytes.len()
+ topic_id_bytes.len()
+ strategy_bytes.len(),
@@ -299,6 +311,7 @@ mod tests {
bytes.put_slice(&consumer_bytes);
bytes.put_slice(&stream_id_bytes);
bytes.put_slice(&topic_id_bytes);
+ bytes.put_u8(1); // Flag: partition_id is Some
bytes.put_u32_le(partition_id);
bytes.put_slice(&strategy_bytes);
bytes.put_u32_le(count);
diff --git a/core/common/src/types/args/mod.rs
b/core/common/src/types/args/mod.rs
index 11f93194e..cc22f9439 100644
--- a/core/common/src/types/args/mod.rs
+++ b/core/common/src/types/args/mod.rs
@@ -408,11 +408,11 @@ impl Default for Args {
quic_max_concurrent_bidi_streams: 10000,
quic_datagram_send_buffer_size: 100000,
quic_initial_mtu: 1200,
- quic_send_window: 100000,
- quic_receive_window: 100000,
+ quic_send_window: 1000000,
+ quic_receive_window: 1000000,
quic_response_buffer_size: 1048576,
quic_keep_alive_interval: 5000,
- quic_max_idle_timeout: 10000,
+ quic_max_idle_timeout: 100000,
quic_validate_certificate: false,
quic_heartbeat_interval: "5s".to_string(),
websocket_server_address: "127.0.0.1:8092".to_string(),
diff --git a/core/integration/tests/sdk/producer/mod.rs
b/core/integration/tests/sdk/producer/mod.rs
index 455fec872..74e194198 100644
--- a/core/integration/tests/sdk/producer/mod.rs
+++ b/core/integration/tests/sdk/producer/mod.rs
@@ -22,7 +22,7 @@ use bytes::Bytes;
use iggy::clients::client::IggyClient;
use iggy::prelude::*;
-const PARTITION_ID: u32 = 1;
+const PARTITION_ID: u32 = 0;
const STREAM_NAME: &str = "test-stream-producer";
const TOPIC_NAME: &str = "test-topic-producer";
const PARTITIONS_COUNT: u32 = 3;
diff --git
a/core/integration/tests/server/scenarios/delete_segments_scenario.rs
b/core/integration/tests/server/scenarios/delete_segments_scenario.rs
index a35c5bf1e..a5959e5b5 100644
--- a/core/integration/tests/server/scenarios/delete_segments_scenario.rs
+++ b/core/integration/tests/server/scenarios/delete_segments_scenario.rs
@@ -20,11 +20,9 @@ use iggy::prelude::*;
use integration::test_server::{ClientFactory, TestServer};
use std::fs::{DirEntry, read_dir};
-const STREAM_ID: u32 = 1;
const STREAM_NAME: &str = "test_stream";
-const TOPIC_ID: u32 = 1;
const TOPIC_NAME: &str = "test_topic";
-const PARTITION_ID: u32 = 1;
+const PARTITION_ID: u32 = 0;
const LOG_EXTENSION: &str = "log";
pub async fn run(client_factory: &dyn ClientFactory, test_server: &TestServer)
{
@@ -36,13 +34,14 @@ pub async fn run(client_factory: &dyn ClientFactory,
test_server: &TestServer) {
.await
.unwrap();
- client.create_stream(STREAM_NAME).await.unwrap();
+ let stream = client.create_stream(STREAM_NAME).await.unwrap();
+ let stream_id = stream.id;
- client
+ let topic = client
.create_topic(
&Identifier::named(STREAM_NAME).unwrap(),
TOPIC_NAME,
- PARTITION_ID,
+ 1,
CompressionAlgorithm::None,
None,
IggyExpiry::NeverExpire,
@@ -50,6 +49,7 @@ pub async fn run(client_factory: &dyn ClientFactory,
test_server: &TestServer) {
)
.await
.unwrap();
+ let topic_id = topic.id;
// Send 5 large messages to create multiple segments
let large_payload = "A".repeat(1024 * 1024);
@@ -79,7 +79,7 @@ pub async fn run(client_factory: &dyn ClientFactory,
test_server: &TestServer) {
// Check initial segment count on filesystem
let data_path = test_server.get_local_data_path();
let partition_path =
-
format!("{data_path}/streams/{STREAM_ID}/topics/{TOPIC_ID}/partitions/{PARTITION_ID}");
+
format!("{data_path}/streams/{stream_id}/topics/{topic_id}/partitions/{PARTITION_ID}");
let initial_segments = get_segment_paths_for_partition(&partition_path);
println!(
diff --git a/core/integration/tests/server/scenarios/message_size_scenario.rs
b/core/integration/tests/server/scenarios/message_size_scenario.rs
index a6430f59a..1eb3bf94d 100644
--- a/core/integration/tests/server/scenarios/message_size_scenario.rs
+++ b/core/integration/tests/server/scenarios/message_size_scenario.rs
@@ -22,8 +22,6 @@ use integration::test_server::{ClientFactory,
assert_clean_system, login_root};
use std::collections::HashMap;
use std::str::FromStr;
-const STREAM_ID: u32 = 1;
-const TOPIC_ID: u32 = 1;
const STREAM_NAME: &str = "test-stream";
const TOPIC_NAME: &str = "test-topic";
const PARTITIONS_COUNT: u32 = 3;
@@ -93,8 +91,8 @@ async fn assert_message_count(client: &IggyClient,
expected_count: u32) {
// 4. Poll messages and validate the count
let polled_messages = client
.poll_messages(
- &STREAM_ID.try_into().unwrap(),
- &TOPIC_ID.try_into().unwrap(),
+ &STREAM_NAME.try_into().unwrap(),
+ &TOPIC_NAME.try_into().unwrap(),
Some(PARTITION_ID),
&Consumer::default(),
&PollingStrategy::offset(0),
@@ -114,7 +112,7 @@ async fn init_system(client: &IggyClient) {
// 2. Create the topic
client
.create_topic(
- &STREAM_ID.try_into().unwrap(),
+ &STREAM_NAME.try_into().unwrap(),
TOPIC_NAME,
PARTITIONS_COUNT,
Default::default(),
@@ -128,7 +126,7 @@ async fn init_system(client: &IggyClient) {
async fn cleanup_system(client: &IggyClient) {
client
- .delete_stream(&STREAM_ID.try_into().unwrap())
+ .delete_stream(&STREAM_NAME.try_into().unwrap())
.await
.unwrap();
}
@@ -179,8 +177,8 @@ async fn send_message_and_check_result(
let send_result = client
.send_messages(
- &STREAM_ID.try_into().unwrap(),
- &TOPIC_ID.try_into().unwrap(),
+ &STREAM_NAME.try_into().unwrap(),
+ &TOPIC_NAME.try_into().unwrap(),
&Partitioning::partition_id(PARTITION_ID),
&mut messages,
)
diff --git
a/core/server/src/binary/handlers/segments/delete_segments_handler.rs
b/core/server/src/binary/handlers/segments/delete_segments_handler.rs
index 42d18859b..129f14177 100644
--- a/core/server/src/binary/handlers/segments/delete_segments_handler.rs
+++ b/core/server/src/binary/handlers/segments/delete_segments_handler.rs
@@ -20,9 +20,12 @@ use crate::binary::command::{BinaryServerCommand,
ServerCommand, ServerCommandHa
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::{handlers::partitions::COMPONENT, sender::SenderKind};
+use crate::shard::namespace::IggyNamespace;
+use crate::shard::transmission::frame::ShardResponse;
+use crate::shard::transmission::message::{ShardMessage, ShardRequest,
ShardRequestPayload, ShardSendRequestResult};
use crate::shard::IggyShard;
-use crate::shard::transmission::event::ShardEvent;
use crate::state::command::EntryCommand;
+use crate::streaming;
use crate::streaming::session::Session;
use anyhow::Result;
use error_set::ErrContext;
@@ -45,31 +48,65 @@ impl ServerCommandHandler for DeleteSegments {
shard: &Rc<IggyShard>,
) -> Result<(), IggyError> {
debug!("session: {session}, command: {self}");
+
let stream_id = self.stream_id.clone();
let topic_id = self.topic_id.clone();
- let partition_id = self.partition_id;
+ let partition_id = self.partition_id as usize;
+ let segments_count = self.segments_count;
- shard
- .delete_segments(
- session,
- &self.stream_id,
- &self.topic_id,
- self.partition_id as usize,
- self.segments_count,
- )
- .await
- .with_error_context(|error| {
- format!(
- "{COMPONENT} (error: {error}) - failed to delete segments
for topic with ID: {topic_id} in stream with ID: {stream_id}, session:
{session}",
- )
- })?;
- let event = ShardEvent::DeletedSegments {
- stream_id: self.stream_id.clone(),
- topic_id: self.topic_id.clone(),
- partition_id: self.partition_id as usize,
- segments_count: self.segments_count,
- };
- shard.broadcast_event_to_all_shards(event).await?;
+ // Ensure authentication and topic exists
+ shard.ensure_authenticated(session)?;
+ shard.ensure_topic_exists(&stream_id, &topic_id)?;
+ shard.ensure_partition_exists(&stream_id, &topic_id, partition_id)?;
+
+ // Get numeric IDs for namespace
+ let numeric_stream_id = shard
+ .streams2
+ .with_stream_by_id(&stream_id,
streaming::streams::helpers::get_stream_id());
+
+ let numeric_topic_id = shard.streams2.with_topic_by_id(
+ &stream_id,
+ &topic_id,
+ streaming::topics::helpers::get_topic_id(),
+ );
+
+ // Route request to the correct shard
+ let namespace = IggyNamespace::new(numeric_stream_id,
numeric_topic_id, partition_id);
+ let payload = ShardRequestPayload::DeleteSegments { segments_count };
+ let request = ShardRequest::new(stream_id.clone(), topic_id.clone(),
partition_id, payload);
+ let message = ShardMessage::Request(request);
+
+ match shard
+ .send_request_to_shard_or_recoil(Some(&namespace), message)
+ .await?
+ {
+ ShardSendRequestResult::Recoil(message) => {
+ if let
ShardMessage::Request(crate::shard::transmission::message::ShardRequest {
+ stream_id,
+ topic_id,
+ partition_id,
+ payload,
+ }) = message
+ && let ShardRequestPayload::DeleteSegments {
segments_count } = payload
+ {
+ shard
+ .delete_segments_base(&stream_id, &topic_id,
partition_id, segments_count)
+ .await
+ .with_error_context(|error| {
+ format!(
+ "{COMPONENT} (error: {error}) - failed to
delete segments for topic with ID: {topic_id} in stream with ID: {stream_id},
session: {session}",
+ )
+ })?;
+ } else {
+ return Err(IggyError::InvalidCommand);
+ }
+ }
+ ShardSendRequestResult::Response(response) => {
+ if !matches!(response, ShardResponse::DeleteSegments) {
+ return Err(IggyError::InvalidCommand);
+ }
+ }
+ }
shard
.state
@@ -83,6 +120,7 @@ impl ServerCommandHandler for DeleteSegments {
"{COMPONENT} (error: {error}) - failed to apply 'delete
segments' command for partition with ID: {partition_id} in topic with ID:
{topic_id} in stream with ID: {stream_id}, session: {session}",
)
})?;
+
sender.send_empty_ok_response().await?;
Ok(())
}
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index 0c64dfe77..bfb04506b 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -388,7 +388,6 @@ impl IggyShard {
|(_, _, _, offset, .., log)| {
*log = loaded_log;
let current_offset =
log.active_segment().end_offset;
- tracing::warn!("loaded current_offset: {}",
current_offset);
offset.store(current_offset,
Ordering::Relaxed);
},
);
@@ -514,6 +513,11 @@ impl IggyShard {
.await?;
Ok(ShardResponse::FlushUnsavedBuffer)
}
+ ShardRequestPayload::DeleteSegments { segments_count } => {
+ self.delete_segments_base(&stream_id, &topic_id, partition_id,
segments_count)
+ .await?;
+ Ok(ShardResponse::DeleteSegments)
+ }
ShardRequestPayload::CreateStream { user_id, name } => {
assert_eq!(self.id, 0, "CreateStream should only be handled by
shard0");
@@ -833,21 +837,6 @@ impl IggyShard {
Ok(())
}
- ShardEvent::DeletedSegments {
- stream_id,
- topic_id,
- partition_id,
- segments_count,
- } => {
- self.delete_segments_bypass_auth(
- &stream_id,
- &topic_id,
- partition_id,
- segments_count,
- )
- .await?;
- Ok(())
- }
ShardEvent::FlushUnsavedBuffer {
stream_id,
topic_id,
diff --git a/core/server/src/shard/system/segments.rs
b/core/server/src/shard/system/segments.rs
index 62d3ef0fa..aa2c4fa51 100644
--- a/core/server/src/shard/system/segments.rs
+++ b/core/server/src/shard/system/segments.rs
@@ -17,7 +17,6 @@ use crate::streaming;
* specific language governing permissions and limitations
* under the License.
*/
-use crate::streaming::session::Session;
use iggy_common::Identifier;
use iggy_common::IggyError;
@@ -82,20 +81,20 @@ impl IggyShard {
let path = msg_writer.path();
drop(msg_writer);
drop(index_writer);
- compio::fs::remove_file(&path).await.map_err(|_| {
- tracing::error!("Failed to delete segment file at path:
{}", path);
+ compio::fs::remove_file(&path).await.map_err(|e| {
+ tracing::error!("Failed to delete segment file at path:
{}, err: {}", path, e);
IggyError::CannotDeleteFile
})?;
} else {
let start_offset = segment.start_offset;
- let path = self.config.system.get_segment_path(
+ let path = self.config.system.get_messages_file_path(
numeric_stream_id,
numeric_topic_id,
partition_id,
start_offset,
);
- compio::fs::remove_file(&path).await.map_err(|_| {
- tracing::error!("Failed to delete segment file at path:
{}", path);
+ compio::fs::remove_file(&path).await.map_err(|e| {
+ tracing::error!("Failed to delete segment file at path:
{}, err: {}", path, e);
IggyError::CannotDeleteFile
})?;
}
@@ -105,19 +104,4 @@ impl IggyShard {
stats.increment_segments_count(1);
Ok(())
}
-
- pub async fn delete_segments(
- &self,
- session: &Session,
- stream_id: &Identifier,
- topic_id: &Identifier,
- partition_id: usize,
- segments_count: u32,
- ) -> Result<(), IggyError> {
- // Assert authentication.
- self.ensure_authenticated(session)?;
- self.ensure_topic_exists(stream_id, topic_id)?;
- self.delete_segments_base(stream_id, topic_id, partition_id,
segments_count)
- .await
- }
}
diff --git a/core/server/src/shard/system/streams.rs
b/core/server/src/shard/system/streams.rs
index 196c1edbb..e2ff1b0b1 100644
--- a/core/server/src/shard/system/streams.rs
+++ b/core/server/src/shard/system/streams.rs
@@ -218,233 +218,4 @@ impl IggyShard {
Ok(())
}
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use crate::configs::server::ServerConfig;
- use crate::shard::ShardInfo;
- use crate::shard::transmission::connector::ShardConnector;
- use crate::slab::streams::Streams;
- use crate::slab::users::Users;
- use crate::state::file::FileState;
- use crate::streaming::persistence::persister::{FilePersister,
PersisterKind};
- use crate::streaming::session::Session;
- use crate::streaming::streams;
- use crate::streaming::users::user::User;
- use crate::streaming::utils::ptr::EternalPtr;
- use crate::versioning::SemanticVersion;
- use dashmap::DashMap;
- use iggy_common::defaults::{DEFAULT_ROOT_PASSWORD, DEFAULT_ROOT_USERNAME};
- use std::net::{Ipv4Addr, SocketAddr};
- use std::rc::Rc;
- use std::sync::Arc;
-
- fn create_test_shard() -> Rc<IggyShard> {
- let tempdir = tempfile::TempDir::new().unwrap();
- let state_path = tempdir.path().join("state.log");
- let config = ServerConfig::default();
-
- let streams = Streams::default();
- let shards_table = Box::new(DashMap::new());
- let shards_table = Box::leak(shards_table);
- let shards_table:
EternalPtr<DashMap<crate::shard::namespace::IggyNamespace, ShardInfo>> =
- shards_table.into();
- let users = Users::new();
- let state = FileState::new(
- &state_path.to_string_lossy(),
- &SemanticVersion::current().unwrap(),
- Arc::new(PersisterKind::File(FilePersister)),
- None,
- );
- let connections = vec![ShardConnector::new(0)];
-
- let builder = IggyShard::builder();
- let shard = builder
- .id(0)
- .streams(streams)
- .shards_table(shards_table)
- .state(state)
- .users(users)
- .connections(connections)
- .config(config)
- .encryptor(None)
- .version(SemanticVersion::current().unwrap())
- .build();
-
- Rc::new(shard)
- }
-
- #[compio::test]
- async fn should_create_and_get_stream_by_id_and_name() {
- let shard = create_test_shard();
-
- let stream_name = "test_stream";
-
- // Initialize root user and session
- let root = User::root(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD);
- let root_id = shard.users.insert(root);
- let session = Session::new(
- 1,
- root_id as u32,
- SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 1234),
- );
- shard
- .permissioner
- .borrow_mut()
- .init(&[&User::root(DEFAULT_ROOT_USERNAME,
DEFAULT_ROOT_PASSWORD)]);
-
- // Create stream
- let stream = shard
- .create_stream2(&session, stream_name.to_string())
- .await
- .unwrap();
-
- let stream_id = stream.id();
- assert_eq!(stream.root().name(), stream_name);
-
- // Verify stream exists by ID
- assert!(
- shard
- .streams2
- .exists(&Identifier::numeric(stream_id as u32).unwrap())
- );
-
- // Verify stream exists by name
- assert!(
- shard
- .streams2
- .exists(&Identifier::from_str_value(stream_name).unwrap())
- );
-
- // Verify we can access stream data by ID
- let retrieved_name = shard.streams2.with_stream_by_id(
- &Identifier::numeric(stream_id as u32).unwrap(),
- streams::helpers::get_stream_name(),
- );
- assert_eq!(retrieved_name, stream_name);
-
- // Verify we can access stream data by name
- let retrieved_id = shard.streams2.with_stream_by_id(
- &Identifier::from_str_value(stream_name).unwrap(),
- streams::helpers::get_stream_id(),
- );
- assert_eq!(retrieved_id, stream_id);
- }
-
- #[compio::test]
- async fn should_update_stream_name() {
- let shard = create_test_shard();
-
- let initial_name = "initial_stream";
- let updated_name = "updated_stream";
-
- // Initialize root user and session
- let root = User::root(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD);
- let root_id = shard.users.insert(root);
- let session = Session::new(
- 1,
- root_id as u32,
- SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 1234),
- );
- shard
- .permissioner
- .borrow_mut()
- .init(&[&User::root(DEFAULT_ROOT_USERNAME,
DEFAULT_ROOT_PASSWORD)]);
-
- // Create stream
- let stream = shard
- .create_stream2(&session, initial_name.to_string())
- .await
- .unwrap();
-
- let stream_id = stream.id();
-
- // Update stream name
- shard
- .update_stream2(
- &session,
- &Identifier::numeric(stream_id as u32).unwrap(),
- updated_name.to_string(),
- )
- .unwrap();
-
- // Verify old name doesn't exist
- assert!(
- !shard
- .streams2
- .exists(&Identifier::from_str_value(initial_name).unwrap())
- );
-
- // Verify new name exists
- assert!(
- shard
- .streams2
- .exists(&Identifier::from_str_value(updated_name).unwrap())
- );
-
- // Verify stream data
- let retrieved_name = shard.streams2.with_stream_by_id(
- &Identifier::numeric(stream_id as u32).unwrap(),
- streams::helpers::get_stream_name(),
- );
- assert_eq!(retrieved_name, updated_name);
- }
-
- #[compio::test]
- async fn should_delete_stream() {
- let shard = create_test_shard();
-
- let stream_name = "to_be_deleted";
-
- // Initialize root user and session
- let root = User::root(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD);
- let root_id = shard.users.insert(root);
- let session = Session::new(
- 1,
- root_id as u32,
- SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 1234),
- );
- shard
- .permissioner
- .borrow_mut()
- .init(&[&User::root(DEFAULT_ROOT_USERNAME,
DEFAULT_ROOT_PASSWORD)]);
-
- // Create stream
- let stream = shard
- .create_stream2(&session, stream_name.to_string())
- .await
- .unwrap();
-
- let stream_id = stream.id();
-
- // Verify stream exists
- assert!(
- shard
- .streams2
- .exists(&Identifier::numeric(stream_id as u32).unwrap())
- );
-
- // Delete stream
- let deleted_stream = shard
- .delete_stream2(&session, &Identifier::numeric(stream_id as
u32).unwrap())
- .await
- .unwrap();
-
- assert_eq!(deleted_stream.id(), stream_id);
- assert_eq!(deleted_stream.root().name(), stream_name);
-
- // Verify stream no longer exists
- assert!(
- !shard
- .streams2
- .exists(&Identifier::numeric(stream_id as u32).unwrap())
- );
- assert!(
- !shard
- .streams2
- .exists(&Identifier::from_str_value(stream_name).unwrap())
- );
- }
-}
+}
\ No newline at end of file
diff --git a/core/server/src/shard/transmission/event.rs
b/core/server/src/shard/transmission/event.rs
index 9e51a3ad1..abc4ea140 100644
--- a/core/server/src/shard/transmission/event.rs
+++ b/core/server/src/shard/transmission/event.rs
@@ -114,12 +114,6 @@ pub enum ShardEvent {
user_id: u32,
name: String,
},
- DeletedSegments {
- stream_id: Identifier,
- topic_id: Identifier,
- partition_id: usize,
- segments_count: u32,
- },
AddressBound {
protocol: TransportProtocol,
address: SocketAddr,
diff --git a/core/server/src/shard/transmission/frame.rs
b/core/server/src/shard/transmission/frame.rs
index 2bbd79b86..c07d95166 100644
--- a/core/server/src/shard/transmission/frame.rs
+++ b/core/server/src/shard/transmission/frame.rs
@@ -32,6 +32,7 @@ pub enum ShardResponse {
PollMessages((IggyPollMetadata, IggyMessagesBatchSet)),
SendMessages,
FlushUnsavedBuffer,
+ DeleteSegments,
Event,
CreateStreamResponse(stream2::Stream),
CreateTopicResponse(topic2::Topic),
diff --git a/core/server/src/shard/transmission/message.rs
b/core/server/src/shard/transmission/message.rs
index a61a45052..ced203b2a 100644
--- a/core/server/src/shard/transmission/message.rs
+++ b/core/server/src/shard/transmission/message.rs
@@ -101,6 +101,9 @@ pub enum ShardRequestPayload {
GetStats {
user_id: u32,
},
+ DeleteSegments {
+ segments_count: u32,
+ },
}
impl From<ShardRequest> for ShardMessage {