This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch rebase_master
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 235a7cc7263cc1d4bd2efed0d6a0847fd17ab344
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Sun Jun 29 22:49:43 2025 +0200

    feat(io_uring): fix send/poll (#1930)
---
 Cargo.lock                                         |  12 ++
 core/integration/tests/streaming/get_by_offset.rs  |   3 +-
 .../tests/streaming/get_by_timestamp.rs            |   4 +-
 core/integration/tests/streaming/messages.rs       |   6 +-
 core/integration/tests/streaming/partition.rs      |  15 +--
 core/integration/tests/streaming/stream.rs         |   6 +-
 core/integration/tests/streaming/topic.rs          |  29 ++--
 core/integration/tests/streaming/topic_messages.rs |  25 ++--
 core/server/Cargo.toml                             |   2 +-
 .../handlers/messages/poll_messages_handler.rs     |  67 ++++-----
 .../handlers/messages/send_messages_handler.rs     |  64 ++++-----
 .../handlers/streams/create_stream_handler.rs      |  12 +-
 .../binary/handlers/topics/create_topic_handler.rs |  18 ++-
 .../binary/handlers/users/login_user_handler.rs    |  18 ++-
 core/server/src/bootstrap.rs                       |  11 +-
 core/server/src/configs/displays.rs                |   5 +-
 core/server/src/io/file.rs                         |   9 +-
 core/server/src/main.rs                            |  35 ++---
 core/server/src/shard/builder.rs                   |   2 +-
 core/server/src/shard/mod.rs                       | 149 ++++++++++++++++-----
 core/server/src/shard/system/messages.rs           |   4 +-
 core/server/src/shard/system/streams.rs            |  41 ++++--
 core/server/src/shard/system/topics.rs             |  98 ++++++++++++--
 core/server/src/shard/system/users.rs              |  15 +++
 core/server/src/shard/tasks/messages.rs            |  38 ++++++
 core/server/src/shard/tasks/mod.rs                 |   1 +
 core/server/src/shard/transmission/event.rs        |  29 ++--
 core/server/src/shard/transmission/frame.rs        |   7 +-
 core/server/src/shard/transmission/message.rs      |  40 ++++--
 core/server/src/shard/transmission/mod.rs          |   2 +-
 core/server/src/state/file.rs                      |   6 +-
 .../src/streaming/segments/indexes/index_reader.rs |  20 ++-
 .../src/streaming/segments/indexes/index_writer.rs |  18 ++-
 .../src/streaming/segments/indexes/indexes_mut.rs  |   7 +-
 core/server/src/streaming/segments/messages/mod.rs |  64 ++-------
 core/server/src/streaming/segments/segment.rs      |   3 +-
 .../streaming/segments/types/messages_batch_mut.rs |   4 +
 .../src/streaming/segments/writing_messages.rs     |   3 +-
 core/server/src/streaming/streams/storage.rs       |  17 +--
 core/server/src/streaming/topics/messages.rs       |   2 +-
 core/server/src/streaming/topics/storage.rs        |  15 ++-
 core/server/src/streaming/utils/memory_pool.rs     |   3 +-
 core/server/src/streaming/utils/pooled_buffer.rs   |  12 +-
 core/server/src/tcp/connection_handler.rs          |  10 +-
 core/server/src/tcp/sender.rs                      |   8 +-
 core/server/src/tcp/tcp_listener.rs                |  23 ++--
 core/server/src/tcp/tcp_server.rs                  |   9 +-
 core/server/src/tcp/tcp_socket.rs                  |  15 ++-
 48 files changed, 631 insertions(+), 375 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index f06db7bf..8a3b9e16 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4875,6 +4875,7 @@ checksum = 
"3bd0f8bcde87b1949f95338b547543fcab187bc7e7a5024247e359a5e828ba6a"
 dependencies = [
  "auto-const-array",
  "bytes",
+ "flume",
  "fxhash",
  "io-uring",
  "libc",
@@ -4882,8 +4883,10 @@ dependencies = [
  "mio 0.8.11",
  "monoio-macros",
  "nix 0.26.4",
+ "once_cell",
  "pin-project-lite",
  "socket2",
+ "threadpool",
  "windows-sys 0.48.0",
 ]
 
@@ -7554,6 +7557,15 @@ dependencies = [
  "cfg-if",
 ]
 
+[[package]]
+name = "threadpool"
+version = "1.8.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "d050e60b33d41c19108b32cea32164033a9013fe3b46cbd4457559bfbf77afaa"
+dependencies = [
+ "num_cpus",
+]
+
 [[package]]
 name = "time"
 version = "0.3.41"
diff --git a/core/integration/tests/streaming/get_by_offset.rs 
b/core/integration/tests/streaming/get_by_offset.rs
index df9f8f40..e5cc2a33 100644
--- a/core/integration/tests/streaming/get_by_offset.rs
+++ b/core/integration/tests/streaming/get_by_offset.rs
@@ -127,8 +127,7 @@ async fn test_get_messages_by_offset(
         Arc::new(AtomicU64::new(0)),
         Arc::new(AtomicU32::new(0)),
         IggyTimestamp::now(),
-    )
-    .await;
+    );
 
     setup.create_partitions_directory(stream_id, topic_id).await;
     partition.persist().await.unwrap();
diff --git a/core/integration/tests/streaming/get_by_timestamp.rs 
b/core/integration/tests/streaming/get_by_timestamp.rs
index 93a178aa..7aff4a89 100644
--- a/core/integration/tests/streaming/get_by_timestamp.rs
+++ b/core/integration/tests/streaming/get_by_timestamp.rs
@@ -18,7 +18,6 @@
 
 use crate::streaming::common::test_setup::TestSetup;
 use bytes::BytesMut;
-use iggy::prelude::Confirmation;
 use iggy::prelude::*;
 use server::configs::cache_indexes::CacheIndexesConfig;
 use server::configs::system::{PartitionConfig, SegmentConfig, SystemConfig};
@@ -129,8 +128,7 @@ async fn test_get_messages_by_timestamp(
         Arc::new(AtomicU64::new(0)),
         Arc::new(AtomicU32::new(0)),
         IggyTimestamp::now(),
-    )
-    .await;
+    );
 
     setup.create_partitions_directory(stream_id, topic_id).await;
     partition.persist().await.unwrap();
diff --git a/core/integration/tests/streaming/messages.rs 
b/core/integration/tests/streaming/messages.rs
index 6ab6ca5b..c6363d9a 100644
--- a/core/integration/tests/streaming/messages.rs
+++ b/core/integration/tests/streaming/messages.rs
@@ -58,8 +58,7 @@ async fn 
should_persist_messages_and_then_load_them_from_disk() {
         Arc::new(AtomicU64::new(0)),
         Arc::new(AtomicU32::new(0)),
         IggyTimestamp::now(),
-    )
-    .await;
+    );
 
     let mut messages = Vec::with_capacity(messages_count as usize);
     let mut appended_messages = Vec::with_capacity(messages_count as usize);
@@ -126,8 +125,7 @@ async fn 
should_persist_messages_and_then_load_them_from_disk() {
         Arc::new(AtomicU64::new(0)),
         Arc::new(AtomicU32::new(0)),
         now,
-    )
-    .await;
+    );
     let partition_state = PartitionState {
         id: partition.partition_id,
         created_at: now,
diff --git a/core/integration/tests/streaming/partition.rs 
b/core/integration/tests/streaming/partition.rs
index 4fff0d04..6d811e59 100644
--- a/core/integration/tests/streaming/partition.rs
+++ b/core/integration/tests/streaming/partition.rs
@@ -49,8 +49,7 @@ async fn should_persist_partition_with_segment() {
             Arc::new(AtomicU64::new(0)),
             Arc::new(AtomicU32::new(0)),
             IggyTimestamp::now(),
-        )
-        .await;
+        );
 
         partition.persist().await.unwrap();
 
@@ -81,8 +80,7 @@ async fn should_load_existing_partition_from_disk() {
             Arc::new(AtomicU64::new(0)),
             Arc::new(AtomicU32::new(0)),
             IggyTimestamp::now(),
-        )
-        .await;
+        );
         partition.persist().await.unwrap();
         assert_persisted_partition(&partition.partition_path, 
with_segment).await;
 
@@ -101,8 +99,7 @@ async fn should_load_existing_partition_from_disk() {
             Arc::new(AtomicU64::new(0)),
             Arc::new(AtomicU32::new(0)),
             now,
-        )
-        .await;
+        );
         let partition_state = PartitionState {
             id: partition.partition_id,
             created_at: now,
@@ -151,8 +148,7 @@ async fn should_delete_existing_partition_from_disk() {
             Arc::new(AtomicU64::new(0)),
             Arc::new(AtomicU32::new(0)),
             IggyTimestamp::now(),
-        )
-        .await;
+        );
         partition.persist().await.unwrap();
         assert_persisted_partition(&partition.partition_path, 
with_segment).await;
 
@@ -185,8 +181,7 @@ async fn should_purge_existing_partition_on_disk() {
             Arc::new(AtomicU64::new(0)),
             Arc::new(AtomicU32::new(0)),
             IggyTimestamp::now(),
-        )
-        .await;
+        );
         partition.persist().await.unwrap();
         assert_persisted_partition(&partition.partition_path, 
with_segment).await;
         let messages = create_messages();
