This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch rebase_master in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 235a7cc7263cc1d4bd2efed0d6a0847fd17ab344 Author: Grzegorz Koszyk <[email protected]> AuthorDate: Sun Jun 29 22:49:43 2025 +0200 feat(io_uring): fix send/poll (#1930) --- Cargo.lock | 12 ++ core/integration/tests/streaming/get_by_offset.rs | 3 +- .../tests/streaming/get_by_timestamp.rs | 4 +- core/integration/tests/streaming/messages.rs | 6 +- core/integration/tests/streaming/partition.rs | 15 +-- core/integration/tests/streaming/stream.rs | 6 +- core/integration/tests/streaming/topic.rs | 29 ++-- core/integration/tests/streaming/topic_messages.rs | 25 ++-- core/server/Cargo.toml | 2 +- .../handlers/messages/poll_messages_handler.rs | 67 ++++----- .../handlers/messages/send_messages_handler.rs | 64 ++++----- .../handlers/streams/create_stream_handler.rs | 12 +- .../binary/handlers/topics/create_topic_handler.rs | 18 ++- .../binary/handlers/users/login_user_handler.rs | 18 ++- core/server/src/bootstrap.rs | 11 +- core/server/src/configs/displays.rs | 5 +- core/server/src/io/file.rs | 9 +- core/server/src/main.rs | 35 ++--- core/server/src/shard/builder.rs | 2 +- core/server/src/shard/mod.rs | 149 ++++++++++++++++----- core/server/src/shard/system/messages.rs | 4 +- core/server/src/shard/system/streams.rs | 41 ++++-- core/server/src/shard/system/topics.rs | 98 ++++++++++++-- core/server/src/shard/system/users.rs | 15 +++ core/server/src/shard/tasks/messages.rs | 38 ++++++ core/server/src/shard/tasks/mod.rs | 1 + core/server/src/shard/transmission/event.rs | 29 ++-- core/server/src/shard/transmission/frame.rs | 7 +- core/server/src/shard/transmission/message.rs | 40 ++++-- core/server/src/shard/transmission/mod.rs | 2 +- core/server/src/state/file.rs | 6 +- .../src/streaming/segments/indexes/index_reader.rs | 20 ++- .../src/streaming/segments/indexes/index_writer.rs | 18 ++- .../src/streaming/segments/indexes/indexes_mut.rs | 7 +- core/server/src/streaming/segments/messages/mod.rs | 64 ++------- core/server/src/streaming/segments/segment.rs | 3 +- .../streaming/segments/types/messages_batch_mut.rs | 4 + .../src/streaming/segments/writing_messages.rs | 3 +- core/server/src/streaming/streams/storage.rs | 17 +-- core/server/src/streaming/topics/messages.rs | 2 +- core/server/src/streaming/topics/storage.rs | 15 ++- core/server/src/streaming/utils/memory_pool.rs | 3 +- core/server/src/streaming/utils/pooled_buffer.rs | 12 +- core/server/src/tcp/connection_handler.rs | 10 +- core/server/src/tcp/sender.rs | 8 +- core/server/src/tcp/tcp_listener.rs | 23 ++-- core/server/src/tcp/tcp_server.rs | 9 +- core/server/src/tcp/tcp_socket.rs | 15 ++- 48 files changed, 631 insertions(+), 375 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f06db7bf..8a3b9e16 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4875,6 +4875,7 @@ checksum = "3bd0f8bcde87b1949f95338b547543fcab187bc7e7a5024247e359a5e828ba6a" dependencies = [ "auto-const-array", "bytes", + "flume", "fxhash", "io-uring", "libc", @@ -4882,8 +4883,10 @@ dependencies = [ "mio 0.8.11", "monoio-macros", "nix 0.26.4", + "once_cell", "pin-project-lite", "socket2", + "threadpool", "windows-sys 0.48.0", ] @@ -7554,6 +7557,15 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "threadpool" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d050e60b33d41c19108b32cea32164033a9013fe3b46cbd4457559bfbf77afaa" +dependencies = [ + "num_cpus", +] + [[package]] name = "time" version = "0.3.41" diff --git a/core/integration/tests/streaming/get_by_offset.rs b/core/integration/tests/streaming/get_by_offset.rs index df9f8f40..e5cc2a33 100644 --- a/core/integration/tests/streaming/get_by_offset.rs +++ b/core/integration/tests/streaming/get_by_offset.rs @@ -127,8 +127,7 @@ async fn test_get_messages_by_offset( Arc::new(AtomicU64::new(0)), Arc::new(AtomicU32::new(0)), IggyTimestamp::now(), - ) - .await; + ); setup.create_partitions_directory(stream_id, topic_id).await; partition.persist().await.unwrap(); diff --git a/core/integration/tests/streaming/get_by_timestamp.rs b/core/integration/tests/streaming/get_by_timestamp.rs index 93a178aa..7aff4a89 100644 --- a/core/integration/tests/streaming/get_by_timestamp.rs +++ b/core/integration/tests/streaming/get_by_timestamp.rs @@ -18,7 +18,6 @@ use crate::streaming::common::test_setup::TestSetup; use bytes::BytesMut; -use iggy::prelude::Confirmation; use iggy::prelude::*; use server::configs::cache_indexes::CacheIndexesConfig; use server::configs::system::{PartitionConfig, SegmentConfig, SystemConfig}; @@ -129,8 +128,7 @@ async fn test_get_messages_by_timestamp( Arc::new(AtomicU64::new(0)), Arc::new(AtomicU32::new(0)), IggyTimestamp::now(), - ) - .await; + ); setup.create_partitions_directory(stream_id, topic_id).await; partition.persist().await.unwrap(); diff --git a/core/integration/tests/streaming/messages.rs b/core/integration/tests/streaming/messages.rs index 6ab6ca5b..c6363d9a 100644 --- a/core/integration/tests/streaming/messages.rs +++ b/core/integration/tests/streaming/messages.rs @@ -58,8 +58,7 @@ async fn should_persist_messages_and_then_load_them_from_disk() { Arc::new(AtomicU64::new(0)), Arc::new(AtomicU32::new(0)), IggyTimestamp::now(), - ) - .await; + ); let mut messages = Vec::with_capacity(messages_count as usize); let mut appended_messages = Vec::with_capacity(messages_count as usize); @@ -126,8 +125,7 @@ async fn should_persist_messages_and_then_load_them_from_disk() { Arc::new(AtomicU64::new(0)), Arc::new(AtomicU32::new(0)), now, - ) - .await; + ); let partition_state = PartitionState { id: partition.partition_id, created_at: now, diff --git a/core/integration/tests/streaming/partition.rs b/core/integration/tests/streaming/partition.rs index 4fff0d04..6d811e59 100644 --- a/core/integration/tests/streaming/partition.rs +++ b/core/integration/tests/streaming/partition.rs @@ -49,8 +49,7 @@ async fn should_persist_partition_with_segment() { Arc::new(AtomicU64::new(0)), Arc::new(AtomicU32::new(0)), IggyTimestamp::now(), - ) - .await; + ); partition.persist().await.unwrap(); @@ -81,8 +80,7 @@ async fn should_load_existing_partition_from_disk() { Arc::new(AtomicU64::new(0)), Arc::new(AtomicU32::new(0)), IggyTimestamp::now(), - ) - .await; + ); partition.persist().await.unwrap(); assert_persisted_partition(&partition.partition_path, with_segment).await; @@ -101,8 +99,7 @@ async fn should_load_existing_partition_from_disk() { Arc::new(AtomicU64::new(0)), Arc::new(AtomicU32::new(0)), now, - ) - .await; + ); let partition_state = PartitionState { id: partition.partition_id, created_at: now, @@ -151,8 +148,7 @@ async fn should_delete_existing_partition_from_disk() { Arc::new(AtomicU64::new(0)), Arc::new(AtomicU32::new(0)), IggyTimestamp::now(), - ) - .await; + ); partition.persist().await.unwrap(); assert_persisted_partition(&partition.partition_path, with_segment).await; @@ -185,8 +181,7 @@ async fn should_purge_existing_partition_on_disk() { Arc::new(AtomicU64::new(0)), Arc::new(AtomicU32::new(0)), IggyTimestamp::now(), - ) - .await; + ); partition.persist().await.unwrap(); assert_persisted_partition(&partition.partition_path, with_segment).await; let messages = create_messages(); diff --git a/core/integration/tests/streaming/stream.rs b/core/integration/tests/streaming/stream.rs index 5b340750..677e2d32 100644 --- a/core/integration/tests/streaming/stream.rs +++ b/core/integration/tests/streaming/stream.rs @@ -145,10 +145,10 @@ async fn should_purge_existing_stream_on_disk() { let topic = stream .get_topic(&Identifier::numeric(topic_id).unwrap()) .unwrap(); - topic - .append_messages(&Partitioning::partition_id(1), batch) - .await + let partition_id = topic + .calculate_partition_id(&Partitioning::partition_id(1)) .unwrap(); + topic.append_messages(partition_id, batch).await.unwrap(); let (_, loaded_messages) = topic .get_messages( PollingConsumer::Consumer(1, 1), diff --git a/core/integration/tests/streaming/topic.rs b/core/integration/tests/streaming/topic.rs index f75ffe2b..e3d17df2 100644 --- a/core/integration/tests/streaming/topic.rs +++ b/core/integration/tests/streaming/topic.rs @@ -38,7 +38,7 @@ async fn should_persist_topics_with_partitions_directories_and_info_file() { let topic_ids = get_topic_ids(); for topic_id in topic_ids { let name = format!("test-{topic_id}"); - let topic = Topic::create( + let created_topic_info = Topic::create( stream_id, topic_id, &name, @@ -53,8 +53,8 @@ async fn should_persist_topics_with_partitions_directories_and_info_file() { MaxTopicSize::ServerDefault, 1, ) - .await .unwrap(); + let topic = created_topic_info.topic; topic.persist().await.unwrap(); @@ -76,7 +76,7 @@ async fn should_load_existing_topic_from_disk() { let topic_ids = get_topic_ids(); for topic_id in topic_ids { let name = format!("test-{topic_id}"); - let topic = Topic::create( + let created_topic_info = Topic::create( stream_id, topic_id, &name, @@ -91,8 +91,9 @@ async fn should_load_existing_topic_from_disk() { MaxTopicSize::ServerDefault, 1, ) - .await .unwrap(); + let topic = created_topic_info.topic; + topic.persist().await.unwrap(); assert_persisted_topic( &topic.path, @@ -102,7 +103,7 @@ async fn should_load_existing_topic_from_disk() { .await; let created_at = IggyTimestamp::now(); - let mut loaded_topic = Topic::empty( + let created_topic_info = Topic::empty( stream_id, topic_id, &name, @@ -111,8 +112,8 @@ async fn should_load_existing_topic_from_disk() { Arc::new(AtomicU32::new(0)), setup.config.clone(), setup.storage.clone(), - ) - .await; + ); + let mut loaded_topic = created_topic_info.topic; let topic_state = TopicState { id: topic_id, name, @@ -153,7 +154,7 @@ async fn should_delete_existing_topic_from_disk() { let topic_ids = get_topic_ids(); for topic_id in topic_ids { let name = format!("test-{topic_id}"); - let topic = Topic::create( + let created_topic_info = Topic::create( stream_id, topic_id, &name, @@ -168,8 +169,8 @@ async fn should_delete_existing_topic_from_disk() { MaxTopicSize::ServerDefault, 1, ) - .await .unwrap(); + let topic = created_topic_info.topic; topic.persist().await.unwrap(); assert_persisted_topic( &topic.path, @@ -193,7 +194,7 @@ async fn should_purge_existing_topic_on_disk() { let topic_ids = get_topic_ids(); for topic_id in topic_ids { let name = format!("test-{topic_id}"); - let topic = Topic::create( + let created_topic_info = Topic::create( stream_id, topic_id, &name, @@ -208,8 +209,8 @@ async fn should_purge_existing_topic_on_disk() { MaxTopicSize::ServerDefault, 1, ) - .await .unwrap(); + let topic = created_topic_info.topic; topic.persist().await.unwrap(); assert_persisted_topic( &topic.path, @@ -225,10 +226,10 @@ async fn should_purge_existing_topic_on_disk() { .map(|msg| msg.get_size_bytes().as_bytes_u32()) .sum::<u32>(); let batch = IggyMessagesBatchMut::from_messages(&messages, batch_size); - topic - .append_messages(&Partitioning::partition_id(1), batch) - .await + let partition_id = topic + .calculate_partition_id(&Partitioning::partition_id(1)) .unwrap(); + topic.append_messages(partition_id, batch).await.unwrap(); let (_, loaded_messages) = topic .get_messages( PollingConsumer::Consumer(1, 1), diff --git a/core/integration/tests/streaming/topic_messages.rs b/core/integration/tests/streaming/topic_messages.rs index 19de255f..3e5362ac 100644 --- a/core/integration/tests/streaming/topic_messages.rs +++ b/core/integration/tests/streaming/topic_messages.rs @@ -62,7 +62,8 @@ async fn assert_polling_messages() { .map(|m| m.get_size_bytes()) .sum::<IggyByteSize>(); let batch = IggyMessagesBatchMut::from_messages(&messages, batch_size.as_bytes_u32()); - topic.append_messages(&partitioning, batch).await.unwrap(); + let partition_id = topic.calculate_partition_id(&partitioning).unwrap(); + topic.append_messages(partition_id, batch).await.unwrap(); let consumer = PollingConsumer::Consumer(1, partition_id); let (_, polled_messages) = topic @@ -106,10 +107,8 @@ async fn given_key_none_messages_should_be_appended_to_the_next_partition_using_ .expect("Failed to create message")], batch_size.as_bytes_u32(), ); - topic - .append_messages(&partitioning, messages) - .await - .unwrap(); + let partition_id = topic.calculate_partition_id(&partitioning).unwrap(); + topic.append_messages(partition_id, messages).await.unwrap(); } for i in 1..=partitions_count { assert_messages(&topic, i, messages_per_partition_count).await; @@ -135,10 +134,8 @@ async fn given_key_partition_id_messages_should_be_appended_to_the_chosen_partit .expect("Failed to create message")], batch_size.as_bytes_u32(), ); - topic - .append_messages(&partitioning, messages) - .await - .unwrap(); + let partition_id = topic.calculate_partition_id(&partitioning).unwrap(); + topic.append_messages(partition_id, messages).await.unwrap(); } for i in 1..=partitions_count { @@ -168,10 +165,8 @@ async fn given_key_messages_key_messages_should_be_appended_to_the_calculated_pa .expect("Failed to create message")], batch_size.as_bytes_u32(), ); - topic - .append_messages(&partitioning, messages) - .await - .unwrap(); + let partition_id = topic.calculate_partition_id(&partitioning).unwrap(); + topic.append_messages(partition_id, messages).await.unwrap(); } let mut messages_count_per_partition = HashMap::new(); @@ -212,7 +207,7 @@ async fn init_topic(setup: &TestSetup, partitions_count: u32) -> Topic { setup.create_topics_directory(stream_id).await; let id = 2; let name = "test"; - let topic = Topic::create( + let created_topic_info = Topic::create( stream_id, id, name, @@ -227,8 +222,8 @@ async fn init_topic(setup: &TestSetup, partitions_count: u32) -> Topic { MaxTopicSize::ServerDefault, 1, ) - .await .unwrap(); + let topic = created_topic_info.topic; topic.persist().await.unwrap(); topic } diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml index 3b9694f5..8da2f495 100644 --- a/core/server/Cargo.toml +++ b/core/server/Cargo.toml @@ -66,7 +66,7 @@ lending-iterator = "0.1.7" hash32 = "1.0.0" mimalloc = { workspace = true, optional = true } moka = { version = "0.12.10", features = ["future"] } -monoio = { version = "0.2.4", features = ["mkdirat", "unlinkat", "renameat"] } +monoio = { version = "0.2.4", features = ["mkdirat", "unlinkat", "renameat", "sync"] } monoio-native-tls = "0.4.0" nix = { version = "0.30", features = ["fs"] } once_cell = "1.21.3" diff --git a/core/server/src/binary/handlers/messages/poll_messages_handler.rs b/core/server/src/binary/handlers/messages/poll_messages_handler.rs index c590dcf2..3abfb6a1 100644 --- a/core/server/src/binary/handlers/messages/poll_messages_handler.rs +++ b/core/server/src/binary/handlers/messages/poll_messages_handler.rs @@ -21,10 +21,10 @@ use crate::binary::handlers::messages::COMPONENT; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::sender::SenderKind; use crate::shard::namespace::IggyNamespace; +use crate::shard::system::messages::PollingArgs; use crate::shard::transmission::frame::ShardResponse; -use crate::shard::transmission::message::{ShardMessage, ShardRequest}; +use crate::shard::transmission::message::{ShardMessage, ShardRequest, ShardRequestPayload}; use crate::shard::{IggyShard, ShardRequestResult}; -use crate::shard::system::messages::PollingArgs; use crate::streaming::segments::IggyMessagesBatchSet; use crate::streaming::session::Session; use crate::to_iovec; @@ -112,53 +112,44 @@ impl ServerCommandHandler for PollMessages { }; let namespace = IggyNamespace::new(stream.stream_id, topic.topic_id, partition_id); - let request = ShardRequest::PollMessages { - consumer, - partition_id, + let payload = ShardRequestPayload::PollMessages { + consumer: consumer.clone(), args, count, }; + let request = ShardRequest::new(stream.stream_id, topic.topic_id, partition_id, payload); let message = ShardMessage::Request(request); let (metadata, batch) = match shard.send_request_to_shard(&namespace, message).await { - ShardRequestResult::SameShard(message) => { - match message { - ShardMessage::Request(request) => { - match request { - ShardRequest::PollMessages { - consumer, - partition_id, - args, - count, - } => { - topic.get_messages(consumer, partition_id, args.strategy, count).await? - - } - _ => unreachable!( - "Expected a SendMessages request inside of SendMessages handler, impossible state" - ), - } - } - _ => unreachable!( - "Expected a request message inside of an command handler, impossible state" - ), - } - } - ShardRequestResult::Result(result) => { - match result? { - ShardResponse::PollMessages(response) => { - response - } - ShardResponse::ErrorResponse(err) => { - return Err(err); + ShardRequestResult::SameShard(message) => match message { + ShardMessage::Request(request) => match request.payload { + ShardRequestPayload::PollMessages { + consumer, + args, + count, + } => { + topic + .get_messages(consumer, partition_id, args.strategy, count) + .await? } _ => unreachable!( - "Expected a PollMessages response inside of PollMessages handler, impossible state" + "Expected a SendMessages request inside of SendMessages handler, impossible state" ), + }, + _ => unreachable!( + "Expected a request message inside of an command handler, impossible state" + ), + }, + ShardRequestResult::Result(result) => match result? { + ShardResponse::PollMessages(response) => response, + ShardResponse::ErrorResponse(err) => { + return Err(err); } - } + _ => unreachable!( + "Expected a PollMessages response inside of PollMessages handler, impossible state" + ), + }, }; - // Collect all chunks first into a Vec to extend their lifetimes. // This ensures the Bytes (in reality Arc<[u8]>) references from each IggyMessagesBatch stay alive // throughout the async vectored I/O operation, preventing "borrowed value does not live diff --git a/core/server/src/binary/handlers/messages/send_messages_handler.rs b/core/server/src/binary/handlers/messages/send_messages_handler.rs index 61e2c39c..f4497bf6 100644 --- a/core/server/src/binary/handlers/messages/send_messages_handler.rs +++ b/core/server/src/binary/handlers/messages/send_messages_handler.rs @@ -21,7 +21,7 @@ use crate::binary::command::{BinaryServerCommand, ServerCommandHandler}; use crate::binary::sender::SenderKind; use crate::shard::namespace::IggyNamespace; use crate::shard::transmission::frame::ShardResponse; -use crate::shard::transmission::message::{ShardMessage, ShardRequest}; +use crate::shard::transmission::message::{ShardMessage, ShardRequest, ShardRequestPayload}; use crate::shard::{IggyShard, ShardRequestResult}; use crate::streaming::segments::{IggyIndexesMut, IggyMessagesBatchMut}; use crate::streaming::session::Session; @@ -150,12 +150,8 @@ impl ServerCommandHandler for SendMessages { let batch = shard.maybe_encrypt_messages(batch)?; let namespace = IggyNamespace::new(stream.stream_id, topic.topic_id, partition_id); - let request = ShardRequest::SendMessages { - stream_id, - topic_id, - partition_id, - batch, - }; + let payload = ShardRequestPayload::SendMessages { batch }; + let request = ShardRequest::new(stream.stream_id, topic.topic_id, partition_id, payload); let message = ShardMessage::Request(request); // Egh... I don't like those nested match statements, // Technically there is only two `request` types that will ever be dispatched @@ -164,43 +160,31 @@ impl ServerCommandHandler for SendMessages { // how to make this code reusable, in a sense that we will have exactly the same code inside of // `PollMessages` handler, but with different request.... match shard.send_request_to_shard(&namespace, message).await { - ShardRequestResult::SameShard(message) => { - match message { - ShardMessage::Request(request) => { - match request { - ShardRequest::SendMessages { - stream_id, - topic_id, - partition_id, - batch, - } => { - // Just shut up rust analyzer. - let _stream_id = stream_id; - let _topic_id = topic_id; - topic.append_messages(partition_id, batch).await? - } - _ => unreachable!( - "Expected a SendMessages request inside of SendMessages handler, impossible state" - ), + ShardRequestResult::SameShard(message) => match message { + ShardMessage::Request(request) => { + let partition_id = request.partition_id; + match request.payload { + ShardRequestPayload::SendMessages { batch } => { + topic.append_messages(partition_id, batch).await? } + _ => unreachable!( + "Expected a SendMessages request inside of SendMessages handler, impossible state" + ), } - _ => unreachable!( - "Expected a request message inside of an command handler, impossible state" - ), } - } - ShardRequestResult::Result(result) => { - match result? { - ShardResponse::SendMessages => { - () - } - ShardResponse::ErrorResponse(err) => { - return Err(err); - } - _ => unreachable!("Expected a SendMessages response inside of SendMessages handler, impossible state"), + _ => unreachable!( + "Expected a request message inside of an command handler, impossible state" + ), + }, + ShardRequestResult::Result(result) => match result? { + ShardResponse::SendMessages => (), + ShardResponse::ErrorResponse(err) => { + return Err(err); } - - } + _ => unreachable!( + "Expected a SendMessages response inside of SendMessages handler, impossible state" + ), + }, }; sender.send_empty_ok_response().await?; Ok(()) diff --git a/core/server/src/binary/handlers/streams/create_stream_handler.rs b/core/server/src/binary/handlers/streams/create_stream_handler.rs index a00afde7..0e746b76 100644 --- a/core/server/src/binary/handlers/streams/create_stream_handler.rs +++ b/core/server/src/binary/handlers/streams/create_stream_handler.rs @@ -21,6 +21,7 @@ use crate::binary::handlers::utils::receive_and_validate; use crate::binary::mapper; use crate::binary::{handlers::streams::COMPONENT, sender::SenderKind}; use crate::shard::IggyShard; +use crate::shard::transmission::event::ShardEvent; use crate::state::command::EntryCommand; use crate::state::models::CreateStreamWithId; use crate::streaming::session::Session; @@ -46,28 +47,31 @@ impl ServerCommandHandler for CreateStream { ) -> Result<(), IggyError> { debug!("session: {session}, command: {self}"); let stream_id = self.stream_id; - + let name = self.name.clone(); let created_stream_id = shard - .create_stream(session, self.stream_id, &self.name) + .create_stream(session, stream_id, &name) .await .with_error_context(|error| { format!( "{COMPONENT} (error: {error}) - failed to create stream with id: {stream_id:?}, session: {session}" ) })?; + let event = ShardEvent::CreatedStream { stream_id, name }; + // Broadcast the event to all shards. + let _responses = shard.broadcast_event_to_all_shards(event.into()); + let stream = shard.find_stream(session, &created_stream_id) .with_error_context(|error| { format!( "{COMPONENT} (error: {error}) - failed to find created stream with id: {created_stream_id:?}, session: {session}" ) })?; - let stream_id = stream.stream_id; let response = mapper::map_stream(&stream); shard .state .apply(session.get_user_id(), &EntryCommand::CreateStream(CreateStreamWithId { - stream_id, + stream_id: stream.stream_id, command: self })) .await .with_error_context(|error| { diff --git a/core/server/src/binary/handlers/topics/create_topic_handler.rs b/core/server/src/binary/handlers/topics/create_topic_handler.rs index db09863e..6785e4f0 100644 --- a/core/server/src/binary/handlers/topics/create_topic_handler.rs +++ b/core/server/src/binary/handlers/topics/create_topic_handler.rs @@ -21,6 +21,7 @@ use crate::binary::handlers::utils::receive_and_validate; use crate::binary::mapper; use crate::binary::{handlers::topics::COMPONENT, sender::SenderKind}; use crate::shard::IggyShard; +use crate::shard::transmission::event::ShardEvent; use crate::state::command::EntryCommand; use crate::state::models::CreateTopicWithId; use crate::streaming::session::Session; @@ -62,14 +63,27 @@ impl ServerCommandHandler for CreateTopic { .await .with_error_context(|error| format!("{COMPONENT} (error: {error}) - failed to create topic for stream_id: {stream_id}, topic_id: {topic_id:?}" ))?; + let event = ShardEvent::CreatedTopic { + stream_id: stream_id.clone(), + topic_id, + name: self.name.clone(), + partitions_count: self.partitions_count, + message_expiry: self.message_expiry, + compression_algorithm: self.compression_algorithm, + max_topic_size: self.max_topic_size, + replication_factor: self.replication_factor, + }; + // Broadcast the event to all shards. + let _responses = shard.broadcast_event_to_all_shards(event.into()); + let stream = shard - .find_stream(session, &self.stream_id) + .get_stream(&self.stream_id) .with_error_context(|error| { format!( "{COMPONENT} (error: {error}) - failed to get stream for stream_id: {stream_id}" ) })?; - let topic = shard.find_topic(session, &stream, &created_topic_id) + let topic = stream.get_topic(&created_topic_id) .with_error_context(|error| { format!( "{COMPONENT} (error: {error}) - failed to get topic with ID: {created_topic_id} in stream with ID: {stream_id}" diff --git a/core/server/src/binary/handlers/users/login_user_handler.rs b/core/server/src/binary/handlers/users/login_user_handler.rs index 21cbba49..b6b163b6 100644 --- a/core/server/src/binary/handlers/users/login_user_handler.rs +++ b/core/server/src/binary/handlers/users/login_user_handler.rs @@ -23,6 +23,7 @@ use crate::binary::handlers::utils::receive_and_validate; use crate::binary::mapper; use crate::binary::{handlers::users::COMPONENT, sender::SenderKind}; use crate::shard::IggyShard; +use crate::shard::transmission::event::ShardEvent; use crate::streaming::session::Session; use anyhow::Result; use error_set::ErrContext; @@ -39,19 +40,30 @@ impl ServerCommandHandler for LoginUser { async fn handle( self, sender: &mut SenderKind, - length: u32, + _length: u32, session: &Rc<Session>, shard: &Rc<IggyShard>, ) -> Result<(), IggyError> { debug!("session: {session}, command: {self}"); + let LoginUser { + username, password, .. + } = self; let user = shard - .login_user(&self.username, &self.password, Some(session)) + .login_user(&username, &password, Some(session)) .with_error_context(|error| { format!( "{COMPONENT} (error: {error}) - failed to login user with name: {}, session: {session}", - self.username + username ) })?; + let event = ShardEvent::LoginUser { + client_id: session.client_id, + username, + password, + }; + // Broadcast the event to all shards. + let _responses = shard.broadcast_event_to_all_shards(event.into()); + let identity_info = mapper::map_identity_info(user.id); sender.send_ok_response(&identity_info).await?; Ok(()) diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs index ebf8946e..2d9f7eaa 100644 --- a/core/server/src/bootstrap.rs +++ b/core/server/src/bootstrap.rs @@ -16,6 +16,7 @@ use crate::{ streaming::{ persistence::persister::{FilePersister, FileWithSyncPersister, PersisterKind}, users::user::User, + utils::file::overwrite, }, }; use std::{env, fs::remove_dir_all, ops::Range, path::Path, sync::Arc}; @@ -47,6 +48,12 @@ pub async fn create_directories(config: &SystemConfig) -> Result<(), IggyError> if !Path::new(&state_path).exists() && create_dir_all(&state_path).await.is_err() { return Err(IggyError::CannotCreateStateDirectory(state_path)); } + let state_log = config.get_state_messages_file_path(); + if !Path::new(&state_log).exists() { + if let Err(_) = overwrite(&state_log).await { + return Err(IggyError::CannotCreateStateDirectory(state_log)); + } + } let streams_path = config.get_streams_path(); if !Path::new(&streams_path).exists() && create_dir_all(&streams_path).await.is_err() { @@ -68,10 +75,6 @@ pub async fn create_directories(config: &SystemConfig) -> Result<(), IggyError> config.get_system_path() ); Ok(()) - - // TODO: Move this to individual shard level - /* - */ } pub fn create_root_user() -> User { diff --git a/core/server/src/configs/displays.rs b/core/server/src/configs/displays.rs index 51873421..b65804b5 100644 --- a/core/server/src/configs/displays.rs +++ b/core/server/src/configs/displays.rs @@ -290,10 +290,7 @@ impl Display for SegmentConfig { write!( f, "{{ size_bytes: {}, cache_indexes: {}, message_expiry: {}, archive_expired: {} }}", - self.size, - self.cache_indexes, - self.message_expiry, - self.archive_expired, + self.size, self.cache_indexes, self.message_expiry, self.archive_expired, ) } } diff --git a/core/server/src/io/file.rs b/core/server/src/io/file.rs index 61e51023..95129aa8 100644 --- a/core/server/src/io/file.rs +++ b/core/server/src/io/file.rs @@ -94,14 +94,9 @@ impl AsyncWriteRent for IggyFile { (Ok(n), buf) } + // This is bait!!!! async fn writev<T: IoVecBuf>(&mut self, buf_vec: T) -> BufResult<usize, T> { - let slice = match IoVecWrapper::new(buf_vec) { - Ok(slice) => slice, - Err(buf) => return (Ok(0), buf), - }; - - let (result, slice) = self.write(slice).await; - (result, slice.into_inner()) + unimplemented!("writev is not implemented for IggyFile"); } async fn flush(&mut self) -> std::io::Result<()> { diff --git a/core/server/src/main.rs b/core/server/src/main.rs index 436b3829..5875e6bd 100644 --- a/core/server/src/main.rs +++ b/core/server/src/main.rs @@ -26,6 +26,7 @@ use dotenvy::dotenv; use error_set::ErrContext; use figlet_rs::FIGfont; use iggy_common::create_user::CreateUser; +use iggy_common::defaults::DEFAULT_ROOT_USER_ID; use iggy_common::{Aes256GcmEncryptor, EncryptorKind, IggyError}; use server::args::Args; use server::bootstrap::{ @@ -46,6 +47,7 @@ use server::state::command::EntryCommand; use server::state::file::FileState; use server::state::models::CreateUserWithId; use server::state::system::SystemState; +use server::streaming::utils::{MemoryPool, crypto}; use server::versioning::SemanticVersion; use server::{IGGY_ROOT_PASSWORD_ENV, IGGY_ROOT_USERNAME_ENV, map_toggle_str}; use tokio::time::Instant; @@ -117,21 +119,27 @@ fn main() -> Result<(), ServerError> { let mut logging = Logging::new(config.telemetry.clone()); logging.early_init(); + // From this point on, we can use tracing macros to log messages. + logging.late_init(config.system.get_system_path(), &config.system.logging)?; + // TODO: Make this configurable from config as a range // for example this instance of Iggy will use cores from 0..4 let available_cpus = available_parallelism().expect("Failed to get num of cores"); let shards_count = available_cpus.into(); let shards_set = 0..shards_count; let connections = create_shard_connections(shards_set.clone()); + let gate = Arc::new(Gate::new()); + let mut handles = Vec::with_capacity(shards_set.len()); for shard_id in shards_set { - let gate: Arc<Gate<()>> = Arc::new(Gate::new()); let id = shard_id as u16; + let gate = gate.clone(); let connections = connections.clone(); let config = config.clone(); let state_persister = resolve_persister(config.system.state.enforce_fsync); - std::thread::Builder::new() + let handle = std::thread::Builder::new() .name(format!("shard-{id}")) .spawn(move || { + MemoryPool::init_pool(config.system.clone()); monoio::utils::bind_to_cpu_set(Some(shard_id)) .expect(format!("Failed to set CPU affinity for shard-{id}").as_str()); @@ -159,7 +167,6 @@ fn main() -> Result<(), ServerError> { // We can't use std::sync::Once because it doesn't support async. // Trait bound on the closure is FnOnce. - let gate = gate.clone(); // Peak into the state to check if the root user exists. // If it does not exist, create it. gate.with_async::<Result<(), IggyError>>(async |gate_state| { @@ -180,9 +187,7 @@ fn main() -> Result<(), ServerError> { entry .command() .and_then(|command| match command { - EntryCommand::CreateUser(payload) - if payload.command.username - == IGGY_ROOT_USERNAME_ENV && payload.command.password == IGGY_ROOT_PASSWORD_ENV => + EntryCommand::CreateUser(payload) if payload.user_id == DEFAULT_ROOT_USER_ID => { Ok(true) } @@ -238,19 +243,20 @@ fn main() -> Result<(), ServerError> { .into(); //TODO: If one of the shards fails to initialize, we should crash the whole program; - shard.run().await.expect("Failed to run shard"); + if let Err(e) = shard.run().await { + error!("Failed to run shard-{id}: {e}"); + } //TODO: If one of the shards fails to initialize, we should crash the whole program; - shard.assert_init(); - let shard = Rc::new(shard); + //shard.assert_init(); }) }) - .expect(format!("Failed to spawn thread for shard-{id}").as_str()) - .join() - .expect(format!("Failed to join thread for shard-{id}").as_str()); + .expect(format!("Failed to spawn thread for shard-{id}").as_str()); + handles.push(handle); } - // From this point on, we can use tracing macros to log messages. - logging.late_init(config.system.get_system_path(), &config.system.logging)?; + handles.into_iter().for_each(|handle| { + handle.join().expect("Failed to join shard thread"); + }); /* #[cfg(feature = "disable-mimalloc")] @@ -260,7 +266,6 @@ fn main() -> Result<(), ServerError> { #[cfg(not(feature = "disable-mimalloc"))] info!("Using mimalloc allocator"); - MemoryPool::init_pool(config.system.clone()); let system = SharedSystem::new(System::new( config.system.clone(), diff --git a/core/server/src/shard/builder.rs b/core/server/src/shard/builder.rs index a2a3fdfb..beda9ae2 100644 --- a/core/server/src/shard/builder.rs +++ b/core/server/src/shard/builder.rs @@ -113,7 +113,7 @@ impl IggyShardBuilder { version: version, stop_receiver: stop_receiver, stop_sender: stop_sender, - frame_receiver: Cell::new(Some(frame_receiver)), + messages_receiver: Cell::new(Some(frame_receiver)), metrics: Metrics::init(), users: Default::default(), diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs index 7976cca3..cfbd6eac 100644 --- a/core/server/src/shard/mod.rs +++ b/core/server/src/shard/mod.rs @@ -20,13 +20,14 @@ pub mod builder; pub mod gate; pub mod namespace; pub mod system; +pub mod tasks; pub mod transmission; use ahash::{AHashMap, AHashSet, HashMap}; use builder::IggyShardBuilder; use error_set::ErrContext; use futures::future::try_join_all; -use iggy_common::{EncryptorKind, IggyError, UserId}; +use iggy_common::{EncryptorKind, Identifier, IggyError, UserId}; use namespace::IggyNamespace; use std::{ cell::{Cell, RefCell}, @@ -46,23 +47,29 @@ use crate::{ configs::server::ServerConfig, shard::{ system::info::SystemInfo, + tasks::messages::spawn_shard_message_task, transmission::{ + event::ShardEvent, frame::{ShardFrame, ShardResponse}, - message::{ShardEvent, ShardMessage, ShardRequest}, + message::{ShardMessage, ShardRequest, ShardRequestPayload}, }, }, state::{ - file::FileState, system::{StreamState, SystemState, UserState}, StateKind + StateKind, + file::FileState, + system::{StreamState, SystemState, UserState}, }, streaming::{ clients::client_manager::ClientManager, diagnostics::metrics::Metrics, + partitions::partition, personal_access_tokens::personal_access_token::PersonalAccessToken, session::Session, storage::SystemStorage, streams::stream::Stream, users::{permissioner::Permissioner, user::User}, }, + tcp::tcp_server::spawn_tcp_server, versioning::SemanticVersion, }; @@ -90,16 +97,15 @@ impl Shard { .sender .send(ShardFrame::new(message, Some(sender.clone()))); // Apparently sender needs to be cloned, otherwise channel will close... //TODO: Fixme - let response = receiver.recv().await - .map_err(|err| { - error!("Failed to receive response from shard: {err}"); - IggyError::ShardCommunicationError(self.id) - })?; + let response = receiver.recv().await.map_err(|err| { + error!("Failed to receive response from shard: {err}"); + IggyError::ShardCommunicationError(self.id) + })?; Ok(response) } } -struct ShardInfo { +pub struct ShardInfo { id: u16, } @@ -135,7 +141,7 @@ pub struct IggyShard { pub(crate) users: RefCell<HashMap<UserId, User>>, pub(crate) metrics: Metrics, - pub frame_receiver: Cell<Option<Receiver<ShardFrame>>>, + pub messages_receiver: Cell<Option<Receiver<ShardFrame>>>, stop_receiver: StopReceiver, stop_sender: StopSender, } @@ -172,17 +178,16 @@ impl IggyShard { // have the correct statistics when the server starts. //self.get_stats().await?; self.init().await?; - self.assert_init(); + // TODO: Fixme + //self.assert_init(); info!("Initiated shard with ID: {}", self.id); // Create all tasks (tcp listener, http listener, command processor, in the future also the background jobs). - /* - let mut tasks: Vec<Task> = vec![Box::pin(spawn_shard_message_task(shard.clone()))]; + let mut tasks: Vec<Task> = vec![Box::pin(spawn_shard_message_task(self.clone()))]; if self.config.tcp.enabled { tasks.push(Box::pin(spawn_tcp_server(self.clone()))); } let result = try_join_all(tasks).await; result?; - */ Ok(()) } @@ -427,6 +432,7 @@ impl IggyShard { #[instrument(skip_all, name = "trace_shutdown")] pub async fn shutdown(&mut self) -> Result<(), IggyError> { + //TODO: Fixme, impl cooperative shutdown. self.persist_messages().await?; Ok(()) } @@ -449,6 +455,81 @@ impl IggyShard { self.shards.len() as u32 } + pub async fn handle_shard_message(&self, message: ShardMessage) -> Option<ShardResponse> { + match message { + ShardMessage::Request(request) => match self.handle_request(request).await { + Ok(response) => Some(response), + Err(err) => Some(ShardResponse::ErrorResponse(err)), + }, + ShardMessage::Event(event) => match self.handle_event(event).await { + Ok(_) => Some(ShardResponse::Event), + Err(err) => Some(ShardResponse::ErrorResponse(err)), + }, + } + } + + async fn handle_request(&self, request: ShardRequest) -> Result<ShardResponse, IggyError> { + let stream = self.get_stream(&Identifier::numeric(request.stream_id)?)?; + let topic = stream.get_topic(&Identifier::numeric(request.topic_id)?)?; + let partition_id = request.partition_id; + match request.payload { + ShardRequestPayload::SendMessages { batch } => { + topic.append_messages(partition_id, batch).await?; + Ok(ShardResponse::SendMessages) + } + ShardRequestPayload::PollMessages { + args, + consumer, + count, + } => { + let (metadata, batch) = topic + .get_messages(consumer, partition_id, args.strategy, count) + .await?; + Ok(ShardResponse::PollMessages((metadata, batch))) + } + } + } + + async fn handle_event(&self, event: Arc<ShardEvent>) -> Result<(), IggyError> { + match &*event { + ShardEvent::CreatedStream { stream_id, name } => { + self.create_stream_bypass_auth(*stream_id, name).await + } + ShardEvent::CreatedTopic { + stream_id, + topic_id, + name, + partitions_count, + message_expiry, + compression_algorithm, + max_topic_size, + replication_factor, + } => { + self.create_topic_bypass_auth( + stream_id, + *topic_id, + name, + *partitions_count, + *message_expiry, + *compression_algorithm, + *max_topic_size, + *replication_factor, + ) + .await + } + ShardEvent::LoginUser { + client_id, + username, + password, + } => self.login_user_event(*client_id, username, password), + ShardEvent::NewSession { address, transport } => { + let session = self.add_client(address, *transport); + self.add_active_session(session); + Ok(()) + } + } + } + pub async fn send_request_to_shard( &self, namespace: &IggyNamespace, @@ -479,6 +560,25 @@ impl IggyShard { } } + pub fn broadcast_event_to_all_shards(&self, event: Arc<ShardEvent>) -> Vec<ShardResponse> { + self.shards + .iter() + .filter_map(|shard| { + if shard.id != self.id { + Some(shard.connection.clone()) + } else { + None + } + }) + .map(|conn| { + // TODO: Fixme, maybe we should send response_sender + // and propagate errors back. + conn.send(ShardFrame::new(event.clone().into(), None)); + ShardResponse::Event + }) + .collect() + } + fn find_shard(&self, namespace: &IggyNamespace) -> Option<&Shard> { let shards_table = self.shards_table.borrow(); shards_table.get(namespace).map(|shard_info| { @@ -496,27 +596,6 @@ impl IggyShard { self.shards_table.borrow_mut().extend(records); } - pub fn broadcast_event_to_all_shards(&self, client_id: u32, event: ShardEvent) { - self.shards - .iter() - .filter_map(|shard| { - if shard.id != self.id { - Some(shard.connection.clone()) - } else { - None - } - }) - .map(|conn| { - //TODO: Fixme - /* - let message = ShardMessage::Event(event); - conn.send(ShardFrame::new(client_id, message, None)); - */ - () - }) - .collect::<Vec<_>>(); - } - pub fn add_active_session(&self, session: Rc<Session>) { self.active_sessions.borrow_mut().push(session); } diff --git a/core/server/src/shard/system/messages.rs b/core/server/src/shard/system/messages.rs index 76337a1a..36ae234b 100644 --- a/core/server/src/shard/system/messages.rs +++ b/core/server/src/shard/system/messages.rs @@ -25,8 +25,8 @@ use crate::streaming::utils::PooledBuffer; use async_zip::tokio::read::stream; use error_set::ErrContext; use iggy_common::{ - BytesSerializable, Consumer, EncryptorKind, IGGY_MESSAGE_HEADER_SIZE, Identifier, - IggyError, Partitioning, PollingStrategy, + BytesSerializable, Consumer, EncryptorKind, IGGY_MESSAGE_HEADER_SIZE, Identifier, IggyError, + Partitioning, PollingStrategy, }; use tracing::{error, trace}; diff --git a/core/server/src/shard/system/streams.rs b/core/server/src/shard/system/streams.rs index fecec536..47068f11 100644 --- a/core/server/src/shard/system/streams.rs +++ b/core/server/src/shard/system/streams.rs @@ -186,6 +186,19 @@ impl IggyShard { })) } + pub async fn create_stream_bypass_auth( + &self, + stream_id: Option<u32>, + name: &str, + ) -> Result<(), IggyError> { + let stream = self.create_stream_base(stream_id, name)?; + let id = stream.stream_id; + self.streams_ids.borrow_mut().insert(name.to_owned(), id); + self.streams.borrow_mut().insert(id, stream); + self.metrics.increment_streams(1); + Ok(()) + } + pub async fn create_stream( &self, session: &Session, @@ -199,6 +212,25 @@ impl IggyShard { if self.streams_ids.borrow().contains_key(name) { return Err(IggyError::StreamNameAlreadyExists(name.to_owned())); } + let stream = self + .create_stream_base(stream_id, name) + .with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - failed to create stream with name: {name}") + })?; + let id = stream.stream_id; + + stream.persist().await?; + info!("Created stream with ID: {id}, name: '{name}'."); + self.streams_ids.borrow_mut().insert(name.to_owned(), id); + self.streams.borrow_mut().insert(id, stream); + self.metrics.increment_streams(1); + Ok(Identifier::numeric(id)?) + } + + fn create_stream_base(&self, stream_id: Option<u32>, name: &str) -> Result<Stream, IggyError> { + if self.streams_ids.borrow().contains_key(name) { + return Err(IggyError::StreamNameAlreadyExists(name.to_owned())); + } let mut id; if stream_id.is_none() { @@ -222,14 +254,7 @@ impl IggyShard { } let stream = Stream::create(id, name, self.config.system.clone(), self.storage.clone()); - stream.persist().await?; - info!("Created stream with ID: {id}, name: '{name}'."); - self.streams_ids - .borrow_mut() - .insert(name.to_owned(), stream.stream_id); - self.streams.borrow_mut().insert(stream.stream_id, stream); - self.metrics.increment_streams(1); - Ok(Identifier::numeric(id)?) + Ok(stream) } pub async fn update_stream( diff --git a/core/server/src/shard/system/topics.rs b/core/server/src/shard/system/topics.rs index 252dae30..c247cf74 100644 --- a/core/server/src/shard/system/topics.rs +++ b/core/server/src/shard/system/topics.rs @@ -101,6 +101,52 @@ impl IggyShard { Ok(Some(topic)) } + pub async fn create_topic_bypass_auth( + &self, + stream_id: &Identifier, + topic_id: Option<u32>, + name: &str, + partitions_count: u32, + message_expiry: IggyExpiry, + compression_algorithm: CompressionAlgorithm, + max_topic_size: MaxTopicSize, + replication_factor: Option<u8>, + ) -> Result<(), IggyError> { + let (stream_id, topic_id, partition_ids) = self + .create_topic_base( + stream_id, + topic_id, + name, + partitions_count, + message_expiry, + compression_algorithm, + max_topic_size, + replication_factor, + ) + .await?; + // TODO: Figure out a way how to distribute the shards table among different shards, + // without the need to do code from below, everytime we handle a `ShardEvent`. + // I think we shouldn't be sharing it, maintain a single shard table per shard, + // but figure out a way how to distribute it smarter (maybe move the broadcast inside of the `insert_shard_table_records`) method. + let records = partition_ids.into_iter().map(|partition_id| { + let namespace = IggyNamespace::new(stream_id, topic_id, partition_id); + // TODO: This setup isn't deterministic. + // Imagine a scenario where client creates partition using `String` identifiers, + // but then for poll_messages requests uses numeric ones. + // the namespace wouldn't match, therefore we would get miss in the shard table. + let hash = namespace.generate_hash(); + let shard_id = hash % self.get_available_shards_count(); + let shard_info = ShardInfo::new(shard_id as u16); + (namespace, shard_info) + }); + self.insert_shard_table_records(records); + + self.metrics.increment_topics(1); + self.metrics.increment_partitions(partitions_count); + self.metrics.increment_segments(partitions_count); + Ok(()) + } + #[allow(clippy::too_many_arguments)] pub async fn create_topic( &self, @@ -130,26 +176,18 @@ impl IggyShard { })?; } - // TODO: Make create topic sync, and extract the storage persister out of it - // perform disk i/o outside of the borrow_mut of the stream. - let mut stream = self.get_stream_mut(stream_id).with_error_context(|error| { - format!("{COMPONENT} (error: {error}) - failed to get mutable reference to stream with ID: {stream_id}") - })?; - let stream_id = stream.stream_id; - let (topic_id, partition_ids) = stream - .create_topic( + let (stream_id, topic_id, partition_ids) = self + .create_topic_base( + stream_id, topic_id, name, partitions_count, message_expiry, compression_algorithm, max_topic_size, - replication_factor.unwrap_or(1), + replication_factor, ) - .await - .with_error_context(|error| { - format!("{COMPONENT} (error: {error}) - failed to create topic with name: {name} in stream ID: {stream_id}") - })?; + .await?; let records = partition_ids.into_iter().map(|partition_id| { let namespace = IggyNamespace::new(stream_id, topic_id, partition_id); // TODO: This setup isn't deterministic. @@ -170,6 +208,40 @@ impl IggyShard { Ok(Identifier::numeric(topic_id)?) } + async fn create_topic_base( + &self, + stream_id: &Identifier, + topic_id: Option<u32>, + name: &str, + partitions_count: u32, + message_expiry: IggyExpiry, + compression_algorithm: CompressionAlgorithm, + max_topic_size: MaxTopicSize, + replication_factor: Option<u8>, + ) -> Result<(u32, u32, Vec<u32>), IggyError> { + // TODO: Make create topic sync, and extract the storage persister out of it + // perform disk i/o outside of the borrow_mut of the stream. + let mut stream = self.get_stream_mut(stream_id).with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - failed to get mutable reference to stream with ID: {stream_id}") + })?; + let stream_id = stream.stream_id; + let (topic_id, partition_ids) = stream + .create_topic( + topic_id, + name, + partitions_count, + message_expiry, + compression_algorithm, + max_topic_size, + replication_factor.unwrap_or(1), + ) + .await + .with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - failed to create topic with name: {name} in stream ID: {stream_id}") + })?; + Ok((stream_id, topic_id, partition_ids)) + } + #[allow(clippy::too_many_arguments)] pub async fn update_topic( &self, diff --git a/core/server/src/shard/system/users.rs b/core/server/src/shard/system/users.rs index f3b271a7..f0e87fc0 100644 --- a/core/server/src/shard/system/users.rs +++ b/core/server/src/shard/system/users.rs @@ -372,6 +372,21 @@ impl IggyShard { Ok(()) } + pub fn login_user_event( + &self, + client_id: u32, + username: &str, + password: &str, + ) -> Result<(), IggyError> { + let active_sessions = self.active_sessions.borrow(); + let session = active_sessions + .iter() + .find(|s| s.client_id == client_id) + .expect(format!("At this point session for {}, should exist.", client_id).as_str()); + self.login_user_with_credentials(username, Some(password), Some(session))?; + Ok(()) + } + pub fn login_user( &self, username: &str, diff --git a/core/server/src/shard/tasks/messages.rs b/core/server/src/shard/tasks/messages.rs new file mode 100644 index 00000000..87660290 --- /dev/null +++ b/core/server/src/shard/tasks/messages.rs @@ -0,0 +1,38 @@ +use futures::StreamExt; +use iggy_common::IggyError; +use std::rc::Rc; +use tracing::error; + +use crate::shard::{IggyShard, transmission::frame::ShardFrame}; + +async fn run_shard_messages_receiver(shard: Rc<IggyShard>) -> Result<(), IggyError> { + let mut messages_receiver = shard.messages_receiver.take().unwrap(); + loop { + if let Some(frame) = messages_receiver.next().await { + let ShardFrame { + message, + response_sender, + } = frame; + match (shard.handle_shard_message(message).await, response_sender) { + (Some(response), Some(response_sender)) => { + response_sender + .send(response) + .await + .expect("Failed to send response back to origin shard."); + } + _ => {} + }; + } + } +} + +pub async fn spawn_shard_message_task(shard: Rc<IggyShard>) -> Result<(), IggyError> { + monoio::spawn(async move { + let result = run_shard_messages_receiver(shard).await; + if let Err(err) = &result { + error!("Error running shard: {err}"); + } + result + }) + .await +} diff --git a/core/server/src/shard/tasks/mod.rs b/core/server/src/shard/tasks/mod.rs new file mode 100644 index 00000000..ba63992f --- /dev/null +++ b/core/server/src/shard/tasks/mod.rs @@ -0,0 +1 @@ +pub mod messages; diff --git a/core/server/src/shard/transmission/event.rs b/core/server/src/shard/transmission/event.rs index 45815a50..49794c3f 100644 --- a/core/server/src/shard/transmission/event.rs +++ b/core/server/src/shard/transmission/event.rs @@ -4,26 +4,27 @@ use iggy_common::{CompressionAlgorithm, Identifier, IggyExpiry, MaxTopicSize}; use crate::streaming::clients::client_manager::Transport; +#[derive(Debug)] pub enum ShardEvent { CreatedStream { stream_id: Option<u32>, - topic_id: Identifier + name: String, }, //DeletedStream(Identifier), //UpdatedStream(Identifier, String), //PurgedStream(Identifier), //CreatedPartitions(Identifier, Identifier, u32), //DeletedPartitions(Identifier, Identifier, u32), - CreatedTopic( - Identifier, - Option<u32>, - String, - u32, - IggyExpiry, - CompressionAlgorithm, - MaxTopicSize, - Option<u8>, - ), + CreatedTopic { + stream_id: Identifier, + topic_id: Option<u32>, + name: String, + partitions_count: u32, + message_expiry: IggyExpiry, + compression_algorithm: CompressionAlgorithm, + max_topic_size: MaxTopicSize, + replication_factor: Option<u8>, + }, //CreatedConsumerGroup(Identifier, Identifier, Option<u32>, String), //DeletedConsumerGroup(Identifier, Identifier, Identifier), /* @@ -42,6 +43,7 @@ pub enum ShardEvent { //CreatedUser(String, String, UserStatus, Option<Permissions>), //DeletedUser(Identifier), LoginUser { + client_id: u32, username: String, password: String, }, @@ -53,8 +55,7 @@ pub enum ShardEvent { //LoginWithPersonalAccessToken(String), //StoredConsumerOffset(Identifier, Identifier, PollingConsumer, u64), NewSession { - user_id: u32, - socket_addr: SocketAddr, + address: SocketAddr, transport: Transport, }, -} \ No newline at end of file +} diff --git a/core/server/src/shard/transmission/frame.rs b/core/server/src/shard/transmission/frame.rs index ff519ab4..6539ec4c 100644 --- a/core/server/src/shard/transmission/frame.rs +++ b/core/server/src/shard/transmission/frame.rs @@ -18,13 +18,16 @@ use async_channel::Sender; use iggy_common::IggyError; -use crate::{binary::handlers::messages::poll_messages_handler::IggyPollMetadata, shard::transmission::message::ShardMessage, streaming::segments::IggyMessagesBatchSet}; +use crate::{ + binary::handlers::messages::poll_messages_handler::IggyPollMetadata, + shard::transmission::message::ShardMessage, streaming::segments::IggyMessagesBatchSet, +}; #[derive(Debug)] pub enum ShardResponse { PollMessages((IggyPollMetadata, IggyMessagesBatchSet)), SendMessages, - ShardEvent, + Event, ErrorResponse(IggyError), } diff --git a/core/server/src/shard/transmission/message.rs b/core/server/src/shard/transmission/message.rs index 360e7a3c..aa196fde 100644 --- a/core/server/src/shard/transmission/message.rs +++ b/core/server/src/shard/transmission/message.rs @@ -1,4 +1,4 @@ -use std::rc::Rc; +use std::{rc::Rc, sync::Arc}; use iggy_common::PollingStrategy; @@ -19,29 +19,47 @@ use iggy_common::PollingStrategy; * specific language governing permissions and limitations * under the License. */ -use crate::{shard::system::messages::PollingArgs, streaming::{polling_consumer::PollingConsumer, segments::IggyMessagesBatchMut, session::Session}}; +use crate::{ + shard::{system::messages::PollingArgs, transmission::event::ShardEvent}, + streaming::{polling_consumer::PollingConsumer, segments::IggyMessagesBatchMut}, +}; #[derive(Debug)] pub enum ShardMessage { Request(ShardRequest), - Event(ShardEvent), + Event(Arc<ShardEvent>), } #[derive(Debug)] -pub enum ShardEvent { - NewSession(), +pub struct ShardRequest { + pub stream_id: u32, + pub topic_id: u32, + pub partition_id: u32, + pub payload: ShardRequestPayload, } -#[derive(Debug)] -pub enum ShardRequest { - SendMessages { +impl ShardRequest { + pub fn new( stream_id: u32, topic_id: u32, partition_id: u32, + payload: ShardRequestPayload, + ) -> Self { + Self { + stream_id, + topic_id, + partition_id, + payload, + } + } +} + +#[derive(Debug)] +pub enum ShardRequestPayload { + SendMessages { batch: IggyMessagesBatchMut, }, PollMessages { - partition_id: u32, args: PollingArgs, consumer: PollingConsumer, count: u32, @@ -54,8 +72,8 @@ impl From<ShardRequest> for ShardMessage { } } -impl From<ShardEvent> for ShardMessage { - fn from(event: ShardEvent) -> Self { +impl From<Arc<ShardEvent>> for ShardMessage { + fn from(event: Arc<ShardEvent>) -> Self { ShardMessage::Event(event) } } diff --git a/core/server/src/shard/transmission/mod.rs b/core/server/src/shard/transmission/mod.rs index 4ed9bd1e..107bdcbd 100644 --- a/core/server/src/shard/transmission/mod.rs +++ b/core/server/src/shard/transmission/mod.rs @@ -16,7 +16,7 @@ * under the License. */ -pub mod event; pub mod connector; +pub mod event; pub mod frame; pub mod message; diff --git a/core/server/src/state/file.rs b/core/server/src/state/file.rs index b845d877..e8eb6ee6 100644 --- a/core/server/src/state/file.rs +++ b/core/server/src/state/file.rs @@ -117,7 +117,6 @@ impl State for FileState { ) }) .map_err(|_| IggyError::CannotReadFile)?; - let file = IggyFile::new(file); let file_size = file .metadata() .await @@ -129,6 +128,8 @@ impl State for FileState { }) .map_err(|_| IggyError::CannotReadFileMetadata)? .len(); + + let file = IggyFile::new(file); if file_size == 0 { info!("State file is empty"); return Ok(Vec::new()); @@ -150,7 +151,8 @@ impl State for FileState { .with_error_context(|error| format!("{FILE_STATE_PARSE_ERROR} index. {error}")) .map_err(|_| IggyError::InvalidNumberEncoding)?; total_size += 8; - if entries_count > 0 && index != current_index + 1 { + // Greater than one, because one of the entries after a fresh reboot is the default root user. + if entries_count > 1 && index != current_index + 1 { error!( "State file is corrupted, expected index: {}, got: {}", current_index + 1, diff --git a/core/server/src/streaming/segments/indexes/index_reader.rs b/core/server/src/streaming/segments/indexes/index_reader.rs index c373ce0f..240e62a1 100644 --- a/core/server/src/streaming/segments/indexes/index_reader.rs +++ b/core/server/src/streaming/segments/indexes/index_reader.rs @@ -17,7 +17,7 @@ */ use super::IggyIndexesMut; -use crate::streaming::utils::PooledBuffer; +use crate::{io::file::IggyFile, streaming::utils::PooledBuffer}; use bytes::BytesMut; use error_set::ErrContext; use iggy_common::{INDEX_SIZE, IggyError, IggyIndex, IggyIndexView}; @@ -30,15 +30,14 @@ use std::{ atomic::{AtomicU64, Ordering}, }, }; -use tokio::fs::OpenOptions; -use tokio::task::spawn_blocking; +use monoio::fs::OpenOptions; use tracing::{error, trace}; /// A dedicated struct for reading from the index file. #[derive(Debug)] pub struct IndexReader { file_path: String, - file: Arc<StdFile>, + file: IggyFile, index_size_bytes: Arc<AtomicU64>, } @@ -52,13 +51,14 @@ impl IndexReader { .with_error_context(|error| format!("Failed to open index file: {file_path}. {error}")) .map_err(|_| IggyError::CannotReadFile)?; + let file = IggyFile::new(file); trace!( "Opened index file for reading: {file_path}, size: {}", index_size_bytes.load(Ordering::Acquire) ); Ok(Self { file_path: file_path.to_string(), - file: Arc::new(file.into_std().await), + file, index_size_bytes, }) } @@ -334,21 +334,19 @@ impl IndexReader { len: u32, use_pool: bool, ) -> Result<PooledBuffer, std::io::Error> { - let file = self.file.clone(); - spawn_blocking(move || { if use_pool { let mut buf = PooledBuffer::with_capacity(len as usize); unsafe { buf.set_len(len as usize) }; - file.read_exact_at(&mut buf, offset as u64)?; + let (result, buf) = self.file.read_exact_at(buf, offset as u64).await; + result?; Ok(buf) } else { let mut buf = BytesMut::with_capacity(len as usize); unsafe { buf.set_len(len as usize) }; - file.read_exact_at(&mut buf, offset as u64)?; + let (result, buf) = self.file.read_exact_at(buf, offset as u64).await; + result?; Ok(PooledBuffer::from_existing(buf)) } - }) - .await? } /// Gets the nth index from the index file. diff --git a/core/server/src/streaming/segments/indexes/index_writer.rs b/core/server/src/streaming/segments/indexes/index_writer.rs index 71b18b6d..120bfe4e 100644 --- a/core/server/src/streaming/segments/indexes/index_writer.rs +++ b/core/server/src/streaming/segments/indexes/index_writer.rs @@ -19,21 +19,22 @@ use error_set::ErrContext; use iggy_common::INDEX_SIZE; use iggy_common::IggyError; +use monoio::fs::OpenOptions; +use monoio::io::AsyncWriteRentExt; use std::sync::{ Arc, atomic::{AtomicU64, Ordering}, }; -use tokio::{ - fs::{File, OpenOptions}, - io::AsyncWriteExt, -}; use tracing::trace; +use crate::io::file::IggyFile; +use crate::streaming::utils::PooledBuffer; + /// A dedicated struct for writing to the index file. #[derive(Debug)] pub struct IndexWriter { file_path: String, - file: File, + file: IggyFile, index_size_bytes: Arc<AtomicU64>, fsync: bool, } @@ -55,6 +56,7 @@ impl IndexWriter { .with_error_context(|error| format!("Failed to open index file: {file_path}. {error}")) .map_err(|_| IggyError::CannotReadFile)?; + let file = IggyFile::from(file); if file_exists { let _ = file.sync_all().await.with_error_context(|error| { format!("Failed to fsync index file after creation: {file_path}. {error}",) @@ -86,16 +88,18 @@ impl IndexWriter { } /// Appends multiple index buffer to the index file in a single operation. - pub async fn save_indexes(&mut self, indexes: &[u8]) -> Result<(), IggyError> { + pub async fn save_indexes(&mut self, indexes: PooledBuffer) -> Result<(), IggyError> { if indexes.is_empty() { return Ok(()); } let count = indexes.len() / INDEX_SIZE; + let len = indexes.len(); self.file .write_all(indexes) .await + .0 .with_error_context(|error| { format!( "Failed to write {} indexes to file: {}. {error}", @@ -105,7 +109,7 @@ impl IndexWriter { .map_err(|_| IggyError::CannotSaveIndexToSegment)?; self.index_size_bytes - .fetch_add(indexes.len() as u64, Ordering::Release); + .fetch_add(len as u64, Ordering::Release); if self.fsync { let _ = self.fsync().await; diff --git a/core/server/src/streaming/segments/indexes/indexes_mut.rs b/core/server/src/streaming/segments/indexes/indexes_mut.rs index 0dacb907..1b5e8e1e 100644 --- a/core/server/src/streaming/segments/indexes/indexes_mut.rs +++ b/core/server/src/streaming/segments/indexes/indexes_mut.rs @@ -221,9 +221,12 @@ impl IggyIndexesMut { } /// Gets the unsaved part of the index buffer - pub fn unsaved_slice(&self) -> &[u8] { + pub fn unsaved_slice(&self) -> PooledBuffer { let start_pos = self.saved_count as usize * INDEX_SIZE; - &self.buffer[start_pos..] + // TODO: Dunno how to handle this better, maybe we should have a `split` method, + // That splits the underlying Indexes buffer into two parts + // saved on disk and not saved yet. + PooledBuffer::from(&self.buffer[start_pos..]) } /// Mark all indexes as saved to disk diff --git a/core/server/src/streaming/segments/messages/mod.rs b/core/server/src/streaming/segments/messages/mod.rs index 512c3894..b79b1af0 100644 --- a/core/server/src/streaming/segments/messages/mod.rs +++ b/core/server/src/streaming/segments/messages/mod.rs @@ -19,71 +19,35 @@ mod messages_reader; mod messages_writer; -use crate::{io::file::IggyFile, to_iovec}; +use crate::{io::file::IggyFile}; use super::IggyMessagesBatchSet; use error_set::ErrContext; use iggy_common::IggyError; -use monoio::io::AsyncWriteRent; -use std::{io::IoSlice, mem::take}; +use monoio::io::AsyncWriteRentExt; pub use messages_reader::MessagesReader; pub use messages_writer::MessagesWriter; -use nix::libc::iovec; /// Vectored write a batches of messages to file async fn write_batch( file: &mut IggyFile, file_path: &str, - batches: IggyMessagesBatchSet, + mut batches: IggyMessagesBatchSet, ) -> Result<usize, IggyError> { - let mut slices = batches.iter().map(|b| to_iovec(&b)).collect::<Vec<iovec>>(); + //let mut slices = batches.iter().map(|b| to_iovec(&b)).collect::<Vec<iovec>>(); let mut total_written = 0; - - loop { - let (result, vomited) = file.writev(slices).await; - slices = vomited; - let bytes_written = result - .with_error_context(|error| { - format!("Failed to write messages to file: {file_path}, error: {error}",) - }) - .map_err(|_| IggyError::CannotWriteToFile)?; - - total_written += bytes_written; - advance_slices(&mut slices.as_mut_slice(), bytes_written); - if slices.is_empty() { - break; - } + // TODO: Fork monoio, piece of shit runtime. + for batch in batches.iter_mut() { + let messages = batch.take_messages(); + let writen = file.write_all(messages).await.0.with_error_context(|error| { + format!( + "Failed to write messages to file: {file_path}, error: {error}", + ) + // TODO: Better error variant. + }).map_err(|_| IggyError::CannotAppendMessage)?; + total_written += writen; } - Ok(total_written) } - -fn advance_slices(mut bufs: &mut [iovec], n: usize) { - // Number of buffers to remove. - let mut remove = 0; - // Remaining length before reaching n. This prevents overflow - // that could happen if the length of slices in `bufs` were instead - // accumulated. Those slice may be aliased and, if they are large - // enough, their added length may overflow a `usize`. - let mut left = n; - for buf in bufs.iter() { - if let Some(remainder) = left.checked_sub(buf.iov_len as _) { - left = remainder; - remove += 1; - } else { - break; - } - } - - bufs = &mut bufs[remove..]; - if bufs.is_empty() { - assert!(left == 0, "advancing io slices beyond their length"); - } else { - unsafe { - bufs[0].iov_len -= n; - bufs[0].iov_base = bufs[0].iov_base.add(n); - } - } -} diff --git a/core/server/src/streaming/segments/segment.rs b/core/server/src/streaming/segments/segment.rs index 92dd254f..9887df90 100644 --- a/core/server/src/streaming/segments/segment.rs +++ b/core/server/src/streaming/segments/segment.rs @@ -313,7 +313,8 @@ impl Segment { } if let Some(index_writer) = self.index_writer.take() { - tokio::spawn(async move { + //TODO: Fixme not sure whether we should spawn a task here. + monoio::spawn(async move { let _ = index_writer.fsync().await; drop(index_writer) }); diff --git a/core/server/src/streaming/segments/types/messages_batch_mut.rs b/core/server/src/streaming/segments/types/messages_batch_mut.rs index ea549291..6a76b582 100644 --- a/core/server/src/streaming/segments/types/messages_batch_mut.rs +++ b/core/server/src/streaming/segments/types/messages_batch_mut.rs @@ -264,6 +264,10 @@ impl IggyMessagesBatchMut { (indexes, messages) } + pub fn take_messages(&mut self) -> PooledBuffer { + std::mem::take(&mut self.messages) + } + /// Take the indexes from the batch pub fn take_indexes(&mut self) -> IggyIndexesMut { std::mem::take(&mut self.indexes) diff --git a/core/server/src/streaming/segments/writing_messages.rs b/core/server/src/streaming/segments/writing_messages.rs index 9c385b02..8313f589 100644 --- a/core/server/src/streaming/segments/writing_messages.rs +++ b/core/server/src/streaming/segments/writing_messages.rs @@ -99,6 +99,7 @@ impl Segment { self.last_index_position += saved_bytes.as_bytes_u64() as u32; let unsaved_indexes_slice = self.indexes.unsaved_slice(); + let len = unsaved_indexes_slice.len(); self.index_writer .as_mut() .expect("Index writer not initialized") @@ -107,7 +108,7 @@ impl Segment { .with_error_context(|error| { format!( "Failed to save index of {} indexes to {self}. {error}", - unsaved_indexes_slice.len() + len ) })?; diff --git a/core/server/src/streaming/streams/storage.rs b/core/server/src/streaming/streams/storage.rs index ebd4946a..5bf4c554 100644 --- a/core/server/src/streaming/streams/storage.rs +++ b/core/server/src/streaming/streams/storage.rs @@ -20,7 +20,6 @@ use crate::state::system::StreamState; use crate::streaming::storage::StreamStorage; use crate::streaming::streams::COMPONENT; use crate::streaming::streams::stream::Stream; -use crate::streaming::topics::topic::CreatedTopicInfo; use crate::streaming::topics::topic::Topic; use ahash::AHashSet; use error_set::ErrContext; @@ -28,11 +27,11 @@ use futures::future::join_all; use iggy_common::IggyError; use iggy_common::IggyTimestamp; use serde::{Deserialize, Serialize}; +use tokio::sync::Mutex; use std::path::Path; use std::sync::Arc; -use tokio::fs; -use tokio::fs::create_dir_all; -use tokio::sync::Mutex; +use monoio::fs; +use monoio::fs::create_dir_all; use tracing::{error, info, warn}; #[derive(Debug)] @@ -52,13 +51,13 @@ impl StreamStorage for FileStreamStorage { } let mut unloaded_topics = Vec::new(); - let dir_entries = fs::read_dir(&stream.topics_path).await; + let dir_entries = std::fs::read_dir(&stream.topics_path); if dir_entries.is_err() { return Err(IggyError::CannotReadTopics(stream.stream_id)); } let mut dir_entries = dir_entries.unwrap(); - while let Some(dir_entry) = dir_entries.next_entry().await.unwrap_or(None) { + while let Some(dir_entry) = dir_entries.next().transpose().unwrap_or(None) { let name = dir_entry.file_name().into_string().unwrap(); let topic_id = name.parse::<u32>(); if topic_id.is_err() { @@ -73,7 +72,8 @@ impl StreamStorage for FileStreamStorage { error!( "Topic with ID: '{topic_id}' for stream with ID: '{stream_id}' was not found in state, but exists on disk and will be removed." ); - if let Err(error) = fs::remove_dir_all(&dir_entry.path()).await { + // TODO: Replace this with the dir walk impl that is mentioed in main function. + if let Err(error) = std::fs::remove_dir_all(&dir_entry.path()) { error!("Cannot remove topic directory: {error}"); } else { warn!( @@ -151,6 +151,7 @@ impl StreamStorage for FileStreamStorage { } } + // TODO: Refactor.... let loaded_topics = Arc::new(Mutex::new(Vec::new())); let mut load_topics = Vec::new(); for mut topic in unloaded_topics { @@ -222,7 +223,7 @@ impl StreamStorage for FileStreamStorage { async fn delete(&self, stream: &Stream) -> Result<(), IggyError> { info!("Deleting stream with ID: {}...", stream.stream_id); - if fs::remove_dir_all(&stream.path).await.is_err() { + if std::fs::remove_dir_all(&stream.path).is_err() { return Err(IggyError::CannotDeleteStreamDirectory(stream.stream_id)); } info!("Deleted stream with ID: {}.", stream.stream_id); diff --git a/core/server/src/streaming/topics/messages.rs b/core/server/src/streaming/topics/messages.rs index 2f8f0557..f56ed560 100644 --- a/core/server/src/streaming/topics/messages.rs +++ b/core/server/src/streaming/topics/messages.rs @@ -25,8 +25,8 @@ use crate::streaming::utils::hash; use ahash::AHashMap; use error_set::ErrContext; use iggy_common::locking::IggySharedMutFn; -use iggy_common::{IggyTimestamp, PollingStrategy}; use iggy_common::{IggyError, IggyExpiry, Partitioning, PartitioningKind, PollingKind}; +use iggy_common::{IggyTimestamp, PollingStrategy}; use std::sync::atomic::Ordering; use tracing::trace; diff --git a/core/server/src/streaming/topics/storage.rs b/core/server/src/streaming/topics/storage.rs index 827ba6c9..1025dbd3 100644 --- a/core/server/src/streaming/topics/storage.rs +++ b/core/server/src/streaming/topics/storage.rs @@ -29,11 +29,11 @@ use futures::future::join_all; use iggy_common::IggyError; use iggy_common::locking::IggyRwLock; use iggy_common::locking::IggySharedMutFn; +use monoio::fs::create_dir_all; use serde::{Deserialize, Serialize}; use std::path::Path; use std::sync::Arc; -use tokio::fs; -use tokio::fs::create_dir_all; +use monoio::fs; use tokio::sync::Mutex; use tracing::{error, info, warn}; @@ -61,14 +61,14 @@ impl TopicStorage for FileTopicStorage { topic.compression_algorithm = state.compression_algorithm; topic.replication_factor = state.replication_factor.unwrap_or(1); - let mut dir_entries = fs::read_dir(&topic.partitions_path).await + let mut dir_entries = std::fs::read_dir(&topic.partitions_path) .with_context(|| format!("Failed to read partition with ID: {} for stream with ID: {} for topic with ID: {} and path: {}", topic.topic_id, topic.stream_id, topic.topic_id, &topic.partitions_path)) .map_err(|_| IggyError::CannotReadPartitions)?; let mut unloaded_partitions = Vec::new(); - while let Some(dir_entry) = dir_entries.next_entry().await.unwrap_or(None) { - let metadata = dir_entry.metadata().await; + while let Some(dir_entry) = dir_entries.next().transpose().unwrap_or(None) { + let metadata = dir_entry.metadata(); if metadata.is_err() || metadata.unwrap().is_file() { continue; } @@ -88,7 +88,8 @@ impl TopicStorage for FileTopicStorage { error!( "Partition with ID: '{partition_id}' for stream with ID: '{stream_id}' and topic with ID: '{topic_id}' was not found in state, but exists on disk and will be removed." ); - if let Err(error) = fs::remove_dir_all(&dir_entry.path()).await { + // TODO: Replace this with the dir walk impl that is mentioed in main function. + if let Err(error) = std::fs::remove_dir_all(&dir_entry.path()) { error!("Cannot remove partition directory: {error}"); } else { warn!( @@ -269,7 +270,7 @@ impl TopicStorage for FileTopicStorage { async fn delete(&self, topic: &Topic) -> Result<(), IggyError> { info!("Deleting topic {topic}..."); - if fs::remove_dir_all(&topic.path).await.is_err() { + if std::fs::remove_dir_all(&topic.path).is_err() { return Err(IggyError::CannotDeleteTopicDirectory( topic.topic_id, topic.stream_id, diff --git a/core/server/src/streaming/utils/memory_pool.rs b/core/server/src/streaming/utils/memory_pool.rs index 198e8626..16ad2087 100644 --- a/core/server/src/streaming/utils/memory_pool.rs +++ b/core/server/src/streaming/utils/memory_pool.rs @@ -155,7 +155,8 @@ impl MemoryPool { /// Initialize the global pool from the given config. pub fn init_pool(config: Arc<SystemConfig>) { - let is_enabled = config.memory_pool.enabled; + // TODO: Fixme (stary napraw). + let is_enabled = false; let memory_limit = config.memory_pool.size.as_bytes_usize(); let bucket_capacity = config.memory_pool.bucket_capacity as usize; diff --git a/core/server/src/streaming/utils/pooled_buffer.rs b/core/server/src/streaming/utils/pooled_buffer.rs index 0aa1dd3a..a8664325 100644 --- a/core/server/src/streaming/utils/pooled_buffer.rs +++ b/core/server/src/streaming/utils/pooled_buffer.rs @@ -18,7 +18,7 @@ use super::memory_pool::{BytesMutExt, memory_pool}; use bytes::{Buf, BufMut, BytesMut}; -use monoio::buf::IoBufMut; +use monoio::buf::{IoBuf, IoBufMut}; use std::ops::{Deref, DerefMut}; #[derive(Debug)] @@ -230,3 +230,13 @@ unsafe impl IoBufMut for PooledBuffer { unsafe { self.inner.set_init(pos) } } } + +unsafe impl IoBuf for PooledBuffer { + fn read_ptr(&self) -> *const u8 { + self.inner.read_ptr() + } + + fn bytes_init(&self) -> usize { + self.inner.bytes_init() + } +} diff --git a/core/server/src/tcp/connection_handler.rs b/core/server/src/tcp/connection_handler.rs index 9c92d27d..5622d2a8 100644 --- a/core/server/src/tcp/connection_handler.rs +++ b/core/server/src/tcp/connection_handler.rs @@ -35,8 +35,14 @@ pub(crate) async fn handle_connection( sender: &mut SenderKind, shard: &Rc<IggyShard>, ) -> Result<(), ConnectionError> { - let length_buffer = BytesMut::with_capacity(INITIAL_BYTES_LENGTH); - let code_buffer = BytesMut::with_capacity(INITIAL_BYTES_LENGTH); + let mut length_buffer = BytesMut::with_capacity(INITIAL_BYTES_LENGTH); + unsafe { + length_buffer.set_len(INITIAL_BYTES_LENGTH); + } + let mut code_buffer = BytesMut::with_capacity(INITIAL_BYTES_LENGTH); + unsafe { + code_buffer.set_len(INITIAL_BYTES_LENGTH); + } loop { let (read_length, initial_buffer) = match sender.read(length_buffer.clone()).await { (Ok(read_length), initial_buffer) => (read_length, initial_buffer), diff --git a/core/server/src/tcp/sender.rs b/core/server/src/tcp/sender.rs index 2af9f494..2f445d4f 100644 --- a/core/server/src/tcp/sender.rs +++ b/core/server/src/tcp/sender.rs @@ -119,14 +119,14 @@ where status ); let prefix = [ - libc::iovec { - iov_base: length.as_ptr() as _, - iov_len: length.len(), - }, libc::iovec { iov_base: status.as_ptr() as _, iov_len: status.len(), }, + libc::iovec { + iov_base: length.as_ptr() as _, + iov_len: length.len(), + }, ]; slices.splice(0..0, prefix); stream diff --git a/core/server/src/tcp/tcp_listener.rs b/core/server/src/tcp/tcp_listener.rs index ac29e2b9..c0080edf 100644 --- a/core/server/src/tcp/tcp_listener.rs +++ b/core/server/src/tcp/tcp_listener.rs @@ -18,15 +18,16 @@ use crate::binary::sender::SenderKind; use crate::shard::IggyShard; -use crate::shard::transmission::message::ShardEvent; +use crate::shard::transmission::event::ShardEvent; use crate::streaming::clients::client_manager::Transport; use crate::tcp::connection_handler::{handle_connection, handle_error}; use crate::tcp::tcp_socket; +use iggy_common::IggyError; use std::net::SocketAddr; use std::rc::Rc; use tracing::{error, info}; -pub async fn start(server_name: &'static str, shard: Rc<IggyShard>) { +pub async fn start(server_name: &'static str, shard: Rc<IggyShard>) -> Result<(), IggyError> { let ip_v6 = shard.config.tcp.ipv6; let socket_config = &shard.config.tcp.socket; let addr: SocketAddr = shard @@ -40,8 +41,8 @@ pub async fn start(server_name: &'static str, shard: Rc<IggyShard>) { monoio::spawn(async move { socket .bind(&addr.into()) - .expect("Failed to bind eTCP listener"); - socket.listen(1024); + .expect("Failed to bind TCP listener"); + socket.listen(1024).unwrap(); let listener: std::net::TcpListener = socket.into(); let listener = monoio::net::TcpListener::from_std(listener).unwrap(); info!("{server_name} server has started on: {:?}", addr); @@ -50,15 +51,14 @@ pub async fn start(server_name: &'static str, shard: Rc<IggyShard>) { Ok((stream, address)) => { let shard = shard.clone(); info!("Accepted new TCP connection: {address}"); - let session = shard.add_client(&address, Transport::Tcp); + let transport = Transport::Tcp; + let session = shard.add_client(&address, transport); //TODO: Those can be shared with other shards. shard.add_active_session(session.clone()); // Broadcast session to all shards. - //TODO: Fixme - /* - let event = Rc::new(ShardEvent::NewSession(session.clone())); - shard.broadcast_event_to_all_shards(session.client_id, event); - */ + let event = ShardEvent::NewSession { address, transport }; + // TODO: Fixme look inside of broadcast_event_to_all_shards method. + let _responses = shard.broadcast_event_to_all_shards(event.into()); let _client_id = session.client_id; info!("Created new session: {session}"); @@ -85,5 +85,6 @@ pub async fn start(server_name: &'static str, shard: Rc<IggyShard>) { Err(error) => error!("Unable to accept TCP socket. {error}"), } } - }); + }) + .await } diff --git a/core/server/src/tcp/tcp_server.rs b/core/server/src/tcp/tcp_server.rs index 9b8e1c04..0430f773 100644 --- a/core/server/src/tcp/tcp_server.rs +++ b/core/server/src/tcp/tcp_server.rs @@ -24,17 +24,18 @@ use tracing::info; /// Starts the TCP server. /// Returns the address the server is listening on. -pub async fn start(shard: Rc<IggyShard>) -> Result<(), IggyError> { +pub async fn spawn_tcp_server(shard: Rc<IggyShard>) -> Result<(), IggyError> { let server_name = if shard.config.tcp.tls.enabled { "Iggy TCP TLS" } else { "Iggy TCP" }; info!("Initializing {server_name} server..."); - let addr = match shard.config.tcp.tls.enabled { + // TODO: Fixme -- storing addr of the server inside of the config for integration tests... + let result = match shard.config.tcp.tls.enabled { true => unimplemented!("TLS support is not implemented yet"), false => tcp_listener::start(server_name, shard).await, }; - info!("{server_name} server has started on: {:?}", addr); - Ok(()) + //info!("{server_name} server has started on: {:?}", addr); + result } diff --git a/core/server/src/tcp/tcp_socket.rs b/core/server/src/tcp/tcp_socket.rs index 747540fa..7ea3acf0 100644 --- a/core/server/src/tcp/tcp_socket.rs +++ b/core/server/src/tcp/tcp_socket.rs @@ -30,6 +30,15 @@ pub fn build(ipv6: bool, config: &TcpSocketConfig) -> Socket { .expect("Unable to create an ipv4 socket") }; + // Required by the thread-per-core model... + // We create bunch of sockets on different threads, that bind to exactly the same address and port. + socket + .set_reuse_address(true) + .expect("Unable to set SO_REUSEADDR on socket"); + socket + .set_reuse_port(true) + .expect("Unable to set SO_REUSEPORT on socket"); + if config.override_defaults { config .recv_buffer_size @@ -54,12 +63,6 @@ pub fn build(ipv6: bool, config: &TcpSocketConfig) -> Socket { socket .set_linger(Some(config.linger.get_duration())) .expect("Unable to set SO_LINGER on socket"); - socket - .set_reuse_address(true) - .expect("Unable to set SO_REUSEADDR on socket"); - socket - .set_reuse_port(true) - .expect("Unable to set SO_REUSEPORT on socket"); } socket