diff --git a/core/integration/tests/streaming/stream.rs 
b/core/integration/tests/streaming/stream.rs
index 5b340750..677e2d32 100644
--- a/core/integration/tests/streaming/stream.rs
+++ b/core/integration/tests/streaming/stream.rs
@@ -145,10 +145,10 @@ async fn should_purge_existing_stream_on_disk() {
         let topic = stream
             .get_topic(&Identifier::numeric(topic_id).unwrap())
             .unwrap();
-        topic
-            .append_messages(&Partitioning::partition_id(1), batch)
-            .await
+        let partition_id = topic
+            .calculate_partition_id(&Partitioning::partition_id(1))
             .unwrap();
+        topic.append_messages(partition_id, batch).await.unwrap();
         let (_, loaded_messages) = topic
             .get_messages(
                 PollingConsumer::Consumer(1, 1),
diff --git a/core/integration/tests/streaming/topic.rs 
b/core/integration/tests/streaming/topic.rs
index f75ffe2b..e3d17df2 100644
--- a/core/integration/tests/streaming/topic.rs
+++ b/core/integration/tests/streaming/topic.rs
@@ -38,7 +38,7 @@ async fn 
should_persist_topics_with_partitions_directories_and_info_file() {
     let topic_ids = get_topic_ids();
     for topic_id in topic_ids {
         let name = format!("test-{topic_id}");
-        let topic = Topic::create(
+        let created_topic_info = Topic::create(
             stream_id,
             topic_id,
             &name,
@@ -53,8 +53,8 @@ async fn 
should_persist_topics_with_partitions_directories_and_info_file() {
             MaxTopicSize::ServerDefault,
             1,
         )
-        .await
         .unwrap();
+        let topic = created_topic_info.topic;
 
         topic.persist().await.unwrap();
 
@@ -76,7 +76,7 @@ async fn should_load_existing_topic_from_disk() {
     let topic_ids = get_topic_ids();
     for topic_id in topic_ids {
         let name = format!("test-{topic_id}");
-        let topic = Topic::create(
+        let created_topic_info = Topic::create(
             stream_id,
             topic_id,
             &name,
@@ -91,8 +91,9 @@ async fn should_load_existing_topic_from_disk() {
             MaxTopicSize::ServerDefault,
             1,
         )
-        .await
         .unwrap();
+        let topic = created_topic_info.topic;
+
         topic.persist().await.unwrap();
         assert_persisted_topic(
             &topic.path,
@@ -102,7 +103,7 @@ async fn should_load_existing_topic_from_disk() {
         .await;
 
         let created_at = IggyTimestamp::now();
-        let mut loaded_topic = Topic::empty(
+        let created_topic_info = Topic::empty(
             stream_id,
             topic_id,
             &name,
@@ -111,8 +112,8 @@ async fn should_load_existing_topic_from_disk() {
             Arc::new(AtomicU32::new(0)),
             setup.config.clone(),
             setup.storage.clone(),
-        )
-        .await;
+        );
+        let mut loaded_topic = created_topic_info.topic;
         let topic_state = TopicState {
             id: topic_id,
             name,
@@ -153,7 +154,7 @@ async fn should_delete_existing_topic_from_disk() {
     let topic_ids = get_topic_ids();
     for topic_id in topic_ids {
         let name = format!("test-{topic_id}");
-        let topic = Topic::create(
+        let created_topic_info = Topic::create(
             stream_id,
             topic_id,
             &name,
@@ -168,8 +169,8 @@ async fn should_delete_existing_topic_from_disk() {
             MaxTopicSize::ServerDefault,
             1,
         )
-        .await
         .unwrap();
+        let topic = created_topic_info.topic;
         topic.persist().await.unwrap();
         assert_persisted_topic(
             &topic.path,
@@ -193,7 +194,7 @@ async fn should_purge_existing_topic_on_disk() {
     let topic_ids = get_topic_ids();
     for topic_id in topic_ids {
         let name = format!("test-{topic_id}");
-        let topic = Topic::create(
+        let created_topic_info = Topic::create(
             stream_id,
             topic_id,
             &name,
@@ -208,8 +209,8 @@ async fn should_purge_existing_topic_on_disk() {
             MaxTopicSize::ServerDefault,
             1,
         )
-        .await
         .unwrap();
+        let topic = created_topic_info.topic;
         topic.persist().await.unwrap();
         assert_persisted_topic(
             &topic.path,
@@ -225,10 +226,10 @@ async fn should_purge_existing_topic_on_disk() {
             .map(|msg| msg.get_size_bytes().as_bytes_u32())
             .sum::<u32>();
         let batch = IggyMessagesBatchMut::from_messages(&messages, batch_size);
-        topic
-            .append_messages(&Partitioning::partition_id(1), batch)
-            .await
+        let partition_id = topic
+            .calculate_partition_id(&Partitioning::partition_id(1))
             .unwrap();
+        topic.append_messages(partition_id, batch).await.unwrap();
         let (_, loaded_messages) = topic
             .get_messages(
                 PollingConsumer::Consumer(1, 1),
diff --git a/core/integration/tests/streaming/topic_messages.rs 
b/core/integration/tests/streaming/topic_messages.rs
index 19de255f..3e5362ac 100644
--- a/core/integration/tests/streaming/topic_messages.rs
+++ b/core/integration/tests/streaming/topic_messages.rs
@@ -62,7 +62,8 @@ async fn assert_polling_messages() {
         .map(|m| m.get_size_bytes())
         .sum::<IggyByteSize>();
     let batch = IggyMessagesBatchMut::from_messages(&messages, 
batch_size.as_bytes_u32());
-    topic.append_messages(&partitioning, batch).await.unwrap();
+    let partition_id = topic.calculate_partition_id(&partitioning).unwrap();
+    topic.append_messages(partition_id, batch).await.unwrap();
 
     let consumer = PollingConsumer::Consumer(1, partition_id);
     let (_, polled_messages) = topic
@@ -106,10 +107,8 @@ async fn 
given_key_none_messages_should_be_appended_to_the_next_partition_using_
                 .expect("Failed to create message")],
             batch_size.as_bytes_u32(),
         );
-        topic
-            .append_messages(&partitioning, messages)
-            .await
-            .unwrap();
+        let partition_id = 
topic.calculate_partition_id(&partitioning).unwrap();
+        topic.append_messages(partition_id, messages).await.unwrap();
     }
     for i in 1..=partitions_count {
         assert_messages(&topic, i, messages_per_partition_count).await;
@@ -135,10 +134,8 @@ async fn 
given_key_partition_id_messages_should_be_appended_to_the_chosen_partit
                 .expect("Failed to create message")],
             batch_size.as_bytes_u32(),
         );
-        topic
-            .append_messages(&partitioning, messages)
-            .await
-            .unwrap();
+        let partition_id = 
topic.calculate_partition_id(&partitioning).unwrap();
+        topic.append_messages(partition_id, messages).await.unwrap();
     }
 
     for i in 1..=partitions_count {
@@ -168,10 +165,8 @@ async fn 
given_key_messages_key_messages_should_be_appended_to_the_calculated_pa
                 .expect("Failed to create message")],
             batch_size.as_bytes_u32(),
         );
-        topic
-            .append_messages(&partitioning, messages)
-            .await
-            .unwrap();
+        let partition_id = 
topic.calculate_partition_id(&partitioning).unwrap();
+        topic.append_messages(partition_id, messages).await.unwrap();
     }
 
     let mut messages_count_per_partition = HashMap::new();
@@ -212,7 +207,7 @@ async fn init_topic(setup: &TestSetup, partitions_count: 
u32) -> Topic {
     setup.create_topics_directory(stream_id).await;
     let id = 2;
     let name = "test";
-    let topic = Topic::create(
+    let created_topic_info = Topic::create(
         stream_id,
         id,
         name,
@@ -227,8 +222,8 @@ async fn init_topic(setup: &TestSetup, partitions_count: 
u32) -> Topic {
         MaxTopicSize::ServerDefault,
         1,
     )
-    .await
     .unwrap();
+    let topic = created_topic_info.topic;
     topic.persist().await.unwrap();
     topic
 }
diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml
index 3b9694f5..8da2f495 100644
--- a/core/server/Cargo.toml
+++ b/core/server/Cargo.toml
@@ -66,7 +66,7 @@ lending-iterator = "0.1.7"
 hash32 = "1.0.0"
 mimalloc = { workspace = true, optional = true }
 moka = { version = "0.12.10", features = ["future"] }
-monoio = { version = "0.2.4", features = ["mkdirat", "unlinkat", "renameat"] }
+monoio = { version = "0.2.4", features = ["mkdirat", "unlinkat", "renameat", 
"sync"] }
 monoio-native-tls = "0.4.0"
 nix = { version = "0.30", features = ["fs"] }
 once_cell = "1.21.3"
diff --git a/core/server/src/binary/handlers/messages/poll_messages_handler.rs 
b/core/server/src/binary/handlers/messages/poll_messages_handler.rs
index c590dcf2..3abfb6a1 100644
--- a/core/server/src/binary/handlers/messages/poll_messages_handler.rs
+++ b/core/server/src/binary/handlers/messages/poll_messages_handler.rs
@@ -21,10 +21,10 @@ use crate::binary::handlers::messages::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::sender::SenderKind;
 use crate::shard::namespace::IggyNamespace;
+use crate::shard::system::messages::PollingArgs;
 use crate::shard::transmission::frame::ShardResponse;
-use crate::shard::transmission::message::{ShardMessage, ShardRequest};
+use crate::shard::transmission::message::{ShardMessage, ShardRequest, 
ShardRequestPayload};
 use crate::shard::{IggyShard, ShardRequestResult};
-use crate::shard::system::messages::PollingArgs;
 use crate::streaming::segments::IggyMessagesBatchSet;
 use crate::streaming::session::Session;
 use crate::to_iovec;
@@ -112,53 +112,44 @@ impl ServerCommandHandler for PollMessages {
         };
 
         let namespace = IggyNamespace::new(stream.stream_id, topic.topic_id, 
partition_id);
-        let request = ShardRequest::PollMessages {
-            consumer,
-            partition_id,
+        let payload = ShardRequestPayload::PollMessages {
+            consumer: consumer.clone(),
             args,
             count,
         };
+        let request = ShardRequest::new(stream.stream_id, topic.topic_id, 
partition_id, payload);
         let message = ShardMessage::Request(request);
         let (metadata, batch) = match shard.send_request_to_shard(&namespace, 
message).await {
-            ShardRequestResult::SameShard(message) => {
-                match message {
-                    ShardMessage::Request(request) => {
-                        match request {
-                            ShardRequest::PollMessages {
-                                consumer,
-                                partition_id,
-                                args,
-                                count,
-                            } => {
-                                topic.get_messages(consumer, partition_id, 
args.strategy, count).await?
-                                
-                            }
-                            _ => unreachable!(
-                                "Expected a SendMessages request inside of 
SendMessages handler, impossible state"
-                            ),
-                        }
-                    }
-                    _ => unreachable!(
-                        "Expected a request message inside of an command 
handler, impossible state"
-                    ),
-                }
-            }
-            ShardRequestResult::Result(result) => {
-                match result? {
-                    ShardResponse::PollMessages(response) => {
-                        response
-                    }
-                    ShardResponse::ErrorResponse(err) => {
-                        return Err(err);
+            ShardRequestResult::SameShard(message) => match message {
+                ShardMessage::Request(request) => match request.payload {
+                    ShardRequestPayload::PollMessages {
+                        consumer,
+                        args,
+                        count,
+                    } => {
+                        topic
+                            .get_messages(consumer, partition_id, 
args.strategy, count)
+                            .await?
                     }
                     _ => unreachable!(
-                        "Expected a PollMessages response inside of 
PollMessages handler, impossible state"
+                        "Expected a SendMessages request inside of 
SendMessages handler, impossible state"
                     ),
+                },
+                _ => unreachable!(
+                    "Expected a request message inside of an command handler, 
impossible state"
+                ),
+            },
+            ShardRequestResult::Result(result) => match result? {
+                ShardResponse::PollMessages(response) => response,
+                ShardResponse::ErrorResponse(err) => {
+                    return Err(err);
                 }
-            }
+                _ => unreachable!(
+                    "Expected a PollMessages response inside of PollMessages 
handler, impossible state"
+                ),
+            },
         };
 
-
         // Collect all chunks first into a Vec to extend their lifetimes.
         // This ensures the Bytes (in reality Arc<[u8]>) references from each 
IggyMessagesBatch stay alive
         // throughout the async vectored I/O operation, preventing "borrowed 
value does not live
diff --git a/core/server/src/binary/handlers/messages/send_messages_handler.rs 
b/core/server/src/binary/handlers/messages/send_messages_handler.rs
index 61e2c39c..f4497bf6 100644
--- a/core/server/src/binary/handlers/messages/send_messages_handler.rs
+++ b/core/server/src/binary/handlers/messages/send_messages_handler.rs
@@ -21,7 +21,7 @@ use crate::binary::command::{BinaryServerCommand, 
ServerCommandHandler};
 use crate::binary::sender::SenderKind;
 use crate::shard::namespace::IggyNamespace;
 use crate::shard::transmission::frame::ShardResponse;
-use crate::shard::transmission::message::{ShardMessage, ShardRequest};
+use crate::shard::transmission::message::{ShardMessage, ShardRequest, 
ShardRequestPayload};
 use crate::shard::{IggyShard, ShardRequestResult};
 use crate::streaming::segments::{IggyIndexesMut, IggyMessagesBatchMut};
 use crate::streaming::session::Session;
@@ -150,12 +150,8 @@ impl ServerCommandHandler for SendMessages {
         let batch = shard.maybe_encrypt_messages(batch)?;
 
         let namespace = IggyNamespace::new(stream.stream_id, topic.topic_id, 
partition_id);
-        let request = ShardRequest::SendMessages {
-            stream_id,
-            topic_id,
-            partition_id,
-            batch,
-        };
+        let payload = ShardRequestPayload::SendMessages { batch };
+        let request = ShardRequest::new(stream.stream_id, topic.topic_id, 
partition_id, payload);
         let message = ShardMessage::Request(request);
         // Egh... I don't like those nested match statements,
         // Technically there is only two `request` types that will ever be 
dispatched
@@ -164,43 +160,31 @@ impl ServerCommandHandler for SendMessages {
         // how to make this code reusable, in a sense that we will have 
exactly the same code inside of
         // `PollMessages` handler, but with different request....
         match shard.send_request_to_shard(&namespace, message).await {
-            ShardRequestResult::SameShard(message) => {
-                match message {
-                    ShardMessage::Request(request) => {
-                        match request {
-                            ShardRequest::SendMessages {
-                                stream_id,
-                                topic_id,
-                                partition_id,
-                                batch,
-                            } => {
-                                // Just shut up rust analyzer.
-                                let _stream_id = stream_id;
-                                let _topic_id = topic_id;
-                                topic.append_messages(partition_id, 
batch).await?
-                            }
-                            _ => unreachable!(
-                                "Expected a SendMessages request inside of 
SendMessages handler, impossible state"
-                            ),
+            ShardRequestResult::SameShard(message) => match message {
+                ShardMessage::Request(request) => {
+                    let partition_id = request.partition_id;
+                    match request.payload {
+                        ShardRequestPayload::SendMessages { batch } => {
+                            topic.append_messages(partition_id, batch).await?
                         }
+                        _ => unreachable!(
+                            "Expected a SendMessages request inside of 
SendMessages handler, impossible state"
+                        ),
                     }
-                    _ => unreachable!(
-                        "Expected a request message inside of an command 
handler, impossible state"
-                    ),
                 }
-            }
-            ShardRequestResult::Result(result) => {
-                match result? {
-                    ShardResponse::SendMessages => {
-                        ()
-                    }
-                    ShardResponse::ErrorResponse(err) => {
-                        return Err(err);
-                    }
-                    _ => unreachable!("Expected a SendMessages response inside 
of SendMessages handler, impossible state"),
+                _ => unreachable!(
+                    "Expected a request message inside of an command handler, 
impossible state"
+                ),
+            },
+            ShardRequestResult::Result(result) => match result? {
+                ShardResponse::SendMessages => (),
+                ShardResponse::ErrorResponse(err) => {
+                    return Err(err);
                 }
-
-            }
+                _ => unreachable!(
+                    "Expected a SendMessages response inside of SendMessages 
handler, impossible state"
+                ),
+            },
         };
         sender.send_empty_ok_response().await?;
         Ok(())
diff --git a/core/server/src/binary/handlers/streams/create_stream_handler.rs 
b/core/server/src/binary/handlers/streams/create_stream_handler.rs
index a00afde7..0e746b76 100644
--- a/core/server/src/binary/handlers/streams/create_stream_handler.rs
+++ b/core/server/src/binary/handlers/streams/create_stream_handler.rs
@@ -21,6 +21,7 @@ use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
 use crate::binary::{handlers::streams::COMPONENT, sender::SenderKind};
 use crate::shard::IggyShard;
+use crate::shard::transmission::event::ShardEvent;
 use crate::state::command::EntryCommand;
 use crate::state::models::CreateStreamWithId;
 use crate::streaming::session::Session;
@@ -46,28 +47,31 @@ impl ServerCommandHandler for CreateStream {
     ) -> Result<(), IggyError> {
         debug!("session: {session}, command: {self}");
         let stream_id = self.stream_id;
-
+        let name = self.name.clone();
         let created_stream_id = shard
-                .create_stream(session, self.stream_id, &self.name)
+                .create_stream(session, stream_id, &name)
                 .await
                 .with_error_context(|error| {
                     format!(
                         "{COMPONENT} (error: {error}) - failed to create 
stream with id: {stream_id:?}, session: {session}"
                     )
                 })?;
+        let event = ShardEvent::CreatedStream { stream_id, name };
+        // Broadcast the event to all shards.
+        let _responses = shard.broadcast_event_to_all_shards(event.into());
+
         let stream = shard.find_stream(session, &created_stream_id)
             .with_error_context(|error| {
                 format!(
                     "{COMPONENT} (error: {error}) - failed to find created 
stream with id: {created_stream_id:?}, session: {session}"
                 )
             })?;
-        let stream_id = stream.stream_id;
         let response = mapper::map_stream(&stream);
 
         shard
             .state
         .apply(session.get_user_id(), 
&EntryCommand::CreateStream(CreateStreamWithId {
-            stream_id,
+            stream_id: stream.stream_id,
             command: self
         }))            .await
             .with_error_context(|error| {
diff --git a/core/server/src/binary/handlers/topics/create_topic_handler.rs 
b/core/server/src/binary/handlers/topics/create_topic_handler.rs
index db09863e..6785e4f0 100644
--- a/core/server/src/binary/handlers/topics/create_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/create_topic_handler.rs
@@ -21,6 +21,7 @@ use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
 use crate::binary::{handlers::topics::COMPONENT, sender::SenderKind};
 use crate::shard::IggyShard;
+use crate::shard::transmission::event::ShardEvent;
 use crate::state::command::EntryCommand;
 use crate::state::models::CreateTopicWithId;
 use crate::streaming::session::Session;
@@ -62,14 +63,27 @@ impl ServerCommandHandler for CreateTopic {
                 .await
                 .with_error_context(|error| format!("{COMPONENT} (error: 
{error}) - failed to create topic for stream_id: {stream_id}, topic_id: 
{topic_id:?}"
                 ))?;
+        let event = ShardEvent::CreatedTopic {
+            stream_id: stream_id.clone(),
+            topic_id,
+            name: self.name.clone(),
+            partitions_count: self.partitions_count,
+            message_expiry: self.message_expiry,
+            compression_algorithm: self.compression_algorithm,
+            max_topic_size: self.max_topic_size,
+            replication_factor: self.replication_factor,
+        };
+        // Broadcast the event to all shards.
+        let _responses = shard.broadcast_event_to_all_shards(event.into());
+
         let stream = shard
-            .find_stream(session, &self.stream_id)
+            .get_stream(&self.stream_id)
             .with_error_context(|error| {
                 format!(
                     "{COMPONENT} (error: {error}) - failed to get stream for 
stream_id: {stream_id}"
                 )
             })?;
-        let topic = shard.find_topic(session, &stream, &created_topic_id)
+        let topic = stream.get_topic(&created_topic_id)
             .with_error_context(|error| {
                 format!(
                     "{COMPONENT} (error: {error}) - failed to get topic with 
ID: {created_topic_id} in stream with ID: {stream_id}"
diff --git a/core/server/src/binary/handlers/users/login_user_handler.rs 
b/core/server/src/binary/handlers/users/login_user_handler.rs
index 21cbba49..b6b163b6 100644
--- a/core/server/src/binary/handlers/users/login_user_handler.rs
+++ b/core/server/src/binary/handlers/users/login_user_handler.rs
@@ -23,6 +23,7 @@ use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
 use crate::binary::{handlers::users::COMPONENT, sender::SenderKind};
 use crate::shard::IggyShard;
+use crate::shard::transmission::event::ShardEvent;
 use crate::streaming::session::Session;
 use anyhow::Result;
 use error_set::ErrContext;
@@ -39,19 +40,30 @@ impl ServerCommandHandler for LoginUser {
     async fn handle(
         self,
         sender: &mut SenderKind,
-        length: u32,
+        _length: u32,
         session: &Rc<Session>,
         shard: &Rc<IggyShard>,
     ) -> Result<(), IggyError> {
         debug!("session: {session}, command: {self}");
+        let LoginUser {
+            username, password, ..
+        } = self;
         let user = shard
-            .login_user(&self.username, &self.password, Some(session))
+            .login_user(&username, &password, Some(session))
             .with_error_context(|error| {
                 format!(
                     "{COMPONENT} (error: {error}) - failed to login user with 
name: {}, session: {session}",
-                    self.username
+                    username
                 )
             })?;
+        let event = ShardEvent::LoginUser {
+            client_id: session.client_id,
+            username,
+            password,
+        };
+        // Broadcast the event to all shards.
+        let _responses = shard.broadcast_event_to_all_shards(event.into());
+
         let identity_info = mapper::map_identity_info(user.id);
         sender.send_ok_response(&identity_info).await?;
         Ok(())
diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs
index ebf8946e..2d9f7eaa 100644
--- a/core/server/src/bootstrap.rs
+++ b/core/server/src/bootstrap.rs
@@ -16,6 +16,7 @@ use crate::{
     streaming::{
         persistence::persister::{FilePersister, FileWithSyncPersister, 
PersisterKind},
         users::user::User,
+        utils::file::overwrite,
     },
 };
 use std::{env, fs::remove_dir_all, ops::Range, path::Path, sync::Arc};
@@ -47,6 +48,12 @@ pub async fn create_directories(config: &SystemConfig) -> 
Result<(), IggyError>
     if !Path::new(&state_path).exists() && 
create_dir_all(&state_path).await.is_err() {
         return Err(IggyError::CannotCreateStateDirectory(state_path));
     }
+    let state_log = config.get_state_messages_file_path();
+    if !Path::new(&state_log).exists() {
+        if let Err(_) = overwrite(&state_log).await {
+            return Err(IggyError::CannotCreateStateDirectory(state_log));
+        }
+    }
 
     let streams_path = config.get_streams_path();
     if !Path::new(&streams_path).exists() && 
create_dir_all(&streams_path).await.is_err() {
@@ -68,10 +75,6 @@ pub async fn create_directories(config: &SystemConfig) -> 
Result<(), IggyError>
         config.get_system_path()
     );
     Ok(())
-
-    // TODO: Move this to individual shard level
-    /*
-     */
 }
 
 pub fn create_root_user() -> User {
diff --git a/core/server/src/configs/displays.rs 
b/core/server/src/configs/displays.rs
index 51873421..b65804b5 100644
--- a/core/server/src/configs/displays.rs
+++ b/core/server/src/configs/displays.rs
@@ -290,10 +290,7 @@ impl Display for SegmentConfig {
         write!(
             f,
             "{{ size_bytes: {}, cache_indexes: {}, message_expiry: {}, 
archive_expired: {} }}",
-            self.size,
-            self.cache_indexes,
-            self.message_expiry,
-            self.archive_expired,
+            self.size, self.cache_indexes, self.message_expiry, 
self.archive_expired,
         )
     }
 }
diff --git a/core/server/src/io/file.rs b/core/server/src/io/file.rs
index 61e51023..95129aa8 100644
--- a/core/server/src/io/file.rs
+++ b/core/server/src/io/file.rs
@@ -94,14 +94,9 @@ impl AsyncWriteRent for IggyFile {
         (Ok(n), buf)
     }
 
+    // This is bait!!!!
     async fn writev<T: IoVecBuf>(&mut self, buf_vec: T) -> BufResult<usize, T> 
{
-        let slice = match IoVecWrapper::new(buf_vec) {
-            Ok(slice) => slice,
-            Err(buf) => return (Ok(0), buf),
-        };
-
-        let (result, slice) = self.write(slice).await;
-        (result, slice.into_inner())
+        unimplemented!("writev is not implemented for IggyFile");
     }
 
     async fn flush(&mut self) -> std::io::Result<()> {
diff --git a/core/server/src/main.rs b/core/server/src/main.rs
index 436b3829..5875e6bd 100644
--- a/core/server/src/main.rs
+++ b/core/server/src/main.rs
@@ -26,6 +26,7 @@ use dotenvy::dotenv;
 use error_set::ErrContext;
 use figlet_rs::FIGfont;
 use iggy_common::create_user::CreateUser;
+use iggy_common::defaults::DEFAULT_ROOT_USER_ID;
 use iggy_common::{Aes256GcmEncryptor, EncryptorKind, IggyError};
 use server::args::Args;
 use server::bootstrap::{
@@ -46,6 +47,7 @@ use server::state::command::EntryCommand;
 use server::state::file::FileState;
 use server::state::models::CreateUserWithId;
 use server::state::system::SystemState;
+use server::streaming::utils::{MemoryPool, crypto};
 use server::versioning::SemanticVersion;
 use server::{IGGY_ROOT_PASSWORD_ENV, IGGY_ROOT_USERNAME_ENV, map_toggle_str};
 use tokio::time::Instant;
@@ -117,21 +119,27 @@ fn main() -> Result<(), ServerError> {
     let mut logging = Logging::new(config.telemetry.clone());
     logging.early_init();
 
+    // From this point on, we can use tracing macros to log messages.
+    logging.late_init(config.system.get_system_path(), 
&config.system.logging)?;
+
     // TODO: Make this configurable from config as a range
     // for example this instance of Iggy will use cores from 0..4
     let available_cpus = available_parallelism().expect("Failed to get num of 
cores");
     let shards_count = available_cpus.into();
     let shards_set = 0..shards_count;
     let connections = create_shard_connections(shards_set.clone());
+    let gate = Arc::new(Gate::new());
+    let mut handles = Vec::with_capacity(shards_set.len());
     for shard_id in shards_set {
-        let gate: Arc<Gate<()>> = Arc::new(Gate::new());
         let id = shard_id as u16;
+        let gate = gate.clone();
         let connections = connections.clone();
         let config = config.clone();
         let state_persister = 
resolve_persister(config.system.state.enforce_fsync);
-        std::thread::Builder::new()
+        let handle = std::thread::Builder::new()
             .name(format!("shard-{id}"))
             .spawn(move || {
+                MemoryPool::init_pool(config.system.clone());
                 monoio::utils::bind_to_cpu_set(Some(shard_id))
                     .expect(format!("Failed to set CPU affinity for 
shard-{id}").as_str());
 
@@ -159,7 +167,6 @@ fn main() -> Result<(), ServerError> {
 
                     // We can't use std::sync::Once because it doesn't support 
async.
                     // Trait bound on the closure is FnOnce.
-                    let gate = gate.clone();
                     // Peak into the state to check if the root user exists.
                     // If it does not exist, create it.
                     gate.with_async::<Result<(), IggyError>>(async 
|gate_state| {
@@ -180,9 +187,7 @@ fn main() -> Result<(), ServerError> {
                                 entry
                                     .command()
                                     .and_then(|command| match command {
-                                        EntryCommand::CreateUser(payload)
-                                            if payload.command.username
-                                            == IGGY_ROOT_USERNAME_ENV && 
payload.command.password == IGGY_ROOT_PASSWORD_ENV =>
+                                        EntryCommand::CreateUser(payload) if 
payload.user_id == DEFAULT_ROOT_USER_ID =>
                                         {
                                             Ok(true)
                                         }
@@ -238,19 +243,20 @@ fn main() -> Result<(), ServerError> {
                         .into();
 
                     //TODO: If one of the shards fails to initialize, we 
should crash the whole program;
-                    shard.run().await.expect("Failed to run shard");
+                    if let Err(e) = shard.run().await {
+                        error!("Failed to run shard-{id}: {e}");
+                    } 
                     //TODO: If one of the shards fails to initialize, we 
should crash the whole program;
-                    shard.assert_init();
-                    let shard = Rc::new(shard);
+                    //shard.assert_init();
                 })
             })
-            .expect(format!("Failed to spawn thread for shard-{id}").as_str())
-            .join()
-            .expect(format!("Failed to join thread for shard-{id}").as_str());
+            .expect(format!("Failed to spawn thread for shard-{id}").as_str());
+        handles.push(handle);
     }
 
-    // From this point on, we can use tracing macros to log messages.
-    logging.late_init(config.system.get_system_path(), 
&config.system.logging)?;
+    handles.into_iter().for_each(|handle| {
+        handle.join().expect("Failed to join shard thread");
+    });
 
     /*
     #[cfg(feature = "disable-mimalloc")]
@@ -260,7 +266,6 @@ fn main() -> Result<(), ServerError> {
     #[cfg(not(feature = "disable-mimalloc"))]
     info!("Using mimalloc allocator");
 
-    MemoryPool::init_pool(config.system.clone());
 
     let system = SharedSystem::new(System::new(
         config.system.clone(),
diff --git a/core/server/src/shard/builder.rs b/core/server/src/shard/builder.rs
index a2a3fdfb..beda9ae2 100644
--- a/core/server/src/shard/builder.rs
+++ b/core/server/src/shard/builder.rs
@@ -113,7 +113,7 @@ impl IggyShardBuilder {
             version: version,
             stop_receiver: stop_receiver,
             stop_sender: stop_sender,
-            frame_receiver: Cell::new(Some(frame_receiver)),
+            messages_receiver: Cell::new(Some(frame_receiver)),
             metrics: Metrics::init(),
 
             users: Default::default(),
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index 7976cca3..cfbd6eac 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -20,13 +20,14 @@ pub mod builder;
 pub mod gate;
 pub mod namespace;
 pub mod system;
+pub mod tasks;
 pub mod transmission;
 
 use ahash::{AHashMap, AHashSet, HashMap};
 use builder::IggyShardBuilder;
 use error_set::ErrContext;
 use futures::future::try_join_all;
-use iggy_common::{EncryptorKind, IggyError, UserId};
+use iggy_common::{EncryptorKind, Identifier, IggyError, UserId};
 use namespace::IggyNamespace;
 use std::{
     cell::{Cell, RefCell},
@@ -46,23 +47,29 @@ use crate::{
     configs::server::ServerConfig,
     shard::{
         system::info::SystemInfo,
+        tasks::messages::spawn_shard_message_task,
         transmission::{
+            event::ShardEvent,
             frame::{ShardFrame, ShardResponse},
-            message::{ShardEvent, ShardMessage, ShardRequest},
+            message::{ShardMessage, ShardRequest, ShardRequestPayload},
         },
     },
     state::{
-        file::FileState, system::{StreamState, SystemState, UserState}, 
StateKind
+        StateKind,
+        file::FileState,
+        system::{StreamState, SystemState, UserState},
     },
     streaming::{
         clients::client_manager::ClientManager,
         diagnostics::metrics::Metrics,
+        partitions::partition,
         personal_access_tokens::personal_access_token::PersonalAccessToken,
         session::Session,
         storage::SystemStorage,
         streams::stream::Stream,
         users::{permissioner::Permissioner, user::User},
     },
+    tcp::tcp_server::spawn_tcp_server,
     versioning::SemanticVersion,
 };
 
@@ -90,16 +97,15 @@ impl Shard {
             .sender
             .send(ShardFrame::new(message, Some(sender.clone()))); // 
Apparently sender needs to be cloned, otherwise channel will close...
         //TODO: Fixme
-        let response = receiver.recv().await
-            .map_err(|err| {
-                error!("Failed to receive response from shard: {err}");
-                IggyError::ShardCommunicationError(self.id)
-            })?;
+        let response = receiver.recv().await.map_err(|err| {
+            error!("Failed to receive response from shard: {err}");
+            IggyError::ShardCommunicationError(self.id)
+        })?;
         Ok(response)
     }
 }
 
-struct ShardInfo {
+pub struct ShardInfo {
     id: u16,
 }
 
@@ -135,7 +141,7 @@ pub struct IggyShard {
     pub(crate) users: RefCell<HashMap<UserId, User>>,
 
     pub(crate) metrics: Metrics,
-    pub frame_receiver: Cell<Option<Receiver<ShardFrame>>>,
+    pub messages_receiver: Cell<Option<Receiver<ShardFrame>>>,
     stop_receiver: StopReceiver,
     stop_sender: StopSender,
 }
@@ -172,17 +178,16 @@ impl IggyShard {
         // have the correct statistics when the server starts.
         //self.get_stats().await?;
         self.init().await?;
-        self.assert_init();
+        // TODO: Fixme
+        //self.assert_init();
         info!("Initiated shard with ID: {}", self.id);
         // Create all tasks (tcp listener, http listener, command processor, 
in the future also the background jobs).
-        /*
-        let mut tasks: Vec<Task> = 
vec![Box::pin(spawn_shard_message_task(shard.clone()))];
+        let mut tasks: Vec<Task> = 
vec![Box::pin(spawn_shard_message_task(self.clone()))];
         if self.config.tcp.enabled {
             tasks.push(Box::pin(spawn_tcp_server(self.clone())));
         }
         let result = try_join_all(tasks).await;
         result?;
-        */
 
         Ok(())
     }
@@ -427,6 +432,7 @@ impl IggyShard {
 
     #[instrument(skip_all, name = "trace_shutdown")]
     pub async fn shutdown(&mut self) -> Result<(), IggyError> {
+        //TODO: Fixme, impl cooperative shutdown.
         self.persist_messages().await?;
         Ok(())
     }
@@ -449,6 +455,81 @@ impl IggyShard {
         self.shards.len() as u32
     }
 
+    pub async fn handle_shard_message(&self, message: ShardMessage) -> 
Option<ShardResponse> {
+        match message {
+            ShardMessage::Request(request) => match 
self.handle_request(request).await {
+                Ok(response) => Some(response),
+                Err(err) => Some(ShardResponse::ErrorResponse(err)),
+            },
+            ShardMessage::Event(event) => match self.handle_event(event).await 
{
+                Ok(_) => Some(ShardResponse::Event),
+                Err(err) => Some(ShardResponse::ErrorResponse(err)),
+            },
+        }
+    }
+
+    async fn handle_request(&self, request: ShardRequest) -> 
Result<ShardResponse, IggyError> {
+        let stream = 
self.get_stream(&Identifier::numeric(request.stream_id)?)?;
+        let topic = stream.get_topic(&Identifier::numeric(request.topic_id)?)?;
+        let partition_id = request.partition_id;
+        match request.payload {
+            ShardRequestPayload::SendMessages { batch } => {
+                topic.append_messages(partition_id, batch).await?;
+                Ok(ShardResponse::SendMessages)
+            }
+            ShardRequestPayload::PollMessages {
+                args,
+                consumer,
+                count,
+            } => {
+                let (metadata, batch) = topic
+                    .get_messages(consumer, partition_id, args.strategy, count)
+                    .await?;
+                Ok(ShardResponse::PollMessages((metadata, batch)))
+            }
+        }
+    }
+
+    async fn handle_event(&self, event: Arc<ShardEvent>) -> Result<(), 
IggyError> {
+        match &*event {
+            ShardEvent::CreatedStream { stream_id, name } => {
+                self.create_stream_bypass_auth(*stream_id, name).await
+            }
+            ShardEvent::CreatedTopic {
+                stream_id,
+                topic_id,
+                name,
+                partitions_count,
+                message_expiry,
+                compression_algorithm,
+                max_topic_size,
+                replication_factor,
+            } => {
+                self.create_topic_bypass_auth(
+                    stream_id,
+                    *topic_id,
+                    name,
+                    *partitions_count,
+                    *message_expiry,
+                    *compression_algorithm,
+                    *max_topic_size,
+                    *replication_factor,
+                )
+                .await
+            }
+            ShardEvent::LoginUser {
+                client_id,
+                username,
+                password,
+            } => self.login_user_event(*client_id, username, password),
+            ShardEvent::NewSession { address, transport } => {
+                let session = self.add_client(address, *transport);
+                self.add_active_session(session);
+                Ok(())
+            }
+        }
+    }
+
     pub async fn send_request_to_shard(
         &self,
         namespace: &IggyNamespace,
@@ -479,6 +560,25 @@ impl IggyShard {
         }
     }
 
+    pub fn broadcast_event_to_all_shards(&self, event: Arc<ShardEvent>) -> 
Vec<ShardResponse> {
+        self.shards
+            .iter()
+            .filter_map(|shard| {
+                if shard.id != self.id {
+                    Some(shard.connection.clone())
+                } else {
+                    None
+                }
+            })
+            .map(|conn| {
+                // TODO: Fixme, maybe we should send response_sender
+                // and propagate errors back.
+                conn.send(ShardFrame::new(event.clone().into(), None));
+                ShardResponse::Event
+            })
+            .collect()
+    }
+
     fn find_shard(&self, namespace: &IggyNamespace) -> Option<&Shard> {
         let shards_table = self.shards_table.borrow();
         shards_table.get(namespace).map(|shard_info| {
@@ -496,27 +596,6 @@ impl IggyShard {
         self.shards_table.borrow_mut().extend(records);
     }
 
-    pub fn broadcast_event_to_all_shards(&self, client_id: u32, event: 
ShardEvent) {
-        self.shards
-            .iter()
-            .filter_map(|shard| {
-                if shard.id != self.id {
-                    Some(shard.connection.clone())
-                } else {
-                    None
-                }
-            })
-            .map(|conn| {
-                //TODO: Fixme
-                /*
-                let message = ShardMessage::Event(event);
-                conn.send(ShardFrame::new(client_id, message, None));
-                */
-                ()
-            })
-            .collect::<Vec<_>>();
-    }
-
     pub fn add_active_session(&self, session: Rc<Session>) {
         self.active_sessions.borrow_mut().push(session);
     }
diff --git a/core/server/src/shard/system/messages.rs 
b/core/server/src/shard/system/messages.rs
index 76337a1a..36ae234b 100644
--- a/core/server/src/shard/system/messages.rs
+++ b/core/server/src/shard/system/messages.rs
@@ -25,8 +25,8 @@ use crate::streaming::utils::PooledBuffer;
 use async_zip::tokio::read::stream;
 use error_set::ErrContext;
 use iggy_common::{
-    BytesSerializable, Consumer, EncryptorKind, IGGY_MESSAGE_HEADER_SIZE, 
Identifier,
-    IggyError, Partitioning, PollingStrategy,
+    BytesSerializable, Consumer, EncryptorKind, IGGY_MESSAGE_HEADER_SIZE, 
Identifier, IggyError,
+    Partitioning, PollingStrategy,
 };
 use tracing::{error, trace};
 
diff --git a/core/server/src/shard/system/streams.rs 
b/core/server/src/shard/system/streams.rs
index fecec536..47068f11 100644
--- a/core/server/src/shard/system/streams.rs
+++ b/core/server/src/shard/system/streams.rs
@@ -186,6 +186,19 @@ impl IggyShard {
         }))
     }
 
+    pub async fn create_stream_bypass_auth(
+        &self,
+        stream_id: Option<u32>,
+        name: &str,
+    ) -> Result<(), IggyError> {
+        let stream = self.create_stream_base(stream_id, name)?;
+        let id = stream.stream_id;
+        self.streams_ids.borrow_mut().insert(name.to_owned(), id);
+        self.streams.borrow_mut().insert(id, stream);
+        self.metrics.increment_streams(1);
+        Ok(())
+    }
+
     pub async fn create_stream(
         &self,
         session: &Session,
@@ -199,6 +212,25 @@ impl IggyShard {
         if self.streams_ids.borrow().contains_key(name) {
             return Err(IggyError::StreamNameAlreadyExists(name.to_owned()));
         }
+        let stream = self
+            .create_stream_base(stream_id, name)
+            .with_error_context(|error| {
+                format!("{COMPONENT} (error: {error}) - failed to create 
stream with name: {name}")
+            })?;
+        let id = stream.stream_id;
+
+        stream.persist().await?;
+        info!("Created stream with ID: {id}, name: '{name}'.");
+        self.streams_ids.borrow_mut().insert(name.to_owned(), id);
+        self.streams.borrow_mut().insert(id, stream);
+        self.metrics.increment_streams(1);
+        Ok(Identifier::numeric(id)?)
+    }
+
+    fn create_stream_base(&self, stream_id: Option<u32>, name: &str) -> 
Result<Stream, IggyError> {
+        if self.streams_ids.borrow().contains_key(name) {
+            return Err(IggyError::StreamNameAlreadyExists(name.to_owned()));
+        }
 
         let mut id;
         if stream_id.is_none() {
@@ -222,14 +254,7 @@ impl IggyShard {
         }
 
         let stream = Stream::create(id, name, self.config.system.clone(), 
self.storage.clone());
-        stream.persist().await?;
-        info!("Created stream with ID: {id}, name: '{name}'.");
-        self.streams_ids
-            .borrow_mut()
-            .insert(name.to_owned(), stream.stream_id);
-        self.streams.borrow_mut().insert(stream.stream_id, stream);
-        self.metrics.increment_streams(1);
-        Ok(Identifier::numeric(id)?)
+        Ok(stream)
     }
 
     pub async fn update_stream(
diff --git a/core/server/src/shard/system/topics.rs 
b/core/server/src/shard/system/topics.rs
index 252dae30..c247cf74 100644
--- a/core/server/src/shard/system/topics.rs
+++ b/core/server/src/shard/system/topics.rs
@@ -101,6 +101,52 @@ impl IggyShard {
         Ok(Some(topic))
     }
 
+    pub async fn create_topic_bypass_auth(
+        &self,
+        stream_id: &Identifier,
+        topic_id: Option<u32>,
+        name: &str,
+        partitions_count: u32,
+        message_expiry: IggyExpiry,
+        compression_algorithm: CompressionAlgorithm,
+        max_topic_size: MaxTopicSize,
+        replication_factor: Option<u8>,
+    ) -> Result<(), IggyError> {
+        let (stream_id, topic_id, partition_ids) = self
+            .create_topic_base(
+                stream_id,
+                topic_id,
+                name,
+                partitions_count,
+                message_expiry,
+                compression_algorithm,
+                max_topic_size,
+                replication_factor,
+            )
+            .await?;
+        // TODO: Figure out a way how to distribute the shards table among 
different shards,
+        // without the need to do code from below, everytime we handle a 
`ShardEvent`.
+        // I think we shouldn't be sharing it, maintain a single shard table 
per shard,
+        // but figure out a way how to distribute it smarter (maybe move the 
broadcast inside of the `insert_shard_table_records`) method.
+        let records = partition_ids.into_iter().map(|partition_id| {
+            let namespace = IggyNamespace::new(stream_id, topic_id, 
partition_id);
+            // TODO: This setup isn't deterministic.
+            // Imagine a scenario where client creates partition using 
`String` identifiers,
+            // but then for poll_messages requests uses numeric ones.
+            // the namespace wouldn't match, therefore we would get miss in 
the shard table.
+            let hash = namespace.generate_hash();
+            let shard_id = hash % self.get_available_shards_count();
+            let shard_info = ShardInfo::new(shard_id as u16);
+            (namespace, shard_info)
+        });
+        self.insert_shard_table_records(records);
+
+        self.metrics.increment_topics(1);
+        self.metrics.increment_partitions(partitions_count);
+        self.metrics.increment_segments(partitions_count);
+        Ok(())
+    }
+
     #[allow(clippy::too_many_arguments)]
     pub async fn create_topic(
         &self,
@@ -130,26 +176,18 @@ impl IggyShard {
                 })?;
         }
 
-        // TODO: Make create topic sync, and extract the storage persister out 
of it
-        // perform disk i/o outside of the borrow_mut of the stream.
-        let mut stream = 
self.get_stream_mut(stream_id).with_error_context(|error| {
-            format!("{COMPONENT} (error: {error}) - failed to get mutable 
reference to stream with ID: {stream_id}")
-        })?;
-        let stream_id = stream.stream_id;
-        let (topic_id, partition_ids) = stream
-            .create_topic(
+        let (stream_id, topic_id, partition_ids) = self
+            .create_topic_base(
+                stream_id,
                 topic_id,
                 name,
                 partitions_count,
                 message_expiry,
                 compression_algorithm,
                 max_topic_size,
-                replication_factor.unwrap_or(1),
+                replication_factor,
             )
-            .await
-            .with_error_context(|error| {
-                format!("{COMPONENT} (error: {error}) - failed to create topic 
with name: {name} in stream ID: {stream_id}")
-            })?;
+            .await?;
         let records = partition_ids.into_iter().map(|partition_id| {
             let namespace = IggyNamespace::new(stream_id, topic_id, 
partition_id);
             // TODO: This setup isn't deterministic.
@@ -170,6 +208,40 @@ impl IggyShard {
         Ok(Identifier::numeric(topic_id)?)
     }
 
+    async fn create_topic_base(
+        &self,
+        stream_id: &Identifier,
+        topic_id: Option<u32>,
+        name: &str,
+        partitions_count: u32,
+        message_expiry: IggyExpiry,
+        compression_algorithm: CompressionAlgorithm,
+        max_topic_size: MaxTopicSize,
+        replication_factor: Option<u8>,
+    ) -> Result<(u32, u32, Vec<u32>), IggyError> {
+        // TODO: Make create topic sync, and extract the storage persister out 
of it
+        // perform disk i/o outside of the borrow_mut of the stream.
+        let mut stream = 
self.get_stream_mut(stream_id).with_error_context(|error| {
+            format!("{COMPONENT} (error: {error}) - failed to get mutable 
reference to stream with ID: {stream_id}")
+        })?;
+        let stream_id = stream.stream_id;
+        let (topic_id, partition_ids) = stream
+            .create_topic(
+                topic_id,
+                name,
+                partitions_count,
+                message_expiry,
+                compression_algorithm,
+                max_topic_size,
+                replication_factor.unwrap_or(1),
+            )
+            .await
+            .with_error_context(|error| {
+                format!("{COMPONENT} (error: {error}) - failed to create topic 
with name: {name} in stream ID: {stream_id}")
+            })?;
+        Ok((stream_id, topic_id, partition_ids))
+    }
+
     #[allow(clippy::too_many_arguments)]
     pub async fn update_topic(
         &self,
diff --git a/core/server/src/shard/system/users.rs 
b/core/server/src/shard/system/users.rs
index f3b271a7..f0e87fc0 100644
--- a/core/server/src/shard/system/users.rs
+++ b/core/server/src/shard/system/users.rs
@@ -372,6 +372,21 @@ impl IggyShard {
         Ok(())
     }
 
+    pub fn login_user_event(
+        &self,
+        client_id: u32,
+        username: &str,
+        password: &str,
+    ) -> Result<(), IggyError> {
+        let active_sessions = self.active_sessions.borrow();
+        let session = active_sessions
+            .iter()
+            .find(|s| s.client_id == client_id)
+            .expect(format!("At this point session for {}, should exist.", 
client_id).as_str());
+        self.login_user_with_credentials(username, Some(password), 
Some(session))?;
+        Ok(())
+    }
+
     pub fn login_user(
         &self,
         username: &str,
diff --git a/core/server/src/shard/tasks/messages.rs 
b/core/server/src/shard/tasks/messages.rs
new file mode 100644
index 00000000..87660290
--- /dev/null
+++ b/core/server/src/shard/tasks/messages.rs
@@ -0,0 +1,38 @@
+use futures::StreamExt;
+use iggy_common::IggyError;
+use std::rc::Rc;
+use tracing::error;
+
+use crate::shard::{IggyShard, transmission::frame::ShardFrame};
+
+async fn run_shard_messages_receiver(shard: Rc<IggyShard>) -> Result<(), 
IggyError> {
+    let mut messages_receiver = shard.messages_receiver.take().unwrap();
+    loop {
+        if let Some(frame) = messages_receiver.next().await {
+            let ShardFrame {
+                message,
+                response_sender,
+            } = frame;
+            match (shard.handle_shard_message(message).await, response_sender) 
{
+                (Some(response), Some(response_sender)) => {
+                    response_sender
+                        .send(response)
+                        .await
+                        .expect("Failed to send response back to origin 
shard.");
+                }
+                _ => {}
+            };
+        }
+    }
+}
+
+pub async fn spawn_shard_message_task(shard: Rc<IggyShard>) -> Result<(), 
IggyError> {
+    monoio::spawn(async move {
+        let result = run_shard_messages_receiver(shard).await;
+        if let Err(err) = &result {
+            error!("Error running shard: {err}");
+        }
+        result
+    })
+    .await
+}
diff --git a/core/server/src/shard/tasks/mod.rs 
b/core/server/src/shard/tasks/mod.rs
new file mode 100644
index 00000000..ba63992f
--- /dev/null
+++ b/core/server/src/shard/tasks/mod.rs
@@ -0,0 +1 @@
+pub mod messages;
diff --git a/core/server/src/shard/transmission/event.rs 
b/core/server/src/shard/transmission/event.rs
index 45815a50..49794c3f 100644
--- a/core/server/src/shard/transmission/event.rs
+++ b/core/server/src/shard/transmission/event.rs
@@ -4,26 +4,27 @@ use iggy_common::{CompressionAlgorithm, Identifier, 
IggyExpiry, MaxTopicSize};
 
 use crate::streaming::clients::client_manager::Transport;
 
+#[derive(Debug)]
 pub enum ShardEvent {
     CreatedStream {
         stream_id: Option<u32>,
-        topic_id: Identifier
+        name: String,
     },
     //DeletedStream(Identifier),
     //UpdatedStream(Identifier, String),
     //PurgedStream(Identifier),
     //CreatedPartitions(Identifier, Identifier, u32),
     //DeletedPartitions(Identifier, Identifier, u32),
-    CreatedTopic(
-        Identifier,
-        Option<u32>,
-        String,
-        u32,
-        IggyExpiry,
-        CompressionAlgorithm,
-        MaxTopicSize,
-        Option<u8>,
-    ),
+    CreatedTopic {
+        stream_id: Identifier,
+        topic_id: Option<u32>,
+        name: String,
+        partitions_count: u32,
+        message_expiry: IggyExpiry,
+        compression_algorithm: CompressionAlgorithm,
+        max_topic_size: MaxTopicSize,
+        replication_factor: Option<u8>,
+    },
     //CreatedConsumerGroup(Identifier, Identifier, Option<u32>, String),
     //DeletedConsumerGroup(Identifier, Identifier, Identifier),
     /*
@@ -42,6 +43,7 @@ pub enum ShardEvent {
     //CreatedUser(String, String, UserStatus, Option<Permissions>),
     //DeletedUser(Identifier),
     LoginUser {
+        client_id: u32,
         username: String,
         password: String,
     },
@@ -53,8 +55,7 @@ pub enum ShardEvent {
     //LoginWithPersonalAccessToken(String),
     //StoredConsumerOffset(Identifier, Identifier, PollingConsumer, u64),
     NewSession {
-        user_id: u32,
-        socket_addr: SocketAddr,
+        address: SocketAddr,
         transport: Transport,
     },
-}
\ No newline at end of file
+}
diff --git a/core/server/src/shard/transmission/frame.rs 
b/core/server/src/shard/transmission/frame.rs
index ff519ab4..6539ec4c 100644
--- a/core/server/src/shard/transmission/frame.rs
+++ b/core/server/src/shard/transmission/frame.rs
@@ -18,13 +18,16 @@
 use async_channel::Sender;
 use iggy_common::IggyError;
 
-use 
crate::{binary::handlers::messages::poll_messages_handler::IggyPollMetadata, 
shard::transmission::message::ShardMessage, 
streaming::segments::IggyMessagesBatchSet};
+use crate::{
+    binary::handlers::messages::poll_messages_handler::IggyPollMetadata,
+    shard::transmission::message::ShardMessage, 
streaming::segments::IggyMessagesBatchSet,
+};
 
 #[derive(Debug)]
 pub enum ShardResponse {
     PollMessages((IggyPollMetadata, IggyMessagesBatchSet)),
     SendMessages,
-    ShardEvent,
+    Event,
     ErrorResponse(IggyError),
 }
 
diff --git a/core/server/src/shard/transmission/message.rs 
b/core/server/src/shard/transmission/message.rs
index 360e7a3c..aa196fde 100644
--- a/core/server/src/shard/transmission/message.rs
+++ b/core/server/src/shard/transmission/message.rs
@@ -1,4 +1,4 @@
-use std::rc::Rc;
+use std::{rc::Rc, sync::Arc};
 
 use iggy_common::PollingStrategy;
 
@@ -19,29 +19,47 @@ use iggy_common::PollingStrategy;
  * specific language governing permissions and limitations
  * under the License.
  */
-use crate::{shard::system::messages::PollingArgs, 
streaming::{polling_consumer::PollingConsumer, segments::IggyMessagesBatchMut, 
session::Session}};
+use crate::{
+    shard::{system::messages::PollingArgs, transmission::event::ShardEvent},
+    streaming::{polling_consumer::PollingConsumer, 
segments::IggyMessagesBatchMut},
+};
 
 #[derive(Debug)]
 pub enum ShardMessage {
     Request(ShardRequest),
-    Event(ShardEvent),
+    Event(Arc<ShardEvent>),
 }
 
 #[derive(Debug)]
-pub enum ShardEvent {
-    NewSession(),
+pub struct ShardRequest {
+    pub stream_id: u32,
+    pub topic_id: u32,
+    pub partition_id: u32,
+    pub payload: ShardRequestPayload,
 }
 
-#[derive(Debug)]
-pub enum ShardRequest {
-    SendMessages {
+impl ShardRequest {
+    pub fn new(
         stream_id: u32,
         topic_id: u32,
         partition_id: u32,
+        payload: ShardRequestPayload,
+    ) -> Self {
+        Self {
+            stream_id,
+            topic_id,
+            partition_id,
+            payload,
+        }
+    }
+}
+
+#[derive(Debug)]
+pub enum ShardRequestPayload {
+    SendMessages {
         batch: IggyMessagesBatchMut,
     },
     PollMessages {
-        partition_id: u32,
         args: PollingArgs,
         consumer: PollingConsumer,
         count: u32,
@@ -54,8 +72,8 @@ impl From<ShardRequest> for ShardMessage {
     }
 }
 
-impl From<ShardEvent> for ShardMessage {
-    fn from(event: ShardEvent) -> Self {
+impl From<Arc<ShardEvent>> for ShardMessage {
+    fn from(event: Arc<ShardEvent>) -> Self {
         ShardMessage::Event(event)
     }
 }
diff --git a/core/server/src/shard/transmission/mod.rs 
b/core/server/src/shard/transmission/mod.rs
index 4ed9bd1e..107bdcbd 100644
--- a/core/server/src/shard/transmission/mod.rs
+++ b/core/server/src/shard/transmission/mod.rs
@@ -16,7 +16,7 @@
  * under the License.
  */
 
-pub mod event;
 pub mod connector;
+pub mod event;
 pub mod frame;
 pub mod message;
diff --git a/core/server/src/state/file.rs b/core/server/src/state/file.rs
index b845d877..e8eb6ee6 100644
--- a/core/server/src/state/file.rs
+++ b/core/server/src/state/file.rs
@@ -117,7 +117,6 @@ impl State for FileState {
                 )
             })
             .map_err(|_| IggyError::CannotReadFile)?;
-        let file = IggyFile::new(file);
         let file_size = file
             .metadata()
             .await
@@ -129,6 +128,8 @@ impl State for FileState {
             })
             .map_err(|_| IggyError::CannotReadFileMetadata)?
             .len();
+
+        let file = IggyFile::new(file);
         if file_size == 0 {
             info!("State file is empty");
             return Ok(Vec::new());
@@ -150,7 +151,8 @@ impl State for FileState {
                 .with_error_context(|error| format!("{FILE_STATE_PARSE_ERROR} 
index. {error}"))
                 .map_err(|_| IggyError::InvalidNumberEncoding)?;
             total_size += 8;
-            if entries_count > 0 && index != current_index + 1 {
+            // Greater than one, because one of the entries after a fresh 
reboot is the default root user.
+            if entries_count > 1 && index != current_index + 1 {
                 error!(
                     "State file is corrupted, expected index: {}, got: {}",
                     current_index + 1,
diff --git a/core/server/src/streaming/segments/indexes/index_reader.rs 
b/core/server/src/streaming/segments/indexes/index_reader.rs
index c373ce0f..240e62a1 100644
--- a/core/server/src/streaming/segments/indexes/index_reader.rs
+++ b/core/server/src/streaming/segments/indexes/index_reader.rs
@@ -17,7 +17,7 @@
  */
 
 use super::IggyIndexesMut;
-use crate::streaming::utils::PooledBuffer;
+use crate::{io::file::IggyFile, streaming::utils::PooledBuffer};
 use bytes::BytesMut;
 use error_set::ErrContext;
 use iggy_common::{INDEX_SIZE, IggyError, IggyIndex, IggyIndexView};
@@ -30,15 +30,14 @@ use std::{
         atomic::{AtomicU64, Ordering},
     },
 };
-use tokio::fs::OpenOptions;
-use tokio::task::spawn_blocking;
+use monoio::fs::OpenOptions;
 use tracing::{error, trace};
 
 /// A dedicated struct for reading from the index file.
 #[derive(Debug)]
 pub struct IndexReader {
     file_path: String,
-    file: Arc<StdFile>,
+    file: IggyFile,
     index_size_bytes: Arc<AtomicU64>,
 }
 
@@ -52,13 +51,14 @@ impl IndexReader {
             .with_error_context(|error| format!("Failed to open index file: 
{file_path}. {error}"))
             .map_err(|_| IggyError::CannotReadFile)?;
 
+        let file = IggyFile::new(file);
         trace!(
             "Opened index file for reading: {file_path}, size: {}",
             index_size_bytes.load(Ordering::Acquire)
         );
         Ok(Self {
             file_path: file_path.to_string(),
-            file: Arc::new(file.into_std().await),
+            file,
             index_size_bytes,
         })
     }
@@ -334,21 +334,19 @@ impl IndexReader {
         len: u32,
         use_pool: bool,
     ) -> Result<PooledBuffer, std::io::Error> {
-        let file = self.file.clone();
-        spawn_blocking(move || {
             if use_pool {
                 let mut buf = PooledBuffer::with_capacity(len as usize);
                 unsafe { buf.set_len(len as usize) };
-                file.read_exact_at(&mut buf, offset as u64)?;
+                let (result, buf) = self.file.read_exact_at(buf, offset as 
u64).await;
+                result?;
                 Ok(buf)
             } else {
                 let mut buf = BytesMut::with_capacity(len as usize);
                 unsafe { buf.set_len(len as usize) };
-                file.read_exact_at(&mut buf, offset as u64)?;
+                let (result, buf) = self.file.read_exact_at(buf, offset as 
u64).await;
+                result?;
                 Ok(PooledBuffer::from_existing(buf))
             }
-        })
-        .await?
     }
 
     /// Gets the nth index from the index file.
diff --git a/core/server/src/streaming/segments/indexes/index_writer.rs 
b/core/server/src/streaming/segments/indexes/index_writer.rs
index 71b18b6d..120bfe4e 100644
--- a/core/server/src/streaming/segments/indexes/index_writer.rs
+++ b/core/server/src/streaming/segments/indexes/index_writer.rs
@@ -19,21 +19,22 @@
 use error_set::ErrContext;
 use iggy_common::INDEX_SIZE;
 use iggy_common::IggyError;
+use monoio::fs::OpenOptions;
+use monoio::io::AsyncWriteRentExt;
 use std::sync::{
     Arc,
     atomic::{AtomicU64, Ordering},
 };
-use tokio::{
-    fs::{File, OpenOptions},
-    io::AsyncWriteExt,
-};
 use tracing::trace;
 
+use crate::io::file::IggyFile;
+use crate::streaming::utils::PooledBuffer;
+
 /// A dedicated struct for writing to the index file.
 #[derive(Debug)]
 pub struct IndexWriter {
     file_path: String,
-    file: File,
+    file: IggyFile,
     index_size_bytes: Arc<AtomicU64>,
     fsync: bool,
 }
@@ -55,6 +56,7 @@ impl IndexWriter {
             .with_error_context(|error| format!("Failed to open index file: 
{file_path}. {error}"))
             .map_err(|_| IggyError::CannotReadFile)?;
 
+        let file = IggyFile::from(file);
         if file_exists {
             let _ = file.sync_all().await.with_error_context(|error| {
                 format!("Failed to fsync index file after creation: 
{file_path}. {error}",)
@@ -86,16 +88,18 @@ impl IndexWriter {
     }
 
     /// Appends multiple index buffer to the index file in a single operation.
-    pub async fn save_indexes(&mut self, indexes: &[u8]) -> Result<(), 
IggyError> {
+    pub async fn save_indexes(&mut self, indexes: PooledBuffer) -> Result<(), 
IggyError> {
         if indexes.is_empty() {
             return Ok(());
         }
 
         let count = indexes.len() / INDEX_SIZE;
+        let len =  indexes.len();
 
         self.file
             .write_all(indexes)
             .await
+            .0
             .with_error_context(|error| {
                 format!(
                     "Failed to write {} indexes to file: {}. {error}",
@@ -105,7 +109,7 @@ impl IndexWriter {
             .map_err(|_| IggyError::CannotSaveIndexToSegment)?;
 
         self.index_size_bytes
-            .fetch_add(indexes.len() as u64, Ordering::Release);
+            .fetch_add(len  as u64, Ordering::Release);
 
         if self.fsync {
             let _ = self.fsync().await;
diff --git a/core/server/src/streaming/segments/indexes/indexes_mut.rs 
b/core/server/src/streaming/segments/indexes/indexes_mut.rs
index 0dacb907..1b5e8e1e 100644
--- a/core/server/src/streaming/segments/indexes/indexes_mut.rs
+++ b/core/server/src/streaming/segments/indexes/indexes_mut.rs
@@ -221,9 +221,12 @@ impl IggyIndexesMut {
     }
 
     /// Gets the unsaved part of the index buffer
-    pub fn unsaved_slice(&self) -> &[u8] {
+    pub fn unsaved_slice(&self) -> PooledBuffer {
         let start_pos = self.saved_count as usize * INDEX_SIZE;
-        &self.buffer[start_pos..]
+        // TODO: Dunno how to handle this better, maybe we should have a 
`split` method,
+        // That splits the underlying Indexes buffer into two parts
+        // saved on disk and not saved yet.
+        PooledBuffer::from(&self.buffer[start_pos..])
     }
 
     /// Mark all indexes as saved to disk
diff --git a/core/server/src/streaming/segments/messages/mod.rs 
b/core/server/src/streaming/segments/messages/mod.rs
index 512c3894..b79b1af0 100644
--- a/core/server/src/streaming/segments/messages/mod.rs
+++ b/core/server/src/streaming/segments/messages/mod.rs
@@ -19,71 +19,35 @@
 mod messages_reader;
 mod messages_writer;
 
-use crate::{io::file::IggyFile, to_iovec};
+use crate::{io::file::IggyFile};
 
 use super::IggyMessagesBatchSet;
 use error_set::ErrContext;
 use iggy_common::IggyError;
-use monoio::io::AsyncWriteRent;
-use std::{io::IoSlice, mem::take};
+use monoio::io::AsyncWriteRentExt;
 
 pub use messages_reader::MessagesReader;
 pub use messages_writer::MessagesWriter;
 
-use nix::libc::iovec;
 
 /// Vectored write a batches of messages to file
 async fn write_batch(
     file: &mut IggyFile,
     file_path: &str,
-    batches: IggyMessagesBatchSet,
+    mut batches: IggyMessagesBatchSet,
 ) -> Result<usize, IggyError> {
-    let mut slices = batches.iter().map(|b| 
to_iovec(&b)).collect::<Vec<iovec>>();
+    //let mut slices = batches.iter().map(|b| 
to_iovec(&b)).collect::<Vec<iovec>>();
     let mut total_written = 0;
-
-    loop {
-        let (result, vomited) = file.writev(slices).await;
-        slices = vomited;
-        let bytes_written = result
-            .with_error_context(|error| {
-                format!("Failed to write messages to file: {file_path}, error: 
{error}",)
-            })
-            .map_err(|_| IggyError::CannotWriteToFile)?;
-
-        total_written += bytes_written;
-        advance_slices(&mut slices.as_mut_slice(), bytes_written);
-        if slices.is_empty() {
-            break;
-        }
+    // TODO: Fork monoio, piece of shit runtime.
+    for batch in batches.iter_mut()  {
+        let messages = batch.take_messages();
+        let writen = 
file.write_all(messages).await.0.with_error_context(|error| {
+            format!(
+                "Failed to write messages to file: {file_path}, error: 
{error}",
+            )
+            // TODO: Better error variant.
+        }).map_err(|_| IggyError::CannotAppendMessage)?;
+        total_written += writen;
     }
-
     Ok(total_written)
 }
-
-fn advance_slices(mut bufs: &mut [iovec], n: usize) {
-    // Number of buffers to remove.
-    let mut remove = 0;
-    // Remaining length before reaching n. This prevents overflow
-    // that could happen if the length of slices in `bufs` were instead
-    // accumulated. Those slice may be aliased and, if they are large
-    // enough, their added length may overflow a `usize`.
-    let mut left = n;
-    for buf in bufs.iter() {
-        if let Some(remainder) = left.checked_sub(buf.iov_len as _) {
-            left = remainder;
-            remove += 1;
-        } else {
-            break;
-        }
-    }
-
-    bufs = &mut bufs[remove..];
-    if bufs.is_empty() {
-        assert!(left == 0, "advancing io slices beyond their length");
-    } else {
-        unsafe {
-            bufs[0].iov_len -= n;
-            bufs[0].iov_base = bufs[0].iov_base.add(n);
-        }
-    }
-}
diff --git a/core/server/src/streaming/segments/segment.rs 
b/core/server/src/streaming/segments/segment.rs
index 92dd254f..9887df90 100644
--- a/core/server/src/streaming/segments/segment.rs
+++ b/core/server/src/streaming/segments/segment.rs
@@ -313,7 +313,8 @@ impl Segment {
         }
 
         if let Some(index_writer) = self.index_writer.take() {
-            tokio::spawn(async move {
+            //TODO: Fixme not sure whether we should spawn a task here.
+            monoio::spawn(async move {
                 let _ = index_writer.fsync().await;
                 drop(index_writer)
             });
diff --git a/core/server/src/streaming/segments/types/messages_batch_mut.rs 
b/core/server/src/streaming/segments/types/messages_batch_mut.rs
index ea549291..6a76b582 100644
--- a/core/server/src/streaming/segments/types/messages_batch_mut.rs
+++ b/core/server/src/streaming/segments/types/messages_batch_mut.rs
@@ -264,6 +264,10 @@ impl IggyMessagesBatchMut {
         (indexes, messages)
     }
 
+    pub fn take_messages(&mut self) ->  PooledBuffer {
+        std::mem::take(&mut self.messages)
+    }
+
     /// Take the indexes from the batch
     pub fn take_indexes(&mut self) -> IggyIndexesMut {
         std::mem::take(&mut self.indexes)
diff --git a/core/server/src/streaming/segments/writing_messages.rs 
b/core/server/src/streaming/segments/writing_messages.rs
index 9c385b02..8313f589 100644
--- a/core/server/src/streaming/segments/writing_messages.rs
+++ b/core/server/src/streaming/segments/writing_messages.rs
@@ -99,6 +99,7 @@ impl Segment {
         self.last_index_position += saved_bytes.as_bytes_u64() as u32;
 
         let unsaved_indexes_slice = self.indexes.unsaved_slice();
+        let len = unsaved_indexes_slice.len();
         self.index_writer
             .as_mut()
             .expect("Index writer not initialized")
@@ -107,7 +108,7 @@ impl Segment {
             .with_error_context(|error| {
                 format!(
                     "Failed to save index of {} indexes to {self}. {error}",
-                    unsaved_indexes_slice.len()
+                    len
                 )
             })?;
 
diff --git a/core/server/src/streaming/streams/storage.rs 
b/core/server/src/streaming/streams/storage.rs
index ebd4946a..5bf4c554 100644
--- a/core/server/src/streaming/streams/storage.rs
+++ b/core/server/src/streaming/streams/storage.rs
@@ -20,7 +20,6 @@ use crate::state::system::StreamState;
 use crate::streaming::storage::StreamStorage;
 use crate::streaming::streams::COMPONENT;
 use crate::streaming::streams::stream::Stream;
-use crate::streaming::topics::topic::CreatedTopicInfo;
 use crate::streaming::topics::topic::Topic;
 use ahash::AHashSet;
 use error_set::ErrContext;
@@ -28,11 +27,11 @@ use futures::future::join_all;
 use iggy_common::IggyError;
 use iggy_common::IggyTimestamp;
 use serde::{Deserialize, Serialize};
+use tokio::sync::Mutex;
 use std::path::Path;
 use std::sync::Arc;
-use tokio::fs;
-use tokio::fs::create_dir_all;
-use tokio::sync::Mutex;
+use monoio::fs;
+use monoio::fs::create_dir_all;
 use tracing::{error, info, warn};
 
 #[derive(Debug)]
@@ -52,13 +51,13 @@ impl StreamStorage for FileStreamStorage {
         }
 
         let mut unloaded_topics = Vec::new();
-        let dir_entries = fs::read_dir(&stream.topics_path).await;
+        let dir_entries = std::fs::read_dir(&stream.topics_path);
         if dir_entries.is_err() {
             return Err(IggyError::CannotReadTopics(stream.stream_id));
         }
 
         let mut dir_entries = dir_entries.unwrap();
-        while let Some(dir_entry) = 
dir_entries.next_entry().await.unwrap_or(None) {
+        while let Some(dir_entry) = 
dir_entries.next().transpose().unwrap_or(None) {
             let name = dir_entry.file_name().into_string().unwrap();
             let topic_id = name.parse::<u32>();
             if topic_id.is_err() {
@@ -73,7 +72,8 @@ impl StreamStorage for FileStreamStorage {
                 error!(
                     "Topic with ID: '{topic_id}' for stream with ID: 
'{stream_id}' was not found in state, but exists on disk and will be removed."
                 );
-                if let Err(error) = 
fs::remove_dir_all(&dir_entry.path()).await {
+                // TODO: Replace this with the dir walk impl that is mentioed 
in main function.
+                if let Err(error) = std::fs::remove_dir_all(&dir_entry.path()) 
{
                     error!("Cannot remove topic directory: {error}");
                 } else {
                     warn!(
@@ -151,6 +151,7 @@ impl StreamStorage for FileStreamStorage {
             }
         }
 
+        // TODO: Refactor....
         let loaded_topics = Arc::new(Mutex::new(Vec::new()));
         let mut load_topics = Vec::new();
         for mut topic in unloaded_topics {
@@ -222,7 +223,7 @@ impl StreamStorage for FileStreamStorage {
 
     async fn delete(&self, stream: &Stream) -> Result<(), IggyError> {
         info!("Deleting stream with ID: {}...", stream.stream_id);
-        if fs::remove_dir_all(&stream.path).await.is_err() {
+        if std::fs::remove_dir_all(&stream.path).is_err() {
             return 
Err(IggyError::CannotDeleteStreamDirectory(stream.stream_id));
         }
         info!("Deleted stream with ID: {}.", stream.stream_id);
diff --git a/core/server/src/streaming/topics/messages.rs 
b/core/server/src/streaming/topics/messages.rs
index 2f8f0557..f56ed560 100644
--- a/core/server/src/streaming/topics/messages.rs
+++ b/core/server/src/streaming/topics/messages.rs
@@ -25,8 +25,8 @@ use crate::streaming::utils::hash;
 use ahash::AHashMap;
 use error_set::ErrContext;
 use iggy_common::locking::IggySharedMutFn;
-use iggy_common::{IggyTimestamp, PollingStrategy};
 use iggy_common::{IggyError, IggyExpiry, Partitioning, PartitioningKind, 
PollingKind};
+use iggy_common::{IggyTimestamp, PollingStrategy};
 use std::sync::atomic::Ordering;
 use tracing::trace;
 
diff --git a/core/server/src/streaming/topics/storage.rs 
b/core/server/src/streaming/topics/storage.rs
index 827ba6c9..1025dbd3 100644
--- a/core/server/src/streaming/topics/storage.rs
+++ b/core/server/src/streaming/topics/storage.rs
@@ -29,11 +29,11 @@ use futures::future::join_all;
 use iggy_common::IggyError;
 use iggy_common::locking::IggyRwLock;
 use iggy_common::locking::IggySharedMutFn;
+use monoio::fs::create_dir_all;
 use serde::{Deserialize, Serialize};
 use std::path::Path;
 use std::sync::Arc;
-use tokio::fs;
-use tokio::fs::create_dir_all;
+use monoio::fs;
 use tokio::sync::Mutex;
 use tracing::{error, info, warn};
 
@@ -61,14 +61,14 @@ impl TopicStorage for FileTopicStorage {
         topic.compression_algorithm = state.compression_algorithm;
         topic.replication_factor = state.replication_factor.unwrap_or(1);
 
-        let mut dir_entries = fs::read_dir(&topic.partitions_path).await
+        let mut dir_entries = std::fs::read_dir(&topic.partitions_path)
             .with_context(|| format!("Failed to read partition with ID: {} for 
stream with ID: {} for topic with ID: {} and path: {}",
                                      topic.topic_id, topic.stream_id, 
topic.topic_id, &topic.partitions_path))
             .map_err(|_| IggyError::CannotReadPartitions)?;
 
         let mut unloaded_partitions = Vec::new();
-        while let Some(dir_entry) = 
dir_entries.next_entry().await.unwrap_or(None) {
-            let metadata = dir_entry.metadata().await;
+        while let Some(dir_entry) = 
dir_entries.next().transpose().unwrap_or(None) {
+            let metadata = dir_entry.metadata();
             if metadata.is_err() || metadata.unwrap().is_file() {
                 continue;
             }
@@ -88,7 +88,8 @@ impl TopicStorage for FileTopicStorage {
                 error!(
                     "Partition with ID: '{partition_id}' for stream with ID: 
'{stream_id}' and topic with ID: '{topic_id}' was not found in state, but 
exists on disk and will be removed."
                 );
-                if let Err(error) = 
fs::remove_dir_all(&dir_entry.path()).await {
+                // TODO: Replace this with the dir walk impl that is mentioed 
in main function.
+                if let Err(error) = std::fs::remove_dir_all(&dir_entry.path()) 
{
                     error!("Cannot remove partition directory: {error}");
                 } else {
                     warn!(
@@ -269,7 +270,7 @@ impl TopicStorage for FileTopicStorage {
 
     async fn delete(&self, topic: &Topic) -> Result<(), IggyError> {
         info!("Deleting topic {topic}...");
-        if fs::remove_dir_all(&topic.path).await.is_err() {
+        if std::fs::remove_dir_all(&topic.path).is_err() {
             return Err(IggyError::CannotDeleteTopicDirectory(
                 topic.topic_id,
                 topic.stream_id,
diff --git a/core/server/src/streaming/utils/memory_pool.rs 
b/core/server/src/streaming/utils/memory_pool.rs
index 198e8626..16ad2087 100644
--- a/core/server/src/streaming/utils/memory_pool.rs
+++ b/core/server/src/streaming/utils/memory_pool.rs
@@ -155,7 +155,8 @@ impl MemoryPool {
 
     /// Initialize the global pool from the given config.
     pub fn init_pool(config: Arc<SystemConfig>) {
-        let is_enabled = config.memory_pool.enabled;
+        // TODO: Fixme (stary napraw).
+        let is_enabled = false;
         let memory_limit = config.memory_pool.size.as_bytes_usize();
         let bucket_capacity = config.memory_pool.bucket_capacity as usize;
 
diff --git a/core/server/src/streaming/utils/pooled_buffer.rs 
b/core/server/src/streaming/utils/pooled_buffer.rs
index 0aa1dd3a..a8664325 100644
--- a/core/server/src/streaming/utils/pooled_buffer.rs
+++ b/core/server/src/streaming/utils/pooled_buffer.rs
@@ -18,7 +18,7 @@
 
 use super::memory_pool::{BytesMutExt, memory_pool};
 use bytes::{Buf, BufMut, BytesMut};
-use monoio::buf::IoBufMut;
+use monoio::buf::{IoBuf, IoBufMut};
 use std::ops::{Deref, DerefMut};
 
 #[derive(Debug)]
@@ -230,3 +230,13 @@ unsafe impl IoBufMut for PooledBuffer {
         unsafe { self.inner.set_init(pos) }
     }
 }
+
+unsafe impl IoBuf for PooledBuffer {
+    fn read_ptr(&self) -> *const u8 {
+        self.inner.read_ptr()
+    }
+
+    fn bytes_init(&self) -> usize {
+        self.inner.bytes_init()
+    }
+}
diff --git a/core/server/src/tcp/connection_handler.rs 
b/core/server/src/tcp/connection_handler.rs
index 9c92d27d..5622d2a8 100644
--- a/core/server/src/tcp/connection_handler.rs
+++ b/core/server/src/tcp/connection_handler.rs
@@ -35,8 +35,14 @@ pub(crate) async fn handle_connection(
     sender: &mut SenderKind,
     shard: &Rc<IggyShard>,
 ) -> Result<(), ConnectionError> {
-    let length_buffer = BytesMut::with_capacity(INITIAL_BYTES_LENGTH);
-    let code_buffer = BytesMut::with_capacity(INITIAL_BYTES_LENGTH);
+    let mut length_buffer = BytesMut::with_capacity(INITIAL_BYTES_LENGTH);
+    unsafe {
+        length_buffer.set_len(INITIAL_BYTES_LENGTH);
+    }
+    let mut code_buffer = BytesMut::with_capacity(INITIAL_BYTES_LENGTH);
+    unsafe {
+        code_buffer.set_len(INITIAL_BYTES_LENGTH);
+    }
     loop {
         let (read_length, initial_buffer) = match 
sender.read(length_buffer.clone()).await {
             (Ok(read_length), initial_buffer) => (read_length, initial_buffer),
diff --git a/core/server/src/tcp/sender.rs b/core/server/src/tcp/sender.rs
index 2af9f494..2f445d4f 100644
--- a/core/server/src/tcp/sender.rs
+++ b/core/server/src/tcp/sender.rs
@@ -119,14 +119,14 @@ where
         status
     );
     let prefix = [
-        libc::iovec {
-            iov_base: length.as_ptr() as _,
-            iov_len: length.len(),
-        },
         libc::iovec {
             iov_base: status.as_ptr() as _,
             iov_len: status.len(),
         },
+        libc::iovec {
+            iov_base: length.as_ptr() as _,
+            iov_len: length.len(),
+        },
     ];
     slices.splice(0..0, prefix);
     stream
diff --git a/core/server/src/tcp/tcp_listener.rs 
b/core/server/src/tcp/tcp_listener.rs
index ac29e2b9..c0080edf 100644
--- a/core/server/src/tcp/tcp_listener.rs
+++ b/core/server/src/tcp/tcp_listener.rs
@@ -18,15 +18,16 @@
 
 use crate::binary::sender::SenderKind;
 use crate::shard::IggyShard;
-use crate::shard::transmission::message::ShardEvent;
+use crate::shard::transmission::event::ShardEvent;
 use crate::streaming::clients::client_manager::Transport;
 use crate::tcp::connection_handler::{handle_connection, handle_error};
 use crate::tcp::tcp_socket;
+use iggy_common::IggyError;
 use std::net::SocketAddr;
 use std::rc::Rc;
 use tracing::{error, info};
 
-pub async fn start(server_name: &'static str, shard: Rc<IggyShard>) {
+pub async fn start(server_name: &'static str, shard: Rc<IggyShard>) -> 
Result<(), IggyError> {
     let ip_v6 = shard.config.tcp.ipv6;
     let socket_config = &shard.config.tcp.socket;
     let addr: SocketAddr = shard
@@ -40,8 +41,8 @@ pub async fn start(server_name: &'static str, shard: 
Rc<IggyShard>) {
     monoio::spawn(async move {
         socket
             .bind(&addr.into())
-            .expect("Failed to bind eTCP listener");
-        socket.listen(1024);
+            .expect("Failed to bind TCP listener");
+        socket.listen(1024).unwrap();
         let listener: std::net::TcpListener = socket.into();
         let listener = monoio::net::TcpListener::from_std(listener).unwrap();
         info!("{server_name} server has started on: {:?}", addr);
@@ -50,15 +51,14 @@ pub async fn start(server_name: &'static str, shard: 
Rc<IggyShard>) {
                 Ok((stream, address)) => {
                     let shard = shard.clone();
                     info!("Accepted new TCP connection: {address}");
-                    let session = shard.add_client(&address, Transport::Tcp);
+                    let transport = Transport::Tcp;
+                    let session = shard.add_client(&address, transport);
                     //TODO: Those can be shared with other shards.
                     shard.add_active_session(session.clone());
                     // Broadcast session to all shards.
-                    //TODO: Fixme
-                    /*
-                    let event = 
Rc::new(ShardEvent::NewSession(session.clone()));
-                    shard.broadcast_event_to_all_shards(session.client_id, 
event);
-                    */
+                    let event = ShardEvent::NewSession { address, transport };
+                    // TODO: Fixme look inside of 
broadcast_event_to_all_shards method.
+                    let _responses = 
shard.broadcast_event_to_all_shards(event.into());
 
                     let _client_id = session.client_id;
                     info!("Created new session: {session}");
@@ -85,5 +85,6 @@ pub async fn start(server_name: &'static str, shard: 
Rc<IggyShard>) {
                 Err(error) => error!("Unable to accept TCP socket. {error}"),
             }
         }
-    });
+    })
+    .await
 }
diff --git a/core/server/src/tcp/tcp_server.rs 
b/core/server/src/tcp/tcp_server.rs
index 9b8e1c04..0430f773 100644
--- a/core/server/src/tcp/tcp_server.rs
+++ b/core/server/src/tcp/tcp_server.rs
@@ -24,17 +24,18 @@ use tracing::info;
 
 /// Starts the TCP server.
 /// Returns the address the server is listening on.
-pub async fn start(shard: Rc<IggyShard>) -> Result<(), IggyError> {
+pub async fn spawn_tcp_server(shard: Rc<IggyShard>) -> Result<(), IggyError> {
     let server_name = if shard.config.tcp.tls.enabled {
         "Iggy TCP TLS"
     } else {
         "Iggy TCP"
     };
     info!("Initializing {server_name} server...");
-    let addr = match shard.config.tcp.tls.enabled {
+    // TODO: Fixme -- storing addr of the server inside of the config for 
integration tests...
+    let result = match shard.config.tcp.tls.enabled {
         true => unimplemented!("TLS support is not implemented yet"),
         false => tcp_listener::start(server_name, shard).await,
     };
-    info!("{server_name} server has started on: {:?}", addr);
-    Ok(())
+    //info!("{server_name} server has started on: {:?}", addr);
+    result
 }
diff --git a/core/server/src/tcp/tcp_socket.rs 
b/core/server/src/tcp/tcp_socket.rs
index 747540fa..7ea3acf0 100644
--- a/core/server/src/tcp/tcp_socket.rs
+++ b/core/server/src/tcp/tcp_socket.rs
@@ -30,6 +30,15 @@ pub fn build(ipv6: bool, config: &TcpSocketConfig) -> Socket 
{
             .expect("Unable to create an ipv4 socket")
     };
 
+    // Required by the thread-per-core model...
+    // We create bunch of sockets on different threads, that bind to exactly 
the same address and port.
+    socket
+        .set_reuse_address(true)
+        .expect("Unable to set SO_REUSEADDR on socket");
+    socket
+        .set_reuse_port(true)
+        .expect("Unable to set SO_REUSEPORT on socket");
+
     if config.override_defaults {
         config
             .recv_buffer_size
@@ -54,12 +63,6 @@ pub fn build(ipv6: bool, config: &TcpSocketConfig) -> Socket 
{
         socket
             .set_linger(Some(config.linger.get_duration()))
             .expect("Unable to set SO_LINGER on socket");
-        socket
-            .set_reuse_address(true)
-            .expect("Unable to set SO_REUSEADDR on socket");
-        socket
-            .set_reuse_port(true)
-            .expect("Unable to set SO_REUSEPORT on socket");
     }
 
     socket


Reply via email to